# Preprocessing Model's responses (Removing Duplication, noises)

In [None]:
import re
import json

path = [] # include path to RAW files 

####################################
# These are Regex patterns that I used for cleaning the RAW responses. 
def truncate_string(s):
    return re.split(r'@@\s*(Response|Test|Exception|Instance|Input|Additional Response|Reference|HumanEval Answer|Postprocess|Grading|Followup|Ground Truth|Human Response|Prompt|Example|Error|Alternate Response|Original|CoreDump|Solution|Explanation|Comment|Answer|Challenge|Expected Output|Output|Test Cases|Expected|Actual Output|Hint|Instruction|@|@@|@@@)', s, maxsplit=1)[0]

def remove_icl_examples(text):
    pattern = r"(?:\[Example 1\]|public).*?\[Your Turn\] Buggy Code:\s*\n"
    return re.sub(pattern, "", text, flags=re.DOTALL)

def remove_icl_examples2(text):
    pattern = r".*?# Fixed Function:\s*\n"
    return re.sub(pattern, "", text, flags=re.DOTALL)

def truncate_string2(s):
    return re.split(r'\n\n\n\s*(\n/\*|\n\*/|import|# Provide|Response|Original|Solution|Explanation|Comment|Answer|Challenge|Expected Output|Output|Test Cases|Expected|Actual Output|Hint|Instruction|@|@@|@@@)', s, maxsplit=1)[0]

def clean_string(s):
    # Remove @@ sequences only if they appear after two newlines
    return re.sub(r'(\S+)\s*\n\n\s*(@+\s*)+', r'\1', s).strip()

def remove_after_second_occurrence(text, phrase="package humaneval.buggy;"):
    parts = text.split(phrase, 2)  
    if len(parts) > 2:
        return phrase.join(parts[:2]) 
    return text  
####################################


# For each RAW files included in path, clean the responses and output the CLEAN file
for bench in path:
    with open(f"{bench}.json", "r") as f:
        data = json.load(f)

    for benchmark in data['data']:
        for i in range(len(data['data'][benchmark]['output'])):
            text = data['data'][benchmark]['output'][i]
            data['data'][benchmark]['output'][i] = clean_string((remove_after_second_occurrence(truncate_string(text.replace("Write a solution to the following coding problem:\nThe input is buggy code, you are given the logic of the function in the comment block. Base on that and fix the functionality to match the logic.","")))))
            
    # Save the test file
    with open(f"{bench}_clean.json", "w") as json_file:
        json.dump(data, json_file, indent=4)



# Running unit tests for cleaned resposnes 

In [None]:
import json
import subprocess

path = [] # include path to CLEAN files 

benchmark_dir = ""
test_results = {}

# For reponses started with "public class"
def replace_public_class(java_file, new_class_code, pattern):
    with open(java_file, "r") as f:
        lines = f.readlines()

    imports = []
    for line in lines:
        if line.strip().startswith("import "):  
            imports.append(line)
        elif line.strip().startswith("package "):
            imports.append(line)
    
    new_code = "".join(imports) + "\n" + new_class_code

    with open(java_file, "w") as f:
        f.write(new_code)

# For reponses started with "public static"
def replace_public_static(java_file, new_class_code, pattern):
    with open(java_file, "r") as f:
        lines = f.readlines()

    imports = []
    in_class = False
    in_feature = False
    for line in lines:
        if line.strip().startswith("import "): 
            imports.append(line)
        elif line.strip().startswith("package "):
            imports.append(line)
        elif line.strip().startswith(pattern.split("{")[0]):
            in_feature = True
        elif (line.strip().startswith("public class") or in_class) and (not in_feature):  
            in_class = True
            imports.append(line)
        
    
    new_code = "".join(imports) + "\n" + new_class_code + "\n}"

    with open(java_file, "w") as f:
        f.write(new_code)

# For reponses started with "package"
def replace_class_body(java_file, new_class_code, pattern):
    with open(java_file, "r") as f:
        lines = f.readlines()

    with open(java_file, "w") as f:
        f.write(new_class_code)

