In [2]:
import glob
import ast
import re
import os
import numpy as np

import tiktoken
import editdistance

from itertools import chain
# from transformers import AutoTokenizer

# tokenizer = AutoTokenizer.from_pretrained("openlm-research/code-llama-13b")

def extract_solution(fname):
    with open(fname) as f:
        content = f.read()

    delimiter_ids = []
    delimiter = "# ---------------------------------"
    lines = content.split("\n")
    for l_i, l in enumerate(lines):
        if l.startswith(delimiter):
            delimiter_ids.append(l_i)
    assert len(delimiter_ids) == 2
    s, e = delimiter_ids
    fixed_solution = "\n".join(lines[s+1:e])

    return fixed_solution


class CodeProcessor(ast.NodeTransformer):
    def __init__(self, imported_packages):
        self.var_counter = 0
        self.var_mapping = {}
        self.imported_packages = imported_packages

    def visit_FunctionDef(self, node):
        node.returns = None

        # Anonymize variable names in function definition
        self.anonymize_funcid(node)
        node.args.args = [self.anonymize_arg(arg) for arg in node.args.args]
        node.body = [self.visit(child) for child in node.body]

        return node

    def visit_Name(self, node):
        # Anonymize variable names in function code
        if isinstance(node.ctx, ast.Store):
            return self.anonymize_name(node)
        
        if isinstance(node.ctx, ast.Load):
            return self.anonymize_name(node)

    def visit_arg(self, node):
        # Remove type annotations from function arguments
        node.annotation = None
        return node

    def visit_Return(self, node):
        # Remove type annotations from return value
        node.value = self.visit(node.value)
        return node

    def anonymize_funcid(self, node):
        if node.name in self.imported_packages:
            return node
        if node.name not in self.var_mapping:
            self.var_mapping[node.name] = f'var{self.var_counter}'
            self.var_counter += 1
        node.name = self.var_mapping[node.name]
        return node

    def anonymize_name(self, node):
        if node.id in self.imported_packages:
            return node
        if node.id not in self.var_mapping:
            self.var_mapping[node.id] = f'var{self.var_counter}'
            self.var_counter += 1
        node.id = self.var_mapping[node.id]
        return node
        
    def anonymize_arg(self, arg):
    
        if arg.arg not in self.var_mapping:
            self.var_mapping[arg.arg] = f'var{self.var_counter}'
            self.var_counter += 1
        arg.arg = self.var_mapping[arg.arg]
        arg.annotation = None
        return arg

def extract_imports(code):
    import_pattern = re.compile(r'import\s+(\w+(?:\.\w+)*)')
    from_import_pattern = re.compile(r'from\s+(\w+(?:\.\w+)*)\s+import\s+')

    imports = []
    
    # Extract 'import' statements
    for match in import_pattern.finditer(code):
        package = match.group(1)
        imports.append(package)
    
    # Extract 'from ... import' statements
    for match in from_import_pattern.finditer(code):
        package = match.group(1)
        imports.append(package)

    return imports

def process_code_ast(code):

    tree = ast.parse(code)
    imported_packages = extract_imports(code)

    processor = CodeProcessor(imported_packages)
    processed_tree = processor.visit(tree)
    processed_code = ast.unparse(processed_tree)
    return processed_code


def tokenize_code(code):
    # Load the "code llama" tokenizer
    global tokenizer

    # Tokenize the code string
    tokens = tokenizer.tokenize(code)

    return tokens

def tokenize_openai(code):
    encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
    return encoding.encode(code)


def dir_listing(dirs):
    all_dirs = []
    for d in dirs:
        dirs_of_d = os.listdir(d)
        dirs_of_d = [os.path.join(d, x) for x in dirs_of_d if not x.startswith(".")]
        all_dirs.extend(dirs_of_d)
    return all_dirs


def dedup_single_update(update_dir):
    # sub_dirs = os.listdir(update_dir)
    # sub_dirs = [os.path.join(update_dir, x) for x in sub_dirs if not x.startswith(".")]
    # update_files = [os.path.join(x, "ref_solution.py") for x in sub_dirs]
    update_files = dir_listing(dir_listing([update_dir]))

    orig_codes = []
    prog_codes = []
    for fname in update_files:
        solution = extract_solution(fname)
        orig_codes.append(solution)
        try:
            proc_solution = process_code_ast(solution)
        except:
            proc_solution =  solution

        prog_codes.append(proc_solution)

    similarity_pairs = []
    num_prog = len(prog_codes)
    for i in range(num_prog):
        file_a, orig_a, prog_a = update_files[i], orig_codes[i], prog_codes[i]
        tok_prog_a = tokenize_openai(prog_a)
        for j in range(i + 1, num_prog):
            file_b, orig_b, prog_b = update_files[j], orig_codes[j], prog_codes[j]
            tok_prog_b = tokenize_openai(prog_b)

            similarity_pairs.append({
                    "file_a": file_a,
                    "orig_a": orig_a,
                    "proc_a": prog_a,
                    "file_b": file_b,
                    "orig_b": orig_b,
                    "proc_b": prog_b,
                    "distance": editdistance.eval(tok_prog_a, tok_prog_b)
                })

    return similarity_pairs




In [None]:

THRESHOLD = 5 # make a threshold
target_dedup_dir = "/u/zliu/tool-KE/data/prelim/dedup_Xi"
update_dirs = dir_listing(dir_listing(dir_listing([target_dedup_dir])))

# handle update one by one
all_sim_pairs = []
for update_dir in update_dirs:
    all_sim_pairs.extend(dedup_single_update(update_dir))
all_sim_pairs = sorted(all_sim_pairs, key=lambda x: x["distance"])

In [None]:
sample_ids = np.arange(0, len(all_sim_pairs))
np.random.shuffle(sample_ids)
sample_ids = sample_ids[:50]

