In [None]:
import pandas as pd
from transformers import pipeline
import math
import numpy as np
import torch
import os
from tqdm.notebook import tqdm

In [None]:
from transformers import AutoTokenizer, AutoModel

In [None]:
def columns_to_list(df, columns):
    return list(zip(*[df[c].to_list() for c in columns]))

In [None]:
def get_files_info(path):
    files = os.listdir(path)
    result = {}
    for f in files:
        if f[-4:] == ".npy":
            parts = f[:-4].split("-")
        else:
            parts = f.split("-")
        i = int(parts[-1])
        start_line = int(parts[-3])
        if i in result:
            print("Conflict!")
        result[i] = (f, start_line)
    return result

### Embeddings

In [None]:
tokenizer = AutoTokenizer.from_pretrained("huggingface/CodeBERTa-small-v1", cache_dir='cache/')
model = AutoModel.from_pretrained("huggingface/CodeBERTa-small-v1", cache_dir='cache/')
#tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base", cache_dir='cache2/')
#model = AutoModel.from_pretrained("microsoft/codebert-base", cache_dir='cache2/')
    
model.cuda()
pass

In [None]:
def tokenize_code(code):
    tokenized = tokenizer(code, add_special_tokens=False)
    if len(tokenized["input_ids"]) <= 510:
        input_ids = [[0] + tokenized["input_ids"] + [2]]
        attention_mask = [[1] + tokenized["attention_mask"] + [1]]
        return {
            "input_ids": torch.tensor(input_ids).long().cuda(),
            "attention_mask": torch.tensor(attention_mask).long().cuda()
        }, len(tokenized["input_ids"])
    input_ids = []
    attention_mask = []
    for i in range(0, len(tokenized["input_ids"]), 510):
        if len(tokenized["input_ids"]) >= i + 510:
            input_ids.append([0] + tokenized["input_ids"][i:i+510] + [2])
            attention_mask.append([1] + tokenized["attention_mask"][i:i+510] + [1])
        else:
            delta = (i + 510) - len(tokenized["input_ids"])
            input_ids.append([0] + tokenized["input_ids"][i:len(tokenized["input_ids"])] + [2] + [1] * delta)
            attention_mask.append([1] + tokenized["attention_mask"][i:len(tokenized["input_ids"])] + [1] + [0] * delta)
    return {
        "input_ids": torch.tensor(input_ids).long().cuda(),
        "attention_mask": torch.tensor(attention_mask).long().cuda()
    }, len(tokenized["input_ids"])
    

In [None]:
def preprocess_part(name):
    df = pd.read_csv(f"data/{name}.csv")
    code_parts = columns_to_list(df, ["id", "methodLoc", "startLine"])
    id2file = get_files_info(f"data/{name}")
    os.makedirs(f"data/embeddings/{name}", exist_ok=True)
    
    for file_id, loc, start_ in tqdm(code_parts, smoothing=0.01):
        if math.isnan(loc):
            continue
        if not file_id in id2file:
            continue
        filename, start = id2file[file_id]
        with open(f"data/{name}/{filename}", "r") as f:
            for _ in range(int(start) - 1):
                f.readline()
            lines = []
            for _ in range(int(loc)):
                lines.append(f.readline())
        code = "\n".join(lines)
        tokenized, l = tokenize_code(code)
        with torch.no_grad():
            embs = model(**tokenized)[0].cpu().numpy()
        embs = embs[:, 0, :]

        np.save(f'data/embeddings/{name}/{filename}', embs)

In [None]:
preprocess_part("val")

In [None]:
preprocess_part("test")

In [None]:
preprocess_part("train")

### Metrics prediction

In [None]:
from torch import nn
from torch.nn import functional as F

class EmbeddingCombiner(nn.Module):
    def __init__(self, embedding_size=768, hidden_size=512, head_size=128, n_heads=4):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(embedding_size, hidden_size),
            nn.ELU(),
            nn.Linear(hidden_size, head_size * n_heads)
        )
        self.attention = nn.Sequential(
            nn.Linear(embedding_size, hidden_size),
            nn.ELU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ELU(),
            nn.Linear(hidden_size, n_heads)
        )
        self.n_heads = n_heads
        self.head_size = head_size
        
    def forward(self, x):
        if len(x) == 1:
            return self.model(x)[0]
        x_enc = self.model(x)
        x_attn = F.softmax(self.attention(x), dim=0).unsqueeze(-1).expand(-1, self.n_heads, self.head_size).reshape(-1, self.n_heads*self.head_size)
        x = (x_enc * x_attn.expand_as(x_enc)).sum(0)
        return x
    
class RegressionNN(nn.Module):
    def __init__(self, input_size=512, hidden_size=256):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ELU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ELU(),
            nn.Linear(hidden_size, 1)
        )
    
    def forward(self, x):
        return self.model(x)

