In [1]:
import h5py
import pandas as pd
from sklearn.utils import resample
from transformers import RobertaTokenizer, RobertaForSequenceClassification
from transformers import Trainer, TrainingArguments
import torch
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score, roc_curve, auc
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import label_binarize

# Function to load and process the HDF5 file
def load_and_process_hdf5(file_path):
    with h5py.File(file_path, 'r') as hdf:
        # List all keys
        print(f"Keys in {file_path}: {list(hdf.keys())}")

        # Load the datasets into pandas Series
        cwe_119_data = pd.Series(hdf['CWE-119'][:], name='CWE-119')
        cwe_120_data = pd.Series(hdf['CWE-120'][:], name='CWE-120')
        cwe_469_data = pd.Series(hdf['CWE-469'][:], name='CWE-469')
        cwe_476_data = pd.Series(hdf['CWE-476'][:], name='CWE-476')
        cwe_other_data = pd.Series(hdf['CWE-other'][:], name='CWE-other')
        function_source_data = pd.Series(hdf['functionSource'][:], name='functionSource')

    # Create a DataFrame with the boolean columns
    df = pd.concat([cwe_119_data, cwe_120_data, cwe_469_data, cwe_476_data, cwe_other_data], axis=1)

    # Create a new column 'Class' based on the boolean columns
    def assign_class(row):
        if row['CWE-119']:
            return 0
        elif row['CWE-120']:
            return 1
        elif row['CWE-469']:
            return 2
        elif row['CWE-476']:
            return 3
        elif row['CWE-other']:
            return 4
        else:
            return -1  # In case none of the columns are True

    df['Class'] = df.apply(assign_class, axis=1)

    # Now eliminate rows where Class = -1 and the corresponding functionSource
    mask = df['Class'] != -1
    df_filtered = df[mask]
    function_source_filtered = function_source_data[mask]

    # Combine the filtered Class and functionSource data
    df_final = pd.concat([df_filtered['Class'], function_source_filtered], axis=1)

    return df_final

In [2]:
# Paths to your HDF5 files
train_hdf5_file_path = '/kaggle/input/vul-code/VDISC_train.hdf5'
test_hdf5_file_path = '/kaggle/input/vul-code/VDISC_test.hdf5'
validation_hdf5_file_path = '/kaggle/input/vul-code/VDISC_validate.hdf5'

# Process the training dataset
print("Processing Training Dataset:")
df_train_final = load_and_process_hdf5(train_hdf5_file_path)

# Downsample training set to 20,000 samples with the given proportions
train_sample_proportions = {0: 5942, 1: 5777, 4: 5582, 3: 2755, 2: 249}  # Based on your request
df_train_downsampled = pd.DataFrame()
for cls, n_samples in train_sample_proportions.items():
    class_data = df_train_final[df_train_final['Class'] == cls]
    class_downsampled = resample(class_data, replace=False, n_samples=n_samples, random_state=42)
    df_train_downsampled = pd.concat([df_train_downsampled, class_downsampled])

print("Final Training Data Class Distribution:")
print(df_train_downsampled['Class'].value_counts())

# Process the validation dataset and downsample to 3,900 samples
print("\nProcessing Validation Dataset:")
df_val_final = load_and_process_hdf5(validation_hdf5_file_path)
val_sample_proportions = {0: 1142, 1: 1099, 4: 1071, 3: 535, 2: 53}  # Recalculated for 3900 samples
df_val_downsampled = pd.DataFrame()
for cls, n_samples in val_sample_proportions.items():
    class_data = df_val_final[df_val_final['Class'] == cls]
    class_downsampled = resample(class_data, replace=False, n_samples=n_samples, random_state=42)
    df_val_downsampled = pd.concat([df_val_downsampled, class_downsampled])

print("Final Validation Data Class Distribution:")
print(df_val_downsampled['Class'].value_counts())