In [None]:

print(len(all_sim_pairs))
from scipy.stats import describe
print(describe([p["distance"] for p in all_sim_pairs]))
median_edit_dist = np.median(([p["distance"] for p in all_sim_pairs]))

print(np.median(([p["distance"] for p in all_sim_pairs])))

In [None]:
sampled_sim_pairs = [all_sim_pairs[i] for i in sample_ids]

In [None]:
for p in sampled_sim_pairs[:10]:
        print("-" * 10)
        print("Dist", p["distance"])
        print("Prog A:", p["file_a"])
        # print("ORIGINAL")
        # print(p["orig_a"])
        print("CANONICALIZED")
        print(p["proc_a"])
        print("Prog B:", p["file_b"])
        # print("ORIGINAL")
        # print(p["orig_b"])
        print("CANONICALIZED")
        print(p["proc_b"])

In [None]:
for p in all_sim_pairs[:]:
        if p["distance"] >= 25:
                continue
        print("-" * 10)
        print("Dist", p["distance"])
        print("Prog A:", p["file_a"])
        # print("ORIGINAL")
        # print(p["orig_a"])
        print("CANONICALIZED")
        print(p["proc_a"])
        print("Prog B:", p["file_b"])
        # print("ORIGINAL")
        # print(p["orig_b"])
        print("CANONICALIZED")
        print(p["proc_b"])

In [None]:
import matplotlib.pyplot as plt
num_ps_distri = np.array([p["distance"] for p in all_sim_pairs if p["distance"] >= 25 ])
values, bins, bars = plt.hist(num_ps_distri, bins=50, ec="k", rwidth=1)

# num_ps_distri = np.array(unit_tests_pass_w_updates)
# values, bins, bars = plt.hist(num_ps_distri, ec="k")
# plt.xticks(np.arange(num_ps_distri.min(), num_ps_distri.max()+1, 20))
import seaborn as sns
sns.set_palette("tab10")
# plt.bar_label(bars)
plt.xlabel("Edit Distance")
plt.ylabel("Count")
plt.title("Edit Distance of Canonicalized Solutions")
plt.savefig("<save dir>")

In [None]:
values

In [None]:
threshold = 25
from collections import Counter, defaultdict
len([p["distance"] for p in all_sim_pairs if p['distance'] < threshold])

In [None]:
lower = 14
upper = lower + 10

for p in [p for p in all_sim_pairs if lower < p['distance'] < upper]:
    prog_syn_id_a = "/".join(p["file_a"].split("/")[-5:-1])
    specific_update_id_a = "/".join(p["file_a"].split("/")[-5:-2])
    
    print("-" * 10)
    print("Dist", p["distance"])
    print("Prog A:", p["file_a"])
    # print("ORIGINAL")
    # print(p["orig_a"])
    print("CANONICALIZED")
    print(p["proc_a"])
    prog_syn_id_b = "/".join(p["file_b"].split("/")[-5:-1])
    specific_update_id_b = "/".join(p["file_b"].split("/")[-5:-2])
    assert prog_syn_id_a != prog_syn_id_b
    assert specific_update_id_a == specific_update_id_b
    print("Prog B:", p["file_b"])
    # print("ORIGINAL")
    # print(p["orig_b"])
    print("CANONICALIZED")
    print(p["proc_b"])
print(len([p for p in all_sim_pairs if lower < p['distance'] < upper]))

In [None]:
semifinal_data_root = "/u/zliu/tool-KE/data/prelim/CodeUpdateArena-after-PS"
all_update_paths = list(glob.glob(f"{semifinal_data_root}/**/update-content-w_ut.json", recursive=True))
update2ps = {}
PS_FILE_NAME = "prog_syn-content-w_ut.json"
U_FILE_NAME = "update-content-w_ut.json"

for update_path in all_update_paths:
    update_dir = os.path.dirname(update_path)
    specific_update_id = "/".join(update_dir.split("/")[-3:])
    api, update_type, _ = specific_update_id.split("/")
    
    update_ps_paths = list(glob.glob(f"{update_dir}/**/{PS_FILE_NAME}", recursive=True))
    update2ps[specific_update_id] = update_ps_paths


In [None]:
sum(len(vs) for vs in update2ps.values())

In [None]:
# human min: 24 
from collections import defaultdict, Counter
threshold = 25
a = [p for p in all_sim_pairs if p['distance'] < threshold]
np.random.shuffle(a)
dup_update2ps_graph = defaultdict(lambda: defaultdict(set))
dup_update2ps_pairs = defaultdict(set)
dup_update2ps = defaultdict(set)
edit_distance_df = []

for p in a:
    prog_syn_id_a = "/".join(p["file_a"].split("/")[-5:-1])
    specific_update_id_a = "/".join(p["file_a"].split("/")[-5:-2])
    
    # print("-" * 10)
    # print("Dist", p["distance"])
    # print("Prog A:", p["file_a"])
    # print("ORIGINAL")
    # print(p["orig_a"])
    # print("CANONICALIZED")
    # print(p["proc_a"])
    prog_syn_id_b = "/".join(p["file_b"].split("/")[-5:-1])
    specific_update_id_b = "/".join(p["file_b"].split("/")[-5:-2])
    assert prog_syn_id_a != prog_syn_id_b
    assert specific_update_id_a == specific_update_id_b
    # print("Prog B:", p["file_b"])
    # print("ORIGINAL")
    # print(p["orig_b"])
    # print("CANONICALIZED")
    # print(p["proc_b"])

    # if prog_syn_id_a in dup_update2ps[specific_update_id_a] or prog_syn_id_b in dup_update2ps[specific_update_id_a][prog_syn_id_a]:
    #     # assert prog_syn_id_b not in dup_update2ps[specific_update_id_a]
    #     dup_update2ps[specific_update_id_a][prog_syn_id_a].add(prog_syn_id_b)
    # elif prog_syn_id_b in dup_update2ps[specific_update_id_a]:
    #     dup_update2ps[specific_update_id_a][prog_syn_id_b].add(prog_syn_id_a)
    # else:
    #     dup_update2ps[specific_update_id_a][prog_syn_id_a].add(prog_syn_id_b)
    dup_update2ps_graph[specific_update_id_a][prog_syn_id_a].add(prog_syn_id_b)
    dup_update2ps[specific_update_id_a].add(prog_syn_id_a)
    dup_update2ps[specific_update_id_a].add(prog_syn_id_b)
    dup_update2ps_pairs[specific_update_id_a].add((prog_syn_id_a, prog_syn_id_b, p["distance"]))
    # dup_update2ps[specific_update_id_a][prog_syn_id_b]
    # dup_update2ps[specific_update_id_a].add(prog_syn_id_b)
