# Evaluate GitHub Python

## Helper functions

In [45]:
# Load the test dataset
import json
import os

def load_json_from_folder(folder_path):
  """
  Loads all JSON files from a specified folder.

  Args:
    folder_path: The path to the folder containing the JSON files.

  Returns:
    A list containing all JSON objects from the files.
  """

  data_packages = []
  for filename in os.listdir(folder_path):
    if filename.endswith(".json"):
      filepath = os.path.join(folder_path, filename)
      try:
        with open(filepath, 'r') as f:
          data = json.load(f)
          data_packages.append(data)
      except json.JSONDecodeError as e:
        print(f"Error decoding JSON in file {filename}: {e}")
          
  return data_packages

def load_json_from_file(json_file):
    try:
        with open(json_file, 'r') as f:
            data = json.load(f)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON in file {json_file}: {e}")  # Corrected line
        return None

    return data

def write_json_to_file(json_data, filename, indent=4):
  """
  Writes JSON data to a file.

  Args:
    json_data: The JSON data to write (can be a dictionary or a list).
    filename: The name of the file to write to.
    indent: (Optional) The number of spaces to use for indentation. 
            Defaults to 4 for better readability.
  """

  with open(filename, 'w') as f:
    json.dump(json_data, f, indent=indent)

In [2]:
import ast

def validate_code(code_str):
  """
  Validates Python code and returns error information if any.

  Args:
    code_str: The Python code as a string.

  Returns:
    None if the code is valid, otherwise a string describing the syntax error.
  """
  try:
    ast.parse(code_str)
    return None  # No error
  except SyntaxError as e:
    return str(e)  # Return the error message as a string

In [20]:
def replace_key_in_json(json_obj, key_to_replace, new_value):
  """
  Creates a new JSON object with a specific key replaced.

  Args:
    json_obj: The original JSON object.
    key_to_replace: The key to be replaced.
    new_value: The new value for the key.

  Returns:
    A new JSON object with the key replaced.
  """

  new_json_obj = json.loads(json.dumps(json_obj))  # Create a deep copy
  new_json_obj[key_to_replace] = new_value
  return new_json_obj

## Load the dataset

In [4]:
github_python_dataset = "../github-python-test"

In [3]:
# The original paper use the 3 4 as hold-out test set
test_dataset = [];
test_dataset.append(load_json_from_file(os.path.join(github_python_dataset, 'model-fixer.pred.evaluated.3.json')))
test_dataset.append(load_json_from_file(os.path.join(github_python_dataset, 'model-fixer.pred.evaluated.4.json')))

In [4]:
total_samples = len(test_dataset[0]) + len(test_dataset[1])
first_code = test_dataset[0][0]['src']['string_format']

print(f"Total number of samples: {total_samples}")
print(f"First code:\n {first_code}")
print(f"Error (if any): {validate_code(first_code)}")

