In [1]:
import os

import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import multiprocess
import concurrent.futures

from datasets import load_dataset

# from pytest_runner import 
from adjustment_utils import adjust_pytest, run_pytest
from agent import CodeAgent
from client_interface.client import ClientInterface

In [2]:
dataset = load_dataset("vikp/python_functions_filtered")

In [3]:
df = pd.DataFrame(dataset["train"])
df = df.sort_values(by="quality_prob", 
                    ascending=False).\
        reset_index(drop=True)
df

Unnamed: 0,code,quality_prob,learning_prob
0,"def covariance(x, y):\n """"""Compute covarian...",0.985363,0.976558
1,"def LikelihoodRatioSignificance(LLnull, LLalt,...",0.981207,0.775137
2,"def spectralflux_wavelength_to_frequency(flux,...",0.977532,0.993036
3,"def plot_contributions(ax, top_n, bar_dims, ba...",0.975866,0.601798
4,"def scale_to_internal(vec, scaling_factor, sca...",0.975498,0.671538
...,...,...,...
58338,def add_assignment_as_district_col(partition):...,0.700075,0.625481
58339,"def get_range(value):\n """"""\n A simple t...",0.700075,0.654840
58340,"def forwardsAdditiveError(image, template):\n ...",0.700075,0.804175
58341,"def calculate_iou(bbox1, bbox2):\n """"""\n ...",0.700075,0.761028


In [4]:
df.code = df.code.str.replace(r'(\'\'\'[\s\S]*?\'\'\'|\"\"\"[\s\S]*?\"\"\")', "", regex=True)
df

Unnamed: 0,code,quality_prob,learning_prob
0,"def covariance(x, y):\n \n n_samples, ho...",0.985363,0.976558
1,"def LikelihoodRatioSignificance(LLnull, LLalt,...",0.981207,0.775137
2,"def spectralflux_wavelength_to_frequency(flux,...",0.977532,0.993036
3,"def plot_contributions(ax, top_n, bar_dims, ba...",0.975866,0.601798
4,"def scale_to_internal(vec, scaling_factor, sca...",0.975498,0.671538
...,...,...,...
58338,def add_assignment_as_district_col(partition):...,0.700075,0.625481
58339,def get_range(value):\n \n return range(...,0.700075,0.654840
58340,"def forwardsAdditiveError(image, template):\n ...",0.700075,0.804175
58341,"def calculate_iou(bbox1, bbox2):\n \n xm...",0.700075,0.761028


In [5]:
def create_tests(code):
    try:

        prompt = """You are a useful code assistant. A user will give you a code representing a python file. 