# Process the test dataset and downsample to 3,900 samples
print("\nProcessing Test Dataset:")
df_test_final = load_and_process_hdf5(test_hdf5_file_path)
test_sample_proportions = {0: 1142, 1: 1099, 4: 1071, 3: 535, 2: 53}  # Recalculated for 3900 samples
df_test_downsampled = pd.DataFrame()
for cls, n_samples in test_sample_proportions.items():
    class_data = df_test_final[df_test_final['Class'] == cls]
    class_downsampled = resample(class_data, replace=False, n_samples=n_samples, random_state=42)
    df_test_downsampled = pd.concat([df_test_downsampled, class_downsampled])

print("Final Test Data Class Distribution:")
print(df_test_downsampled['Class'].value_counts())


Processing Training Dataset:
Keys in /kaggle/input/vul-code/VDISC_train.hdf5: ['CWE-119', 'CWE-120', 'CWE-469', 'CWE-476', 'CWE-other', 'functionSource']
Final Training Data Class Distribution:
Class
0    5942
1    5777
4    5582
3    2755
2     249
Name: count, dtype: int64

Processing Validation Dataset:
Keys in /kaggle/input/vul-code/VDISC_validate.hdf5: ['CWE-119', 'CWE-120', 'CWE-469', 'CWE-476', 'CWE-other', 'functionSource']
Final Validation Data Class Distribution:
Class
0    1142
1    1099
4    1071
3     535
2      53
Name: count, dtype: int64

Processing Test Dataset:
Keys in /kaggle/input/vul-code/VDISC_test.hdf5: ['CWE-119', 'CWE-120', 'CWE-469', 'CWE-476', 'CWE-other', 'functionSource']
Final Test Data Class Distribution:
Class
0    1142
1    1099
4    1071
3     535
2      53
Name: count, dtype: int64


In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report, roc_auc_score
from sklearn.preprocessing import label_binarize
import numpy as np
import pandas as pd

# Custom Dataset class to handle encodings and labels
class CodeBERTDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long).clone().detach()
        return item

    def __len__(self):
        return len(self.labels)

# Function to check and clean NaN values in DataFrames
def clean_data(df):
    df = df.dropna()  # Drop rows with NaN values
    return df

# Function to tokenize data
def tokenize_function(df, tokenizer):
    return tokenizer(
        df['functionSource'].astype(str).tolist(),
        padding=True,
        truncation=True,
        max_length=512,
        return_tensors='pt'
    )

# Define a simple neural network for stacking
class StackingNN(nn.Module):
    def __init__(self, input_size, num_classes):
        super(StackingNN, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, num_classes)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)  # Output logits
        return self.softmax(x)  # Output probabilities

# Clean and process the datasets to ensure no NaN values
df_train_downsampled = clean_data(df_train_downsampled)
df_val_downsampled = clean_data(df_val_downsampled)
df_test_downsampled = clean_data(df_test_downsampled)

# Initialize tokenizer and models
graphcodebert_tokenizer = RobertaTokenizer.from_pretrained("microsoft/graphcodebert-base")
graphcodebert_model = RobertaForSequenceClassification.from_pretrained("microsoft/graphcodebert-base", num_labels=5)

unixcoder_tokenizer = RobertaTokenizer.from_pretrained("microsoft/unixcoder-base")
unixcoder_model = RobertaForSequenceClassification.from_pretrained("microsoft/unixcoder-base", num_labels=5)

# Tokenize the data
train_encodings_graphcodebert = tokenize_function(df_train_downsampled, graphcodebert_tokenizer)
train_encodings_unixcoder = tokenize_function(df_train_downsampled, unixcoder_tokenizer)

val_encodings_graphcodebert = tokenize_function(df_val_downsampled, graphcodebert_tokenizer)
val_encodings_unixcoder = tokenize_function(df_val_downsampled, unixcoder_tokenizer)

test_encodings_graphcodebert = tokenize_function(df_test_downsampled, graphcodebert_tokenizer)
test_encodings_unixcoder = tokenize_function(df_test_downsampled, unixcoder_tokenizer)

# Prepare labels
train_labels = df_train_downsampled['Class'].tolist()
val_labels = df_val_downsampled['Class'].tolist()
test_labels = df_test_downsampled['Class'].tolist()