Total number of samples: 15055
First code:
 def test_pp_no_constraint ( self ) :
    filenames = [ tests . get_data_path ( ( "str" , "str" , "str" ) ) ]
    pp_constraints = pp . _convert_constraints ( None )
    pp_loader = iris . fileformats . rules . Loader ( pp . load , { } ,
        convert , pp . _load_rules )
    cubes = list ( load_cubes ( filenames , None , pp_loader , pp_constraints )
    self . assertEqual ( len ( cubes ) , 152 )

Error (if any): '(' was never closed (<unknown>, line 6)


## Prepare the model

In [11]:
import torch
import sys
import os
from datasets import load_dataset
from transformers import pipeline

In [8]:
# Prepare token
torch.cuda.empty_cache()
hf_token = os.environ.get('HF_TOKEN')

In [65]:
# Prepare instruction
python_syntax_fixer_instruction = "You are an expert Python code fixer. \
             You will receive input in the following format: \n\n \
             [Fix] | <error code>\n \
             <python code snippet>\n\n \
             Your task is to ONLY provide the corrected Python code with NO explanations or additional text. \n \
             Do not include the original error code in your response and do not format the code. \
             Treat the code snippet as regular text. Do NOT put any prefix, only plain text as code only."

In [66]:
# Load the model and instruction
instruct_model_id = "meta-llama/Llama-3.2-3B-Instruct"

model_id = "meta-llama/Llama-3.2-3B-Instruct"
pipe = pipeline(
    "text-generation",
    model=model_id,
    token=hf_token,
    torch_dtype=torch.bfloat16,
    device_map="auto",
)

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Some parameters are on the meta device because they were offloaded to the cpu.


## Process fixing code

In [67]:
def fix_code(code_snippet):
    code_error = validate_code(code_snippet)
    messages = [
        { "role": "system", "content": python_syntax_fixer_instruction }
    ]
    messages.append({"role": "user", "content": f"[Fix] | {code_error}\n{code_snippet}"})
    
    outputs = pipe(messages, max_new_tokens=512, pad_token_id=pipe.tokenizer.eos_token_id)
    
    return outputs[0]["generated_text"][-1]["content"]

In [75]:
from tqdm.notebook import tqdm

FIXING_ATTEMP_COUNT = 10

def perform_fixing_code(dataset):
    results = []
    for item in tqdm(dataset, desc="Fixing code"):
        code_snippet = item['src']['string_format']
        
        fixing_attempts = []
        # Sub-progress bar for fixing attempts
        with tqdm(total=FIXING_ATTEMP_COUNT, desc="Fixing attempts", leave=False) as pbar:  
            for _ in range(FIXING_ATTEMP_COUNT):
                fixed_code = fix_code(code_snippet)
                remain_error = validate_code(fixed_code)
                fixing_attempts.append({
                    "string_format": fixed_code,
                    "err_obj": 0 if remain_error is None \
                                else { "msg": item["orig_err_obj"]["msg"], "msg_detailed": remain_error }
                })
                pbar.update(1) # update progress bar

                # If the code is already fixed, no need to retry
                if remain_error is None:
                    pbar.update(10)
                    break
            
            pbar.close() # close the progress bar after the loop
            # Update the return result
            results.append(replace_key_in_json(item, "pred", fixing_attempts))
    return results

In [None]:
# Perform fix on dataset
fixed_set_1 = perform_fixing_code(test_dataset[0])

# Write the result to files
write_json_to_file(
    fixed_set_1,
    os.path.join(github_python_dataset, 'model-fixer.pred.evaluated-llm.3.json'),
    2
)

In [77]:
# Perform fix on dataset
fixed_set_2 = perform_fixing_code(test_dataset[1])

# Write the result to files
write_json_to_file(
    fixed_set_2,
    os.path.join(github_python_dataset, 'model-fixer.pred.evaluated-llm.4.json'),
    2
)

## Evaluate the result

In [15]:
from pathlib import Path
from collections import defaultdict
import json

def get_test_result(pred_dir, pred_fname, pred_set):
    #
    def collate_eval():
      success  = []; denom = 0
      success_by_group = defaultdict(list); denom_by_group = defaultdict(int)
      agg_obj = {}
      for split in {3,4}: #heldout test set
        print ('split', split)
        pred_dir_path   = Path(f'{pred_dir}')
        pred_path  = pred_dir_path/pred_fname
        pred_eval_path = f'{pred_path}.{pred_set}.{split}.json'
        print(pred_eval_path)
        eval_objs = json.load(open(pred_eval_path))
        for eval_obj in eval_objs:
          progid = eval_obj['progid']
          orig_err_type = eval_obj['orig_err_obj']['msg']
          if 'indent' in orig_err_type:
              orig_err_type = 'indentation error'
          denom += 1
          denom_by_group[orig_err_type] += 1
          for k, pred_obj in enumerate(eval_obj['pred']):
            pred_err_obj = pred_obj['err_obj']
            if (pred_err_obj == 0):
              name = '{:02d}-{}-{:03d}'.format(split, progid, k)
              success.append(name)
              success_by_group[orig_err_type].append(name)
      return success, denom, success_by_group, denom_by_group
    #
    def print_stats(name_list, _denom):
      top1 = set()
      for name in name_list:
        split, progid, k = name.split('-')
        if int(split) in {3,4}: #test set
          if int(k)==0:
            top1.add(f'{split}-{progid}')
      acc = len(top1)/float(_denom)*100
      print ('   acc: {} ({:.1f}%) | denom {}'.format(len(top1), acc, _denom))
      return acc
    #
    success, denom, success_by_group, denom_by_group = collate_eval()
    acc_dict = {}
    print ('Total'); acc = print_stats(success, denom); acc_dict['total'] = acc
    print ('-'*50)
    for err_type in success_by_group:
        print (f'{err_type.capitalize()}')
        acc = print_stats(success_by_group[err_type], denom_by_group[err_type])
        acc_dict[err_type] = acc
        
    json.dump(acc_dict, open(Path(pred_dir)/f'stats.{pred_set}.json', 'w'), indent=2)

In [21]:
# Show LLM test result
get_test_result(github_python_dataset, 'model-fixer.pred', 'evaluated-llm')

split 3
../github-python-test/model-fixer.pred.evaluated-llm.3.json
split 4
../github-python-test/model-fixer.pred.evaluated-llm.4.json
Total
   acc: 12295 (81.7%) | denom 15055
--------------------------------------------------
Unbalanced (){}[]
   acc: 3282 (82.1%) | denom 3999
Invalid syntax
   acc: 3986 (83.9%) | denom 4749
Indentation error
   acc: 5027 (79.7%) | denom 6307


In [20]:
# Show BIFI test result
get_test_result(github_python_dataset, 'model-fixer.pred', 'evaluated')

split 3
../github-python-test/model-fixer.pred.evaluated.3.json
split 4
../github-python-test/model-fixer.pred.evaluated.4.json
Total
   acc: 13503 (89.7%) | denom 15055
--------------------------------------------------
Unbalanced (){}[]
   acc: 3892 (97.3%) | denom 3999
Invalid syntax
   acc: 4287 (90.3%) | denom 4749
Indentation error
   acc: 5324 (84.4%) | denom 6307


# Evaluate Pruto-DeepFix

## Helper functions

In [8]:
import sqlite3

def get_error_code_snippets_from_db(db_path):
    """
    Retrieves all code that contains error in the SQLite database.
    
    Args:
    db_path: Path to the SQLite database file.
    
    Returns:
    A list of all code contains error.
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    try:
        cursor.execute("SELECT * FROM Code WHERE errorcount != 0")
        data = [row for row in cursor.fetchall()]
        return data
    finally:
        conn.close()

In [14]:
import subprocess

def compile_code(code):
  """
  Compiles the given C code using gcc and returns the error message 
  if compilation fails.

  Args:
    code: The C code as a string.

  Returns:
    An empty string if compilation is successful, 
    the error message otherwise.
  """
  try:
    with open('temp.c', 'w') as f:
      f.write(code)

    result = subprocess.run(['gcc', 'temp.c', '-o', 'temp'], 
                             capture_output=True, text=True)

    if result.returncode == 0:
      return ""  # No error message
    else:
      return result.stderr  # Return the error message

  finally:
    import os
    try:
      os.remove('temp.c')
      os.remove('temp')
    except OSError:
      pass

In [42]:
import re

def remove_backticks(text):
  """
  Removes backtick code formatting from a string.

  Args:
    text: The string containing code blocks enclosed in backticks.

  Returns:
    The string with backtick code formatting removed.
  """
  pattern = r'```(?:[a-z]+)?\n(.*?)```'  # Matches code blocks with optional language
  return re.sub(pattern, r'\1', text, flags=re.DOTALL)

## Load dataset

In [9]:
prutor_deepfix_dataset_db = '../prutor-deepfix-09-12-2017/prutor-deepfix-09-12-2017.db'

In [10]:
# Load dataset
data = get_error_code_snippets_from_db(prutor_deepfix_dataset_db)

In [15]:
# Show the structure of 1st record
data[0]

('prog19941',
 'user801',
 'prob90',
 '#include <stdio.h>\nint main(){\n    int k,n;\n    scanf("%d%d",&k,&n);\n    int a[n];\n    int i=0;\n    for(i=0;i<n;i++)\n    scanf("%d",&a[i]);\n    int flag=0;\n    int m=0;\n    while(m<n)\n    {\n        int j=0;\n        while(j<n)\n        {\n            //if(m!=j)\n            {\n                if (a[j]==(k-[ai]))\n                //if ((a[m]+a[j])==k)\n                flag=1;\n               \n            }  \n            j++;    \n        }\n        m++;\n    }\n    if (flag==1)\n    printf("lucky");\n    else \n    printf("unlucky");\n    return 0;\n}',
 'In function ‘main’:\n18:30: error: expected expression before ‘[’ token\n                 if (a[j]==(k-[ai]))\n                              ^\n18:31: error: ‘ai’ undeclared (first use in this function)\n                 if (a[j]==(k-[ai]))\n                               ^\n18:31: note: each undeclared identifier is reported only once for each function it appears in',
 2)

In [19]:
# Check compliation - should CONTAIN error
compile_code(data[0][3])

'temp.c: In function ‘main’:\ntemp.c:18:30: error: expected expression before ‘[’ token\n   18 |                 if (a[j]==(k-[ai]))\n      |                              ^\ntemp.c:18:31: error: ‘ai’ undeclared (first use in this function); did you mean ‘i’?\n   18 |                 if (a[j]==(k-[ai]))\n      |                               ^~\n      |                               i\ntemp.c:18:31: note: each undeclared identifier is reported only once for each function it appears in\n'

## Prepare model

In [20]:
import torch
import sys
import os
from datasets import load_dataset
from transformers import pipeline

In [21]:
# Prepare token
torch.cuda.empty_cache()
hf_token = os.environ.get('HF_TOKEN')

In [36]:
# Prepare instruction
cpp_syntax_fixer_instruction = "You are an expert C/C++ code fixer. \
             You will receive input in the following format: \n\n \
             [Fix] | <error code>\n \
             <code snippet>\n\n \
             Your task is to ONLY provide the corrected C/C++ code with NO explanations or additional text. \n \
             Do not include the original error code in your response and do not format the code. \
             Treat the code snippet as regular text. Do NOT put any prefix, only plain text as code only."

In [23]:
# Load the model and instruction
instruct_model_id = "meta-llama/Llama-3.2-3B-Instruct"

model_id = "meta-llama/Llama-3.2-3B-Instruct"
pipe = pipeline(
    "text-generation",
    model=model_id,
    token=hf_token,
    torch_dtype=torch.bfloat16,
    device_map="auto",
)

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Some parameters are on the meta device because they were offloaded to the cpu.


## Process fixing code

In [37]:
def fix_code(code_snippet):
    code_error = compile_code(code_snippet)
    messages = [
        { "role": "system", "content": cpp_syntax_fixer_instruction }
    ]
    messages.append({"role": "user", "content": f"[Fix] | {code_error}\n{code_snippet}"})
    
    outputs = pipe(messages, max_new_tokens=512, pad_token_id=pipe.tokenizer.eos_token_id)
    
    return outputs[0]["generated_text"][-1]["content"]

### Fix 1 sample

In [38]:
# Fix 1 record
input_code = data[0][3]
output_code = fix_code(input_code)

In [53]:
# Show input code and check compilation
print(input_code)
print('--- ERROR ---')
print(compile_code(input_code))

#include <stdio.h>
int main(){
    int k,n;
    scanf("%d%d",&k,&n);
    int a[n];
    int i=0;
    for(i=0;i<n;i++)
    scanf("%d",&a[i]);
    int flag=0;
    int m=0;
    while(m<n)
    {
        int j=0;
        while(j<n)
        {
            //if(m!=j)
            {
                if (a[j]==(k-[ai]))
                //if ((a[m]+a[j])==k)
                flag=1;
               
            }  
            j++;    
        }
        m++;
    }
    if (flag==1)
    printf("lucky");
    else 
    printf("unlucky");
    return 0;
}
--- ERROR ---
temp.c: In function ‘main’:
temp.c:18:30: error: expected expression before ‘[’ token
   18 |                 if (a[j]==(k-[ai]))
      |                              ^
temp.c:18:31: error: ‘ai’ undeclared (first use in this function); did you mean ‘i’?
   18 |                 if (a[j]==(k-[ai]))
      |                               ^~
      |                               i
temp.c:18:31: note: each undeclared identifier is reported only onc

In [54]:
# Show input code and check compilation
output_code = remove_backticks(output_code)

print(output_code)
print('--- ERROR ---')
print(compile_code(output_code))

#include <stdio.h>

int main(){
    int k,n;
    scanf("%d%d",&k,&n);
    int a[n];
    int i=0;
    for(i=0;i<n;i++)
    scanf("%d",&a[i]);
    int flag=0;
    int m=0;
    while(m<n)
    {
        int j=0;
        while(j<n)
        {
            if (a[j]==(k-(a[i])))
            flag=1;
            j++;    
        }
        m++;
        i++; // Increment i to avoid infinite loop
    }
    if (flag==1)
    printf("lucky");
    else 
    printf("unlucky");
    return 0;
}

--- ERROR ---



# WIP

In [None]:
import torch
from transformers import pipeline

model_id = "meta-llama/Llama-3.2-3B-Instruct"
pipe = pipeline(
    "text-generation",
    model=model_id,
    torch_dtype=torch.bfloat16,
    device_map="auto",
)

In [None]:
messages = [
    {"role": "system", "content": "You are a Python syntax error correction expert. You will receive code snippets with syntax errors, prefixed with [Fix]. Each input will have two parts separated by a vertical bar (|). You will only fix the provided code, without any additional explanation."},
]
outputs = pipe(
    messages,
    max_new_tokens=128,
)
print(outputs[0]["generated_text"][-1])

In [11]:
from transformers import pipeline

model_id = "meta-llama/Llama-3.2-3B-Instruct"
pipe = pipeline(
    "text-generation",
    model=model_id,
    torch_dtype=torch.bfloat16,
    device_map="auto",
)

error_code = "SyntaxError: invalid syntax"
code_snippet = """
def greet(name):
  print("Hello, " + name 

greet("Alice")
"""

messages = [
    { 
        "role": "system", 
        "content": \
            "You are an expert Python code fixer. \
             You will receive input in the following format: \n\n \
             [Fix] | <error code>\n \
             <python code snippet>\n\n \
             Your task is to ONLY provide the corrected Python code with NO explanations or additional text. \n \
             Do not include the original error code in your response and do not format the code. \
             Treat the code snippet as regular text."
    }
]

messages.append({"role": "user", "content": f"""
[Fix] | {error_code}
{code_snippet}
"""})

outputs = pipe(messages, max_new_tokens=256)
print(outputs[0]["generated_text"][-1])

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Some parameters are on the meta device because they were offloaded to the cpu.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


{'role': 'assistant', 'content': 'def greet(name):\n  print("Hello, " + name)'}


In [13]:
fixed_code = outputs[0]["generated_text"][-1]['content']
fixed_code

'def greet(name):\n  print("Hello, " + name)'

In [14]:
validate_code(code_snippet)

"'(' was never closed (<unknown>, line 3)"

In [15]:
validate_code(fixed_code)