# Will run junit-tests and create a result file
def test_response(project_name):
    try:
        result = subprocess.run(["mvn", "test", f"-Dtest=TEST_{project_name}.java"], 
                                capture_output=True, text=True, timeout=10)

        output = result.stdout + result.stderr
        output_upper = output.upper()
        
        if "BUILD FAILURE" in output_upper:
            if "FAILURES:" in output_upper:
                return "wrong"
            return "uncompilable"
        elif "TIMEOUT" in output_upper:
            return "timeout"
        elif "BUILD SUCCESS" in output_upper:
            return "plausible"
        else:
            return "wrong"

    except Exception as e:
        print(f"Error running tests for {project_name}: {e}")
        return "uncompilable"

# This will go into the CLEAN response files, run junit-test on each resposnes through getting each output, replace it into the Buggy Java class, then run JUnit test
for bench in path:
    with open(f"{bench}.json", "r") as f:
        data = json.load(f)

    for project_name, details in data['data'].items():
        print(project_name)
        buggy_file = f"src/main/java/humaneval/buggy/{project_name}.java"
        
        with open(buggy_file, "r") as f:
            buggy_code = f.readlines()

        test_results[project_name] = {}
        pattern = details['input']
        for rank, patch in enumerate(details["output"]):

            if str(patch).startswith("package"):
                replace_class_body(buggy_file, patch,pattern)

            elif str(patch).startswith("public class"):
                replace_public_class(buggy_file, patch,pattern)

            else:
                replace_public_static(buggy_file, patch,pattern)
            
            correctness = test_response(project_name)

            test_results[project_name][rank] = correctness
            print(f"Test result for {project_name} fix {rank}: {correctness}")

            # Replace with orignal code
            with open(buggy_file, "w") as f: 
                f.writelines(buggy_code)

    with open(f"{bench}_results.json", "w") as f:
        json.dump(test_results, f, indent=2)

print("✅ All tests completed. Results saved to test_results.json")

        

# For getting Pass@k results from RESULT files

In [None]:
import json

path = [] # include path to RESULT files 

# This will go through every files included in path, get the pass@k results of those files
for bench in path:
    with open(f"{bench}.json", "r") as f:
        data = json.load(f)
    pass1 = 0
    pass5 = 0
    pass10 = 0
    
    print(bench)
    for benchmark,num in data.items():
        i = 1
        done5 = False
        done10 = False
        for result in data[benchmark]:
            if i == 1 and data[benchmark][result] == 'plausible':
                pass1 = pass1 + 1
            
            if i <= 5 and data[benchmark][result] == 'plausible' and not done5:
                pass5 = pass5 + 1
                done5 = True

            if i <= 10 and data[benchmark][result] == 'plausible' and not done10:
                pass10 = pass10 + 1
                done10 = True

            i = i + 1
    print(f"Pass@1:{pass1}/163, Pass@5:{pass5}/163, Pass@10:{pass10}/163")

# Training Scripts

In [None]:
import os
from peft import get_peft_model, LoraConfig
import torch
from datasets import load_dataset
from peft import (
    LoraConfig, PromptEncoderConfig, PrefixTuningConfig, IA3Config,
    get_peft_model,
)
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForSeq2Seq
import sys
from datasets import Dataset

