# Google Drive

Prepare Connection to Google Drive to download Code Snippets and Ontology

In [1]:
import pandas as pd

## Ontology

Generate list of leave nodes as CSV for LLM Prompt Context.

In [2]:
!pip install -U -q rdflib

In [3]:
from rdflib import Graph, RDFS, RDF
import csv
import json

def clean_uri(uri):
    return uri.split('#')[-1] if '#' in uri else uri.split('/')[-1]

def build_class_hierarchy(rdf_file_path, format='xml'):
    g = Graph()
    g.parse(rdf_file_path, format=format)

    hierarchy = {}
    class_labels = set()
    comments = {}

    for s, p, o in g:
        if p == RDFS.subClassOf:
            child = clean_uri(str(s))
            parent = clean_uri(str(o))

            class_labels.add(child)
            class_labels.add(parent)

            if parent not in hierarchy:
                hierarchy[parent] = []
            hierarchy[parent].append(child)

        if p == RDFS.comment:
            comments[clean_uri(str(s))] = str(o)

    for label in class_labels:
        hierarchy.setdefault(label, [])

    return hierarchy, comments

def save_dict_to_text(data, output_file_path):
    with open(output_file_path, 'w', encoding='utf-8') as f:
        f.write(json.dumps(data, indent=2))

def extract_subtree(hierarchy, root):
    subtree = {}

    def dfs(node):
        if node not in hierarchy:
            return
        subtree[node] = hierarchy[node]
        for child in hierarchy[node]:
            dfs(child)

    dfs(root)
    return subtree

def extract_leaves_with_parents(subtree):
    leaves_with_parents = []
    for parent, children in subtree.items():
        for child in children:
            if child in subtree and not subtree[child]:
                leaves_with_parents.append((parent, child))
    return leaves_with_parents

code_patterns = {
    "IfCheckingInForLoop": "refers to an if statement inside a for loop",
    "IfElifCheckingInForLoop": "refers to an if-elif statement inside a for loop",
    "IfElifElseCheckingInForLoop": "refers to an if-elif-else statement inside a for loop",
    "IfElseCheckingInForLoop": "refers to an if-else statement inside a for loop",
    "IfCheckingInWhileLoop": "refers to an if statement inside a while loop",
    "IfElifCheckingInWhileLoop": "refers to an if-elif statement inside a while loop",
    "IfElifElseCheckingInWhileLoop": "refers to an if-elif-else statement inside a while loop",
    "IfElseCheckingInWhileLoop": "refers to an if-else statement inside a while loop",
    "IfChecking": "refers to an if statement which is not placed inside any loop",
    "IfElifChecking": "refers to an if-elif statement which is not placed inside any loop",
    "IfElifElseChecking": "refers to an if-elif-else statement which is not placed inside any loop",
    "IfElseChecking": "refers to an if-else statement which is not placed inside any loop",
    "NestedIfChecking": "refers to an if inside another if",
    "ListReferencing": "refers to the code in which one list is set equal to another, and a change to one of the lists causes the same change in the other list. ListReferencing should only be marked as present if a list is explicitly assigned to another list variable (e.g., list2 = list1).",
    "AccessingDictionary": "includes any operation that retrieves values from a dictionary, including direct indexing (dict[key]) and methods like .get(), .items(), .keys(), and .values().",
    "MixedNestedLoopIteration": "refers to nested loop such that a while-loop inside a for-loop or a for-loop inside a while-loop",
    "NestedForLoopIteration": "refers to nested loop such that a for-loop is inside another for-loop",
    "NestedWhileLoopIteration": "refers to nested loop such that a while-loop is inside another while-loop",
    "SingleForLoopIteration": "refers to a single use use of a for-loop that has no nested structure",
    "SingleWhileLoopIteration": "refers to a single use of a while-loop that has no nested structure",
    "CallingFunctionLibrary": "refers to ANY use of built-in Python functions (print, len, replace, etc.) and built-in methods of objects (like list.append() or string.replace(), etc.).",
    "CallingNestedFunction": "refers to calling a function that was defined inside another user-defined function, but the call itself can happen anywhere (inside or outside the enclosing function).",
    "DefiningNestedFunction": "refers to defining a function inside another user-defined function.",
    "CallingRecursiveFunction": "refers to calling a function that calls itself (recursion)",
    "DefiningRecursiveFunction": "refers to defining a function that calls itself within its own body (recursion).",
    "NestedFunctionCall": "refers to when one function call is placed as an argument to another function call (e.g., f(g(x)))",
    "DefiningStandardFunction": "refers to defining a function that: 1) Does not call any other user-defined functions 2) Is not nested inside another function 3) Does not call itself (not recursive) Note: A standard function may still call built-in functions. Additionally, DefiningStandardFunction can still be later called in nested patterns (NestedFunctionCall).",
    "CallingStandardFunction": "refers to calling a function that meets all the 3 following criteria:1) Does not call any other user-defined functions 2) Is not nested inside another function 3) Does not call itself (not recursive) Note: A standard function may still call built-in functions."
}