# dup_update2ps = {k: list(v) for k, v in dup_update2ps.items()}
print("original #PS:")
num_ps2count = dict(Counter(len(update2ps[specific_update]) for specific_update in dup_update2ps_pairs.keys()))
print(num_ps2count)
ps_threshold = 5
print(f"Count(#PS > {ps_threshold})")
print(sum(v for k, v in num_ps2count.items() if k > ps_threshold))
print(f"Count(#PS <= {ps_threshold})")
print(sum(v for k, v in num_ps2count.items() if k <= ps_threshold))

import itertools
dedup_update2ps_graph = {}
count_below_x = 0
x = 2
non_dup_update = 0
update2dedup_count = {}
update2dedup_ps = {}
dup_update2dedup_count = {}
dup_update2original_count = {}
update2new_ps = {}
dup_update2dedup_ps = {}
legal_dup_distances = []
for specific_update_id, dup_ps_pairs in dup_update2ps_pairs.items():
    ps = ["/".join(p.split("/")[-5:-1]) for p in update2ps[specific_update_id]]
    assert len(set(ps)) == len(ps)
    ps = set(ps)
    dup_update2original_count[specific_update_id] = len(ps)
    unique_dup_ps = set(itertools.chain(*[[a, b] for a,b,d in dup_ps_pairs]))
    dup_update2ps[specific_update_id] = unique_dup_ps
    
    dup_update2dedup_count[specific_update_id] = len(ps) - len(unique_dup_ps)
    update2dedup_ps[specific_update_id] = dup_update2dedup_ps[specific_update_id] = ps - unique_dup_ps
    
    if len(update2dedup_ps[specific_update_id]) == 0 or len(update2dedup_ps[specific_update_id]) > 2:
        continue
    
    sorted_by_distance = sorted(dup_ps_pairs, key=lambda x: x[-1])
    p_1, p_2, distance = sorted_by_distance[-1]
    legal_dup_distances.append(distance)
    
    update2dedup_ps[specific_update_id] |= set([p_1, p_2])
    
print(f"Total duplicate #PS: {sum(len(v) for v in update2dedup_ps.values())}")

for specific_update_id, ps in update2ps.items():
    if specific_update_id in update2dedup_ps:
        continue
    update2dedup_ps[specific_update_id] = ["/".join(p.split("/")[-5:-1]) for p in ps]
update2dedup_ps = {k: vs for k, vs in update2dedup_ps.items() if len(vs) > 0}
print(f"#Update After: {len(update2dedup_ps)}")
print(f"Total #PS after dedup {sum([len(vs) for vs in update2dedup_ps.values()])}")
print(f"#PS demo before dedupe: {Counter([len(vs) for vs in update2ps.values()])}")
print(f"#PS demo after dedupe: {Counter([len(vs) for vs in update2dedup_ps.values()])}")
import matplotlib.pyplot as plt
num_ps_distri = np.array([len(vs) for vs in update2dedup_ps.values()])
values, bins, bars = plt.hist(num_ps_distri, ec="k", rwidth=1)

# num_ps_distri = np.array(unit_tests_pass_w_updates)
# values, bins, bars = plt.hist(num_ps_distri, ec="k")
# plt.xticks(np.arange(num_ps_distri.min(), num_ps_distri.max()+1, 20))

plt.bar_label(bars)
plt.xlabel("Pass w. Update (during generation)")
plt.ylabel("Count")
plt.title("Pass w. Update  distribution")

In [None]:
import pandas as pd

df = []

# for update, ps_ids in update2dedup_ps.items():
#     api, update_type, _ = update.split("/")
#     pacakge = api.split(".")[0]
#     if len(update_type.split("-")) == 2:
#         action, place = update_type.split("-")
#         aspect = None
#     else:
#         assert len(update_type.split("-")) == 3
#         action, place, aspect = update_type.split("-")
    
#     df.append({
#         "pacakge": pacakge,
#         "api": api,
#         "update_type": update_type,
#         "[action]": action,
#         "[locus]": place,
#         "[aspect]": aspect,
#         "[locus]-[aspect]": f"{place}-{aspect}",
#         "#PS": len(ps_ids)
#     })
# df = pd.DataFrame(df)
all_ps_paths = glob.glob(f"<save root for prog syn>/**/{PS_FILE_NAME}", recursive=True)
all_ps_paths[0]

In [None]:
all_ps_paths[0].split("/")[-5:-1]

In [None]:

import json
import tiktoken

def is_testing_try_catch(unit_test):
    return all(x in unit_test for x in ["try:",  "except"])

progsyn_df = []
gpt4_tokenizer = tiktoken.encoding_for_model("gpt-4")

