Note: This notebook works because in the 'elice-ccc-project/dataset/new_dataset' folder, there is a folder called '6822003'. For a general material_exercise_id (i.e a number other than 6822003), you need a folder at 'elice-ccc-project/dataset/new_dataset/{material_exercise_id}' and you can create such a document using OttTypeBeforeProcess by a few clicks.

\
This can be either done manually, or you can import the OttTypeBeforeProcess's content as a package into this notebook, and define a new function that handles creating of a folder analogous to 'elice-ccc-project/dataset/new_dataset/6822003'

# Setup

In [None]:
from google.colab import drive

# Mounting Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
pip install openai > /dev/null 2>&1

In [None]:
import pandas as pd
import os
from os import defpath
import datetime
import openai
import time
from openai import AzureOpenAI

# Fields

Variables that are initially = None will be updated after running setup_datasets(exercise_id)

E.g if you run setup_datasets(6822003):
* ops_number_csv = ops_6822003.zip as a DataFrame
* exercise_running_log_csv = the exercise_running_log.csv file as DataFrame
* material_exercise_csv = material_exercise.csv as DataFrame
* prob = description of the problem with material_exercise_id = 6822003
* grouped = material_exercise_csv being grouped by ['user_id', 'material_exercise_id']
* grouped_dict = {name : group } pairs where name is a (user_id, material_exercise_id) pair and group is a table for which user_id and material_exercise_id is fixed.

Note: As you can see, some of the variables do not really depend on '6822003' but I still made them to be null before running setup_datasets(6822003)


In [None]:
exercise_running_log_adress = "/content/drive/MyDrive/elice-ccc-project/dataset/new_dataset/exercise_running_log.csv"
exercise_running_log_csv = None

In [None]:
ops_number_csv_adress_incomplete = "/content/drive/MyDrive/elice-ccc-project/dataset/new_dataset/ops/ops_"
ops_number_csv = None

In [None]:
material_exercise_csv_adress = "/content/drive/MyDrive/elice-ccc-project/dataset/new_dataset/material_exercise.csv"
material_exercise_csv = None

In [None]:
exercise_running_log_csv

In [None]:
prob = None

In [None]:
grouped = None
groups_dict = None

# Functions

In [None]:
def setup_datasets(exercise_id):

  # setup the ops_exercise_id.csv file. E.g import ops_6822003.csv file
  ops_number_csv_adress = ops_number_csv_adress_incomplete + exercise_id + '.csv'
  global ops_number_csv
  ops_number_csv = pd.read_csv(ops_number_csv_adress)

  # setup the exercise_running_log.csv file
  global exercise_running_log_csv
  exercise_running_log_csv = pd.read_csv(exercise_running_log_adress)
  exercise_running_log_csv = exercise_running_log_csv[exercise_running_log_csv['material_exercise_id'] == int(exercise_id)]
  exercise_running_log_csv = preprocess_exercise_running_log(exercise_running_log_csv)
  add_running_logs(exercise_running_log_csv, exercise_id)

  # setup material_exercise_csv
  global material_exercise_csv
  material_exercise_csv = pd.read_csv(material_exercise_csv_adress)

  # setup prob variable
  global prob
  prob = material_exercise_csv[material_exercise_csv['material_exercise_id'] == int(exercise_id)].iloc[0, 3]

  # setup grouped
  global grouped
  grouped = exercise_running_log_csv.groupby(['user_id', 'material_exercise_id'])

  # setup group_dict
  global groups_dict
  groups_dict = {name: group for name, group in grouped}

In [None]:
def preprocess_exercise_running_log(df):
  df = df[df['user_id'].isin(ops_number_csv['user_id'])].copy()

  # Convert 'created_datetime' to datetime format
  df['created_datetime'] = pd.to_datetime(df['created_datetime'])

  # Sort the DataFrame by user_id, material_exercise_id, and created_datetime
  df = df.sort_values(by=['user_id', 'material_exercise_id', 'created_datetime'])

  # Calculate attempt count
  df['attempt_count'] = df.groupby(['user_id', 'material_exercise_id']).cumcount() + 1

  # Calculate time intervals between attempts
  df['time_diff'] = df.groupby(['user_id', 'material_exercise_id'])['created_datetime'].diff().dt.total_seconds()

  # Fill NaN values for the first attempt
  df['time_diff'] = df['time_diff'].fillna(0)

  return df