Your task is to take said code and generate a complete working testing file using Pytest. 
You may assume the original code can be found in a `source.py` file residing in the same directory as the test file, you must import it as such.
You *MUST* always provide the full test code with no other explanations.
You *MUST* use only one assertion per test. Always aim for full code coverage. 
"""
        from adjustment_utils import modify_pytest_code, run_pytest
        from agent import CodeAgent
        model = CodeAgent(prompt)

        results = model.generate_response(code, n= 3)
        test_results =[]
        for result in results:
            try:
                res = run_pytest(code, result)
                if not len(res["stderr"]):
                    if (res["failed_assertions"] == 0) and (res["coverage"] > 90):
                        return (code, result)
                    res["pytest_code"] = result
                    test_results.append(res)
            except Exception as e:
                continue

        try:
            test_results = sorted(test_results, key=lambda x: x["coverage"], reverse=True)

            pytest_code = ""
            adjusted_results = []
            for test_result in test_results:

                try:
                    adjusted = modify_pytest_code(code, test_result["pytest_code"])

                    res = run_pytest(code, adjusted)
                    res["pytest_code"] = result

                    if (not len(res["stderr"])) and (res["failed_assertions"]==0):
                        if res["coverage"] > 95:
                            return (code, adjusted)
                        adjusted_results.append(res)
                except Exception as e:
                    continue

            pytest_code = max(adjusted_results, key=lambda x: x["coverage"])["pytest_code"]
            return (code, pytest_code)
        except:
            return None
    
    except:
        return None

In [6]:
step_size = 1000

for j in range(46_000, 59_000, step_size):


    input_code = df.code[j:j+step_size]
    num_tasks = len(input_code)

    with multiprocess.Pool(24) as pool:
        results = list(tqdm(pool.imap(create_tests, input_code), total=num_tasks))


    results = [result for result in results if result is not None]
    df_results = pd.DataFrame(results)
    df_results.columns = ["original_code", "pytest_code"]
    df_results = df_results.dropna().reset_index(drop=True)
    # df_results["coverage"] = df_results.apply(lambda x: run_pytest(x.iloc[0], x.iloc[1])["coverage"], axis=1)



    def get_cov(row):
        if (row is None):
            return (None, None, None)

        code, pytest_code = row

        if (code is None) or (pytest_code is None):
            return (None, None, None)
        
        from adjustment_utils import run_pytest
        
        coverage = run_pytest(code, pytest_code, random_subdir=True)["coverage"]
        return (code, pytest_code, coverage)

    num_tasks = len(results)

    with multiprocess.Pool(24) as pool:
        results_cov = list(tqdm(pool.imap(get_cov, results), total=num_tasks))

    df_cov = pd.DataFrame(results_cov).dropna()
    df_cov.columns = ["original_code", "pytest_code", "coverage"]
    df_cov = df_cov.sort_values(by="coverage", ascending=False).reset_index(drop=True)
    df_cov.to_csv(f"{j}_{j + step_size}_dataset_results.csv", index=False)

    print(f"Saved {j}_{j + step_size}_dataset_results.csv with {df_cov[df_cov.coverage > 99].shape[0]} 100% coverage tests")

  0%|          | 0/1000 [00:00<?, ?it/s]

  0%|          | 0/966 [00:00<?, ?it/s]

Saved 46000_47000_dataset_results.csv with 737 100% coverage tests


  0%|          | 0/1000 [00:00<?, ?it/s]

  0%|          | 0/960 [00:00<?, ?it/s]

Saved 47000_48000_dataset_results.csv with 699 100% coverage tests


  0%|          | 0/1000 [00:00<?, ?it/s]

  0%|          | 0/952 [00:00<?, ?it/s]

Saved 48000_49000_dataset_results.csv with 709 100% coverage tests


  0%|          | 0/1000 [00:00<?, ?it/s]

  0%|          | 0/965 [00:00<?, ?it/s]

Saved 49000_50000_dataset_results.csv with 753 100% coverage tests


  0%|          | 0/1000 [00:00<?, ?it/s]

  0%|          | 0/941 [00:00<?, ?it/s]

Saved 50000_51000_dataset_results.csv with 728 100% coverage tests


  0%|          | 0/1000 [00:00<?, ?it/s]

  0%|          | 0/956 [00:00<?, ?it/s]

Saved 51000_52000_dataset_results.csv with 731 100% coverage tests


  0%|          | 0/1000 [00:00<?, ?it/s]

  0%|          | 0/956 [00:00<?, ?it/s]

Saved 52000_53000_dataset_results.csv with 716 100% coverage tests


  0%|          | 0/1000 [00:00<?, ?it/s]

  0%|          | 0/959 [00:00<?, ?it/s]

Saved 53000_54000_dataset_results.csv with 732 100% coverage tests


  0%|          | 0/1000 [00:00<?, ?it/s]

In [1]:
results

NameError: name 'results' is not defined

In [8]:
results

[]

In [8]:
from adjustment_utils import modify_pytest_code

code, test_result = results[0]

adjusted = modify_pytest_code(code, test_result["pytest_code"])

TypeError: cannot unpack non-iterable NoneType object

In [20]:
print(adjusted)

import sys
sys.path.insert(0, '../')
from source import extract_stresses_and_forces
import numpy as np

def test_extract_stresses_and_forces():
    fit_data = ...
    values = ...
    args = ...
    dft_forces, ip_forces, dft_stresses, ip_stresses = extract_stresses_and_forces(fit_data, values, args)
    assert np.allclose(dft_forces, ip_forces), 'Forces are not the same'


In [7]:
results

[TypeError("expected string or bytes-like object, got 'dict'"),
 TypeError("expected string or bytes-like object, got 'dict'")]

In [17]:
# [idx for idx, res in enumerate(results) if res and run_pytest(res[0], res[1])["coverage"] > 90]

In [11]:
results = [result for result in results if result is not None]
df_results = pd.DataFrame(results)
df_results.columns = ["original_code", "pytest_code"]
df_results = df_results.dropna().reset_index(drop=True)
# df_results["coverage"] = df_results.apply(lambda x: run_pytest(x.iloc[0], x.iloc[1])["coverage"], axis=1)
df_results

Unnamed: 0,original_code,pytest_code
0,"def extract_stresses_and_forces(fit_data, valu...",import pytest\nfrom source import extract_stre...
1,import torch\n\ndef mvdigamma(vec: torch.Float...,import torch\nimport pytest\nfrom source impor...
2,"import torch\n\ndef amplitude_to_db(x, ref=1.0...",import torch\nimport pytest\nfrom source impor...
3,def HHV_modified_Dulong(mass_fractions):\n ...,import pytest\nimport sys\nimport os\nsys.path...
4,"def intersect_ray_plane(o,w,p,n):\n \n #...",# source.py\n\nimport torch\n\nclass Ray:\n ...
5,"def HamCenter1D_Hamiltonian(t, u, PARAMETERS =...",import pytest\nfrom source import *\nimport nu...
6,"def update_means(means, X, perm_out):\n r\n...",import os\nimport pytest\nimport numpy as np\n...
7,"def luminance_newhall1943(V, **kwargs):\n \...","import sys\nsys.path.append(""."") # This will ..."
8,"import torch\n\ndef batchwise_cdist(samples1, ...",import pytest\nimport torch\nfrom torch.testin...
9,"def get_bandwidth(n, var_ret, var_noise, kerne...",# import the function to be tested\nfrom sourc...


In [12]:
# df_results.to_csv("3.5k_3.5k_dataset_results.csv", index=False)

In [15]:
def get_cov(row):
    if (row is None):
        return (None, None, None)

    code, pytest_code = row

    if (code is None) or (pytest_code is None):
        return (None, None, None)
    
    from adjustment_utils import run_pytest, modify_pytest_code
    
    coverage = run_pytest(code, pytest_code, random_subdir=True)["coverage"]
    return (code, pytest_code, coverage)

num_tasks = len(results)



# def modify_code(row):
#     if (row is None):
#         return (None, None, None)
#     from adjustment_utils import run_pytest, modify_pytest_code
#     code, pytest_code = row

#     try:
#         working_test = modify_pytest_code(code, pytest_code)
#     except Exception as e:
#         return code, e
    
# #     return code, working_test


# with multiprocess.Pool(24) as pool:
#     modified_results = list(tqdm(pool.imap(modify_code, results[:100]), total=num_tasks))


with multiprocess.Pool(24) as pool:
    results_cov = list(tqdm(pool.imap(get_cov, results), total=num_tasks))


  0%|          | 0/846 [00:00<?, ?it/s]

In [16]:
results_cov

[("def fail_safe(temperature, neutrons_produced_per_second, threshold):\n    \n\n    result = temperature * neutrons_produced_per_second\n\n    if result < threshold * 0.9:\n        return 'LOW'\n    if 0.9 * threshold <= result <= 1.1 * threshold:\n        return 'NORMAL'\n    return 'DANGER'",
  "# This is the test_source.py file\nimport pytest\nfrom source import Source\n\nclass TestSource:\n\n    def test_fail_safe_low(self):\n        assert Source.fail_safe(20, 50, 1000) == 'LOW'\n\n    def test_fail_safe_normal(self):\n        assert Source.fail_safe(50, 50, 1000) == 'NORMAL'\n\n    def test_fail_safe_danger(self):\n        assert Source.fail_safe(100, 50, 1000) == 'DANGER'",
  14.0),
 ('import torch\n\ndef psnr(original_img, resoluted_img):\n    \n\n    # fisrt computing th emse\n    resoluted_img = (resoluted_img * 255).round().clamp(0, 255) / 255\n    subs = original_img - resoluted_img\n    mse = subs.pow(2).mean([-3, -2, -1])\n\n    # formular for psnr (https://en.wikipedia.

In [20]:
df_cov = pd.DataFrame(results_cov).dropna()
df_cov.columns = ["original_code", "pytest_code", "coverage"]
df_cov = df_cov.sort_values(by="coverage", ascending=False).reset_index(drop=True)
df_cov[df_cov.coverage > 85].to_csv("5k_6k_dataset_results.csv", index=False)
# df_cov[df_cov.coverage > 95]

In [16]:
print(df_cov[df_cov.coverage > 90].iloc[1, 1])

# test_source.py
import pytest
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..')) # This line is to import the parent directory, where source.py is located
from source import calculate_acceleration

# Test 1: Check if function returns acceleration when input is a list and the correct number of elements are given
def test_calculate_acceleration1():
    result = calculate_acceleration([1, 2, 3, 4], 1)
    assert result == 20, "Failure: The function did not return the expected value"

# Test 2: Check if function raises a TypeError when the input is not a list
def test_calculate_acceleration2():
    with pytest.raises(TypeError):
        result = calculate_acceleration("string", 1)

# Test 3: Check if function raises a ValueError when the list does not contain enough elements
def test_calculate_acceleration3():
    with pytest.raises(ValueError):
        result = calculate_acceleration([1], 1)

# Test 4: Check if function raises a ValueError when the list c

In [13]:
df_results.pytest_code.isna().sum()

0

In [17]:
idx = 1200

print(results[idx][0])

def time_to_sample_number(seconds, frequency):
    
    return seconds * frequency + 0.5


In [18]:
print(results[idx][1])

# Import the function to test
from source import time_to_sample_number

# Define a test case
def test_time_to_sample_number():
    # Test with known input
    assert time_to_sample_number(10, 2) == 30
    # Test with another known input
    assert time_to_sample_number(5, 3) == 15
    # Test with zero frequency
    assert time_to_sample_number(10, 0) == 0
    # Test with negative frequency
    assert time_to_sample_number(10, -2) == -5


In [19]:
run_pytest(results[idx][0], results[idx][1])

{'coverage': 100.0,
 'stderr': '',
 'failed_assertions': 1,
 'fails': 'FAILED test_source.py::test_time_to_sample_number - assert 20.5 == 30'}

In [32]:
code = results[idx][0]
test_result = results[idx][1]

from adjustment_utils import modify_pytest_code

adjusted = modify_pytest_code(code, test_result)

In [33]:
run_pytest(results[idx][0], adjusted)

{'coverage': 100.0,
 'stderr': '',
 'failed_assertions': 0,
 'fails': ''}

In [61]:
print(adjusted)

from source import scale_from_internal
import pytest

def test_scale_from_internal():
    with pytest.raises(TypeError):
        assert scale_from_internal([1, 2, 3], 2, 1) == [2, 4, 5]
    assert scale_from_internal([4, 5, 6], None, 3) == [7, 8, 9]
    assert scale_from_internal([7, 8, 9], 1, None) == [8, 9, 10]


In [29]:
all = pd.concat([pd.read_csv(f"generated/{file}") for file in os.listdir("generated")]).reset_index(drop=True)

In [30]:
all#[all.coverage]

Unnamed: 0,original_code,pytest_code,coverage
0,"def overlap_with(intervals, start, end):\n ...",# Let's assume the source file is named 'sourc...,100.0
1,"def compute_loss(criterion, outputs, labels, b...","import torch\n\ndef compute_loss(criterion, ou...",100.0
2,def compute_amp_fraction(df_shape_features):\n...,# test_source.py\n\nimport pytest\nfrom source...,100.0
3,import torch\n\ndef gradient_to_contrastive_ex...,import pytest\nfrom source import gradient_to_...,100.0
4,"def _mask_border_keypoints(image_shape, keypoi...",import pytest\nimport numpy as np\nfrom source...,100.0
...,...,...,...
1519,"def demand_response_resource_capacity(df, reso...",import pytest\nimport pandas as pd\nfrom sourc...,0.0
1520,"def maml_inner_step(input, output, model, opti...","import pytest\nimport sys\nsys.path.append("".""...",0.0
1521,"def net(f, c='close', o = 1):\n r\n new_...",import pytest\nimport pandas as pd\nfrom sourc...,0.0
1522,"def vapour_pressure_deficit(svp, vp):\n r\n...","import sys\nsys.path.append(""."")\nimport sourc...",0.0


In [37]:
pytest_code = """import pytest
from torch import tensor
import numpy as np
from source import amplitude_to_db # imports the code from source file