all_ps_paths = glob.glob(f"/u/zliu/tool-KE/data/prelim/CodeUpdateArena-after-dedup/**/{PS_FILE_NAME}", recursive=True)
for ps_path in all_ps_paths:
    api, update_type, _, _ = ps_path.split("/")[-5:-1]
    update_id = "/".join(ps_path.split("/")[-5:-2])
    progsyn_id = "/".join(ps_path.split("/")[-5:-1])
    
    update_path = "/".join(ps_path.split("/")[:-2]) + "/" + U_FILE_NAME
    assert os.path.exists(update_path)
    
    pacakge = api.split(".")[0]
    if len(update_type.split("-")) == 2:
        action, place = update_type.split("-")
        aspect = None
    else:
        assert len(update_type.split("-")) == 3
        action, place, aspect = update_type.split("-")
    ps_content = json.load(open(ps_path, "r"))
    update_content = json.load(open(update_path, "r"))
    
    progsyn_df.append({
        "update_id": update_id,
        "progsyn_id": progsyn_id,
        "package": pacakge,
        "api": api,
        "update_type": update_type,
        "[action]": action,
        "[locus]": place,
        "[aspect]": aspect,
        "[locus]-[aspect]": f"{place}-{aspect}",
        "#token(update docstring)": len(gpt4_tokenizer.encode(update_content["update_docstring"])),
        # "#token(update implementation)": len(gpt4_tokenizer.encode(update_content["new_impl"])),
        # "Avg. #token(update unit test)": np.mean([len(gpt4_tokenizer.encode(u)) for u in update_content["unit_tests"] if not is_testing_try_catch(u)]),
        # "#update_unit_tests": len([u for u in update_content["unit_tests"] if not is_testing_try_catch(u)]),
        "#token(scenario)": len(gpt4_tokenizer.encode(ps_content["scenario"])),
        "#token(problem)": len(gpt4_tokenizer.encode(ps_content["problem"])),
        "#token(reference solution)": len(gpt4_tokenizer.encode(ps_content["solution_new"])),
        "Avg. #token(prog_syn unit test)": np.mean([len(gpt4_tokenizer.encode(u)) for u in ps_content["unit_tests"] if not is_testing_try_catch(u)]),
        "#progsyn_unit_tests": len([u for u in ps_content["unit_tests"] if not is_testing_try_catch(u)]),
    })
progsyn_df = pd.DataFrame(progsyn_df)
progsyn_df.to_csv("<save dir>/prog-syn-summary.csv")

In [None]:
import matplotlib.pyplot as plt
num_ps_distri = np.array([int(x) for x in progsyn_df.groupby("update_id").describe()["#token(scenario)"]["count"].to_list()])
values, bins, bars = plt.hist(num_ps_distri, ec="k", rwidth=1)

# num_ps_distri = np.array(unit_tests_pass_w_updates)
# values, bins, bars = plt.hist(num_ps_distri, ec="k")
# plt.xticks(np.arange(num_ps_distri.min(), num_ps_distri.max()+1, 20))

plt.bar_label(bars)
plt.xlabel("#PS")
plt.ylabel("Count")
plt.title("#PS / update")
plt.savefig("/u/zliu/tool-KE/plot_neurips/histogram-ps-per-update.pdf")

In [None]:

import json
import tiktoken

def is_testing_try_catch(unit_test):
    return all(x in unit_test for x in ["try:",  "except"])

update_df = []
gpt4_tokenizer = tiktoken.encoding_for_model("gpt-4")

all_update_paths = glob.glob(f"/u/zliu/tool-KE/data/prelim/CodeUpdateArena-after-dedup/**/{U_FILE_NAME}", recursive=True)
for update_path in all_update_paths:
    api, update_type, _ = update_path.split("/")[-4:-1]
    update_id = "/".join(update_path.split("/")[-4:-1])
    
    pacakge = api.split(".")[0]
    if len(update_type.split("-")) == 2:
        action, place = update_type.split("-")
        aspect = None
    else:
        assert len(update_type.split("-")) == 3
        action, place, aspect = update_type.split("-")
    update_content = json.load(open(update_path, "r"))
    update_df.append({
        "update_id": update_id,
        "package": pacakge,
        "api": api,
        "update_type": update_type,
        "[action]": action,
        "[locus]": place,
        "[aspect]": aspect,
        "[locus]-[aspect]": f"{place}-{aspect}",
        "#token(update docstring)": len(gpt4_tokenizer.encode(update_content["update_docstring"])),
        "#token(update implementation)": len(gpt4_tokenizer.encode(update_content["new_impl"])),
        "Avg. #token(update unit test)": np.mean([len(gpt4_tokenizer.encode(u)) for u in update_content["unit_tests"]]),
        "#update_unit_tests": len([u for u in update_content["unit_tests"]]),
        # "#token(scenario)": len(gpt4_tokenizer.encode(ps_content["scenario"])),
        # "#token(problem)": len(gpt4_tokenizer.encode(ps_content["problem"])),
        # "#token(reference solution)": len(gpt4_tokenizer.encode(ps_content["solution_new"])),
        # "Avg. #token(prog_syn unit test)": np.mean([len(gpt4_tokenizer.encode(u)) for u in ps_content["unit_tests"] if not is_testing_try_catch(u)]),
        # "#progsyn_unit_tests": len([u for u in ps_content["unit_tests"] if not is_testing_try_catch(u)]),
    })
update_df = pd.DataFrame(update_df)
update_df.to_csv("/u/zliu/tool-KE/tables/update-summary.csv")

In [None]:
import itertools
from scipy.stats import describe
from collections import Counter
describe(list(Counter([p for p, a in update_df.groupby(["package", "api"]).describe().index]).values()))

In [None]:
update_df["#update_unit_tests"]

In [None]:
df["#token(reference solution)"].describe()

In [None]:
df.groupby("api").describe()["#token(update docstring)"]["count"].describe()