leaf_filter = [
    # Python Parser
    'UnaryOperation',
    'IndexingExpression',
    'SlicingExpression',
    # Educational
    'IndexingDictionary',
    'IndexingList',
    'IndexingString',
    'IndexingTuple',
    'SlicingList',
    'SlicingTuple',
    'SlicingString',
    'WhileLoopWithListIndexing',
    'WhileLoopWith*=',
    'WhileLoopWith+=',
    'ForLoopWithListIndexing',
    'ForLoopWith*=',
    'ForLoopWith+=',
    'ReplacingDictionaryElement',
    'ReplacingListElement',
    'ReplacingElement2DArray'
]
leaf_filter = None

def save_leaves_with_parents_csv(leaves_with_parents, output_csv_path, comments):
    with open(output_csv_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Parent', 'Leaf', 'Description'])
        for parent, leaf in leaves_with_parents:

            if parent == "EducationalPython":
                parent = "Python"
            if parent == "Iteration":
                parent = "NonNestedIteration"
            if parent == "ModifyingStrigCase":
                parent = "ModifyingStringCase"
            if parent == "CallingStandardFunction" and leaf == "NestedCall":
                continue

            is_leaf_included = True
            if leaf_filter:
              is_leaf_included = leaf in leaf_filter

            if is_leaf_included:
              description = comments.get(leaf) or code_patterns.get(leaf, '')
              writer.writerow([parent, leaf, description])



if __name__ == "__main__":
    rdf_file = "data/final_ontology.owl"
    rdf_format = 'xml'

    hierarchy, comments = build_class_hierarchy(rdf_file, format=rdf_format)

    save_dict_to_text(hierarchy, "ontology/hierarchy.txt")

    educational_python_subtree = extract_subtree(hierarchy, "EducationalPython")
    python_subtree = extract_subtree(hierarchy, "Python")

    save_dict_to_text(educational_python_subtree, "ontology/hierarchy_educational_python.txt")
    save_dict_to_text(python_subtree, "ontology/hierarchy_python.txt")

    educational_python_leaves = extract_leaves_with_parents(educational_python_subtree)
    python_leaves = extract_leaves_with_parents(python_subtree)

    save_leaves_with_parents_csv(educational_python_leaves, "ontology/hierarchy_educational_python_leaves.csv", comments)
    save_leaves_with_parents_csv(python_leaves, "ontology/hierarchy_python_leaves.csv", comments)

    print("CSV leaf outputs with parent and description saved.")

CSV leaf outputs with parent and description saved.


## Code Snippets

Extract every code snippet. Store the snippets in a list. In the next phase, feed each snippet to the LLM to pull out its knowledge components (KCs).


Parsing Snippets

In [4]:
def split_code_exercises(input_file):
  with open(input_file, 'r') as f:
    code_exercises = f.read()
  code_snippets = code_exercises.split('#CODE_SNIPPET')
  for idx, snippet in enumerate(code_snippets):
    snippet = snippet.strip()
  code_snippets = [s.strip() for s in code_snippets if s.strip()]
  return code_snippets

def test_split_code_exercises():
  # input_file = "data/all_exercise_snippet.py"
  input_file = "data/all_exercise_snippet_with_prob_description.py"
  code_snippets = split_code_exercises(input_file)
  print(len(code_snippets))
  print(code_snippets[0])

# test_split_code_exercises()

## Batch Generation Classification with Window Sliding (CLAWS)

Main settings

In [5]:
import os
from pathlib import Path
import shutil
import csv
import json



# SNIPPET_TYPE = "code_only"
# PROMPT_INPUT_SNIPPET = "all_exercise_snippet.py"

# SNIPPET_TYPE = "data/code_with_problems"
# PROMPT_INPUT_SNIPPET = "data/all_exercise_snippet_with_prob_description.py"


# PROMPT_SESSION_ID = "window_10"
# PROMPT_WINDOW_SIZE = 10

# PROMPT_SESSION_ID = "window_5"
# PROMPT_WINDOW_SIZE = 5

# PROMPT_SESSION_ID = "no_window"
# PROMPT_WINDOW_SIZE = -1



SNIPPET_TYPE = "data/parsons_with_problems"
PROMPT_INPUT_SNIPPET = "data/parsons_with_prob_description.py"

#
# PROMPT_SESSION_ID = "parsons_window_10"
# PROMPT_WINDOW_SIZE = 10

PROMPT_SESSION_ID = "parsons_window_5"
PROMPT_WINDOW_SIZE = 5

PROMPT_OUTPUT_PATH = f"output/{PROMPT_SESSION_ID}/{SNIPPET_TYPE}/prompts"
CLAWS_OUTPUT_PATH = f"output/{PROMPT_SESSION_ID}/{SNIPPET_TYPE}/output"

### Generate Prompt from Templates


In [None]:
def get_code_exercises(input_file):
  snippets = split_code_exercises(input_file)
  return snippets



def chunk_list(lst, chunk_size):
  if chunk_size <= 0:
    return [lst]
  return [lst[i:i+chunk_size] for i in range(0, len(lst), chunk_size)]

def get_classes(window_size):
  print(f"Window Size = {window_size}")

  t = open("ontology/hierarchy_python_leaves.csv", "r").readlines()
  t = t[1:]
  print(f"Python Classes = {len(t)}")
  python_csv_lines = chunk_list(t, window_size)
  print(f"Educational Chunks Num. of Windows = {len(python_csv_lines)}")

  t = open("ontology/hierarchy_educational_python_leaves.csv", "r").readlines()
  t = t[1:]
  print(f"Educational Python Classes = {len(t)}")
  educational_python_csv_lines = chunk_list(t, window_size)
  print(f"Educational Python Num. of Windows = {len(educational_python_csv_lines)}")

  csv_lines = python_csv_lines + educational_python_csv_lines
  exit()
  return csv_lines



PROMPT_TEMPLATE = """
You are an annotation assistant.
Given a code snippet, evaluate all rows in the ontology classes CSV.
Provide a reason for how the class is being used in the snippet/ problem, evaluate, and give a usage score.

## Output
Respond as long as possible.
Score is between 0-5.
Return as a CSV (exercise_name,parent,leaf,reason,score).

## INPUT: Ontology Classes
```csv(parent,leaf,description)
{ontology}
```

## INPUT: Code Snippet
```code
{snippet}
```
"""



def generate_prompts(snippet_file, window_size, output_dir):
    shutil.rmtree(output_dir, ignore_errors=True)
    os.makedirs(output_dir, exist_ok=True)
    snippets = split_code_exercises(snippet_file)
    class_windows = get_classes(window_size)
    for code_idx, snippet in enumerate(snippets, start=1):
        for class_idx, csv_data in enumerate(class_windows, start=1):
            ontology = "\n".join([line.strip() for line in csv_data])
            prompt = PROMPT_TEMPLATE.format(ontology=ontology, snippet=snippet)

            # print(prompt)
            # break

            fname = f"{class_idx:03d}-{code_idx:03d}.txt"
            with open(Path(output_dir) / fname, "w", encoding="utf-8") as f:
                f.write(prompt)
    print(f"Generated {len(snippets)} code snippets x {len(class_windows)} ontology batches")
    print(f"Output path: {output_dir}")



generate_prompts(
    PROMPT_INPUT_SNIPPET,
    PROMPT_WINDOW_SIZE,
    PROMPT_OUTPUT_PATH
)

Window Size = 5
Python Classes = 98
Educational Chunks Num. of Windows = 20
Educational Python Classes = 108
Educational Python Num. of Windows = 22
Generated 115 code snippets x 42 ontology batches
Output path: output/parsons_window_5/data/parsons_with_problems/prompts


: 

### Prepare Batch Completions (OpenAI)

In [6]:
def create_jsonl_from_prompts(prompt_dir, jsonl_out):
    entries = []
    for prompt_file in sorted(Path(prompt_dir).glob("*.txt")):
        class_idx, code_idx = prompt_file.stem.split("-")
        with open(prompt_file, "r", encoding="utf-8") as f:
            prompt = f.read()
        entries.append({
            "custom_id": f"{class_idx}-{code_idx}",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-4.1-mini",
                "messages": [
                    {"role": "system", "content": prompt}
                ],
                "max_tokens": 32768,
                "temperature": 0,
                "top_p": 0.1,
                "store": True
            }
        })
    with open(jsonl_out, "w", encoding="utf-8") as f:
        for item in entries:
            f.write(json.dumps(item) + "\n")
    print(f"JSONL file created: {jsonl_out}")