In [None]:
from torch.utils.data import Dataset, DataLoader
import time

In [None]:
class EmbeddingDataset(Dataset):
    def __init__(self, name, target_name):
        df = pd.read_csv(f"data/{name}_metrics.csv")
        self.code_parts = columns_to_list(df, ["id", target_name])
        self.id2file = get_files_info(f"data/embeddings/{name}")
        self.name = name
        self.avg_y = np.mean([y for _, y in self.code_parts])
        self.std_y = np.std([y for _, y in self.code_parts])
    
    def __len__(self):
        return len(self.code_parts)
    
    def __getitem__(self, idx):
        i, y = self.code_parts[idx]
        filename, _ = self.id2file[i]
        x = np.load(f"data/embeddings/{self.name}/{filename}")
        return x, y
    
class MyDataLoader:
    def __init__(self, dataset, batch_size):
        self.dataset = dataset
        self.batch_size = batch_size
        
    def get_batch(self):
        idx = np.random.randint(0, len(self.dataset), self.batch_size)
        xs = []
        ys = []
        for i in idx:
            x, y = self.dataset[i]
            xs.append(x)
            ys.append(y)
        return xs, ys

In [None]:
def model_inference(encoder, regression, dataset, batches, batch_size, norm):
    dataloader = MyDataLoader(dataset, batch_size=batch_size)
    mae, mse, r2 = 0, 0, 0
    with torch.no_grad():
        for _ in range(batches):
            xs, ys = dataloader.get_batch()
            tx = []
            for x in xs:
                x = torch.tensor(np.array(x), dtype=torch.float32, device="cuda")
                tx.append(encoder(x))
            xs = torch.stack(tx)
            ys_pred = norm[1] * regression(xs).view(-1) + norm[0]
            ys = torch.tensor(np.array(ys), dtype=torch.float32, device="cuda")
            mse += ((ys_pred - ys)**2).mean().item()
            mae += ((ys_pred - ys).abs()).mean().item()
            r2 += (1 - ((ys_pred - ys)**2).mean() / ((dataset.avg_y - ys)**2).mean()).item()
        mse /= batches
        mae /= batches
        r2 /= batches
    return mse, mae, r2


def train_model(target_name, middle_size=512, head_size=128, n_heads=4, hidden_size=256, 
                lr=1e-4, epoch=90, batch_size=64, batches_per_epoch=1000, batches_per_eval=100):
    print(f"Training model for {target_name} with prarms {(middle_size, head_size, n_heads, hidden_size, lr)}")
    encoder_model = EmbeddingCombiner(768, middle_size, head_size, n_heads)
    regression_model = RegressionNN(middle_size, hidden_size)
    encoder_model.cuda()
    regression_model.cuda()
    optim = torch.optim.Adam(list(encoder_model.parameters()) + list(regression_model.parameters()), lr=lr)
    
    train_dataset = EmbeddingDataset("train", target_name)
    test_dataset = EmbeddingDataset("test", target_name)
    val_dataset = EmbeddingDataset("val", target_name)

    norm = train_dataset.avg_y, train_dataset.std_y
    
    train_errors = []
    test_errors = []
    val_errors = []
    for i in range(epoch):
        start = time.time()
        dataloader = MyDataLoader(train_dataset, batch_size=batch_size)
        
        print(f"Epoch {i+1}")
        print("Train")
        # Training step
        for _ in tqdm(range(batches_per_epoch)):
            xs, ys = dataloader.get_batch()
            tx = []
            for x in xs:
                x = torch.tensor(np.array(x), dtype=torch.float32, device="cuda")
                tx.append(encoder_model(x))
            xs = torch.stack(tx)
            ys_pred = regression_model(xs).view(-1)
            ys = torch.tensor(np.array(ys), dtype=torch.float32, device="cuda")
            ys = (ys - norm[0]) / norm[1]
            loss = F.mse_loss(ys_pred, ys)
            optim.zero_grad()
            loss.backward()
            optim.step()
        
        print("Test")
        mse, mae, r2 = model_inference(encoder_model, regression_model, train_dataset, batches_per_eval, batch_size, norm)
        train_errors.append((mse, mae, r2))
        
        mse, mae, r2 = model_inference(encoder_model, regression_model, val_dataset, batches_per_eval, batch_size, norm)
        val_errors.append((mse, mae, r2))
        
        mse, mae, r2  = model_inference(encoder_model, regression_model, test_dataset, batches_per_eval, batch_size, norm)
        test_errors.append((mse, mae, r2))
        print(f"ERRORS INFO")
        print(f"Train    | mse: {train_errors[-1][0]}, mae: {train_errors[-1][1]}, r2: {train_errors[-1][2]}")
        print(f"Validate | mse: {val_errors[-1][0]}, mae: {val_errors[-1][1]}, r2: {val_errors[-1][2]}")
        print(f"Epoch time: {(time.time() - start) / 60}")
    return train_errors, test_errors, val_errors