In [None]:
labels, sizes = list(zip(*Counter(df["update_type"]).items()))
palette_color = sns.color_palette("colorblind") 
  
# plotting data on chart 
plt.pie(sizes, labels=labels, colors=palette_color, autopct='%.0f%%')
plt.savefig("/u/zliu/tool-KE/plot_neurips/pie-chart-by-update-type.pdf")
plt.show()

In [None]:
labels, sizes = list(zip(*Counter(df["package"]).items()))
palette_color = sns.color_palette("colorblind") 
  
# plotting data on chart 
plt.pie(sizes, labels=labels, colors=palette_color, autopct='%.0f%%')
plt.savefig("/u/zliu/tool-KE/plot_neurips/pie-chart-by-package.pdf")
plt.show()

In [None]:
describe(legal_dup_distances)

In [None]:
np.std(legal_dup_distances)

In [None]:
import json
ps2unit_tests = {}
def is_testing_try_catch(unit_test):
    return all(x in unit_test for x in ["try:",  "except"])
        
update2try = defaultdict(set)
n_test_threshold = 3

for specific_update_id, dedup_ps in update2dedup_ps.items():
    for ps_id in dedup_ps:
        
        assert os.path.exists(f"{semifinal_data_root}/{ps_id}/{PS_FILE_NAME}")
        ps_content = json.load(open(f"{semifinal_data_root}/{ps_id}/{PS_FILE_NAME}", "r"))
        unit_tests = ps_content["unit_tests"]
        pass_w_update = ps_content["unit_tests_pass_w_update"]
        trimmed_unit_tests = [unit_tests[int(i)] for i, pass_flag in pass_w_update.items() if pass_flag]
        # remove unit tests that are testing try catch
        trimmed_unit_tests = [u for u in trimmed_unit_tests if not is_testing_try_catch(u)]
        if len(trimmed_unit_tests) < n_test_threshold:
            update2try[specific_update_id].add(ps_id)
        ps2unit_tests[ps_id] = trimmed_unit_tests
        pass

In [None]:
print(f"#UnitTest demographics {Counter(len(vs) for vs in ps2unit_tests.values())}")
print(f"#PS removed {sum(len(vs) for vs in update2try.values())}")

In [None]:
# How many updates are we left with if we remove those things
update2detry = {}
for specific_update_id, dedup_ps in update2dedup_ps.items():
    try_ps = update2try[specific_update_id]
    assert len(set(dedup_ps)) == len(dedup_ps)
    update2detry[specific_update_id] = set(dedup_ps) - try_ps
print(f"#Update removed: {sum(len(vs) < 3 for vs in update2detry.values())}")

In [None]:
def all_pass(pass_dict):
    return all(pass_dict.values())
def accuracy(pass_dict):
    return np.mean(list(pass_dict.values()))
def pass_at_k(n: int, c: int, k: int):
    """
    :param n: total number of samples
    :param c: number of correct samples
    :param k: k in pass@$k$
    """
    if n - c < k: return 1.0 
    return 1.0 - np.prod(1.0 - k / np.arange(n - c + 1, n + 1))

In [None]:
accuracy(test_reports[1].pass_w_update)

In [None]:
accuracy(test_reports[1].pass_w_update)

In [None]:
all_ps_after_dedup = set(itertools.chain(*update2dedup_ps.values()))

In [None]:
len(all_ps_after_dedup)

In [None]:
def aggregate_test_reports_wo_trycatch(test_reports, exclude_trycatch=False, min_unit_tests=None):
    c_old_excl = 0
    c_new_excl = 0
    c_unsolved = 0
    c_old = 0
    c_new = 0

    n = len(test_reports)

    accuracies = []
    affected = []
    ids = []
    if exclude_trycatch:
        ids = [idx for idx, contain_try_catch in enumerate([is_testing_try_catch(u) for u in test_reports[0].unit_tests.values()]) if contain_try_catch]
        
    if exclude_trycatch and \
        min_unit_tests is not None and \
            (len(test_reports[0].unit_tests.values()) - len(ids) < min_unit_tests):
        return None
    
    from copy import deepcopy
    for test_report in test_reports:
        pass_w_update = deepcopy(test_report.pass_w_update)
        pass_wo_update = deepcopy(test_report.pass_wo_update)
        if exclude_trycatch:
            for idx in ids:
                del pass_w_update[idx]
                del pass_wo_update[idx]
        
        c_old_excl += all_pass(pass_wo_update) and not all_pass(pass_w_update)
        c_new_excl += all_pass(pass_w_update) and not all_pass(pass_wo_update)
        c_old += all_pass(pass_wo_update)
        c_new += all_pass(pass_w_update)
        
        affected.append(sum(pass_w_update.values()) != sum(pass_wo_update.values()))
        accuracies.append(accuracy(pass_w_update))
        
    
    c_unsolved = n - c_old_excl - c_new_excl
    ret = {"n_test": len(test_reports[0].unit_tests) - len(ids)}
    ret[f"unsolved (%)"] = c_unsolved / n * 100
    for k in [1,2,5]:
        # ret[f"{prefix}_pass@{k}"] = np.nan if n == 0 or c > n else pass_at_k(n, c, k)
        ret[f"pass@{k}(new)"] = np.nan if n == 0 or c_new > n else pass_at_k(n, c_new, k) * 100
        ret[f"UPass@{k}"] = np.nan if n == 0 or c_new_excl > n else pass_at_k(n, c_new_excl, k) * 100
        
        ret[f"pass@{k}(old)"] = np.nan if n == 0 or c_old > n else pass_at_k(n, c_old, k) * 100

        ret[f"pass@{k}(old excl.)"] = np.nan if n == 0 or c_old_excl > n else pass_at_k(n, c_old_excl, k) * 100
    ret["affected"] = np.mean(affected) * 100
    ret["accuracies"] = np.mean(accuracies) * 100
    return ret