create_jsonl_from_prompts(
  PROMPT_OUTPUT_PATH,
  Path(PROMPT_OUTPUT_PATH) / "class_bulk.jsonl"
)

JSONL file created: output/parsons_window_5/data/parsons_with_problems/prompts/class_bulk.jsonl


In [7]:
def preview_file(filename, num_chars=100):
    """
    Prints the first num_chars characters of a file.

    Args:
        filename (str): The path to the file.
        num_chars (int): The number of characters to preview (default is 100).
    """
    try:
        with open(filename, 'r') as file:
            content = file.read()
            lines_count = len(content.split('\n'))
            print(f"File '{filename}' has {lines_count} lines.")
            print(content[:num_chars])
    except FileNotFoundError:
        print(f"Error: File '{filename}' not found.")
    except Exception as e:
         print(f"An error occurred: {e}")



preview_file(Path(PROMPT_OUTPUT_PATH) / "class_bulk.jsonl")

File 'output/parsons_window_5/data/parsons_with_problems/prompts/class_bulk.jsonl' has 4831 lines.
{"custom_id": "001-001", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "gpt-4.1


### --- Test Generation

In [8]:
!pip install -U -q openai python-dotenv

from openai import OpenAI
from dotenv import load_dotenv
load_dotenv()

client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))