In [None]:
#train_errors, test_errors, val_errors = train_model("methodRfc")

In [None]:
#train_errors, test_errors, val_errors = train_model("methodRfc", middle_size=1024, hidden_size=512, n_heads=8)

In [None]:
#train_errors, test_errors, val_errors = train_model("methodRfc", middle_size=1024, hidden_size=1024, n_heads=8)

In [None]:
#train_errors, test_errors, val_errors = train_model("methodRfc", middle_size=256, hidden_size=256, n_heads=4, head_size=64)

In [None]:
#train_errors, test_errors, val_errors = train_model("methodRfc", middle_size=256, hidden_size=128, n_heads=4, head_size=64)

In [None]:
def train_for_all_metrics(metrics, epoch=50, is_complex=False):
    for m in tqdm(metrics):
        os.makedirs(f"logs/{m}", exist_ok=True)
        train_errors, test_errors, val_errors = train_model(m, epoch=epoch, middle_size=1024, hidden_size=1024, n_heads=8, is_complex=is_complex, lr=1e-3, batches_per_epoch=500, batch_size=128)
        np.save(f'logs/{m}/train', np.array(train_errors))
        np.save(f'logs/{m}/test', np.array(test_errors))
        np.save(f'logs/{m}/val', np.array(val_errors))

In [None]:
simple_metrics = ['methodAnonymousClassesQty', 'methodAssignmentsQty',
       'methodCbo', 'methodComparisonsQty', 'methodLambdasQty', 'methodLoc',
       'methodLoopQty', 'methodMathOperationsQty', 'methodMaxNestedBlocks',
       'methodNumbersQty', 'methodParametersQty', 'methodParenthesizedExpsQty',
       'methodReturnQty', 'methodRfc', 'methodStringLiteralsQty',
       'methodSubClassesQty', 'methodTryCatchQty', 'methodUniqueWordsQty',
       'methodVariablesQty', 'methodWmc']

In [None]:
complex_metrics = ['CyclomaticComplexity', 'HalsteadDifficultyMethod',
       'DesignComplexity', 'HalsteadEffortMethod', 'HalsteadVolumeMethod',
       'HalsteadBugsMethod', 'HalsteadLengthMethod',
       'HalsteadVocabularyMethod', 'EssentialCyclomaticComplexity',
       'ControlDensity', 'QCPCorrectness', 'QCPMaintainability',
       'QCPReliability']

In [None]:
train_for_all_metrics(simple_metrics)

In [None]:
train_for_all_metrics(complex_metrics)

In [None]:
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
def build_plots(metrics):
    os.makedirs('logs/plots', exist_ok=True)
    for m in metrics:
        train_errors = np.load(f'logs/{m}/train.npy')
        test_errors = np.load(f'logs/{m}/test.npy')
        val_errors = np.load(f'logs/{m}/val.npy')
        xs = list(range(len(train_errors)))
        
        fig, axis = plt.subplots(2, 1, figsize=(8, 8))
        axis[0].set_xlim(0, len(train_errors))
        axis[0].set_ylim(-1.25, 1.25)
        axis[0].grid()
        axis[0].set_title("R2")
        axis[0].plot(xs, train_errors[:, 2], label="Train", color=(0.8, 0., 0.))
        axis[0].plot(xs, val_errors[:, 2], label="Validate", color=(0., 0.8, 0.))
        axis[0].plot(xs, test_errors[:, 2], label="Test", color=(0.8, 0.8, 0.))
        axis[1].set_xlim(0, len(train_errors))
        axis[1].grid()
        axis[1].set_title("MSE")
        axis[1].plot(xs, train_errors[:, 0], label="Train", color=(0.8, 0., 0.))
        axis[1].plot(xs, val_errors[:, 0], label="Validate", color=(0., 0.8, 0.))
        axis[1].plot(xs, test_errors[:, 0], label="Test", color=(0.8, 0.8, 0.))
        fig.suptitle(f"{m}", fontsize=16)
        plt.legend()
        plt.tight_layout()
        plt.savefig(f'logs/plots/{m}.jpg')
        plt.close(fig)

In [None]:
build_plots(simple_metrics)
build_plots(complex_metrics)

In [None]:
def print_bests(metrics):
    os.makedirs('logs/plots', exist_ok=True)
    for m in metrics:
        #train_errors = np.load(f'logs/{m}/train.npy')
        test_errors = np.load(f'logs/{m}/test.npy')
        #val_errors = np.load(f'logs/{m}/val.npy')
        #xs = list(range(len(train_errors)))
        means = np.mean(test_errors[-3:], 0)
        print(f"{m} | R2: {means[2]} | MSE: {means[0]}")

In [None]:
print_bests(simple_metrics)
print_bests(complex_metrics)