In [None]:
import pickle
import pandas as pd
["gpt-4", "deepseek-coder-7b-instruct-v1.5", "CodeLlama-7b-Instruct-hf", "deepseek-coder-6.7b-instruct"]
# based on update2detry recalculate GPT-4 results
prepend_result_root = "/u/zliu/tool-KE/evaluation_output/prepend_n=5"
REPORT_FILE_NAME = "test_reports.pkl"
model_name = "gpt-4"
all_report_paths = list(glob.glob(f"{prepend_result_root}/**/{model_name}/{REPORT_FILE_NAME}", recursive=True))


def recal_df(report_paths, dedup=False, ps_after_dedup=set(), exclude_trycatch=False, min_unit_tests=None):
    all_eval_results = []
    for report_path in report_paths:
        test_reports = pickle.load(open(report_path, "rb"))
        eval_result = aggregate_test_reports_wo_trycatch(test_reports, exclude_trycatch=exclude_trycatch, min_unit_tests=min_unit_tests)
        if eval_result is None:
            continue
        
        prog_syn_id = "/".join(report_path.split("/")[-6:-2])
        if dedup and prog_syn_id not in ps_after_dedup:
            continue
        
        specific_update_id = "/".join(report_path.split("/")[-6:-3])
        api_path, update_type, _ = specific_update_id.split("/")
        
        if len(update_type.split("-")) == 3:
            action, location, aspect = update_type.split("-")
        else:
            assert len(update_type.split("-")) == 2
            action, location = update_type.split("-")
            aspect = None
        all_eval_results.append(eval_result)
        eval_result["api_path"] = api_path
        eval_result["update_type"] = update_type
        eval_result["[location]-[aspect]"] = f"{location}-{aspect}"
        eval_result["[action]"] = f"{action}"
        eval_result["model"] = f"{model_name}"
        
        eval_result["package"] = api_path.split(".")[0]
        eval_result["specific_update_id"] = specific_update_id
        eval_result["prog_syn_id"] = prog_syn_id
    
    df = pd.DataFrame(all_eval_results)
    df = df.reset_index().drop(["index"], axis=1) # .set_index(["package", "api_path", "update_type", "specific_update_id", "prog_syn_id"]).drop(["index"], axis=1)
    num_update = 0
    for specific_update_id, sub_df in df.groupby(['specific_update_id']):
        if len(sub_df) >= 3:
            num_update += 1
    print(f"#Update: {num_update}")
    return df


In [None]:
# 682.000000 - 608.000000

In [None]:
recal_df(all_report_paths, dedup=False, ps_after_dedup=all_ps_after_dedup, exclude_trycatch=False, min_unit_tests=None).describe()

In [None]:

final_table = recal_df(list(glob.glob(f"/u/zliu/tool-KE/evaluation_output_dedup/prepend_n=5/**/gpt-4/{REPORT_FILE_NAME}", recursive=True)), 
         dedup=False, ps_after_dedup=all_ps_after_dedup, exclude_trycatch=True, min_unit_tests=3)

In [None]:
final_table = recal_df(list(glob.glob(f"/u/zliu/tool-KE/evaluation_output_dedup/prepend_n=5/**/gpt-4/{REPORT_FILE_NAME}", recursive=True)), 
         dedup=False, ps_after_dedup=all_ps_after_dedup, exclude_trycatch=True, min_unit_tests=3)

In [None]:
tunable_model_names = ["deepseek-coder-7b-instruct-v1.5", "CodeLlama-7b-Instruct-hf", "deepseek-coder-6.7b-instruct"]
model_name = tunable_model_names[2]
recal_df(list(glob.glob(f"/u/zliu/tool-KE/evaluation_output_dedup/FT-2_n=5/**/{model_name}/{REPORT_FILE_NAME}", recursive=True)), 
         dedup=False, ps_after_dedup=all_ps_after_dedup, exclude_trycatch=False, min_unit_tests=3).describe()

In [None]:
final_table[:258].describe()

In [None]:
final_table[final_table["package"] == "torch"].describe()

In [None]:
labels, sizes = list(zip(*Counter(final_table["update_type"]).items()))
plt.pie(sizes, labels=labels, autopct='%1.1f%%')
plt.show()

In [None]:
labels, sizes = list(zip(*Counter(final_table["package"]).items()))
plt.pie(sizes, labels=labels, autopct='%1.1f%%')
plt.show()

In [None]:

["gpt-4", "deepseek-coder-7b-instruct-v1.5", "CodeLlama-7b-Instruct-hf", "deepseek-coder-6.7b-instruct"]
model_name = "deepseek-coder-6.7b-instruct"
recal_df(list(glob.glob(f"{prepend_result_root}/**/{model_name}/{REPORT_FILE_NAME}", recursive=True)), 
         dedup=True, ps_after_dedup=all_ps_after_dedup, exclude_trycatch=True, min_unit_tests=3).describe()

In [None]:
"/".join(all_failed_prog_syns[0].split("/")[:-1])

In [None]:
os.listdir(f"{semifinal_data_root}/{all_failed_prog_syns[0]}")

In [None]:
all_failed_prog_syns = final_table[final_table["pass@5(new)"] == 0]["prog_syn_id"].to_list()
# get update arena bigger than certain size
semifinal_data_root = "/u/zliu/tool-KE/data/prelim/CodeUpdateArena-after-PS"
prepend_result_root = "/u/zliu/tool-KE/evaluation_output/prepend_n=5"
# all_update_paths = list(glob.glob(f"{semifinal_data_root}/**/update-content-w_ut.json", recursive=True))
from src.utils.prompt_tool import CodeGenTemplate, InstructTemplate
from omegaconf import OmegaConf
from data.prelim.manager_update import UpdateManagerV21
from data.prelim.manager_prog_syn import ProgSynManagerV21

