# Generating Test Cases for Leetcode Dataset

- Code for generating test cases from comments using LLM
- Validates test cases via running each script on Subprocess
- Used to automatically verify the LLM-generated code within BICS v2 benchmark
- Original dataset found at [greengerong/leetcode @ HuggingFace](https://huggingface.co/datasets/greengerong/leetcode)

> .env

- Please include the following environment variables in your `.env` file:
```
HUGGINGFACE_API_KEY=...
OPENAI_API_KEY=...
```

In [None]:
!pip install tiktoken

Collecting tiktoken
  Downloading tiktoken-0.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tiktoken
Successfully installed tiktoken-0.7.0


In [None]:
!pip install datasets

Collecting datasets
  Downloading datasets-2.19.1-py3-none-any.whl (542 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m542.0/542.0 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m9.9 MB/s[0m eta [36m0:00:00[0m
Collecting xxhash (from datasets)
  Downloading xxhash-3.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (194 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m194.1/194.1 kB[0m [31m15.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting multiprocess (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl (134 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: xxhash, dill, multiprocess, datasets
Successfully installed datasets-

In [None]:
!pip install huggingface_hub



In [None]:
!pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.1


## Dataset Examination

In [None]:
from datasets import load_dataset

# Load the IMDb dataset
dataset = load_dataset('greengerong/leetcode')

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Downloading readme:   0%|          | 0.00/21.0 [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/16.1M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/2360 [00:00<?, ? examples/s]

In [None]:
len(dataset['train']['python'])

2360

In [None]:
from abc import ABC, abstractmethod

class Tokenizer(ABC):
    def __init__(self, *args, **kwargs):
        """
        Initialize the Tokenizer with any number of arguments.

        Parameters:
        *args: A tuple of positional arguments.
        **kwargs: A dictionary of keyword arguments.
        """
        # You can handle or pass these arguments as needed
        super().__init__(*args, **kwargs)  # Optional: useful if extending another class with an __init__

    @abstractmethod
    def count_tokens(self, string: str) -> int:
        """
        Abstract method to count tokens in a string.

        Parameters:
        string (str): The string to count tokens in.

        Returns:
        int: The number of tokens.
        """
        pass

In [None]:
import tiktoken

class OpenAITokenizer(Tokenizer):
    def __init__(self, encoding_name):
        self.encoding_name = encoding_name

    def count_tokens(self, string: str) -> int:
        """Returns the number of tokens in a text string."""
        encoding = tiktoken.get_encoding(self.encoding_name)
        num_tokens = len(encoding.encode(string))
        return num_tokens

In [None]:
openai_tokenizer = OpenAITokenizer(encoding_name='cl100k_base')

In [None]:
from tqdm import tqdm

num_tokens = 0

for i in tqdm(range(len(dataset['train']['python']))):
    num_tokens += openai_tokenizer.count_tokens(dataset['train']['python'][i])

100%|██████████| 2360/2360 [00:13<00:00, 169.24it/s]


In [None]:
num_tokens

703330

In [None]:
print(dataset['train']['content'][0])

In [None]:
print(dataset['train']['python'][0])


    ```python
def twoSum(nums, target):
    map = {}
    for i, num in enumerate(nums):
        complement = target - num
        if complement in map:
            return [map[complement], i]
        map[num] = i
    return []
```
    
    The algorithm leverages a hash map (unordered_map in C++, HashMap in Java, dictionary in Python, and Map in JavaScript). It iterates through the given 'nums' array and calculates the complementary value (target - current value). If the complementary value is already in the hash map, it means that we found a solution, and we return those indices. If the complement is not in the hash map, we store the current element in the hash map with its index. If the algorithm doesn't find the solution, it returns an empty array or throws an exception (in Java).

This approach has a time complexity of O(n) and a space complexity of O(n) as well.
    


## Basic Code Preprocessing

In [None]:
import re

def extract_python_code(text):
    pattern = r'```python(.*?)```'
    match = re.search(pattern, text, re.DOTALL)
    if match:
        return match.group(1).strip()
    return None
def extract_javascript_code(text):
    pattern = r'```javascript(.*?)```'
    match = re.search(pattern, text, re.DOTALL)
    if match:
        return match.group(1).strip()
    return None
def extract_java_code(text):
    pattern = r'```java(.*?)```'
    match = re.search(pattern, text, re.DOTALL)
    if match:
        return match.group(1).strip()
    return None
def extract_cpp_code(text):
    pattern = r'```cpp(.*?)```'
    match = re.search(pattern, text, re.DOTALL)
    if match:
        return match.group(1).strip()
    return None

In [None]:
def process_python_code(sample):
  sample['python_code_only'] = extract_python_code(sample['python'])
  return sample
def process_javascript_code(sample):
  sample['javascript_code_only'] = extract_javascript_code(sample['javascript'])
  return sample
def process_java_code(sample):
  sample['java_code_only'] = extract_java_code(sample['java'])
  return sample
def process_cpp_code(sample):
  sample['c++_code_only'] = extract_cpp_code(sample['c++'])
  return sample

In [None]:
dataset = dataset.map(process_python_code)
dataset = dataset.map(process_javascript_code)
dataset = dataset.map(process_java_code)
dataset = dataset.map(process_cpp_code)

Map:   0%|          | 0/2360 [00:00<?, ? examples/s]

Map:   0%|          | 0/2360 [00:00<?, ? examples/s]

Map:   0%|          | 0/2360 [00:00<?, ? examples/s]

Map:   0%|          | 0/2360 [00:00<?, ? examples/s]

In [None]:
dataset

DatasetDict({
    train: Dataset({
        features: ['id', 'slug', 'title', 'difficulty', 'content', 'java', 'c++', 'python', 'javascript', 'python_code_only', 'javascript_code_only', 'java_code_only', 'c++_code_only'],
        num_rows: 2360
    })
})

In [None]:
from huggingface_hub import HfApi, HfFolder
import os
from dotenv import load_dotenv
load_dotenv()

def upload_to_huggingface(dataset):
    # Authenticate to HuggingFace Hub (ensure you have an API token)
    token = os.getenv('HUGGINGFACE_API_KEY')
    HfFolder.save_token(token)

    # Push the dataset to the HuggingFace Hub
    api = HfApi()
    repo_id = 'techandy42/multi_lang_leetcode'

    dataset.push_to_hub(repo_id, token=token)

    print("Dataset uploaded successfully.")

In [None]:
upload_to_huggingface(dataset)

## Creating Test Cases (Python)

In [None]:
!pip install litellm

Collecting litellm
  Downloading litellm-1.38.10-py3-none-any.whl (4.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.5/4.5 MB[0m [31m13.6 MB/s[0m eta [36m0:00:00[0m
Collecting openai>=1.27.0 (from litellm)
  Downloading openai-1.30.3-py3-none-any.whl (320 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m320.6/320.6 kB[0m [31m23.6 MB/s[0m eta [36m0:00:00[0m
Collecting httpx<1,>=0.23.0 (from openai>=1.27.0->litellm)
  Downloading httpx-0.27.0-py3-none-any.whl (75 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.6/75.6 kB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
Collecting httpcore==1.* (from httpx<1,>=0.23.0->openai>=1.27.0->litellm)
  Downloading httpcore-1.0.5-py3-none-any.whl (77 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.9/77.9 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting h11<0.15,>=0.13 (from httpcore==1.*->httpx<1,>=0.23.0->openai>=1.27.0->litellm)
  Downloadin

In [None]:
from abc import ABC, abstractmethod

class Model(ABC):
    def __init__(self, *args, **kwargs):
        """
        Initialize the Tokenizer with any number of arguments.

        Parameters:
        *args: A tuple of positional arguments.
        **kwargs: A dictionary of keyword arguments.
        """
        # You can handle or pass these arguments as needed
        super().__init__(*args, **kwargs)  # Optional: useful if extending another class with an __init__

    @abstractmethod
    def completion_json(self, prompt: str) -> dict:
        """
        Abstract method to generate a completion from a prompt.

        Parameters:
        prompt (str): The prompt to generate a completion from.

        Returns:
        dict: The completion as a JSON object.
        """
        pass

    @abstractmethod
    def get_context_limit(self) -> int:
        """
        Abstract method to get the context limit of the model.

        Returns:
        int: The context limit.
        """
        pass

In [None]:
from pydantic import BaseModel, ValidationError
from typing import Type
import json
import re
from litellm import completion
from dotenv import load_dotenv
import os

load_dotenv()

os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")

openai_tokenizer = OpenAITokenizer(encoding_name="cl100k_base")

class OpenAIModel(Model):
    def __init__(self, model_name, context_limit):
        self.model_name = model_name
        self.context_limit = context_limit

    def completion_json(self, prompt: str, pydantic_model: Type[BaseModel]) -> dict:
        if openai_tokenizer.count_tokens(prompt) > self.context_limit:
            raise ValueError("The prompt exceeds the context limit of the model.")

        # Define the messages to send to the model
        messages = [
            {
                "role": "system",
                "content": "Return output in JSON format.",
            },
            {
                "role": "user",
                "content": prompt,
            },
        ]

        # Generate the JSON output using the completion method
        response = completion(model=self.model_name, messages=messages)

        # Extract the content from the response
        content = response.choices[0].message.content.strip()

        # Remove ```json <content> ``` tags using regex
        content = re.sub(r'```json\s*(.*?)\s*```', r'\1', content, flags=re.DOTALL)

        # Parse the JSON into a dictionary
        try:
            data = json.loads(content)
        except json.JSONDecodeError:
            raise ValueError("The content returned is not valid JSON")

        # Validate the dictionary against the Pydantic model
        try:
            _ = pydantic_model(**data)
        except ValidationError as e:
            raise ValueError(f"The JSON does not contain all necessary fields: {e}")

        return data

    def get_context_limit(self) -> int:
        return self.context_limit

# Test Case
if __name__ == "__main__":
    openai_model = OpenAIModel(model_name="openai/gpt-4o", context_limit=128000)
    class Person(BaseModel):
        name: str
        age: int
    prompt = f"""Create a fictional person with a name and an age.

JSON Output Format:
{{
    "name": str,
    "age": int,
}}
"""
    data = openai_model.completion_json(prompt, Person)
    print(data)

{'name': 'Alex Carter', 'age': 29}


### Cost Estimation

In [None]:
input_token_count = 1300
output_token_count = 700
print(f"Input Token Count: {input_token_count} / Output Token Count: {output_token_count}")
cost_per_single_question = ((input_token_count / 1_000_000) * 5) + ((output_token_count / 1_000_000) * 15)
print(f"Costs around ${cost_per_single_question:.3f} to create test cases for a single Python leetcode questions.")
print(f"Costs around ${cost_per_single_question * len(dataset['train']):.3f} to create test cases for all Python leetcode questions.")

Input Token Count: 1300 / Output Token Count: 700
Costs around $0.017 to create test cases for a single Python leetcode questions.
Costs around $40.120 to create test cases for all Python leetcode questions.


### Test Cases Generation

In [None]:
import os

def create_folder_if_not_exists(folder_path):
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
        print(f"Folder created: {folder_path}")
    else:
        print(f"Folder already exists: {folder_path}")

In [None]:
def replace_dash_with_underscore(string):
    return string.replace('-', '_')

In [None]:
def rename_test_cases(i: int, test_cases: dict, dataset):
    for test_no in range(len(test_cases['test_function_names'])):
        test_function_name = test_cases['test_function_names'][test_no]
        test_function = test_cases['test_functions'][test_no]
        if test_function_name not in test_function:
            print(f"Test function name not found in source code: {test_function_name}")
        else:
            test_cases['test_functions'][test_no] = test_function.replace(test_function_name, replace_dash_with_underscore(dataset['train']['slug'][i]) + f'_test{test_no+1}')

In [None]:
def create_source_code_with_test_cases(i: int, test_cases: dict, dataset):
    source_code_with_test_cases = f"results = []\n\n\n{dataset['train']['python_code_only'][i]}\n\n\n"

    for test_no in range(len(test_cases['test_functions'])):
        source_code_with_test_cases += f"{test_cases['test_functions'][test_no]}\n\n"

    for test_no in range(len(test_cases['test_functions'])):
        source_code_with_test_cases += f"results.append({replace_dash_with_underscore(dataset['train']['slug'][i])}_test{test_no+1}())\n\n\n"

    expected_results = "["
    for test_no in range(len(test_cases['test_functions'])):
        expected_results += "\"SUCCESS\", "
    expected_results = expected_results[:-2]
    expected_results += "]"

    source_code_with_test_cases += f"""try:
    assert results == {expected_results}
except Exception as e:
    raise e
"""

    return source_code_with_test_cases

In [None]:
from pydantic import BaseModel
from typing import Optional
import os
import subprocess

def get_test_functions(i: int, dataset, model: Model, verbose: bool = False) -> Optional[dict]:
    prompt_create_python_test_cases = f"""Instruction:
- You will be provided with a Python code documentation and source code.
- The documentation contains examples with sample inputs and expected outputs.
- Write Python test function for each example which calls the function or class in the source code with the sample inputs and checks if the actual output matches the expected output.
- If the output is returned by the function, store it in a variable and check if it is same as the expected output.
- If the function modifies the input as an output without returning any value, check the input if it is same as the expected output.
- If the test passes, return "SUCCESS" from the test function.
- If the test fails, return "FAILURE" from the test function.
- Each test function should have function signature def test_function_name() -> str.
- You should return all test functions as a list of string at the field "test_functions" within the JSON output.
- In addition, you should return the names of the test functions as a list of string at the field "test_function_names" within the JSON output. Do not include anything else than names of each function.
- For N examples in the documentation, you should have N test functions and N test function names.

Example Documentation:
{dataset['train']['content'][0]}


Example Source Code:
{dataset['train']['python_code_only'][0]}


Example Output Test Function No.1:
def test1():
    nums = [2, 7, 11, 15]
    target = 9
    result = twoSum(nums, target)
    if result == [0, 1]:
        return "SUCCESS"
    else:
        return "FAILURE"


Example Output Test Function No.2:
def test2():
    nums = [3, 2, 4]
    target = 6
    result = twoSum(nums, target)
    if result == [1, 2]:
        return "SUCCESS"
    else:
        return "FAILURE"


Example Output Test Function No.3:
def test3():
    nums = [3, 3]
    target = 6
    result = twoSum(nums, target)
    if result == [0, 1]:
        return "SUCCESS"
    else:
        return "FAILURE"


Actual Documentation:
{dataset['train']['content'][i]}


Acutal Source Code:
{dataset['train']['python_code_only'][i]}


Output JSON Format:
{{
    "test_functions": list[str], // each string is a Python test function
    "test_function_names": list[str], // each string is the name of the test function
}}
"""

    class TestCases(BaseModel):
        test_functions: list[str]
        test_function_names: list[str]

    test_cases = model.completion_json(prompt_create_python_test_cases, TestCases)

    if verbose:
        print("=" * 9 + " TEST CASES " + "=" * 9)
        print(test_cases)
        print("=" * 30)

    if len(test_cases['test_functions']) != len(test_cases['test_function_names']):
        print("Mismatch between number of test function names and test function definitions.")
        return None

    rename_test_cases(i, test_cases, dataset)

    source_code_with_test_cases = create_source_code_with_test_cases(i, test_cases, dataset)

    if verbose:
        print("=" * 2 + " SOURCE CODE + TEST CASES " + "=" * 2)
        print(source_code_with_test_cases)
        print("=" * 30)

    foldername = "test_cases"

    create_folder_if_not_exists(foldername)

    filename = f"{replace_dash_with_underscore(dataset['train']['slug'][i])}.py"

    filepath = os.path.join(foldername, filename)

    with open(filepath, 'w') as f:
        f.write(source_code_with_test_cases)

    try:
        result = subprocess.run(['python', filepath])
        return_code = result.returncode
        if return_code == 0:
            print("Test cases are valid.")
            return test_cases['test_functions']
        else:
            print("Test cases are invalid.")
            return None
    except Exception as e:
        print(f"Error occurred: {e}")
        return None

In [None]:
# Sample Run
openai_model = OpenAIModel(model_name="openai/gpt-4o", context_limit=128000)
test_functions = get_test_functions(1, dataset, openai_model, verbose=True)
print(test_functions)

In [None]:
openai_model = OpenAIModel(model_name="openai/gpt-4o", context_limit=128000)
test_functions_list = []

i = 0
retries = 0
while i < 100:
    print(f"\n\nCreating test cases for question no.{i+1}...")
    try:
        test_functions = get_test_functions(i, dataset, openai_model, verbose=False)
        if test_functions is None:
            if retries < 3:
                print(f"Retrying for question no.{i+1}...")
                retries += 1
            else:
                print(f"Failed three times on question no.{i+1}, moving on to next question...")
                retries = 0
                i += 1
        else:
            test_functions_list.append((i, test_functions))
            i += 1
    except Exception as e:
        print(f"Error occurred on question no.{i+1}: {e}")
        if retries < 3:
            print(f"Retrying for question no.{i+1}...")
            retries += 1
        else:
            print(f"Failed three times on question no.{i+1}, moving on to next question...")
            retries = 0
            i += 1

In [None]:
test_functions_map = {}
for i, test_functions in test_functions_list:
    test_functions_map[i] = test_functions

In [None]:
def proess_python_test_cases(sample, i):
  sample['python_test_case1'] = test_functions_map[i][0] if (i in test_functions_map) and (len(test_functions_map[i]) > 0) else None
  sample['python_test_case2'] = test_functions_map[i][1] if (i in test_functions_map) and (len(test_functions_map[i]) > 1) else None
  sample['python_test_case3'] = test_functions_map[i][2] if (i in test_functions_map) and (len(test_functions_map[i]) > 2) else None
  return sample

dataset = dataset.map(proess_python_test_cases, with_indices=True)

Map:   0%|          | 0/2360 [00:00<?, ? examples/s]

In [None]:
upload_to_huggingface(dataset)

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ?it/s]

Creating parquet from Arrow format:   0%|          | 0/3 [00:00<?, ?ba/s]

README.md:   0%|          | 0.00/733 [00:00<?, ?B/s]

Dataset uploaded successfully.