@pytest.fixture(scope="module")
def x():
    return tensor([1., 2., 3.])

@pytest.fixture(scope="module")
def ref():
    return 1.

@pytest.fixture(scope="module")
def amin():
    return 1e-7

# test the function with a tensor input for x and a scalar input for ref and amin
def test_amplitude_to_db_tensor_ref_scalar_amin(x, ref, amin):
    output = amplitude_to_db(x, ref=ref, amin=amin)
    expected_output = tensor([-inf, -inf, -inf])  # since x is always positive and ref=1.0
    assert np.allclose(output, expected_output) 
    
# test the function with a scalar input for x and a tensor input for ref and amin
def test_amplitude_to_db_scalar_ref_tensor_amin(x, ref, amin):
    output = amplitude_to_db(x.item(), ref=torch.tensor(ref), amin=amin)
    expected_output = tensor([-inf, -inf, -inf])  # since x is always positive and ref=1.0
    assert np.allclose(output, expected_output) 
    
# test the function with a tensor input for x and a scalar input for ref but tensor input for amin
def test_amplitude_to_db_tensor_ref_scalar_amin_tensor(x, ref, amin):
    output = amplitude_to_db(x, ref=ref, amin=torch.tensor(amin))
    expected_output = tensor([-inf, -inf, -inf])  # since x is always positive and ref=1.0
    assert np.allclose(output, expected_output) 
    
