In [1]:
%load_ext autoreload
%autoreload 2

%pwd
%matplotlib inline

import pandas as pd

pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)
pd.set_option("display.expand_frame_repr", False)

[Errno 2] No such file or directory: '/home/benji/Documents/Uni/heidelberg/05/masterarbeit/impls/scripts/experiments'
/home/ben/coding/scripts/experiments/00datasets


In [4]:
import pathlib, sys
import logging

DATA_FOLDER = pathlib.Path("/raid/students/mdti4py/datasets/cdt4py")

logger = logging.getLogger(name=__name__)
logger.setLevel(level=logging.DEBUG)

handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(fmt=logging.Formatter('%(asctime)s | %(levelname)s : %(message)s'))
logger.addHandler(handler)

In [7]:
from scripts.infer.structure import AuthorRepo
from typet5.data import GitRepo


class CDT4PyRepo(GitRepo):
    def __init__(self, author_repo: AuthorRepo) -> None:
        super().__init__(
            author=author_repo.author,
            name=author_repo.repo,
            url=None,
            stars=-1,
            forks=-1
        )
        
    def authorname(self) -> str:
        return f"{self.author}/{self.name}"
        

class CDT4PyFlaskRepo(CDT4PyRepo):
    def repo_dir(self, repos_dir: pathlib.Path) -> pathlib.Path:
        return repos_dir / "flask" / self.authorname()
    
class CDT4PyNumpyRepo(CDT4PyRepo):
    def repo_dir(self, repos_dir: pathlib.Path) -> pathlib.Path:
        return repos_dir / "numpy"  / self.authorname()

In [None]:
from typet5.data import GitRepo

import tqdm

from scripts.infer.structure import CrossDomainTypes4Py

dataset = CrossDomainTypes4Py(dataset_root=DATA_FOLDER)
test_set = dataset.test_set()
assert test_set

new_repos = []

for cdt4py_repo in (CDT4PyFlaskRepo, CDT4PyNumpyRepo):
    downloaded_repos = [cdt4py_repo(dataset.author_repo(test_repo)) for test_repo in test_set]

    for r in tqdm.tqdm(downloaded_repos):
        r.read_last_update(DATA_FOLDER)
    
    new_domain_repos = [r for r in downloaded_repos if "typeshed" not in r.name and "stub" not in r.name]
    logger.info(f"{len(new_domain_repos)} / {len(downloaded_repos)} are not additionally related to stubbing")
    
    new_repos.extend(new_domain_repos)

In [None]:
loc_limit = 50000

small_repos = []
all_repos = []
for rep in tqdm.tqdm(new_repos):
    try:
        loc = rep.count_lines_of_code(DATA_FOLDER)
        if loc < loc_limit:
            small_repos.append(rep)
        else:
            logger.warning(f"{rep.authorname()} does not pass, with {loc} LOC")
    except UnicodeDecodeError:
        # nothing we can do
        logger.warning(f"{rep.authorname()} does not pass due to encoding error", exc_info=True)
    except Exception as e:
        logger.warning(f"{rep.authorname()} does not pass", exc_info=True)

    else:
        all_repos.append(rep)

print(
    f"{len(small_repos)}/{len(new_repos)} repos are within the size limit ({loc_limit} LOC)."
)

In [None]:
test_repos = small_repos[:]

In [None]:
# Bin by lines of code
import pandas as pd

kloc = loc_limit // 1000
LOC_BINS = [loc * 1000 for loc in range(0, kloc + 1, 4)]
print(LOC_BINS)

def loc_binning(repos: list[GitRepo], title: str) -> None:
    repo_loc = pd.DataFrame(
        [(repo.authorname(), repo.lines_of_code) for repo in repos],
        columns=["Repository", "Lines of Code"]
    )
    repo_loc.plot.hist(ylabel="Frequency", bins=LOC_BINS, title=title)

In [None]:
loc_binning(all_repos, title="All Test Repositories")
loc_binning(test_repos, title="Sub 50kLOC Test Repositories")

In [9]:
import operator

def count_repo_annots(repo: GitRepo) -> tuple[GitRepo, dict] | None:
    try:
        annotations = repo.collect_annotations(DATA_FOLDER)
        if repo.n_type_annots / rep.lines_of_code > 0.05:
            return repo, annotations
    except Exception as e:
        logger.warning(f"Failed to count annotations for {repo.name}. Exception: {e}")
        return None

NameError: name 'small_repos' is not defined

In [None]:
from concurrent.futures import ProcessPoolExecutor, as_completed

with ProcessPoolExecutor(max_workers=4) as executor:
    fs = [executor.submit(count_repo_annots, r) for r in small_repos]
    repo2annotations = [f.result() for f in tqdm.tqdm(as_completed(fs), total=len(fs))]

repo2annotations: list[tuple[CDT4PyRepo, dict]] = [r for r in repo2annotations if r is not None]
useful_repos: list[CDT4PyRepo] = list(map(operator.itemgetter(0), repo2annotations))

logger.info(
    f"{len(useful_repos)}/{len(small_repos)} repos are parsable, have enough portions of type annotations"
)

del repo2annotations

In [8]:
# Bin by relative annotation count
def type_slots_filled(repos: list[GitRepo], title: str) -> None:
    anno_df = pd.DataFrame(
        [(repo.authorname(), repo.n_type_annots / repo.n_type_places * 100) for repo in repos],
        columns=["Repository", "Annotation Frequency"]
    )
    bins = [x for x in range(0, 100 + 1, 10)]
    anno_df.plot.hist(ylabel="Frequency", bins=bins, title=title)

In [None]:
type_slots_filled(useful_repos, title="Percentile of Type Slots Filled")