In [None]:
#CodeT5+-Base using code_x_glue_cc_defect_detection dataset

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import T5Tokenizer, T5EncoderModel, AutoTokenizer
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from datasets import load_dataset
import time

# Dataset class
class CodeDataset(Dataset):
    def __init__(self, codes, labels, tokenizer, max_length):
        self.codes = codes
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.codes)

    def __getitem__(self, idx):
        code = self.codes[idx]
        label = self.labels[idx]
        encodings = self.tokenizer(code, truncation=True, padding='max_length', max_length=self.max_length, return_tensors="pt")
        input_ids = encodings['input_ids'].squeeze()
        attention_mask = encodings['attention_mask'].squeeze()
        return input_ids, attention_mask, label

# Model class
class CodeClassifier(nn.Module):
    def __init__(self, encoder, num_classes, hidden_dim):
        super(CodeClassifier, self).__init__()
        self.encoder = encoder
        self.fc = nn.Linear(768, num_classes)
        self.dropout = nn.Dropout(0.2)

    def forward(self, input_ids, attention_mask):
        encoder_outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask).last_hidden_state
        pooled_output = encoder_outputs[:, 0]  # Use [CLS] token representation
        pooled_output = self.dropout(pooled_output)
        logits = self.fc(pooled_output)
        return logits

# Load and preprocess the dataset
def load_code_x_glue_defect_detection():
    dataset = load_dataset('code_x_glue_cc_defect_detection')
    train_codes = dataset['train']['func']
    train_labels = dataset['train']['target']
    valid_codes = dataset['validation']['func']
    valid_labels = dataset['validation']['target']
    test_codes = dataset['test']['func']
    test_labels = dataset['test']['target']
    return (train_codes, train_labels), (valid_codes, valid_labels), (test_codes, test_labels)

# Encode labels
def encode_labels(labels):
    encoder = LabelEncoder()
    encoded_labels = encoder.fit_transform(labels)
    return encoded_labels, encoder

# Prepare DataLoader
def create_dataloaders(train_data, valid_data, tokenizer, max_length, batch_size):
    train_dataset = CodeDataset(train_data[0], train_data[1], tokenizer, max_length)
    valid_dataset = CodeDataset(valid_data[0], valid_data[1], tokenizer, max_length)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(valid_dataset, batch_size=batch_size)
    return train_loader, val_loader

# Train the model
def train_model(model, train_loader, val_loader, device, num_epochs=5, learning_rate=2e-5):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.NAdam(model.parameters(), lr=learning_rate)
    model.to(device)

    # Print total trainable parameters
    total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Total trainable parameters: {total_params:,}")

    # Start timing
    start_time = time.time()

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for input_ids, attention_mask, labels in train_loader:
            input_ids, attention_mask, labels = input_ids.to(device), attention_mask.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_train_loss = total_loss / len(train_loader)
        print(f'Epoch {epoch+1}, Loss: {avg_train_loss:.4f}')

        # Validation
        val_metrics = evaluate_model(model, val_loader, device, silent=True)
        print(f'Validation Accuracy: {val_metrics["accuracy"]:.4f}')

    # End timing
    end_time = time.time()
    training_time = end_time - start_time
    print(f"Total training time: {training_time:.2f} seconds")