PS_FILE_NAME = "prog_syn-content-w_ut.json"
U_FILE_NAME = "update-content-w_ut.json"
ps_target_root = "/u/zliu/tool-KE/data/prelim/gpt4_failed"


update_cfg = OmegaConf.load("configs/update_generation_v2-1.yaml")
update_cfg.new_impl.include_unit_tests=True

progsyn_cfg = OmegaConf.load("configs/prog_syn_generation_v2-1.yaml")

delimiter = "# ---------------------------------"

os.makedirs(ps_target_root, exist_ok=True)

for prog_syn_id in all_failed_prog_syns:
    
    specific_update_id = "/".join(prog_syn_id.split("/")[:-1])
    update_dir = f"{semifinal_data_root}/{specific_update_id}"
    api, update_type, _ = specific_update_id.split("/")
    
    update_content = json.load(open(f"{update_dir}/{U_FILE_NAME}", "r"))
    u_manager = UpdateManagerV21(cfg=update_cfg, api_path=api, update_tag=update_type)
    u_manager.load_from_dir(save_dir=update_dir,)
    update_info = [
        (k, update_content[k].replace("\n", "\n#")) 
        for k in 
        ["update_description", "rationale",
        "new_function_signature", "update_docstring"]
    ]
    # new_impl = "\n".join([l for l in update_content["new_impl"].split("\n") if not l.strip().startswith("#")])
    new_impl = update_content["new_impl"]
    
    ps_save_dir = f"{semifinal_data_root}/{prog_syn_id}"
    ps_target_dir = f"{ps_target_root}/{prog_syn_id}"
    os.makedirs(ps_target_dir, exist_ok=True)
    
    # assert len(list(glob.glob(f"{ps_target_dir}/**/*solution.py", recursive=True))) == 0
    
    assert os.path.exists(f"{ps_save_dir}/{PS_FILE_NAME}")
    ps_content = json.load(open(f"{ps_save_dir}/{PS_FILE_NAME}", "r"))
    unit_tests_pass_w_update = [ps_content["unit_tests_pass_w_update"][str(i)] for i in range(len(ps_content["unit_tests_pass_w_update"]))]
    overall_pass_w_update = np.mean(unit_tests_pass_w_update)
    
    ps_info = [
        (k, ps_content[k].replace("\n", "\n#")) 
        for k in 
        ["scenario", "problem",
        "solution_signature",]
    ]
    
    # solution_new_no_comment = "\n".join([l for l in ps_content["solution_new"].split("\n") if not l.strip().startswith("#")])
    unit_tests = ps_content["unit_tests"]
    # trimmed_unit_tests = [unit_tests[int(i)] for i, pass_flag in ps_content["unit_tests_pass_w_update"].items() if pass_flag]
    # try_catch_unit_test_ids = [idx for idx, contain_try_catch in enumerate([is_testing_try_catch(u) for u in test_reports[0].unit_tests.values()]) if contain_try_catch]
    # remove unit tests that are testing try catch
    
    solution_header = [
        "\n".join(ps_content["imports"]),
        "\n".join([f"""# "{k}": {v}"""
                        for k, v in update_info]),
        new_impl,
        u_manager.update_enforce_statement,
        "\n".join([f"""# "{k}": {v}"""
                        for k, v in ps_info]),
        f"# PS path: {ps_save_dir}",
    ]
    solution_trail = [
        f"# Overall pass_w_update (gen.) {overall_pass_w_update}",
        "# Unit tests",
        *["\n".join([f"# Unit test {i}", f"# pass_w_updates (gen.): {unit_tests_pass_w_update[i]}", unit_test]) 
        for i, unit_test in enumerate(unit_tests) if not is_testing_try_catch(unit_test) and unit_tests_pass_w_update[i]],
    ]
    ref_solution_file = solution_header + [
        f"# Reference solution",
        delimiter,
        ps_content["solution_new"],
        delimiter,
    ] + solution_trail
    open(f"{ps_target_dir}/ref_solution.py", "w").write("\n\n".join(ref_solution_file))
    
    generated_texts = json.load(open(f"{prepend_result_root}/{prog_syn_id}/gpt-4/generated_texts.json", "r"))
    test_reports = pickle.load(open(f"{prepend_result_root}/{prog_syn_id}/gpt-4/test_reports.pkl", "rb"))
    generated_programs = list(map(InstructTemplate.solution_extractor, generated_texts))
    
    
    for p_i, generated_program in enumerate(generated_programs):
        test_report = test_reports[p_i]
        overall_pass_w_update = np.mean([test_report.pass_w_update[i] for i, unit_test in test_report.unit_tests.items() if not is_testing_try_catch(unit_test)])
        
        solution_trail = [
            f"# Overall pass_w_update (gen.) {overall_pass_w_update}",
            "# Unit tests",
            *["\n".join([
                f"# Unit test {i}", 
                f"# pass_w_updates (experiment): {test_report.pass_w_update[i]}", 
                f"# pass_wo_updates (experiment): {test_report.pass_wo_update[i]}",
                unit_test
            ]) for i, unit_test in test_report.unit_tests.items() if not is_testing_try_catch(unit_test)
            ],
        ]
        predict_solution_file = solution_header + [
            f"# Reference solution",
            delimiter,
            generated_program,
            delimiter,
        ] + solution_trail
        open(f"{ps_target_dir}/predicted_solution-{p_i}.py", "w").write("\n\n".join(predict_solution_file))

In [None]:
len(set(final_table[final_table["pass@5(new)"] == 0]["specific_update_id"].to_list()))
# [ps_content["unit_tests_pass_w_update"][str(i)] for i in range(len(ps_content["unit_tests_pass_w_update"]))]

In [None]:
from src.utils.prompt_tool import CodeGenTemplate, InstructTemplate

