In [11]:
import torch
import argparse
import datasets
from datasets import load_dataset
from trl import SFTTrainer, DataCollatorForCompletionOnlyLM
from transformers import AutoModelForCausalLM, TrainingArguments, HfArgumentParser, AutoTokenizer, TrainerCallback
from huggingface_hub import login
import matplotlib.pyplot as plt
from peft import LoraConfig
import nltk
import numpy as np
from sklearn.metrics import accuracy_score
from tqdm.auto import tqdm
import pandas as pd

In [2]:
from dotenv import load_dotenv
import os
load_dotenv()
hf_token = os.getenv("HF_TOKEN") # make a .env for this and put your access token as HF_TOKEN=whateverYourAccessTokenIs

In [3]:
model_id = "meta-llama/Llama-3.2-1B-Instruct"
device = "cuda" if torch.cuda.is_available() else "cpu"
#print("GPU available ", torch.cuda.is_available())

In [4]:
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    token=hf_token
).to(device)

tokenizer = AutoTokenizer.from_pretrained(model_id, token=hf_token)

In [None]:
def run_model(model, tokenizer, messages, max_new_tokens=5, verbose=False):
    input_text = tokenizer.apply_chat_template(messages, tokenize=False)

    if verbose: print("\n###input_text:###\n", input_text)

    input_ids = tokenizer(input_text, return_tensors="pt").input_ids.to(model.device)

    if verbose: print("\n###input_ids:###\n", input_ids)

    terminators = [
      tokenizer.eos_token_id,
      tokenizer.convert_tokens_to_ids("<|eot_id|>")
    ]

    output = model.generate(
        input_ids,
        max_new_tokens=max_new_tokens,
        eos_token_id=terminators,
        do_sample=False,
    )


    # Decode the output and return the response without special tokens
    response = tokenizer.decode(output[0], skip_special_tokens=True)

    if verbose: print("\n###response:###\n", response)
    
    start_marker = "python\n"
    end_marker = "```"
    assistant_response = response.split(start_marker)[1].split(end_marker)[0] # grab just the code snippet
    #assistant_response = response.split("\n")[-1].strip()
    #assistant_response = response
    return assistant_response

In [14]:
data = pd.read_json("lc_hard.json", lines=False)
data

Unnamed: 0,desc,skeleton,examples,ref,test
0,\nGiven n non-negative integers representing a...,"\ndef trap(self, height: List[int]) -> int:\n",[],"[\ndef trap(self, height: List[int]) -> int:\n...","{'input': [], 'output': []}"
1,\nGiven an array of integers heights represent...,"\ndef largestRectangleArea(self, heights: List...",[],"[\ndef largestRectangleArea(self, heights: Lis...","{'input': [], 'output': []}"
2,\nGiven two sorted arrays nums1 and nums2 of s...,"\ndef findMedianSortedArrays(self, nums1: List...",[],"[\ndef findMedianSortedArrays(self, nums1: Lis...","{'input': [], 'output': []}"
3,\nGiven two strings s and t of lengths m and n...,"\ndef minWindow(self, s: str, t: str) -> str:\n",[],"[\ndef minWindow(self, s: str, t: str) -> str:...","{'input': [], 'output': []}"
4,"\nYou are given an array of integers nums, the...","\ndef maxSlidingWindow(self, nums: List[int], ...",[],"[\ndef maxSlidingWindow(self, nums: List[int],...","{'input': [], 'output': []}"
5,\nYou are given an array of k linked-lists lis...,\n# Definition for singly-linked list.\n# clas...,[],"[\ndef mergeKLists(self, lists: List[Optional[...","{'input': [], 'output': []}"
6,"\nGiven the head of a linked list, reverse the...",\n# Definition for singly-linked list.\n# clas...,[],"[\ndef reverseKGroup(self, head: Optional[List...","{'input': [], 'output': []}"
7,\nA path in a binary tree is a sequence of nod...,\n# Definition for a binary tree node.\n# clas...,[],"[\ndef traverse(self, node, storer):\n ...","{'input': [], 'output': []}"
8,"\nGiven an integer array nums, return the numb...","\ndef reversePairs(self, nums: List[int]) -> i...",[],"[\ndef reversePairs(self, nums: List[int]) -> ...","{'input': [], 'output': []}"
9,\nGiven an m x n board of characters and a lis...,"\ndef findWords(self, board: List[List[str]], ...",[],"[\ndef findWords(self, board: List[List[str]],...","{'input': [], 'output': []}"


In [36]:
def apply_lc_prompt(desc, skel):
    prompt = (
        "Your task is to complete the following problem in Python. You are provided with a function signature and a description. Output your completed version of the function. "
        f"Description: {desc}"
        "Below is the starting point for your code. \n"
        f"{skel}"
    )

    return prompt.strip()

In [37]:
dataset = data.copy()
dataset["prompt"] = dataset.apply(lambda x: apply_lc_prompt(x["desc"], x["ref"]), axis=1)
#print(dataset.iloc[0].to_dict())

In [20]:
#import nltk.translate.bleu_score


def eval_bleu(model, tokenizer, dataset, max_new_tokens=1000):
    outputs = []

    for row in tqdm(dataset.to_dict(orient="records")):
        messages = [
            {"role": "system", "content": ""},
            {"role": "user", "content": row["prompt"]},
        ]

        output = run_model(model=model, tokenizer=tokenizer, messages=messages, max_new_tokens=max_new_tokens)

        outputs.append(output)
    
    r, h = [], []
    for idx, row in tqdm(enumerate(dataset.to_dict(orient="records"))):
        refs_in_dataset = row["ref"]
        references = []
        for real_code_solution in refs_in_dataset:
            references.append(real_code_solution.split())
        hypothesis = outputs[idx].split()
        
        r.append(references)
        h.append(hypothesis)
    
    bleu_score = nltk.translate.bleu_score.corpus_bleu(r, h, weights=(1,0,0,0))
    return bleu_score, outputs


