## Add dataset "GPTSniffer" as input first

In [43]:
import torch
import torch.nn as nn
import torch.optim as optim

from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoModel, Trainer, TrainingArguments
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import confusion_matrix
import os
os.environ["OMP_NUM_THREADS"] = "1" # export OMP_NUM_THREADS=1
os.environ["OPENBLAS_NUM_THREADS"] = "1" # export OPENBLAS_NUM_THREADS=1 
os.environ["MKL_NUM_THREADS"] = "1" # export MKL_NUM_THREADS=1
os.environ["VECLIB_MAXIMUM_THREADS"] = "1" # export VECLIB_MAXIMUM_THREADS=1
import pandas as pd
import numpy as np
#from timm.optim.lion import Lion
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

from sklearn.metrics import accuracy_score

In [44]:
import random

# Set seeds for reproducibility
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

# If using GPU, set the seed for the CUDA operations as well
torch.cuda.manual_seed_all(seed)

In [45]:
def get_code_without_comments(filepath):
    with open(filepath, 'rb') as f:
        lines = f.readlines()
    code_lines = []
    for line in tokenize.tokenize(lines.__iter__().__next__):
        if line.type != tokenize.COMMENT:
            code_lines.append(line.string)
    return ''.join(code_lines)

In [46]:
import re

def remove_java_comments(filepath):
    with open(filepath, 'r') as f:
        code = f.read()  # Read the entire file content

    # Remove single-line comments (//)
    code = re.sub(r'//.*', '', code)

    # Remove multi-line comments (/* ... */)
    code = re.sub(r'/\*.*?\*/', '', code, flags=re.DOTALL)

    return code


In [47]:
# define the dataset
class CodeDataset(Dataset):
    def __init__(self, directory):
        self.samples = []
        for filename in os.listdir(directory):
            label = int(filename.split('_')[0])
            code = remove_java_comments(os.path.join(directory, filename))
            self.samples.append((code, label))
            # with open(os.path.join(directory, filename), 'r') as f:
            #     code = f.read()
            #     self.samples.append((code, label))
    
    def __len__(self):
        return len(self.samples)

    def __getitem__(self, index):
        code, label = self.samples[index]
        inputs = tokenizer.encode_plus(code, padding='max_length', max_length=512, truncation=True)
        input_ids = inputs['input_ids']
        attention_mask = inputs['attention_mask']
        return {
            'input_ids': torch.tensor(input_ids, dtype=torch.long), 
            'attention_mask': torch.tensor(attention_mask, dtype=torch.long), 
            'labels': torch.tensor(label, dtype=torch.long)
        }

In [48]:
#from google.colab import drive
#drive.mount('/content/drive/',force_remount=False)
#DATA_PATH = '/content/drive/My Drive/Colab Notebooks/SourceSniffer/'
DATA_PATH = '/kaggle/input/gptsniffer/dataset-1/dataset-1/'
from os.path import join


# Set the directory where the training and testing data is stored
train_data_path = join(DATA_PATH,'training_data')
test_data_path = join(DATA_PATH,'testing_data') 


# Define the training dataset and dataloader
train_dataset = CodeDataset(train_data_path)
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)

# Define the testing dataset and dataloader
test_dataset = CodeDataset(test_data_path)
test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False)


# Set device to GPU if available, otherwise use CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#device = torch.device('cpu')

In [49]:
class SupConLoss(nn.Module):
    def __init__(self, temperature=0.07):
        super(SupConLoss, self).__init__()
        self.temperature = temperature

    def forward(self, features, labels):
        """
        features: Tensor of shape [batch_size, feature_dim]
        labels: Tensor of shape [batch_size]
        """
        batch_size = features.size(0)
        mask = torch.eq(labels.unsqueeze(1), labels.unsqueeze(0)).float()  # Binary label mask
        features_normalized = features / features.norm(dim=1, keepdim=True)  # Normalize features
        
        logits = torch.div(
            torch.mm(features_normalized, features_normalized.T),
            self.temperature
        )  # Pairwise similarity logits
        logits_max = torch.max(logits, dim=1, keepdim=True).values
        logits -= logits_max.detach()  # Numerical stability

        exp_logits = torch.exp(logits) * mask  # Mask for same-class samples
        log_prob = logits - torch.log(exp_logits.sum(dim=1, keepdim=True) + 1e-12)

        # Compute mean log-probability for positive pairs
        mask_sum = mask.sum(dim=1)
        mean_log_prob_pos = (log_prob * mask).sum(dim=1) / (mask_sum + 1e-12)  # Avoid division by zero
        loss = -mean_log_prob_pos.mean()  # Average loss
        return loss