In [None]:
generated_programs = list(map(InstructTemplate.solution_extractor, generated_texts))

In [None]:
semifinal_data_root = "/u/zliu/tool-KE/data/prelim/CodeUpdateArena-after-PS"
all_update_paths = list(glob.glob(f"{semifinal_data_root}/**/update-content-w_ut.json", recursive=True))
update2ps_id = {}
PS_FILE_NAME = "prog_syn-content-w_ut.json"
U_FILE_NAME = "update-content-w_ut.json"

for update_path in all_update_paths:
    update_dir = os.path.dirname(update_path)
    specific_update_id = "/".join(update_dir.split("/")[-3:])
    api, update_type, _ = specific_update_id.split("/")
    
    update_ps_paths = list(glob.glob(f"{update_dir}/**/{PS_FILE_NAME}", recursive=True))
    update2ps_id[specific_update_id] = set(["/".join(p.split("/")[-5:-1]) for p in update_ps_paths])
    assert len(update_ps_paths) == len(update2ps_id[specific_update_id])

In [None]:
update2ps_id['math.sin/modify-output-semantics/update-0']

In [None]:
len(final_table["prog_syn_id"])

In [None]:
final_update2dedup_ps = defaultdict(set)
for prog_syn_id in final_table["prog_syn_id"].to_list():
    specific_update_id = "/".join(prog_syn_id.split("/")[:-1])
    final_update2dedup_ps[specific_update_id].add(prog_syn_id)
final_update2dedup_ps = {k: vs for k, vs in final_update2dedup_ps.items() if len(vs) >= 3}

In [None]:
final_table.columns

In [None]:
describe([len(vs) for vs in final_update2dedup_ps.values()])

In [None]:
specific_update_id = 'math.sin/modify-output-semantics/update-0'


In [None]:
len(final_update2dedup_ps)

In [None]:
sum([len(vs) for vs in final_update2dedup_ps.values()])

In [None]:
def copyanything(src, dst):
    import shutil
    try:
        shutil.copytree(src, dst, dirs_exist_ok=True)
    except OSError as exc: # python >2.5
        if exc.errno in (errno.ENOTDIR, errno.EINVAL):
            shutil.copy(src, dst)
        else: raise

In [None]:
semifinal_data_root = "/u/zliu/tool-KE/data/prelim/CodeUpdateArena-after-PS"
dedup_data_root = "/u/zliu/tool-KE/data/prelim/CodeUpdateArena-after-dedup"

old_expr_dir = "/u/zliu/tool-KE/evaluation_output/prepend_n=5"
new_expr_dir = "/u/zliu/tool-KE/evaluation_output_dedup/prepend_n=5"

source_root = semifinal_data_root
target_root = dedup_data_root

In [None]:
update2ps_shrink_map = {}
import shutil
for specific_update_id in final_update2dedup_ps.keys():
    assert os.path.exists(f"{source_root}/{specific_update_id}/{U_FILE_NAME}")
    
    os.makedirs(f"{target_root}/{specific_update_id}", exist_ok=True)
    shutil.copyfile(f"{source_root}/{specific_update_id}/{U_FILE_NAME}", f"{target_root}/{specific_update_id}/{U_FILE_NAME}")
    
    remaining_ps = sorted(
        final_update2dedup_ps[specific_update_id],
        key=lambda x: int(x.split("-")[-1])
    )
    
    reindexed_remaining_ps = [f"{specific_update_id}/ProgSyn-{i}" for i in range(len(remaining_ps))]
    ps_shrink_map = dict(zip(remaining_ps, reindexed_remaining_ps))
    update2ps_shrink_map[specific_update_id] = ps_shrink_map
    
    for old_ps_dir, new_ps_dir in ps_shrink_map.items():
        assert os.path.exists(f"{target_root}/{new_ps_dir}")
        # os.makedirs(f"{target_root}/{new_ps_dir}", exist_ok=True)
        # copyanything(f"{source_root}/{old_ps_dir}", f"{target_root}/{new_ps_dir}")
        pass

In [None]:
# json.dump(update2ps_shrink_map, open(f"{dedup_data_root}/update2ps_shrink_map.json", "w"))

In [None]:
dedup_data_root = "/u/zliu/tool-KE/data/prelim/CodeUpdateArena-after-dedup"
all_update_paths = list(glob.glob(f"{dedup_data_root}/**/{U_FILE_NAME}", recursive=True))
all_update_ids = ["/".join(p.split("/")[-4:-1]) for p in all_update_paths]
assert len(all_update_ids) == len(set(all_update_ids))
len(all_update_ids)

In [None]:
# import numpy as np
# random_ids = np.arange(len(all_update_ids))
# np.random.shuffle(random_ids)
sorted(random_ids[:50])

In [None]:
sampled_update_ids = [all_update_ids[i] for i in sorted(random_ids[:50])]

In [None]:
json.dump(sampled_update_ids, open("/u/zliu/tool-KE/evaluation_output_dedup/specificity/sampled_update_ids.json", "w"))

In [None]:
for update_id in all_update_ids:
    if update_id not in update2ps_shrink_map:
        print(update_id)

In [None]:
import matplotlib.pyplot as plt
num_ps_distri = np.array([p for p in dedupe_update2ps.values()])
values, bins, bars = plt.hist(num_ps_distri, bins=10, ec="k", rwidth=0.6)

# num_ps_distri = np.array(unit_tests_pass_w_updates)
# values, bins, bars = plt.hist(num_ps_distri, ec="k")
# plt.xticks(np.arange(num_ps_distri.min(), num_ps_distri.max()+1, 20))

plt.bar_label(bars)
plt.xlabel("Pass w. Update (during generation)")
plt.ylabel("Count")
plt.title("Pass w. Update  distribution")