In [38]:
df = dataset.copy()
bleu_score, outputs = eval_bleu(model, tokenizer, df)
print(f"Bleu: {bleu_score}")
df["output"] = outputs
display(df)


The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
  2%|▏         | 1/50 [00:08<06:37,  8.12s/it]The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
  4%|▍         | 2/50 [00:15<06:15,  7.83s/it]The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
  6%|▌         | 3/50 [00:26<07:16,  9.29s/it]The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Ple

Bleu: 0.24062617172853393





Unnamed: 0,desc,skeleton,examples,ref,test,prompt,output
0,\nGiven n non-negative integers representing a...,"\ndef trap(self, height: List[int]) -> int:\n",[],"[\ndef trap(self, height: List[int]) -> int:\n...","{'input': [], 'output': []}",Your task is to complete the following problem...,"def trap(self, height: List[int]) -> int:\n ..."
1,\nGiven an array of integers heights represent...,"\ndef largestRectangleArea(self, heights: List...",[],"[\ndef largestRectangleArea(self, heights: Lis...","{'input': [], 'output': []}",Your task is to complete the following problem...,def largestRectangleArea(heights: List[int]) -...
2,\nGiven two sorted arrays nums1 and nums2 of s...,"\ndef findMedianSortedArrays(self, nums1: List...",[],"[\ndef findMedianSortedArrays(self, nums1: Lis...","{'input': [], 'output': []}",Your task is to complete the following problem...,"def findMedianSortedArrays(self, nums1: List[i..."
3,\nGiven two strings s and t of lengths m and n...,"\ndef minWindow(self, s: str, t: str) -> str:\n",[],"[\ndef minWindow(self, s: str, t: str) -> str:...","{'input': [], 'output': []}",Your task is to complete the following problem...,from collections import defaultdict\n\ndef min...
4,"\nYou are given an array of integers nums, the...","\ndef maxSlidingWindow(self, nums: List[int], ...",[],"[\ndef maxSlidingWindow(self, nums: List[int],...","{'input': [], 'output': []}",Your task is to complete the following problem...,from collections import deque\n\ndef maxSlidin...
5,\nYou are given an array of k linked-lists lis...,\n# Definition for singly-linked list.\n# clas...,[],"[\ndef mergeKLists(self, lists: List[Optional[...","{'input': [], 'output': []}",Your task is to complete the following problem...,"from typing import List, Optional\n\n# Definit..."
6,"\nGiven the head of a linked list, reverse the...",\n# Definition for singly-linked list.\n# clas...,[],"[\ndef reverseKGroup(self, head: Optional[List...","{'input': [], 'output': []}",Your task is to complete the following problem...,"def reverseKGroup(self, head: Optional[ListNod..."
7,\nA path in a binary tree is a sequence of nod...,\n# Definition for a binary tree node.\n# clas...,[],"[\ndef traverse(self, node, storer):\n ...","{'input': [], 'output': []}",Your task is to complete the following problem...,"class Solution:\n def maxPathSum(self, root..."
8,"\nGiven an integer array nums, return the numb...","\ndef reversePairs(self, nums: List[int]) -> i...",[],"[\ndef reversePairs(self, nums: List[int]) -> ...","{'input': [], 'output': []}",Your task is to complete the following problem...,"from bisect import bisect_left, bisect_right\n..."
9,\nGiven an m x n board of characters and a lis...,"\ndef findWords(self, board: List[List[str]], ...",[],"[\ndef findWords(self, board: List[List[str]],...","{'input': [], 'output': []}",Your task is to complete the following problem...,"def findWords(self, board: List[List[str]], wo..."


In [39]:
print(df.iloc[0].to_dict()["output"])

def trap(self, height: List[int]) -> int:
    """
    Given n non-negative integers representing an elevation map where the width of each bar is 1, 
    compute how much water it can trap after raining.

    Args:
    height (List[int]): A list of non-negative integers representing the elevation map.

    Returns:
    int: The amount of water that can be trapped.
    """
    left = 0
    right = len(height) - 1
    left_max = height[left]
    right_max = height[right]
    water = 0

    while left < right:
        # If the left maximum is less than the right maximum, move the left pointer to the right
        if left_max < right_max:
            left += 1
            left_max = max(left_max, height[left])
            water += left_max - height[left]
        # If the left maximum is not less than the right maximum, move the right pointer to the left
        else:
            right -= 1
            right_max = max(right_max, height[right])
            water += right_max - height[right]



In [None]:
def eval_test_case(code, test_inputs, expected_outputs, function_name):
    namespace = {}

    try:
        exec(code, namespace)

        func = namespace.get(function_name)
        if not callable(func):
            raise ValueError(f"Function '{function_name}' is not defined or callable")
        
        passed = 0
        total = len(test_inputs)

        for test_input, expected_output in zip(test_inputs, expected_outputs):
            try:
                result = func(*test_input)
                if result == expected_output:
                    passed += 1
            except Exception as e:
                print(f"Test with input {test_input} on {function_name} failed due to error: {e}")
        
        return passed / total if total > 0 else 0.0
    
    except Exception as e:
        # code messed up, penalize
        return 0.0

In [None]:
#TODO: process dataset to run eval_test_case 