In [None]:
%load_ext autoreload
%autoreload 2

from typet5.utils import *
from typet5.static_analysis import UsageAnalysis, PythonProject
from typet5.function_dataset import data_project_from_dir

In [None]:
dataset = "ManyTypes4Py"

result_paths = {
    "CodeT5": get_eval_dir(dataset, ""),
    "TypeT5": get_eval_dir(dataset, "(implicit_imports, new) model-v7--TrainingConfig(drop_env_types=False, add_implicit_rel_imports=True)"),
}

In [None]:
ex_proj = PythonProject.parse_from_root(Path("/home/jiayi/Projects/type4py"))
analysis = UsageAnalysis(
    ex_proj, add_implicit_rel_imports=True, add_override_usages=True
)
pretty_print_dict(analysis.get_stats())


In [None]:
from typet5.data import (
    create_tokenized_srcsets,
    get_tk_dataset_name,
    load_tokenized_srcsets,
    TypeCheckSettings,
)
from typet5.tokenized_src import PreprocessArgs

pre_args = PreprocessArgs()
dataset = "InferTypes4Py"
sdata_name = get_tk_dataset_name(dataset, pre_args, False)
sdata_path = get_dataroot() / "TokenizedSrcSets" / sdata_name
create_tokenized_srcsets(
    dataset,
    sdata_path,
    func_only=False,
    pre_args=pre_args,
)
tk_dataset = load_tokenized_srcsets(sdata_path)
tk_dataset["test"].print_stats()


In [None]:
from typet5.static_analysis import ProjectPath

analysis.user2used[ProjectPath.from_str("type4py.type_check/MypyManager._build_tc_cmd")]


In [None]:
from typet5.utils import *

def show_subtokens(ids: list[int]):
    tks = [f"⦗{x}⦘" for x in DefaultTokenizer.convert_ids_to_tokens(ids)]
    print(" ".join(tks))

ex_code = """
def eval_on_dataset(
    model: <extra_id_0>,
    data: <extra_id_1>,
    window_size: <extra_id_2> = None,
)
"""

encoding = DefaultTokenizer.encode(ex_code)
print(encoding)
show_subtokens(encoding)

In [None]:
ex_output = """
<extra_id_0>ModelWrapper<extra_id_1>TokenizedSrcSet<extra_id_2>Optional[int]
"""
print(decoding := DefaultTokenizer.encode(ex_output))
show_subtokens(decoding)

In [None]:
from spot import proj_root
from typet5.static_analysis import ProjectPath, UsageAnalysis, PythonProject
from pprint import pprint


proj = PythonProject.parse_from_root(proj_root())
for caller, callees in UsageAnalysis(proj).user2used.items():
    if caller.module == "typet5.static_analysis":
        print(caller)
        for callee in callees:
            print("\t", callee.used, "" if callee.is_certain else "  (maybe)")


In [None]:
%load_ext autoreload
%autoreload 2

from typet5.tokenized_src import PreprocessArgs, proj_root
from typet5.function_dataset import repo_to_tk_srcs, dataset_from_repos

srcs = repo_to_tk_srcs(proj_root(), PreprocessArgs(drop_env_types=True))

from typet5.data import TokenizedSrcSet, CtxArgs

sdata = TokenizedSrcSet(proj_root(), srcs)
ctx_args = CtxArgs(1024, 128, 256, 512)
cdata = sdata.to_chunks(ctx_args)

In [None]:
from typet5.utils import *

print(decode_tokens(cdata.data["input_ids"][0]))


In [None]:
for src in [s for s in srcs if "static_analysis/PythonFunction" in str(s.file)][:10]:
    print(f"======= file: {src.file} ========")
    src.print_code(max_lines=250)


In [None]:
from typet5.data import PreprocessArgs, repr_modified_args

repr_modified_args(
    TrainingConfig(pre_args=PreprocessArgs(imports_in_preamble=False)), flatten=True
)


In [None]:
from typet5.static_analysis import cst, PythonModule, compute_module_usages, PythonProject

code1 = """
# root.file1

# global function
def gf(x):
    return x * x

# with inner function
def gf_with_inner(x):
    def inner(y):
        return y * y
    return inner(x)

# class
class C:
    def __init__(self, x):
        self.x = x
    
    def foo(self, y):
        return self.x + y

    @staticmethod
    def s_method(x):
        return x + 1
    
"""
code2 = """
# root.file2
from .file1 import gf
from root.file1 import gf_with_inner
import root.file1
import root.file1 as f1

def usage1(x):
    gf(x) + root.file1.C(5)
    foo(5)

def usage2(x):
    def inner():
        1 + gf_with_inner(x)
    return inner()

def usage_method1(x):
    x = f1.C(5)
    1 + x.foo(3)

def usage_method2(x):
    (1 + f1.C(5)).foo(3)

def usage_local():
    usage1(3)
    UsageClass(4)

@f1.C(1)
def usage_dec():
    pass

class UsageClass:
    def __init__(self, x):
        self.x = gf_with_inner(x)
        self.y = self.foo(5)

    def foo(self, y):
        return usage_local(f1.gf(y))

    @staticmethod
    def s_method(x):
        return x

class SubClass(UsageClass):
    def use(self):
        self.foo(5)
        f1.C.s_method(5)
"""