# Dataset objects
train_dataset_graphcodebert = CodeBERTDataset(train_encodings_graphcodebert, train_labels)
val_dataset_graphcodebert = CodeBERTDataset(val_encodings_graphcodebert, val_labels)

train_dataset_unixcoder = CodeBERTDataset(train_encodings_unixcoder, train_labels)
val_dataset_unixcoder = CodeBERTDataset(val_encodings_unixcoder, val_labels)

# Test dataset objects (you missed these in the original code)
test_dataset_graphcodebert = CodeBERTDataset(test_encodings_graphcodebert, test_labels)
test_dataset_unixcoder = CodeBERTDataset(test_encodings_unixcoder, test_labels)

# Step 1: Define training arguments
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy='epoch',  # Add evaluation strategy to perform evaluation
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    learning_rate=2e-5,
    logging_dir='./logs',
    logging_steps=10,
    save_steps=500,
    save_total_limit=2,
    report_to="none"  # Disable Weights & Biases (W&B) logging
)

# Step 2: Train GraphCodeBERT and UniXcoder using Trainer
trainer_graphcodebert = Trainer(
    model=graphcodebert_model, 
    args=training_args,
    train_dataset=train_dataset_graphcodebert,
    eval_dataset=val_dataset_graphcodebert,  # Add validation dataset
    tokenizer=graphcodebert_tokenizer
)

trainer_unixcoder = Trainer(
    model=unixcoder_model, 
    args=training_args,
    train_dataset=train_dataset_unixcoder,
    eval_dataset=val_dataset_unixcoder,  # Add validation dataset
    tokenizer=unixcoder_tokenizer
)

# Train both models
trainer_graphcodebert.train()
trainer_unixcoder.train()

# Step 3: Get predictions for both models on the train set
graphcodebert_train_preds = trainer_graphcodebert.predict(train_dataset_graphcodebert).predictions
unixcoder_train_preds = trainer_unixcoder.predict(train_dataset_unixcoder).predictions

# Apply softmax to get probabilities
graphcodebert_train_probs = torch.softmax(torch.tensor(graphcodebert_train_preds), dim=1).numpy()
unixcoder_train_probs = torch.softmax(torch.tensor(unixcoder_train_preds), dim=1).numpy()

# Combine predictions to form stacking input
stacked_train_data = np.hstack((graphcodebert_train_probs, unixcoder_train_probs))

# Step 4: Create and train the StackingNN model
stacking_model = StackingNN(input_size=stacked_train_data.shape[1], num_classes=5)
optimizer = optim.Adam(stacking_model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

# Convert to torch tensor
X_train_tensor = torch.tensor(stacked_train_data, dtype=torch.float32)
y_train_tensor = torch.tensor(train_labels, dtype=torch.long)

# Create DataLoader
train_loader = DataLoader(torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor), batch_size=16, shuffle=True)

# Train the neural network
for epoch in range(10):  # Number of epochs can be adjusted
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = stacking_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

# Step 5: Get predictions on the test set from both base models
graphcodebert_test_preds = trainer_graphcodebert.predict(test_dataset_graphcodebert).predictions
unixcoder_test_preds = trainer_unixcoder.predict(test_dataset_unixcoder).predictions

# Apply softmax to get probabilities for test set
graphcodebert_test_probs = torch.softmax(torch.tensor(graphcodebert_test_preds), dim=1).numpy()
unixcoder_test_probs = torch.softmax(torch.tensor(unixcoder_test_preds), dim=1).numpy()

# Combine predictions for the test set
stacked_test_data = np.hstack((graphcodebert_test_probs, unixcoder_test_probs))
X_test_tensor = torch.tensor(stacked_test_data, dtype=torch.float32)

# Step 6: Get the final predictions from the stacking neural network
with torch.no_grad():
    final_test_probs = stacking_model(X_test_tensor)
    final_predictions = torch.argmax(final_test_probs, dim=1).numpy()

# Step 7: Evaluate the final predictions
accuracy = accuracy_score(test_labels, final_predictions)
precision, recall, f1, _ = precision_recall_fscore_support(test_labels, final_predictions, average='weighted')
classification_report_final = classification_report(test_labels, final_predictions, digits=4)