def main():
    os.environ["TOKENIZERS_PARALLELISM"] = "false"
    
    model_name_or_path = "codellama/CodeLlama-7b-hf"

    # Uncomment this for QLoRA
    # model = AutoModelForCausalLM.from_pretrained(model_name_or_path, device_map="auto",quantization_config={"load_in_4bit": True, "bnb_4bit_compute_dtype": torch.bfloat16})

    model = AutoModelForCausalLM.from_pretrained(model_name_or_path, device_map="auto", torch_dtype=torch.bfloat16)

    tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)

    # Loading original dataset
    dataset = load_dataset("zxliu/ReAPR-Automatic-Program-Repair-via-Retrieval-Augmented-Large-Language-Models", split="train")

    # Random seed (so far this seed yields the best result)
    dataset = dataset.shuffle(seed=42)

    # Select 30k dataset, with max token = 1250
    def count_tokens(buggy_function, fixed_function):
        buggy_tokens = tokenizer(buggy_function, return_tensors="pt", truncation=False).input_ids.size(1)
        fixed_tokens = tokenizer(fixed_function, return_tensors="pt", truncation=False).input_ids.size(1)
        return buggy_tokens + fixed_tokens
    
    selected_samples = []
    max_samples = 30000

    for sample in dataset:
        if count_tokens(sample['buggy_function'], sample['fixed_function']) <= 1250:
            selected_samples.append(sample)
            if len(selected_samples) >= max_samples:
                break  # Stop once we have 30,000 samples

    data_dict = {
        "buggy_function": [sample["buggy_function"] for sample in selected_samples],
        "fixed_function": [sample["fixed_function"] for sample in selected_samples],
    }

    selected_dataset = Dataset.from_dict(data_dict)

    # Instruction dataset, spliting 80 20 for training and evaluation
    split_dataset = selected_dataset.train_test_split(train_size=24000, test_size=6000, seed=42)
    train_dataset = split_dataset["train"]
    eval_dataset = split_dataset["test"]

    tokenizer.add_eos_token = True

    if model_name_or_path == "deepseek-ai/deepseek-coder-6.7b-base":
        tokenizer.pad_token_id = 32018 # this follow the previous study 
    else:
        tokenizer.pad_token_id = 0
    tokenizer.padding_side = "right"

    # Tokenize the datasets for training
    def tokenize(prompt, add_eos_token=True):
        result = tokenizer(
            prompt,
            truncation=True,
            max_length=1350,
            padding=False,
            return_tensors=None,
        )
        if (
                result["input_ids"][-1] != tokenizer.eos_token_id
                and len(result["input_ids"]) < 1350
                and add_eos_token
            ):
                result["input_ids"].append(tokenizer.eos_token_id)
                result["attention_mask"].append(1)

        result["labels"] = result["input_ids"].copy()

        return result

    # Generate Prompt using standard prompting approach
    def full_prompt_generation(data):
        full_prompt =f"""You are an expert model in fixing program bugs. Your job is to deliver the most accurate fixes given a buggy program.

        @@ Instruction:
        {data["buggy_function"]}

        @@ Response:
        {data["fixed_function"]}
    """
        full = tokenize(full_prompt)

        question = tokenize(f"""You are an expert model in fixing program bugs. Your job is to deliver the most accurate fixes given a buggy program.

        @@ Instruction:
        {data["buggy_function"]}

        @@ Response:
        """)

        question_len = len(question['input_ids'])

        # setting this helps model focus only on generating the fixed instead of reproducing input
        full["labels"] = [
            -100
        ] * question_len + full["labels"][
            question_len:
        ]  
        return full

    tokenized_train_dataset = train_dataset.map(full_prompt_generation, remove_columns=['buggy_function','fixed_function'])
    tokenized_val_dataset = eval_dataset.map(full_prompt_generation, remove_columns=['buggy_function','fixed_function'])


    model.train()  # put model back into training mode

    # Uncomment this for QLoRA
    # model = prepare_model_for_kbit_training(model) 

    # configs 
    config = LoraConfig(
        r=16,
        lora_alpha=16,
        lora_dropout=0.05,
        bias="none",
        task_type="CAUSAL_LM",
    )

    # config = PromptEncoderConfig(
    #         peft_type="P_TUNING",
    #         task_type="CAUSAL_LM",
    #         num_virtual_tokens=100,
    #         encoder_hidden_size=2048,
    #         encoder_reparameterization_type= "MLP"
    # )

    # config = IA3Config(
    #         peft_type="IA3",
    #         task_type="CAUSAL_LM",
    #         )

    model = get_peft_model(model, config)

    output_dir = "/drive/MyDrive/my_model"

    training_args = TrainingArguments(
            per_device_train_batch_size=3,
            per_device_eval_batch_size=3,
            gradient_accumulation_steps=1,
            warmup_ratio=0.05,
            num_train_epochs= 3,
            learning_rate=1e-5,
            lr_scheduler_type="cosine",
            fp16=False,
            bf16= True,
            optim="adamw_torch",
            evaluation_strategy="steps", 
            save_strategy="no",
            eval_steps=0.2,
            output_dir=output_dir,
            load_best_model_at_end=False,
            group_by_length=True, 
            report_to="none", 
            run_name=None, 
            gradient_checkpointing=True,
            dataloader_drop_last=True,
            dataloader_pin_memory=True,
            disable_tqdm = False,
            dataloader_num_workers=4,
        )

    trainer = Trainer(
        model=model,
        train_dataset=tokenized_train_dataset,
        eval_dataset=tokenized_val_dataset,
        args=training_args,
        data_collator=DataCollatorForSeq2Seq(
            tokenizer, pad_to_multiple_of=8, return_tensors="pt", padding=True
        ),
    )

    model.config.use_cache = False

    # for running in linux, speed up 
    if torch.__version__ >= "2" and sys.platform != "win32":
        print("compiling the model")
        model = torch.compile(model)

    for param in model.parameters():
        param.requires_grad = True

    trainer.train()

    model.save_pretrained(output_dir)