def execute_prompt(prompt_file):
  with open(prompt_file, "r") as f:
    PROMPT = f.read()

  # print(PROMPT)
  # exit()

  response = client.responses.create(
    model="gpt-4.1-mini",
    input=[
      {
        "role": "system",
        "content": [
          {
            "type": "input_text",
            "text": PROMPT
          }
        ]
      }
    ],
    text={
      "format": {
        "type": "text"
      }
    },
    reasoning={},
    tools=[],
    temperature=0,
    max_output_tokens=32768,
    top_p=0.1,
    store=True
  )
  return response



In [9]:
def test_execute_prompt():
  response = execute_prompt(Path(PROMPT_OUTPUT_PATH) / "001-001.txt")
  content = response.output_text
  print(content)

test_execute_prompt()

exercise_name,parent,leaf,reason,score
ps_hello,BuiltInFunction,min(),The code snippet does not use the min() function or any functionality related to it.,0
ps_hello,BuiltInFunction,tuple(),The code snippet does not use the tuple() function or any tuple-related operations.,0
ps_hello,BuiltInFunction,open(),The code snippet does not use the open() function or any file operations.,0
ps_hello,BuiltInFunction,input(),The code snippet does not use the input() function or any user input operations.,0
ps_hello,BuiltInFunction,next(),The code snippet does not use the next() function or any iterator operations.,0


### Upload and Create new Batch

In [10]:
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv()

client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))

In [11]:
def upload_jsonl(openai_client, file_path):
    return openai_client.files.create(file=open(file_path, "rb"), purpose="batch")

def create_batch(openai_client, input_file_id):
    return openai_client.batches.create(
        input_file_id=input_file_id,
        endpoint="/v1/chat/completions",
        completion_window="24h"
    )



file_obj = upload_jsonl(
  client,
  Path(PROMPT_OUTPUT_PATH) / "class_bulk.jsonl"
)
# old_batch = batch
batch = create_batch(client, file_obj.id)
batch

