In [None]:
import itertools as it
import random
from collections import defaultdict
from typing import Iterator

import pandas as pd
import seaborn as sns
from entitybert.selection import prepare_file_ranker_df
from tqdm import tqdm

In [None]:
random.seed(0)

In [None]:
class ItemLookup[T]:
    def __init__(self):
        self._items: dict[int, list[T]] = defaultdict(list)

    def add_item(self, value: int, item: T):
        self._items[value].append(item)

    def within(self, value_range: range) -> Iterator[T]:
        for value in value_range:
            yield from self._items[value]


class File:
    def __init__(self, id: int, lloc: int, entities: int):
        self.id = id
        self.lloc = lloc
        self.entities = entities

    def __repr__(self) -> str:
        return f"File(id={self.id}, lloc={self.lloc}, entities={self.entities})"


class FileLookup:
    def __init__(self):
        self._files: dict[int, File] = dict()
        self._by_lloc: ItemLookup[File] = ItemLookup()
        self._by_entities: ItemLookup[File] = ItemLookup()

    def add_file(self, file: File):
        if file.id in self._files:
            raise ValueError("duplicate file id")
        self._files[file.id] = file
        self._by_lloc.add_item(file.lloc, file)
        self._by_entities.add_item(file.entities, file)

    def rand_file(self) -> File:
        return random.choice(list(self._files.values()))

    def within(self, lloc_range: range, entities_range: range) -> set[File]:
        lloc = self._by_lloc.within(lloc_range)
        entities = self._by_entities.within(entities_range)
        return set(lloc) & set(entities)


class ProjectLookup:
    def __init__(self):
        self._projects: dict[str, FileLookup] = defaultdict(FileLookup)

    def add_file(self, project: str, file: File):
        self._projects[project].add_file(file)

    def rand_project(self) -> str:
        return random.choice(list(self._projects.keys()))

    def rand_file(self, project: str) -> File:
        return self._projects[project].rand_file()

    def rand_file_within_range(
        self, project: str, lloc_range: range, entities_range: range
    ) -> File | None:
        files = self._projects[project].within(lloc_range, entities_range)
        if len(files) == 0:
            return None
        return random.choice(list(files))

    def rand_file_pair(
        self, lloc_tol: int, entities_tol: int
    ) -> tuple[File, File] | None:
        a_project = self.rand_project()
        b_project = self.rand_project()
        a_file = self.rand_file(a_project)
        lloc_range = range(max(0, a_file.lloc - lloc_tol), a_file.lloc + lloc_tol + 1)
        entities_range = range(
            max(0, a_file.entities - entities_tol), a_file.entities + entities_tol + 1
        )
        b_file = self.rand_file_within_range(b_project, lloc_range, entities_range)
        if b_file is None:
            return None
        if a_file.id == b_file.id:
            return None
        return (a_file, b_file)

    def sample_n_pairs(
        self, lloc_tol: int, entities_tol: int, n: int
    ) -> list[tuple[File, File]]:
        ids: set[int] = set()
        pairs: set[tuple[File, File]] = set()
        while len(pairs) < n:
            pair = self.rand_file_pair(lloc_tol, entities_tol)
            if pair is None:
                continue
            if pair[0].id in ids or pair[1].id in ids:
                continue
            ids.add(pair[0].id)
            ids.add(pair[1].id)
            pairs.add(pair)
        return list(pairs)

In [None]:
def is_ascii(text: str):
    try:
        text.encode("ascii")
    except UnicodeEncodeError:
        return False
    return True

In [None]:
with open("_data/dbs_test.txt") as f:
    db_paths = sorted(line.rstrip() for line in f.readlines())

In [None]:
dfs = []

for db_path in tqdm(db_paths):
    df = prepare_file_ranker_df(db_path)
    df.insert(0, "project", db_path)
    dfs.append(df)

df = pd.concat(dfs, ignore_index=True)
df = df[[is_ascii(c) for c in df["content"]]]
df = df.sort_values(["project", "filename"])
df = df.reset_index(drop=True)
df = df.astype({"loc": "int32", "lloc": "int32", "entities": "int32", "commits": "int32"})
df

In [None]:
def filter_df(df: pd.DataFrame, global_quantile: float) -> pd.DataFrame:
    columns = ["lloc", "commits"]
    global_thresholds = df[columns].quantile(global_quantile)
    return df[(df[columns] >= global_thresholds).all(axis=1)]

In [None]:
df_filtered_50p = filter_df(df, 0.5)
df_filtered_50p

In [None]:
df_filtered_75p = filter_df(df, 0.75)
df_filtered_75p

In [None]:
df_filtered = df_filtered_75p

In [None]:
lloc_tol_raw = df_filtered["lloc"].std() * (1 / 64)
print(lloc_tol_raw)
lloc_tol = round(lloc_tol_raw)
print(lloc_tol)

In [None]:
entities_tol_raw = df_filtered["entities"].std() * (1 / 64)
print(entities_tol_raw)
entities_tol = round(entities_tol_raw)
print(entities_tol)

In [None]:
project_lookup = ProjectLookup()

for ix, row in df_filtered.iterrows():
    lloc = row["lloc"]
    entities = row["entities"]
    project_lookup.add_file(row["project"], File(int(ix), lloc, entities))

In [None]:
pairs = project_lookup.sample_n_pairs(entities_tol=entities_tol, lloc_tol=lloc_tol, n=1200)

In [None]:
out_rows = []

for position, (file_a, file_b) in enumerate(pairs):
    row_a = df.loc[file_a.id]
    row_b = df.loc[file_b.id]
    out_rows.append({
        "position": position,
        "project_a": row_a["project"],
        "project_b": row_b["project"],
        "filename_a": row_a["filename"],
        "filename_b": row_b["filename"],
        "content_a": row_a["content"],
        "content_b": row_b["content"],
    })

out_df = pd.DataFrame.from_records(out_rows, index="position")
out_df.insert(0, "sequence", "testset-largefiles-75p")
out_df.to_csv("testset-largefiles-75p.csv")
out_df