In [None]:
pip install evaluate --quiet

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, DataCollatorWithPadding
from datasets import load_dataset, Dataset
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, f1_score, make_scorer
import evaluate
import torch

# Dataset uploading

In [None]:
import os 

DATASET_PATHS = {
    "local": {
        "train": "../../datasets/train_set.csv",
        "test": "../../datasets/test_set.csv"
    },
    "kaggle": {
        "train": "/kaggle/input/python-codes-time-complexity/train_set.csv",
        "test": "/kaggle/input/python-codes-time-complexity/test_set.csv"
    }
}

def upload_datasets(dataset_paths=DATASET_PATHS):
    for path in dataset_paths:
        if os.path.exists(dataset_paths[path]['train']) and os.path.exists(dataset_paths[path]['test']):
            return dataset_paths[path]['train'], dataset_paths[path]['test']

    return FileNotFoundError(f"Datasets do not exist in the current paths: {dataset_paths}")
            

train_set_path, test_set_path = upload_datasets()

# Metrics

### Ordering labels by Hierarchy

In [None]:
LABELS_HIERARCHY = {
    'constant': 1,
    'logn': 2,
    'linear': 3,
    'nlogn': 4,
    'quadratic': 5,
    'cubic': 6,
    'np': 7
}

N_CLASSES = len(LABELS_HIERARCHY)

# Dataset uploading

In [None]:
train_set = load_dataset("csv", data_files=train_set_path)['train']
test_set = load_dataset("csv", data_files=test_set_path)['train']

train_labels = train_set['complexity']
test_labels = test_set['complexity']

# Checkpoint

In [None]:
checkpoints = ["microsoft/codebert-base", "neulab/codebert-python", "microsoft/graphcodebert-base", 
              "Salesforce/codet5-base", "Salesforce/codet5-base-codexglue-sum-python", "Salesforce/codet5p-220m-py",
              "Salesforce/codet5-base-multi-sum", "microsoft/unixcoder-base"]

# Tokenizing

## Label tokenizing

In [None]:
labelEncoder = LabelEncoder()
labelEncoder.fit(train_labels)

## Feature tokenizing

In [None]:
def tokenize_data(samples, tokenizer):
    tokenized = tokenizer(samples['code'], truncation=True, max_length=512)
    tokenized['labels'] = labelEncoder.transform(samples['complexity'])
    return tokenized


def set_tokenizer(checkpoint):
    try:
        tokenizer = AutoTokenizer.from_pretrained(checkpoint)
    except:
        checkpoint = "-".join(checkpoint.split("-")[:2])
        tokenizer = AutoTokenizer.from_pretrained(checkpoint)

    X_train = train_set.map(lambda x: tokenize_data(x, tokenizer), batched=True)
    X_eval = test_set.map(lambda x: tokenize_data(x, tokenizer), batched=True)

    # Collator for batch padding
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
    return tokenizer, data_collator, X_train, X_eval

# Evaluating

### Writing the custom metric *Hierarchy Complexity Score*

In [None]:
def hc_score(y_true, y_pred, n_classes=N_CLASSES):
    assert len(y_true) == len(y_pred), f"The amount of y_true labels: {len(y_true)} does not equal to the amount of y_pred: {len(y_pred)}."

    n_samples = len(y_true)
    
    return (np.sum(np.abs(y_pred - y_true)) / n_classes) / n_samples

## Computing metrics

In [None]:
def compute_metrics(eval_preds):
    logits, labels = eval_preds
    preds = np.argmax(logits[0], axis=-1) if isinstance(logits, tuple) else np.argmax(logits, axis=-1)

    # Calculate accuracy
    accuracy = accuracy_score(labels, preds)
    # Calculate F-1 Macro
    f1_macro_score = f1_score(labels, preds, average='macro')
    # Calculate Hierarchy Score
    hierarchy_score = hc_score(labels, preds)

    return {
        "accuracy": accuracy,
        "f1_macro": f1_macro_score,
        "hierarchy_score": hierarchy_score
    }

# Model

In [None]:
def set_model(checkpoint, lora=None):
    model = AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=7) if not lora else lora
    return model

# Hyperparameters

In [None]:
def set_training_args(checkpoint, batch_size=16):
    training_args = TrainingArguments(output_dir=f"training_results/{checkpoint}/", 
                                      eval_strategy="epoch",
                                      save_strategy="epoch",
                                      logging_strategy="epoch",
                                      bf16=True,
                                      report_to='none',
                                      num_train_epochs=3,
                                      per_device_train_batch_size=batch_size,
                                      per_device_eval_batch_size=batch_size,
                                      gradient_accumulation_steps = 2,
                                      load_best_model_at_end=True,
                                      #label_names=['complexity']
                                     )
    return training_args

# LoRA

from peft import LoraConfig, get_peft_model

config = LoraConfig(
    r=16, 
    lora_alpha=16,
    target_modules = ["q_proj", "v_proj"] # Not sure about this
    lora_dropout=0.1,
    bias='none',
    modules_to_save=['classifier'] # Not sure about this one either
)

model_lora = get_peft_model(model=set_model("Salesforce/codet5p-770m-py"), peft_config=config)
model_lora.print_trainable_parameters()

# Trainer 

In [None]:
!rm -rf training_results

In [None]:
def mass_train_models(checkpoints):
    num_of_checkpoints = len(checkpoints)
    
    for idx, checkpoint in enumerate(checkpoints):
        
        # Collecting
        tokenizer, data_collator, train_set, eval_set = set_tokenizer(checkpoint)
        model = set_model(checkpoint)
        training_args = set_training_args(checkpoint=checkpoint, batch_size=4)
    
        # Building
        trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_set,
        eval_dataset=eval_set,
        data_collator=data_collator,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics)

        # Train
        print(f"CHECKPOINT[{idx+1}/{num_of_checkpoints}]: {checkpoint}")
        trainer.train()
        
        # Save metrics
        test_metrics = trainer.evaluate(eval_dataset=eval_set)
        trainer.save_metrics(split="test", metrics=test_metrics)


mass_train_models(checkpoints[4:])

# Flushing CUDA

In [None]:
!pip install GPUtil

import torch
from GPUtil import showUtilization as gpu_usage
from numba import cuda

def free_gpu_cache():
    print("Initial GPU Usage")
    gpu_usage()                             

    torch.cuda.empty_cache()

    cuda.select_device(0)
    cuda.close()
    cuda.select_device(0)

    print("GPU Usage after emptying the cache")
    gpu_usage()

free_gpu_cache()                           

Initial GPU Usage
| ID | GPU | MEM |
------------------
|  0 |  0% | 50% |
|  1 |  0% | 37% |


# Inference

In [None]:
print(torch.cuda.device_count())

In [None]:
device = torch.cuda.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
def predict(inputs):
    # Tokenizing inputs
    test_sample = tokenizer(inputs, return_tensors='pt')
    inputs = Dataset.from_dict({key: value.to(model.device) for key, value in test_sample.items()})

    # Predicting & decoding inputs
    preds = trainer.predict(test_dataset=inputs)
    preds = labelEncoder.inverse_transform(y=np.ravel(np.argmax(preds.predictions, axis=-1)))
    
    return preds

In [None]:
test_sample = """
class Solution:
    def isValid(self, s: str) -> bool:
        bracketMap = {"(": ")", "[": "]", "{": "}"}
        openSet = set(["(", "[", "{"])
        stack = []
        for char in s:
            if char in openSet:
                stack.append(char)
            elif stack and char == bracketMap[stack[-1]]:
                stack.pop()
            else:
                return False
        return stack == []
        """

predict(test_sample)