In [50]:
# Model with Binary Classifier
class CodeBERTBinaryClassifier(nn.Module):
    def __init__(self, encoder_model, hidden_size=256, num_layers=2):
        super(CodeBERTBinaryClassifier, self).__init__()
        self.encoder = encoder_model
        # self.classifier = nn.Linear(self.encoder.config.hidden_size, 1)  # Binary classification

        # self.classifier = nn.Sequential(
        #     nn.Dropout(0.3),  # Dropout with 30%
        #     nn.Linear(self.encoder.config.hidden_size, 1)
        # )

        self.classifier = nn.Sequential(
            nn.Dropout(0.3),  # Dropout with 30%
            nn.Linear(self.encoder.config.hidden_size, 128),  # Hidden layer with 128 units
            nn.BatchNorm1d(128),  # Batch normalization for the hidden layer
            nn.ReLU(),  # ReLU activation for the hidden layer
            nn.Dropout(0.3),  # Dropout with 30%
            nn.Linear(128, 1)  # Output layer with 1 unit
        )
        
        # # Define an MLP classifier with hidden layers
        # layers = []
        # input_size = self.encoder.config.hidden_size  # Output size of the encoder
        # output_size = 1  # For binary classification
        
        # # Adding hidden layers to the MLP
        # for _ in range(num_layers):
        #     layers.append(nn.Linear(input_size, hidden_size))
        #     layers.append(nn.ReLU())  # Use ReLU activation after each hidden layer
        #     layers.append(nn.Dropout(0.1))  # Optional: dropout for regularization
        #     input_size = hidden_size  # The output size of the previous layer becomes input size
        
        # # Output layer
        # layers.append(nn.Linear(hidden_size, output_size))
        # layers.append(nn.Sigmoid())  # Sigmoid for binary classification output
        
        # # Combine all layers into a sequential module
        # self.classifier = nn.Sequential(*layers)

    def forward(self, input_ids, attention_mask):
        outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
        cls_output = outputs.last_hidden_state[:, 0, :]  # [CLS] token representation
        logits = self.classifier(cls_output.detach()).squeeze(-1)  # Squeeze for binary logit
        return logits, cls_output

In [51]:
# Define the tokenizer and the model
# tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base")
# base_model = AutoModel.from_pretrained("microsoft/codebert-base")

tokenizer = AutoTokenizer.from_pretrained("microsoft/graphcodebert-base")
base_model = AutoModel.from_pretrained("microsoft/graphcodebert-base")

model = CodeBERTBinaryClassifier(base_model)
model = model.to(device)

# optimizer = optim.AdamW(model.parameters(), lr=2e-5)
optimizer = optim.AdamW(
    [
        {"params": model.encoder.parameters(), "lr": 1e-5},  # Pre-trained layers
        {"params": model.classifier.parameters(), "lr": 1e-4},     # Task-specific head
    ],
    weight_decay=1e-2,
)
criterion = SupConLoss(temperature=0.07)

Some weights of RobertaModel were not initialized from the model checkpoint at microsoft/graphcodebert-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [52]:
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, inputs, targets):
        bce_loss = F.binary_cross_entropy_with_logits(inputs, targets, reduction='none')
        pt = torch.exp(-bce_loss)  # Probabilities
        focal_loss = self.alpha * (1 - pt) ** self.gamma * bce_loss
        return focal_loss.mean()

In [53]:
from tqdm import tqdm
import torch.nn.functional as F
# Define BCEWithLogitsLoss
classification_loss_fn = torch.nn.BCEWithLogitsLoss()
# Training Loop
alpha, beta = 2.0, 1.0  # Loss scaling factors
epochs = 6