In [None]:
def add_running_logs(df, exercise_id):
  df['log_combined'] = ''  # Initialize the column with empty strings

  # Loop through the DataFrame grouped by user_id
  for user_id, group in df.groupby('user_id'):
      # Get the index of the last row for this user_id
      last_index = group.index[-1]

      # Construct the filename
      title = f'/content/drive/MyDrive/elice-ccc-project/dataset/new_dataset/{exercise_id}/{exercise_id}_' + str(user_id) + '.py'

      # Check if the file exists and read its content
      if os.path.exists(title):
          with open(title, 'r') as file:
              content = file.read()
              df.at[last_index, 'log_combined'] = content  # Update only the last row
      else:
          print(f"{title} does not exist.")


In [None]:
def create_prompt(group, examples=None):
    prompts = []
    errors_text = """
Type Category Description and occurrence examples:

(A) Input
Errors arising from the inability to properly store input values properly.
1. When not all given input values are received.
2. When the data type of the variable for the input value is incorrect.

(B) Output
Errors arising from non-compliance with the required output format.
1. When the output format of the value in the variable is incorrect.
2. When an incorrect string literal is output.

(C) Variable
Errors arising from incorrect use of variables.
1. When the value stored in the variable is incorrect.
2. When the data type of the variable is incorrectly specified.

(D) Computation
Errors caused by incorrect calculations.
1. When calculating using incorrect values.
2. When calculating using incorrect operations.

(E) Condition
Errors caused by incorrect use of conditional statements.
1. When the conditional operation in the declaration part of the conditional statement is incorrect.
2. When the condition in the declaration part of the conditional statement is insufficient.

(F) Branching
Errors caused by incorrect branching of the program.
1. When the break in the loop is written incorrectly.
2. When a conditional statement that should be written as if-else is written as if-if.

(G) Loop
Errors caused by incorrect use of loops.
1. When the condition in the declaration part of the loop is incorrect.
2. When the variable used in the declaration part of the loop is incorrect.

(H) Array/String
Errors caused by incorrect arrays or strings.
1. When arrays or strings are initialized incorrectly.
2. When referencing an incorrect index when using an array or string.

(I) Function
Errors caused by incorrect user-defined functions.
1. When the parameters or return values of user-defined functions are incorrectly defined.
2. When the arguments are incorrect when calling user-defined functions.

(J) Conceptual
Errors caused by incorrect concepts for problem-solving.
1. When solving a different problem than the one presented.
2. When the necessary loops or conditional statements are not written to solve the presented problem.
"""

    for _, row in group.iterrows():
        prompt = f"""
- Created datetime: {row['created_datetime']}
- User ID: {row['user_id']}
- Run type: {row['run_type']}
- Attempt count: {row['attempt_count']}
- Time difference between attempts: {row['time_diff']}
- Actual submitted code for the coding problem:
{row['log_combined']}
- Coding Problem:
{prob}
- Actual Score: 16
"""

        prompts.append(prompt)
    full_prompt = (
        f"Based on the following details about a student's attempts on this coding problem:\n"
        f"{prob}\n\n"
        f"and their coding attempt:\n"
        f"{row['log_combined']}\n\n"
        "Output a fully correct solution to the problem.\n\n"
        "Then from the listed error types, choose the errors which upon introducing into the code will test a student's concepts for that problem.\n"
        "- Error Type: The category of the error should be clear.\n"
        "- Description: A brief description of how that error relates to the concept.\n"
        "-\n\n"
        "Below are example error types and descriptions for reference. The error classification needs to be from within these errors only.\n"
        f"{errors_text}"
    )
    return full_prompt

    # Incorporating one-shot example
    example = group[group['user_id'] == 6329546]

    if not example.empty:
        example_prompt = f"""
        - Example:
            - Created datetime: {example['created_datetime'].values[0]}
            - User ID: {example['user_id'].values[0]}
            - Run type: {example['run_type'].values[0]}
            - Attempt count: {example['attempt_count'].values[0]}
            - Time difference between attempts: {example['time_diff'].values[0]}
            - Actual submitted code for the coding problem: {example['log_combined'].values[0]}
            - Coding Problem: {prob}
            - Actual Score: 20 out of 20 so this is student's coding solution is without any signficant errors.
        """
        full_prompt += "Examples:\n" + example_prompt + "\n\n"

    full_prompt += (
        "Now, choose the errors which if introduced will help the student's concepts once he corrects them. :\n" + "\n".join(prompts) + "\n"
    )

    return full_prompt