# test the function with a scalar input for x and a tensor input for both ref and amin
def test_amplitude_to_db_scalar_ref_tensor_amin_tensor(x, ref, amin):
    output = amplitude_to_db(x.item(), ref=torch.tensor(ref), amin=torch.tensor(amin))
    expected_output = tensor([-inf, -inf, -inf])  # since x is always positive and ref=1.0
    assert np.allclose(output, expected_output) 
    
# test the function with a tensor input for all arguments
def test_amplitude_to_db_tensor_ref_tensor_amin_tensor(x, ref, amin):
    output = amplitude_to_db(x.item(), ref=torch.tensor(ref), amin=torch.tensor(amin))
    expected_output = tensor([-inf, -inf, -inf])  # since x is always positive and ref=1.0
    assert np.allclose(output, expected_output) 
    
# test the function with a tensor input for all arguments but different types of ref and amin inputs
def test_amplitude_to_db_tensor_ref_float_amin_tensor(x, ref):
    output = amplitude_to_db(x.item(), ref=ref.numpy().astype(np.float32), amin=torch.tensor(1e-7))
    expected_output = tensor([-inf, -inf, -inf])  # since x is always positive and ref=1.0
    assert np.allclose(output, expected_output) 
    """


code = """import torch

def amplitude_to_db(x, ref=1.0, amin=1e-7):
    
    x = x.pow(2.)
    x = torch.clamp(x, min=amin)
    return 10.0 * (torch.log10(x) - torch.log10(torch.tensor(ref,
                                                             device=x.device,
                                                             requires_grad=False,
                                                             dtype=x.dtype)))"""


run_pytest(code, pytest_code)

{'coverage': 100.0,
 'stderr': '',
 'failed_assertions': 6,
 'fails': "FAILED test_source.py::test_amplitude_to_db_tensor_ref_scalar_amin - NameError: name 'inf' is not defined\nFAILED test_source.py::test_amplitude_to_db_scalar_ref_tensor_amin - RuntimeError: a Tensor with 3 elements cannot be converted to Scalar\nFAILED test_source.py::test_amplitude_to_db_tensor_ref_scalar_amin_tensor - NameError: name 'torch' is not defined\nFAILED test_source.py::test_amplitude_to_db_scalar_ref_tensor_amin_tensor - RuntimeError: a Tensor with 3 elements cannot be converted to Scalar\nFAILED test_source.py::test_amplitude_to_db_tensor_ref_tensor_amin_tensor - RuntimeError: a Tensor with 3 elements cannot be converted to Scalar\nFAILED test_source.py::test_amplitude_to_db_tensor_ref_float_amin_tensor - RuntimeError: a Tensor with 3 elements cannot be converted to Scalar"}

In [29]:
from adjustment_utils import modify_pytest_code

In [30]:
code, pytest_code = results[1]

In [31]:
print(modify_pytest_code(code, pytest_code))

import pytest
import numpy as np
from source import HamCenter1D_Hamiltonian

def test_HamCenter1D_Hamiltonian():
    t = 1
    u = np.array([1, 2])
    PARAMETERS = [1]
    result = HamCenter1D_Hamiltonian(t, u, PARAMETERS)
    assert np.isclose(result, 0.5 * 1 * (2 * 2 + 1 * 1)), 'The function HamCenter1D_Hamiltonian did not return the expected result'
if __name__ == '__main__':
    test_HamCenter1D_Hamiltonian()


In [34]:
print(pytest_code)

# test_HamCenter1D_Hamiltonian.py
import pytest
import numpy as np
from source import HamCenter1D_Hamiltonian

def test_HamCenter1D_Hamiltonian():
    # unit test case
    t = 1  # time
    u = np.array([1, 2])  # position
    PARAMETERS = [1]  # Hamiltonian Model Parameter
    result = HamCenter1D_Hamiltonian(t, u, PARAMETERS)
    assert np.isclose(result, 0.5*1*(2*2 + 1*1)), "The function HamCenter1D_Hamiltonian did not return the expected result"

if __name__ == "__main__":
    test_HamCenter1D_Hamiltonian()