alpha = 0.75  # Weight for positive class
gamma = 2.0   # Focusing parameter
focal_loss_fn = FocalLoss(alpha=alpha, gamma=gamma)

model.train()
for epoch in range(epochs):
    epoch_loss = 0
    for batch in tqdm(train_dataloader, desc=f"Epoch {epoch+1}/{epochs}"):
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        optimizer.zero_grad()
        logits, features = model(input_ids, attention_mask)

        features = F.normalize(features, p=2, dim=1)
        supcon_loss  = criterion(features, labels.float())
        # BCE loss on logits
        bce_loss = nn.BCEWithLogitsLoss()(logits.squeeze(-1), labels.float())
        # Combine the two losses
        # total_loss = supcon_loss + bce_loss
        total_loss = alpha * supcon_loss + beta * bce_loss
        
        # Backpropagation and optimizer step
        total_loss.backward()
        optimizer.step()
        epoch_loss += total_loss.item()
    print(f"Epoch {epoch+1}/{epochs} Loss: {epoch_loss/len(train_dataloader):.4f}")
    torch.save(model.state_dict(), f'gpt_classifier_{epoch}.pth')

Epoch 1/6: 100%|██████████| 150/150 [01:08<00:00,  2.20it/s]


Epoch 1/6 Loss: 1.9065


Epoch 2/6: 100%|██████████| 150/150 [01:08<00:00,  2.18it/s]


Epoch 2/6 Loss: 1.7302


Epoch 3/6: 100%|██████████| 150/150 [01:08<00:00,  2.18it/s]


Epoch 3/6 Loss: 1.7328


Epoch 4/6: 100%|██████████| 150/150 [01:08<00:00,  2.18it/s]


Epoch 4/6 Loss: 1.7161


Epoch 5/6: 100%|██████████| 150/150 [01:08<00:00,  2.18it/s]


Epoch 5/6 Loss: 1.7352


Epoch 6/6: 100%|██████████| 150/150 [01:08<00:00,  2.18it/s]


Epoch 6/6 Loss: 1.7249


In [54]:
torch.save(model.state_dict(), "codebert_binary_classifier.pth")

In [55]:
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm

def infer_test_dataset(model, test_dataset, batch_size=8, device="cuda"):
    model.eval()  # Set the model to evaluation mode
    dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    predictions = []
    logits_list = []

    with torch.no_grad():  # Disable gradient computation for inference
        for batch in tqdm(dataloader, desc="Inferencing"):
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)

            # Forward pass through the model
            logits, _ = model(input_ids, attention_mask)
            logits = logits.cpu()  # Move logits back to CPU for processing
            probabilities = torch.sigmoid(logits)
            preds = (probabilities > 0.5).long()  # Threshold at 0.5 for binary labels

            predictions.extend(preds.tolist())
            logits_list.extend(probabilities.tolist())

    return predictions, logits_list

In [56]:
# checkpoint = torch.load("/kaggle/working/gpt_classifier_5.pth")
# model.load_state_dict(checkpoint)


# Ground truth labels (from your test dataset)
true_labels = [sample['labels'].item() for sample in test_dataset]

predictions, logits = infer_test_dataset(model, test_dataset, batch_size=8, device=device)

# Analyze predictions
# for i, (pred, logit, true) in enumerate(zip(predictions, logits, true_labels)):
#     print(f"Sample {i}: Predicted Label = {pred}, Logit = {logit}, True Label = {true}")

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

# Calculate metrics
accuracy = accuracy_score(true_labels, predictions)
precision = precision_score(true_labels, predictions)
recall = recall_score(true_labels, predictions)
f1 = f1_score(true_labels, predictions)
conf_matrix = confusion_matrix(true_labels, predictions)

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print("Confusion Matrix:")
print(conf_matrix)

Inferencing: 100%|██████████| 35/35 [00:04<00:00,  7.41it/s]

Accuracy: 0.7766
Precision: 0.8070
Recall: 0.7023
F1 Score: 0.7510
Confusion Matrix:
[[120  22]
 [ 39  92]]