main()


# Inference Scripts

In [None]:
import os
import json
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig
from peft import AutoPeftModelForCausalLM

os.environ["TOKENIZERS_PARALLELISM"] = "false"
device = "cuda"
model_name_or_path = "codellama/CodeLlama-7b-hf"
tokenizer_name_or_path = "codellama/CodeLlama-7b-hf"



tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
tokenizer.pad_token_id = 0 # unk. we want this to be different from the eos token
tokenizer.padding_side = "right"

ADAPTER_PATH = "/drive/MyDrive/my_model_lora"

# For loading PEFT-trained models

# QLORA
# model = AutoModelForCausalLM.from_pretrained(model_name_or_path, device_map="auto",quantization_config={"load_in_4bit": True, "bnb_4bit_compute_dtype": torch.float16})

# LoRA, IA3, PTuning
# model = AutoPeftModelForCausalLM.from_pretrained(ADAPTER_PATH,device_map="auto",torch_dtype=torch.bfloat16)

# For loading Base models
model = AutoModelForCausalLM.from_pretrained(model_name_or_path,device_map=device,torch_dtype=torch.bfloat16)

# This class is used for generating Inference Prompt
class generate_prompt:
    PROMPT_TEMPLATE = """You are an expert model in fixing program bugs. Your job is to deliver the most accurate fixes given a buggy program.

@@ Instruction
{instruction}

@@ Response
"""

    @staticmethod
    def generate_prompt(instruction: str) -> str:
        return generate_prompt.PROMPT_TEMPLATE.format(instruction=instruction)

# Accompany with class generate_prompt
def generate_benchmark_prompt(text: str) -> str:
    # This prompt will change depending on the Benchmark, below is for benchmark that includes additional examples (RQ2, RQ3)
    BENCHMARK_PROMPT = """Write a solution to the following coding problem:
    The input is buggy code, you are given the logic of the function in the comment block. Base on that and fix the functionality to match the logic.
    {problem}"""

    # This is for benchmark with no context (RQ1)
    # BENCHMARK_PROMPT = """Write a solution to the following coding problem:
    # The input is buggy code, fix it accordingly.
    # {problem}"""

    # This is for benchmark with Few-Shot settings (RQ2,RQ3)
    # BENCHMARK_PROMPT = """You are given some examples in the comment block on how to fix the target buggy code. Base on those examples, fix the last Buggy Code and only provide your response for the last Fixed Code section.
    # {problem}"""

    formatted_text = BENCHMARK_PROMPT.format(problem=text)
    return generate_prompt.generate_prompt(instruction=formatted_text)

output = json.load(open('3_examples_benchmark.json', 'r')) # Depending on which benchmark are being tested on
output2 = json.load(open('0_examples_benchmark.json', 'r')) # This dataset acts as a limit for model response max length

model.to("cuda")

# This follow the same setting as previous study, except for max_length
for benchmark in output['data']:
    inputs = tokenizer(generate_benchmark_prompt(output['data'][benchmark]['input']), truncation=True,
                    max_length=1800,
                    padding=False,
                    return_tensors="pt",
                )
    
    eos_id = tokenizer.convert_tokens_to_ids(tokenizer.eos_token)

    generated_ids = model.generate(
        input_ids=inputs['input_ids'].cuda(),
        attention_mask=inputs['attention_mask'].cuda(),
        max_new_tokens= tokenizer(output2['data'][benchmark]['input'], return_tensors="pt", truncation=False).input_ids.size(1) + 256, # maximum expected length of response from models, this avoid too much noises
        num_beams=10,
        num_return_sequences=10,
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=eos_id,
    )

    all_output = []
    for generated_id in generated_ids:
        text = tokenizer.decode(generated_id[len(inputs[0]):], skip_special_tokens=True, clean_up_tokenization_spaces=False)
        all_output.append(text)

    output['data'][benchmark]['output'] = all_output

with open(f'codellama_qlora_3_icls.json', "w") as f:
    json.dump(output, f, indent=2)