# Compute AUC score for multi-class classification
test_labels_binarized = label_binarize(test_labels, classes=[0, 1, 2, 3, 4])
auc_score = roc_auc_score(test_labels_binarized, final_test_probs.numpy(), multi_class='ovr')

print(f"Stacked Neural Network Accuracy: {accuracy:.4f}")
print(f"Stacked Neural Network Precision: {precision:.4f}")
print(f"Stacked Neural Network Recall: {recall:.4f}")
print(f"Stacked Neural Network F1-Score: {f1:.4f}")
print(f"Stacked Neural Network AUC-Score: {auc_score:.4f}")
print("\nFinal Classification Report:\n", classification_report_final)


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/772 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/539 [00:00<?, ?B/s]



pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/graphcodebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


tokenizer_config.json:   0%|          | 0.00/1.11k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/938k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/444k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/772 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/691 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/504M [00:00<?, ?B/s]

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/unixcoder-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}


Epoch,Training Loss,Validation Loss
1,0.7881,0.638707
2,0.5565,0.612684
3,0.4215,0.624774


  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}


Epoch,Training Loss,Validation Loss
1,0.7566,0.629912
2,0.4889,0.572526
3,0.3696,0.593555


  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}


Stacked Neural Network Accuracy: 0.8177
Stacked Neural Network Precision: 0.8155
Stacked Neural Network Recall: 0.8177
Stacked Neural Network F1-Score: 0.8154
Stacked Neural Network AUC-Score: 0.8826

Final Classification Report:
               precision    recall  f1-score   support

           0     0.8936    0.8748    0.8841      1142
           1     0.8415    0.8790    0.8598      1099
           2     0.5263    0.1887    0.2778        53
           3     0.7708    0.7607    0.7658       535
           4     0.7424    0.7535    0.7479      1071

    accuracy                         0.8177      3900
   macro avg     0.7549    0.6913    0.7071      3900
weighted avg     0.8155    0.8177    0.8154      3900



In [4]:
import os

# Plot AUC-ROC curves for each class and save the figure
def plot_roc_auc_curve(test_labels, predictions, num_classes=5):
    test_labels_bin = label_binarize(test_labels, classes=[0, 1, 2, 3, 4])  # Binarize the labels for multi-class ROC
    fpr = dict()
    tpr = dict()
    roc_auc = dict()

    for i in range(num_classes):
        fpr[i], tpr[i], _ = roc_curve(test_labels_bin[:, i], predictions[:, i])
        roc_auc[i] = roc_auc_score(test_labels_bin[:, i], predictions[:, i])

    # Plot ROC curve for each class
    plt.figure(figsize=(10, 8))
    colors = ['aqua', 'darkorange', 'cornflowerblue', 'green', 'red']
    for i in range(num_classes):
        plt.plot(fpr[i], tpr[i], color=colors[i], lw=2,
                 label=f'Class {i} ROC curve (area = {roc_auc[i]:.2f})')

    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) for each class')
    plt.legend(loc="lower right")

    # Save the figure to a file
    output_dir = './roc_auc_plots'
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    plt.savefig(os.path.join(output_dir, 'roc_auc_curve.png'))  # Save the figure as PNG
    plt.show()

# Compute metrics function with ROC-AUC curve plotting and saving
def compute_metrics(p):
    preds = np.argmax(p.predictions, axis=1)
    
    precision, recall, f1, _ = precision_recall_fscore_support(p.label_ids, preds, average='weighted')
    acc = accuracy_score(p.label_ids, preds)
    
    # Compute ROC-AUC for each class using one-vs-rest approach
    label_binarized = label_binarize(p.label_ids, classes=[0, 1, 2, 3, 4])
    auc_score = roc_auc_score(
        y_true=label_binarized,
        y_score=p.predictions,
        multi_class="ovr"
    )

    # Plot ROC curves for each class and save to a file
    plot_roc_auc_curve(p.label_ids, p.predictions)

    return {
        'accuracy': acc,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'roc_auc': auc_score
    }