Batch(id='batch_68c803f0da9481909445db299ebad875', completion_window='24h', created_at=1757938672, endpoint='/v1/chat/completions', input_file_id='file-WeFowKvSESs5tLeM4xGLi3', object='batch', status='validating', cancelled_at=None, cancelling_at=None, completed_at=None, error_file_id=None, errors=None, expired_at=None, expires_at=1758025072, failed_at=None, finalizing_at=None, in_progress_at=None, metadata=None, output_file_id=None, request_counts=BatchRequestCounts(completed=0, failed=0, total=0), usage={'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0, 'input_tokens_details': {'cached_tokens': 0}, 'output_tokens_details': {'reasoning_tokens': 0}})

### View Batch Detail or Cancel

In [13]:
from openai.types import Batch, BatchRequestCounts

def view_batch(openai_client, batch_id):
    return openai_client.batches.retrieve(batch_id)

def cancel_batch(openai_client, batch_id):
    return openai_client.batches.cancel(batch_id)



# # no_window
# # code_only
# batch_no_co = type('', (), {'id': "batch_682475a840c88190ab9550734eb1cbfb"})()
# # code_with_problems
# batch_no_cwp = Batch(id='batch_682496e7b9908190bff5a16f0a6e2632', completion_window='24h', created_at=1747228391, endpoint='/v1/chat/completions', input_file_id='file-Q3Arh24bcJ1wXdXzePboqu', object='batch', status='in_progress', cancelled_at=None, cancelling_at=None, completed_at=None, error_file_id=None, errors=None, expired_at=None, expires_at=1747314791, failed_at=None, finalizing_at=None, in_progress_at=1747228393, metadata=None, output_file_id=None, request_counts=BatchRequestCounts(completed=0, failed=0, total=528))

# # window_5
# # code_only
# batch_5_co = Batch(id='batch_682499cb18f881908cec309fb7bff025', completion_window='24h', created_at=1747229131, endpoint='/v1/chat/completions', input_file_id='file-CzxwsBTMTWXh7cFist2GBD', object='batch', status='in_progress', cancelled_at=None, cancelling_at=None, completed_at=None, error_file_id=None, errors=None, expired_at=None, expires_at=1747315531, failed_at=None, finalizing_at=None, in_progress_at=1747229138, metadata=None, output_file_id=None, request_counts=BatchRequestCounts(completed=0, failed=0, total=11004))
# # code_with_problems
# batch_5_cwp = Batch(id='batch_68249b22fb488190bf5e19575a541c36', completion_window='24h', created_at=1747229474, endpoint='/v1/chat/completions', input_file_id='file-6UHuPFyN2usGHqGNDfd5Gi', object='batch', status='validating', cancelled_at=None, cancelling_at=None, completed_at=None, error_file_id=None, errors=None, expired_at=None, expires_at=1747315874, failed_at=None, finalizing_at=None, in_progress_at=None, metadata=None, output_file_id=None, request_counts=BatchRequestCounts(completed=0, failed=0, total=0))

batch_view = view_batch(client, batch.id)
# cancel_batch(client, batch.id)

batch_view

Batch(id='batch_68c803f0da9481909445db299ebad875', completion_window='24h', created_at=1757938672, endpoint='/v1/chat/completions', input_file_id='file-WeFowKvSESs5tLeM4xGLi3', object='batch', status='completed', cancelled_at=None, cancelling_at=None, completed_at=1757942888, error_file_id=None, errors=None, expired_at=None, expires_at=1758025072, failed_at=None, finalizing_at=1757941219, in_progress_at=1757938737, metadata=None, output_file_id='file-SswGHckrd7f5SuSL316s8V', request_counts=BatchRequestCounts(completed=4830, failed=0, total=4830), usage={'input_tokens': 1338037, 'output_tokens': 904676, 'total_tokens': 2242713, 'input_tokens_details': {'cached_tokens': 0}, 'output_tokens_details': {'reasoning_tokens': 0}})

### Download Results and Save as Combined (CSV)

TODO: Mark non-compliance to retry the generation

In [14]:
def download_batch_result(openai_client, output_file_id, output_path):
    content = openai_client.files.content(output_file_id)
    with open(output_path, "w", encoding="utf-8") as f:
        f.write(content.text)
    print(f"Downloaded to: {output_path}")



def combine_results_to_csv(jsonl_file, csv_out):
    rows = []
    with open(jsonl_file, "r", encoding="utf-8") as f:
        count = 0
        for line in f:
            count += 1
            data = json.loads(line)
            custom_id = data.get("custom_id")
            content = data.get("response", {}).get("body", {}).get("choices", [{}])[0].get("message", {}).get("content", "")
            try:
                for row in csv.reader(content.splitlines()):
                    if row and row[0] != "exercise_name":
                        rows.append(row)
            except csv.Error as e:
                print(f"CSV parsing error in entry {count}: {e}")
                continue
    with open(csv_out, "w", encoding="utf-8", newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["exercise_name", "parent", "leaf_class", "reason", "score"])
        writer.writerows(rows)
    print(f"Combined CSV saved to: {csv_out}")





os.makedirs(CLAWS_OUTPUT_PATH, exist_ok=True)
download_batch_result(
  client,
  batch_view.output_file_id,
  Path(CLAWS_OUTPUT_PATH) / "batch_output.jsonl"
)
combine_results_to_csv(
  Path(CLAWS_OUTPUT_PATH) / "batch_output.jsonl",
  Path(CLAWS_OUTPUT_PATH) / "result.csv"
)

Downloaded to: output/parsons_window_5/data/parsons_with_problems/output/batch_output.jsonl
Combined CSV saved to: output/parsons_window_5/data/parsons_with_problems/output/result.csv