In [None]:
def get_prediction_for_group(group_name):
  if isinstance(group_name, tuple):

    if group_name in groups_dict:
        group = groups_dict[group_name]
        prompt = create_prompt(group)
        try:
            response = client.chat.completions.create(
                model='gpt-4o',
                messages=[
                    {"role": "system", "content": "You are an AI assistant trained to classify errors in code. Respond in a professional tone. Based on the programming problem given come up with a coding solution to the coding problem and then from the list of errors choose which introducing which errors will enhance a student's understading of concepts. So output the fully functional solution and then the errors from the list which will be helpful to test concepts of student of that problem."},
                    {"role": "user", "content": prompt}
                ]
            )
            print(f"Group: {group_name}")
            print(response.choices[0].message.content)
        except Exception as e:
            print(f"Error processing group {group_name}: {e}")
    else:
        print(f"Group {group_name} not found.")

# Setup the API

In [None]:
os.environ["AZURE_OPENAI_API_KEY"] = 'c5c982e515214500a9f5560db7c3e0b4'
os.environ["AZURE_OPENAI_ENDPOINT"] = 'https://elice-ccc.openai.azure.com/'

api_key = os.getenv("AZURE_OPENAI_API_KEY")
azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")

client = AzureOpenAI(
  api_key=api_key,
  api_version="2024-05-01-preview",
  azure_endpoint=azure_endpoint
)



```
# This is formatted as code
```

# Get Predictions

If when you change exercise_id, get_prediction_for_group doesn't work, then take a look at the first note before "Setup" tab. \

For any further debuggin, the fields are clearly at the Fields tab.

In [None]:
user_id = 12931635
exercise_id = 6822003
setup_datasets(str(exercise_id))

In [None]:
get_prediction_for_group((user_id, exercise_id))

Group: (12931635, 6822003)
Here is the fully functional solution to the given problem:

```python
import random                           ## DO NOT CHANGE THIS CODE

final_winner = None

def get_random_type():                  ## DO NOT CHANGE THIS CODE
    types = ['water', 'fire', 'grass']  ## DO NOT CHANGE THIS CODE
    return random.choice(types)         ## DO NOT CHANGE THIS CODE

def create_pokemon(type_number=None):
    power = random.randint(1, 100)      ## DO NOT CHANGE THIS CODE
    if type_number is None:
        pokemon_type = get_random_type()
    else:
        types = ['water', 'fire', 'grass']
        pokemon_type = types[type_number]
    return (power, pokemon_type)

def battle(pokemon1, pokemon2):
    power1, type1 = pokemon1
    power2, type2 = pokemon2

    if type1 == 'water' and type2 == 'fire':
        power1 += 10
    elif type1 == 'fire' and type2 == 'grass':
        power1 += 10
    elif type1 == 'grass' and type2 == 'water':
        power1 += 10
    elif type2

In [None]:
import ast


code = """
def loop(a, b):
    if a > b:
      return a + b
    else:
      return a - b

x = int(input())
y = int(input())
print(loop(x, y))
"""

tree = ast.parse(code)
print(ast.dump(tree))


Module(body=[FunctionDef(name='loop', args=arguments(posonlyargs=[], args=[arg(arg='a'), arg(arg='b')], kwonlyargs=[], kw_defaults=[], defaults=[]), body=[If(test=Compare(left=Name(id='a', ctx=Load()), ops=[Gt()], comparators=[Name(id='b', ctx=Load())]), body=[Return(value=BinOp(left=Name(id='a', ctx=Load()), op=Add(), right=Name(id='b', ctx=Load())))], orelse=[Return(value=BinOp(left=Name(id='a', ctx=Load()), op=Sub(), right=Name(id='b', ctx=Load())))])], decorator_list=[]), Assign(targets=[Name(id='x', ctx=Store())], value=Call(func=Name(id='int', ctx=Load()), args=[Call(func=Name(id='input', ctx=Load()), args=[], keywords=[])], keywords=[])), Assign(targets=[Name(id='y', ctx=Store())], value=Call(func=Name(id='int', ctx=Load()), args=[Call(func=Name(id='input', ctx=Load()), args=[], keywords=[])], keywords=[])), Expr(value=Call(func=Name(id='print', ctx=Load()), args=[Call(func=Name(id='loop', ctx=Load()), args=[Name(id='x', ctx=Load()), Name(id='y', ctx=Load())], keywords=[])], key

In [None]:
class GeneralBranchingErrorIntroducer(ast.NodeTransformer):
    def visit_FunctionDef(self, node):
        # Process function body to handle both if-else and loop structures
        new_body = []
        for stmt in node.body:
            if isinstance(stmt, ast.If):
                # Replace 'if-else' with two 'if' statements
                new_body.extend(self.handle_if(stmt))
            elif isinstance(stmt, ast.While):
                # Introduce branching error in 'while' loop
                new_body.append(self.handle_while(stmt))
            elif isinstance(stmt, ast.For):
                # Introduce branching error in 'for' loop
                new_body.append(self.handle_for(stmt))
            else:
                new_body.append(stmt)
        node.body = new_body
        return node

    def handle_if(self, node):
        if node.orelse:
            self.generic_visit(node)
            negated_condition = ast.UnaryOp(op=ast.Not(), operand=node.test)
            new_if_node = ast.If(
                test=negated_condition,
                body=node.orelse,
                orelse=[]
            )
            return [ast.If(test=node.test, body=node.body, orelse=[]), new_if_node]
        return [node]

    def handle_while(self, node):
        self.generic_visit(node)
        # Duplicate while loop with a modified condition to introduce error
        negated_condition = ast.UnaryOp(op=ast.Not(), operand=node.test)
        new_while_node = ast.While(
            test=negated_condition,
            body=node.body,
            orelse=node.orelse
        )
        return ast.If(test=node.test, body=[node], orelse=[new_while_node])

    def handle_for(self, node):
        self.generic_visit(node)
        # Create a duplicate loop with an erroneous branching condition
        additional_check = ast.If(
            test=ast.Constant(value=False),  # Always false condition
            body=[ast.Continue()],  # Skips an iteration erroneously
            orelse=[]
        )
        node.body.insert(0, additional_check)
        return node

In [None]:
code_if = """
def loop_if(a, b):
    if a > b:
        return a + b
    else:
        return a - b
"""

code_while = """
def loop_while(a, b):
    while a > b:
        a -= 1
    return a
"""

code_for = """
def loop_for(n):
    for i in range(n):
        print(i)
"""

# Apply transformations on the individual code snippets
# Parse the example codes
tree_if = ast.parse(code_if)
tree_while = ast.parse(code_while)
tree_for = ast.parse(code_for)

# Apply the transformer
transformer = GeneralBranchingErrorIntroducer()

new_tree_if = transformer.visit(tree_if)
new_tree_while = transformer.visit(tree_while)
new_tree_for = transformer.visit(tree_for)

# Fix missing locations
ast.fix_missing_locations(new_tree_if)
ast.fix_missing_locations(new_tree_while)
ast.fix_missing_locations(new_tree_for)

# Convert the modified ASTs back to Python code
new_code_if = ast.unparse(new_tree_if)
new_code_while = ast.unparse(new_tree_while)
new_code_for = ast.unparse(new_tree_for)

new_code_if, new_code_while, new_code_for

# Transform and print modified 'if-else' code
tree_if = ast.parse(code_if)
new_tree_if = transformer.visit(tree_if)
ast.fix_missing_locations(new_tree_if)
new_code_if = ast.unparse(new_tree_if)

# Transform and print modified 'while' loop code
tree_while = ast.parse(code_while)
new_tree_while = transformer.visit(tree_while)
ast.fix_missing_locations(new_tree_while)
new_code_while = ast.unparse(new_tree_while)

# Transform and print modified 'for' loop code
tree_for = ast.parse(code_for)
new_tree_for = transformer.visit(tree_for)
ast.fix_missing_locations(new_tree_for)
new_code_for = ast.unparse(new_tree_for)

new_code_if, new_code_while, new_code_for

('def loop_if(a, b):\n    if a > b:\n        return a + b\n    if not a > b:\n        return a - b',
 'def loop_while(a, b):\n    if a > b:\n        while a > b:\n            a -= 1\n    else:\n        while not a > b:\n            a -= 1\n    return a',
 'def loop_for(n):\n    for i in range(n):\n        if False:\n            continue\n        print(i)')

**TOY EXP**