code3 = """
# root.file3
from .file1 import *

def usage1(x):
    gf(5)
    C(5)
    
"""

project = PythonProject.from_modules(
    [
        PythonModule.from_cst(cst.parse_module(code1), "root.file1"),
        PythonModule.from_cst(cst.parse_module(code2), "root.file2"),
        PythonModule.from_cst(cst.parse_module(code3), "root.file3"),
    ]
)

for u in compute_module_usages(project.modules["root.file3"]):
    print(str(u))


In [None]:
from typet5.static_analysis import build_project_namespaces

build_project_namespaces(project)


In [None]:
from typet5.static_analysis import compute_module_usages

compute_module_usages(project.modules["root.file3"])


In [None]:
from typet5.static_analysis import UsageAnalysis, build_project_namespaces

build_project_namespaces(project)

analysis = UsageAnalysis(project)
analysis.caller2callees[ProjectPath("root.file2", "SubClass.use")]


In [None]:
import libcst as cst

from typet5.tokenized_src import TokenizedSrc, PreprocessArgs
from typet5.utils import Path, decode_tokens

ex_code = '''# document comment 1
  # document comment 2
"""String document commnet"""
import os; import spot;
from sys import argv, exit
# after import
@wraps(function)
def catch_permission_denied(function):
    import some.inner.imports
    """
    Decorator to catch :class:`psycopg2.ProgrammingError` exceptions with the
    ``INSUFFICIENT_PRIVILEGE`` error code and rethrow them as
    :class:`~werkzeug.exceptions.Forbidden` exceptions instead.
    """
    @wraps(function)
    def decorated(x: str, y: int) -> str:
        try:
            # comment 1
            # comment 1 cont
            return function(*args, **kwargs)

        except InsufficientPrivilege as error:
            LOG.error("Forbidden: %s", error) # comment 2
            raise Forbidden()

    return decorated
'''
pre_args = PreprocessArgs(stub_in_preamble=True)
ex_src = TokenizedSrc.parse(ex_code, Path("test_file"), Path("test_repo"), pre_args)
print(decode_tokens(ex_src.tokenized_code))


In [None]:
from typet5.data import src_to_chunks_, CtxArgs, PreprocessArgs
from ipywidgets import interactive

pre_args = PreprocessArgs(stub_in_preamble=True)
ex_src = TokenizedSrc.parse(ex_code, Path("test_file"), Path("test_repo"), pre_args)


def print_code(
    preamble: int,
    left: int,
    right: int,
    ctx_size: int,
    max_labels: int,
    chunk_id: int,
    inline_prev: bool,
):
    chunks = []
    args = CtxArgs(
        ctx_size,
        preamble,
        left,
        right,
        max_labels=max_labels,
        inline_prev_gold=inline_prev,
    )
    src_to_chunks_(chunks, [], ex_src, (0, len(ex_src.types)), args)
    print(decode_tokens(chunks[chunk_id]["input_ids"]))


interactive(
    print_code,
    preamble=(1, 100),
    left=(1, 200),
    right=(1, 100),
    ctx_size=(1, 500),
    max_labels=(1, 10),
    chunk_id=(0, 1),
    inline_prev=True,
)


In [None]:
%load_ext autoreload
%autoreload 2

import os
import pickle
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from typing import *

import pandas as pd
import plotly.express as px

from typet5.data import GitRepo, ModuleRemapUnpickler
from typet5.type_env import (
    AnnotPath,
    MypyChecker,
    SelectAnnotations,
    TypeInfAction,
    TypeInfEnv,
    TypeInfState,
    mypy_checker,
)
from typet5.utils import cst, proj_root, read_file, seq_flatten, tqdm, write_file

os.chdir(proj_root())

datadir = Path(os.getenv("datadir"))
repos_dir = datadir / "SPOT-data/repos"

useful_repos_path = proj_root() / "scripts" / "useful_repos.pkl"
rename_module = lambda n: "typet5.data" if n == "typet5.data_prepare" else n
with useful_repos_path.open("rb") as f:
    useful_repos: list[GitRepo] = ModuleRemapUnpickler(f, rename_module).load()

In [None]:
# loading pre-trained model and tokenizer
from typet5.utils import get_data_dir

model_dir = "Salesforce/codet5-base"
# model_dir = datadir / "checkpoints/saved/SPOT-CodeT5-no_margin/"