# Evaluate the model
def evaluate_model(model, data_loader, device, silent=False):
    model.eval()
    preds = []
    labels = []
    total_loss = 0
    criterion = nn.CrossEntropyLoss()

    with torch.no_grad():
        for input_ids, attention_mask, label in data_loader:
            input_ids, attention_mask, label = input_ids.to(device), attention_mask.to(device), label.to(device)
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, label)
            total_loss += loss.item()
            pred = torch.argmax(outputs, dim=1)
            preds.extend(pred.cpu().numpy())
            labels.extend(label.cpu().numpy())

    accuracy = accuracy_score(labels, preds)
    precision = precision_score(labels, preds, average='weighted')
    recall = recall_score(labels, preds, average='weighted')
    f1 = f1_score(labels, preds, average='weighted')
    avg_loss = total_loss / len(data_loader)

    metrics = {
        "loss": avg_loss,
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1
    }

    if not silent:
        print("Evaluation Metrics:")
        print(f'Loss: {avg_loss:.4f}')
        print(f'Accuracy: {accuracy:.4f}')
        print(f'Precision: {precision:.4f}')
        print(f'Recall: {recall:.4f}')
        print(f'F1 Score: {f1:.4f}')

        # Generate classification report
        report = classification_report(labels, preds, target_names=['True', 'False'])
        print("Classification Report: \n", report)

        # Compute confusion matrix
        conf_matrix = confusion_matrix(labels, preds)

        # Plot confusion matrix
        plt.figure(figsize=(10, 8))
        sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['True', 'False'], yticklabels=['True', 'False'])
        plt.xlabel('Predicted Labels')
        plt.ylabel('True Labels')
        plt.title('Confusion Matrix')
        plt.show()

    return metrics

# Save the model
def save_model(model, path='code_classifier.pth'):
    torch.save(model.state_dict(), path)

# Load the model
def load_model(path, encoder, num_classes):
    model = CodeClassifier(encoder, num_classes, 512)
    model.load_state_dict(torch.load(path))
    return model

# Main script
if __name__ == "__main__":
    # Load the dataset
    (train_codes, train_labels), (valid_codes, valid_labels), (test_codes, test_labels) = load_code_x_glue_defect_detection()

    # Encode labels
    train_labels, label_encoder = encode_labels(train_labels)
    valid_labels = label_encoder.transform(valid_labels)
    test_labels = label_encoder.transform(test_labels)

    # Load tokenizer and preprocess data
    pretrained_model_name = "Salesforce/codet5p-220m"  # Replace with your pre-trained model name
    tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name)
    max_length = 512

    # Prepare dataloaders
    batch_size = 8
    train_loader, val_loader = create_dataloaders((train_codes, train_labels), (valid_codes, valid_labels), tokenizer, max_length, batch_size)

    # Load the encoder model
    encoder_model = T5EncoderModel.from_pretrained(pretrained_model_name)

    # Build and train the model
    num_classes = len(label_encoder.classes_)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = CodeClassifier(encoder_model, num_classes, 512)

    train_model(model, train_loader, val_loader, device, num_epochs=5)

    # Save the trained model
    save_model(model, 'code_classifier.pth')

    # Load the model for evaluation
    loaded_model = load_model('code_classifier.pth', encoder_model, num_classes)
    loaded_model.to(device)

    # Evaluate the model
    print("Train Dataset:")
    train_metrics = evaluate_model(loaded_model, train_loader, device)
    print("Validation Dataset:")
    val_metrics = evaluate_model(loaded_model, val_loader, device)

    # Prepare test data loader
    test_dataset = CodeDataset(test_codes, test_labels, tokenizer, max_length)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    print("Test Dataset:")
    test_metrics = evaluate_model(loaded_model, test_loader, device)

    # Print metrics in desired format
    print("\neval_loss\teval_accuracy\teval_f1\teval_precision\teval_recall")
    print(f"train\t{train_metrics['loss']:.6f}\t{train_metrics['accuracy']:.6f}\t{train_metrics['f1']:.6f}\t{train_metrics['precision']:.6f}\t{train_metrics['recall']:.6f}")
    print(f"val\t{val_metrics['loss']:.6f}\t{val_metrics['accuracy']:.6f}\t{val_metrics['f1']:.6f}\t{val_metrics['precision']:.6f}\t{val_metrics['recall']:.6f}")
    print(f"test\t{test_metrics['loss']:.6f}\t{test_metrics['accuracy']:.6f}\t{test_metrics['f1']:.6f}\t{test_metrics['precision']:.6f}\t{test_metrics['recall']:.6f}")