import torch
from transformers import (
    DataCollatorForSeq2Seq,
    RobertaTokenizer,
    T5ForConditionalGeneration,
)
from transformers.models.t5 import T5ForConditionalGeneration

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
tokenizer: RobertaTokenizer = RobertaTokenizer.from_pretrained(model_dir)
model: T5ForConditionalGeneration = T5ForConditionalGeneration.from_pretrained(
    model_dir
).to(device)
max_target_length = 128


In [None]:
from typet5.data import mask_type_annots, output_ids_as_types, tokenize_masked

test_code = """
@dataclass
class GitRepo:
    author: str
    name: str
    url: str
    stars: int
    forks: int

    def authorname(self):
        return self.author + "__" + self.name

    def repo_dir(self, repos_dir: Path) -> Path:
        return repos_dir / "downloaded" / self.authorname()

    def download(self, repos_dir: Path, timeout=None) -> bool:
        pass
"""


def run_model(code: str, num_beams=16):
    masked = mask_type_annots((Path("no_source"), code))
    tks = tokenize_masked(masked, tokenizer, device)
    input_ids = tks["input_ids"]
    with torch.no_grad():
        loss = model.forward(**tks).loss
        dec = model.generate(
            input_ids,
            max_length=max_target_length,
            num_beams=num_beams,
            # do_sample=True,
        )[0]
    return {
        "loss": loss,
        "predicted_types": output_ids_as_types(dec, tokenizer),
        "labels": output_ids_as_types(tks["labels"][0], tokenizer),
        "generation": tokenizer.decode(dec),
        "input_ids": input_ids[0],
        "output_ids": dec,
        "annots_info": masked["annots_info"],
    }


result = run_model(test_code, num_beams=10)
result["loss"]


In [None]:
from spot import PythonType
from typet5.type_env import apply_annotations


def type_to_annot(ty: PythonType) -> str:
    return cst.Annotation(cst.parse_expression(str(ty)))


def run_aug_model(src: Path, cwd: Path):
    result = run_model(read_file(src), num_beams=10)
    pred_annots = {
        info.path: type_to_annot(t)
        for info, t in zip(result["annots_info"], result["predicted_types"])
    }
    m1 = apply_annotations(cst.parse_module(read_file(src)), pred_annots)
    write_file(src, m1.code)
    checker_r = MypyChecker.check_project(src, cwd)
    pos_to_preds = {
        info.annot_range: str(ty)
        for info, ty in zip(result["annots_info"], result["predicted_types"])
    }
    return {
        "model_result": result,
        "module": m1,
        "checker_feedback": checker_r,
        "pos_to_preds": pos_to_preds,
    }


aug_r = run_aug_model(inference_dir / "env_code_2.py", inference_dir)


In [None]:
from typet5.utils import patch_code_with_extra

print("---- predicted types ----")
print(aug_r["model_result"]["predicted_types"])
print("---- model output ----")
print(tokenizer.decode(aug_r["model_result"]["output_ids"], skip_special_tokens=False))
print("---- checker_feedback ----")
print(aug_r["checker_feedback"].output_str)

print("---- new input ----")
new_input = patch_code_with_extra(
    aug_r["module"].code,
    aug_r["pos_to_preds"],
    aug_r["checker_feedback"].error_dict["env_code_2.py"],
)
print(new_input)


In [None]:
import pickle

from typet5.utils import Path, run_long_task, DefaultTokenizer, not_none, CountedAcc
from spot import proj_root
from typet5.function_dataset import guess_src_root

datadir = Path(not_none(os.getenv("datadir")))
repos_dir = datadir / "SPOT-data/repos/"

repos_split_path = proj_root() / "data/repos_split.pkl"
with repos_split_path.open("rb") as f:
    repos_split = pickle.load(f)

root_is_src = list[bool]()
for repo in repos_split["train"]:
    rd = repo.repo_dir(repos_dir)
    root_is_src.append(guess_src_root(rd).name == "src")

CountedAcc(sum(root_is_src), len(root_is_src))


In [None]:
src_in_root = 0
package_in_root = 0
setup_in_root = 0
n_proj = 0

weird_repos = []
setup_files = []

for repo in repos_split["train"]:
    rd: Path = repo.repo_dir(repos_dir)
    n_proj += 1
    files = list(rd.iterdir())
    if rd / "src" in files:
        src_in_root += 1
    elif rd / (pname := rd.name.split("__")[-1]) in files:
        package_in_root += 1
    elif rd / "setup.cfg" in files:
        setup_in_root += 1
        setup_files.append(rd / "setup.cfg")
    else:
        weird_repos.append(repo)

print("n_projects:", n_proj)
print("src_in_root:", src_in_root)
print("package_in_root:", package_in_root)
print("setup_in_root:", setup_in_root)
print("weird_repos:", len(weird_repos))


In [None]:
for repo in weird_repos[:10]:
    rd: Path = repo.repo_dir(repos_dir)
    print("Repo:", rd.relative_to(repos_dir))
    for f in rd.iterdir():
        print(f.relative_to(rd))
