# Classificação de Vulnerabilidades utilizando Aprendizado Auto-Supervisionado

### Grupo : Classificação de Vulnerabilidades utilizando Aprendizado Auto-Supervisionado

> #### Install necessary libraries :

In [None]:
%pip install gdown scikit-learn transformers pinecone unidecode torch peft datasets matplotlib python-dotenv ollama

> #### Import necessary libraries :

In [None]:
# Metrics and performance evaluation
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_score, recall_score, f1_score, accuracy_score, classification_report  # Tools for evaluating classification models

# Hugging Face Transformers for pre-trained models and tokenization
from transformers import AutoTokenizer, AutoModelForMaskedLM, Trainer, TrainingArguments, TrainerCallback  # Pre-trained models and training utilities
from transformers import DataCollatorForLanguageModeling  # Data collation for masked language modeling tasks

# Parameter-efficient fine-tuning (PEFT) tools
from peft import LoraConfig, get_peft_model, PeftModel  # LoRa configurations and PEFT model utilities

# Data handling and preprocessing
from torch.utils.data import DataLoader, TensorDataset  # PyTorch data handling utilities
from sklearn.model_selection import train_test_split  # Train-test splitting
from sklearn.preprocessing import LabelEncoder  # Encoding target labels

# Pinecone for vector database operations
from pinecone import ServerlessSpec, Pinecone  # Pinecone serverless setup and API access

# Datasets and visualizations
from datasets import load_dataset  # Loading pre-built datasets
from sklearn.manifold import TSNE  # Dimensionality reduction for visualization
import matplotlib.pyplot as plt  # Plotting utilities

# Environment variable management
from dotenv import load_dotenv  # Load environment variables from a .env file

# PyTorch and related utilities
import torch.nn as nn  # Neural network modules
import torch  # Core PyTorch library

# File management and downloading
import gdown  # Download files from Google Drive
import json  # Handle JSON data
import os  # OS-level operations

# Miscellaneous utilities
import unidecode
import numpy as np  # Numerical operations
import pandas as pd  # Data handling with DataFrames
from tqdm.auto import tqdm  # Progress bar for loops
import time  # Time measurement
import re  # Regular expression operations

# Ollama API
import ollama  # Interaction with the Ollama library for model inference


# 1. Developed Model

> The fine-tuning process was conducted using the "roberta-base" model, a derivative of BERT developed by Facebook. Fine-tuning employed
> the LoRa (Low-Rank Adaptation) technique with a rank-8 matrix to accommodate computational resource constraints. The dataset consisted of CVE descriptions curated by the Mitre Corporation.
>
> The fine-tuned model was used to generate a vector database hosted on Pinecone, containing representations of the 25 most prevalent CWEs from the Mitre database. For retrieval, a top_k=10 strategy was applied, ensuring the most relevant contexts were selected for classification.
> 
> To enhance classification accuracy, an example builder was implemented to generate question-and-answer pairs illustrating correct CVE-to-CWE mappings. This was integrated into a Retrieval-Augmented Generation (RAG) strategy, where the vector database provided context information, and the examples served as in-context learning data for a Llama model.
> 
> This system enables the Llama model to accurately predict the appropriate CWE category for a given CVE description by leveraging both contextual and example-based learning.

# 1.1. Base-model fine-tunning 

## 1.1.1. Extract, transform and load data

### Dataset Download
This section ensures that the necessary dataset is downloaded and stored locally. The dataset is available via Google Drive, 
and the `gdown` library is used for downloading. 

The dataset is saved in the following location:
- **Directory**: `./app/data`
- **Filename**: `cve_dataset.csv`

If the directory does not exist, it will be created automatically. The dataset URL and its purpose are provided for transparency.

In [None]:
# Define the data directory and dataset URL
data_dir = "./app/data"
cve_dataset_url_download = "https://drive.google.com/uc?id=1vGQWNhupeN08fvtuHEBBuhH8sef8XJUu"
cve_dataset_path = os.path.join(data_dir, "cve_dataset.csv")

# Create the data directory if it does not exist
os.makedirs(data_dir, exist_ok=True)

# Download the dataset from the provided URL
# Arguments:
# - `cve_dataset_url_download` (str): The URL of the dataset to download.
# - `cve_dataset_path` (str): The local file path where the dataset will be saved.
# Returns:
# - The file path of the downloaded dataset.
gdown.download(cve_dataset_url_download, cve_dataset_path, quiet=False)

### Data Preprocessing
This section processes the CVE dataset to ensure it is clean and focused on relevant information. The steps include:
1. **Year extraction and filtering**: Only CVEs reported after the year 2000 are considered.
2. **Removing missing descriptions**: CVEs without descriptions are excluded as they are essential for analysis.
3. **Dropping unnecessary columns**: Columns `cwe` and `year` are removed to focus on the `cve_id` and `description`.
4. **Resetting the DataFrame index**: This ensures a clean, continuous index after filtering.

#### CWE Codes of Interest
The predefined list of CWE codes (`cwe_list`) includes common software weaknesses, which may be referenced in later steps of the analysis.

In [None]:
# Define the list of CWE codes of interest
# Arguments:
# - `cwe_list` (list): List of CWE codes that may be used in future steps for filtering or analysis.
# Purpose:
# - This ensures we have a predefined focus on specific types of vulnerabilities.
cwe_list = [
    "CWE-79", "CWE-787", "CWE-89", "CWE-352", "CWE-22", "CWE-125", "CWE-78",
    "CWE-416", "CWE-862", "CWE-434", "CWE-94", "CWE-20", "CWE-77", "CWE-287",
    "CWE-269", "CWE-502", "CWE-200", "CWE-863", "CWE-918", "CWE-119", "CWE-476",
    "CWE-798", "CWE-190", "CWE-400", "CWE-306"
]

# Read the CVE dataset
# Arguments:
# - `cve_dataset_path` (str): Path to the CSV file containing the CVE dataset.
# Returns:
# - A pandas DataFrame containing the loaded dataset.
cve_dataset = pd.read_csv(cve_dataset_path)

# Extract the year from the CVE ID and filter the dataset
# The CVE ID format is "CVE-YYYY-XXXX", so the second part is the year.
# Only consider entries with years greater than 2000.
cve_dataset['year'] = cve_dataset['cve_id'].apply(lambda x: int(x.split('-')[1]))
# cve_dataset = cve_dataset[cve_dataset['year'] > 2000]

# Remove entries with missing descriptions
# The 'description' column contains text describing the vulnerability.
cve_dataset.dropna(subset=['description'], inplace=True)

# Drop unnecessary columns to focus on relevant data
# - 'cwe': The CWE column is dropped because it is not used in subsequent analysis.
# - 'year': The year column is no longer needed after filtering.
cve_dataset.drop(columns=['cwe', 'year'], inplace=True)

# Reset the index to ensure a clean DataFrame
cve_dataset.reset_index(drop=True, inplace=True)

print(len(cve_dataset))


## 1.1.2. Split dataset for LoRa fine-tunning

### Data Splitting and Storage
This section splits the preprocessed dataset into training and testing sets for fine-tuning models. The steps include:
1. **Splitting the dataset**: The data is divided into 80% for training and 20% for testing, ensuring reproducibility with a fixed random seed.
2. **Saving to CSV**: The `description` column is saved as plain text in the following structure:
    - **Training dataset**: `./datasets/finetune/train.csv`
    - **Testing dataset**: `./datasets/finetune/test.csv`

In [None]:
# Create directories for storing fine-tuning datasets
# Arguments:
# - Directory path: `./datasets/finetune`
# Purpose:
# - Ensure the directory structure is in place for saving the train and test datasets.
os.makedirs("./datasets/finetune", exist_ok=True)

# Split the dataset into training and testing sets
# Arguments:
# - `cve_dataset` (DataFrame): The filtered and preprocessed CVE dataset.
# - `test_size` (float): Proportion of the dataset to include in the test split (20% here).
# - `random_state` (int): Random seed to ensure reproducibility of the split.
# Returns:
# - `cve_train_data` (DataFrame): Training portion of the dataset.
# - `cve_test_data` (DataFrame): Testing portion of the dataset.
cve_train_data, cve_test_data = train_test_split(
    cve_dataset,
    test_size=0.2,
    random_state=42
)

# Print the sizes of the training and testing datasets for validation
print(f"Training set size: {len(cve_train_data)}")
print(f"Testing set size: {len(cve_test_data)}")

# Save the training and testing datasets to CSV files
# The descriptions are extracted and saved as plain text for fine-tuning purposes.
# Arguments:
# - `index=False`: Avoids saving the DataFrame index.
# - `header=False`: Ensures only text is saved, suitable for certain NLP frameworks.

# Save training dataset
cve_train_data['description'].to_csv("./datasets/finetune/train.csv", index=False, header=False)
# Save testing dataset
cve_test_data['description'].to_csv("./datasets/finetune/test.csv", index=False, header=False)

# Save only the 'text' column as .txt files for model consumption
cve_train_data['description'].to_csv(f"./datasets/finetune/train.txt", index=False, header=False)
cve_test_data['description'].to_csv(f"./datasets/finetune/test.txt", index=False, header=False)


## 1.1.3. LoRa Fine-tunning Pipeline 

### Fine-Tuning with LoRA
This section fine-tunes the RoBERTa model using LoRA for masked language modeling. Key steps include:
1. **Model and LoRA configuration**: The base model is adapted with LoRA for efficient fine-tuning.
2. **Dataset preparation**: Training and testing datasets are tokenized with padding and truncation.
3. **Training**: The model is trained using a Trainer object with mixed-precision training enabled.
4. **Metrics and Saving**: Training metrics are logged, and the fine-tuned model is saved for later use.

In [None]:
# Define the version for model outputs
# Arguments:
# - `MODEL_VERSION` (int): Version number for the fine-tuned model.
MODEL_VERSION = 1
output_dir = f"./models/model_{MODEL_VERSION}"
os.makedirs(output_dir, exist_ok=True)

# Define base model and tokenizer
# The RoBERTa base model is used for masked language modeling (MLM).
model_name = "roberta-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
base_model = AutoModelForMaskedLM.from_pretrained(model_name)

# LoRA Configuration
# LoRA adapts specific layers of the base model with low-rank modifications for efficient fine-tuning.
lora_config = LoraConfig(
    task_type="MLM",  # Task: Masked Language Modeling
    r=8,              # Rank
    lora_alpha=16,    # Scaling factor
    lora_dropout=0.1, # Dropout rate for LoRA
    target_modules=["query", "key", "value"],  # Targeted attention layers
)
model = get_peft_model(base_model, lora_config)

# Load and preprocess dataset
# The dataset is loaded from the previously saved text files.
# Arguments:
# - `data_files`: Paths to train and test datasets.
dataset = load_dataset(
    "text", 
    data_files={"train": "./datasets/finetune/train.txt", "test": "./datasets/finetune/test.txt"}
)

# Preprocessing function for tokenizing text
# Arguments:
# - `examples` (dict): Dictionary containing text data.
# Returns:
# - Tokenized dataset with padding and truncation.
def preprocess_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=128)

# Apply preprocessing to the dataset
tokenized_datasets = dataset.map(preprocess_function, batched=True, remove_columns=["text"])

# Data collator for dynamic masking during training
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=True,
    mlm_probability=0.15,  # Probability of masking tokens
)

# Training arguments
training_args = TrainingArguments(
    output_dir=f"{output_dir}/results_lora",  # Directory for training results
    evaluation_strategy="epoch",  # Evaluate at the end of each epoch
    learning_rate=5e-4,  # Learning rate
    per_device_train_batch_size=16,  # Batch size for training
    per_device_eval_batch_size=16,   # Batch size for evaluation
    num_train_epochs=3,  # Number of training epochs
    weight_decay=0.01,  # Weight decay for regularization
    save_total_limit=2,  # Limit on saved models
    logging_steps=100,  # Logging frequency
    save_steps=500,  # Save model every 500 steps
    report_to="none",  # Disable reporting to external platforms
    fp16=True,  # Enable mixed-precision training
)

# Callback for saving metrics
class SaveMetricsCallback(TrainerCallback):
    def __init__(self, output_dir):
        self.output_dir = output_dir
        self.metrics = {"training": [], "evaluation": []}

    def on_log(self, args, state, control, logs=None, **kwargs):
        if logs:
            self.metrics["training"].append({"step": state.global_step, "logs": logs})

    def on_evaluate(self, args, state, control, metrics=None, **kwargs):
        if metrics:
            self.metrics["evaluation"].append({"step": state.global_step, "metrics": metrics})

    def on_train_end(self, args, state, control, **kwargs):
        metrics_file = os.path.join(self.output_dir, "training_metrics.json")
        with open(metrics_file, "w") as f:
            json.dump(self.metrics, f, indent=4)
        print(f"Saved metrics to {metrics_file}")

# Initialize callback for saving metrics
metrics_output_dir = f"{output_dir}/metrics"
os.makedirs(metrics_output_dir, exist_ok=True)
save_metrics_callback = SaveMetricsCallback(metrics_output_dir)

# Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    callbacks=[save_metrics_callback],
)

# Train the model
trainer.train()

# Save the fine-tuned model and tokenizer
model.save_pretrained(f"{output_dir}/fine_tuned_lora_mlm")
tokenizer.save_pretrained(f"{output_dir}/fine_tuned_lora_mlm")

## 1.1.4. Trainning graphics 

### Training Metrics Visualization
This section visualizes key metrics from the training logs:
1. **Training Loss**: Tracks the model's loss function value over training steps.
2. **Gradient Norm**: Visualizes the norm of gradients, which provides insights into the stability of the training process.
3. **Learning Rate**: Shows the learning rate schedule during training.

Each plot provides:
- **Steps (x-axis)**: The training step where the metric was logged.
- **Metric value (y-axis)**: The corresponding value of the metric.

#### Observations
- **Loss**: Should generally decrease over steps, indicating model improvement.
- **Grad Norm**: Large spikes may indicate instability in gradients.
- **Learning Rate**: Should follow the defined schedule, aiding in optimization.

In [None]:
# File path for the training state JSON file
# This file contains logs from the training process, including loss, gradient norms, and learning rates.
file_path = f"./models/model_{MODEL_VERSION}/results_lora/checkpoint-25608/trainer_state.json"

# Load the JSON data
# Arguments:
# - `file_path` (str): Path to the `trainer_state.json` file.
# Returns:
# - `data` (dict): Parsed JSON data containing training logs.
with open(file_path, "r") as file:
    data = json.load(file)

# Extract relevant metrics from the log history
# - `steps`: Training steps where metrics were logged.
# - `losses`: Training loss values at corresponding steps.
# - `grad_norms`: Gradient norms (if available) at corresponding steps.
# - `learning_rates`: Learning rates at corresponding steps.
log_history = data.get("log_history", [])
steps = [entry["step"] for entry in log_history if "loss" in entry]
losses = [entry["loss"] for entry in log_history if "loss" in entry]
grad_norms = [entry["grad_norm"] for entry in log_history if "grad_norm" in entry]
learning_rates = [entry["learning_rate"] for entry in log_history if "learning_rate" in entry]

# Visualization of training metrics
# Create three subplots: Training Loss, Gradient Norm, and Learning Rate over training steps.
fig, axes = plt.subplots(1, 3, figsize=(16, 6))

# Plot training loss
axes[0].plot(steps, losses, label="Training Loss", color="orange")
axes[0].set_title("Training Loss Over Steps")
axes[0].set_xlabel("Steps")
axes[0].set_ylabel("Loss")
axes[0].legend()
axes[0].grid(True)

# Plot gradient norms
axes[1].plot(steps[:len(grad_norms)], grad_norms, label="Grad Norm", color="blue")
axes[1].set_title("Grad Norm Over Steps")
axes[1].set_xlabel("Steps")
axes[1].set_ylabel("Grad Norm")
axes[1].legend()
axes[1].grid(True)

# Plot learning rates
axes[2].plot(steps[:len(learning_rates)], learning_rates, label="Learning Rate", color="green")
axes[2].set_title("Learning Rate Over Steps")
axes[2].set_xlabel("Steps")
axes[2].set_ylabel("Learning Rate")
axes[2].legend()
axes[2].grid(True)

# Adjust layout and display the plots
plt.tight_layout()
plt.show()

# 1.2. RAG Pipeline

## 1.2.1. Loading base and fine-tunned models

### Loading Base and Fine-Tuned Models
This section initializes the base model (`roberta-base`) and loads the fine-tuned LoRA adapter for inference or further analysis.

#### Steps:
1. **Base Model**:
    - The base model, RoBERTa, is loaded with its tokenizer.
    - Hidden states output is enabled for advanced downstream tasks.
2. **Fine-Tuned Model**:
    - The LoRA adapter, fine-tuned for masked language modeling, is loaded and integrated with the base model.
    - A separate tokenizer associated with the fine-tuned model is initialized.

#### Paths:
- **Base Model**: `roberta-base` from Hugging Face model hub.
- **Fine-Tuned Model**: Located at `./models/model_{MODEL_VERSION}/fine_tuned_lora_mlm`.

These models can now be used for tasks such as inference or evaluation.

In [None]:
# Define base model and tokenizer
# Arguments:
# - `base_model_name` (str): Name of the base model from Hugging Face's model hub.
# Returns:
# - `tokenizer_bm`: Tokenizer for the base model.
# - `base_model`: Pretrained RoBERTa model for masked language modeling.
base_model_name = "roberta-base"
tokenizer_bm = AutoTokenizer.from_pretrained(base_model_name)
base_model = AutoModelForMaskedLM.from_pretrained(
    base_model_name, 
    output_hidden_states=True  # Enables output of hidden states for additional tasks.
)

# Load fine-tuned model and tokenizer
# Arguments:
# - `adapter_path` (str): Path to the fine-tuned LoRA adapter model.
# Returns:
# - `tokenizer_ftm`: Tokenizer corresponding to the fine-tuned model.
# - `fine_tuned_model`: The base model combined with the fine-tuned LoRA adapter.
adapter_path = f"./models/model_{MODEL_VERSION}/fine_tuned_lora_mlm"
tokenizer_ftm = AutoTokenizer.from_pretrained(adapter_path)
fine_tuned_model = PeftModel.from_pretrained(base_model, adapter_path)

## 1.2.2. Defining models encoder

### Encoders for Embedding Extraction
In this section, we implement two functions for extracting text embeddings:
1. **`encoder_fine_tuned`**: Uses the fine-tuned model (with LoRA) to generate embeddings. Inputs are processed in batches, which improves efficiency for large datasets.
2. **`encoder_base_model`**: Uses the base model (without fine-tuning) to generate embeddings. This method processes inputs individually.

#### Notes
- **Hidden States**: Both functions extract the model's final hidden states and compute the mean of all tokens to obtain a representation vector for each input.
- **Use of `torch.no_grad`**: Ensures that operations are performed without storing gradients, saving memory.
- **Input Size**: The maximum size for each input is limited to 128 tokens, truncating as necessary.

These functions can be used to compare representations generated by fine-tuned models and base models, providing insights into the improvements brought by fine-tuning.


In [None]:
# Set the model to evaluation mode
fine_tuned_model.eval()

# Function to encode text inputs using the fine-tuned model
# Arguments:
# - `inputs` (list of str): List of text inputs to encode.
# - `tokenizer` (Tokenizer): Tokenizer associated with the fine-tuned model.
# - `model` (Model): The fine-tuned model to use for encoding.
# - `batch_size` (int): Number of inputs to process in each batch.
# Returns:
# - `embeddings` (list of numpy arrays): List of embeddings for the input text.
def encoder_fine_tuned(inputs, tokenizer, model, batch_size=16):
    embeddings = []
    for i in range(0, len(inputs), batch_size):
        # Process inputs in batches
        batch_inputs = inputs[i:i + batch_size]
        tokens = tokenizer(
            batch_inputs, 
            return_tensors="pt", 
            truncation=True, 
            padding=True, 
            max_length=128
        )
        with torch.no_grad():
            outputs = model(**tokens)
            hidden_states = outputs.hidden_states  # Extract hidden states from the model
            batch_embeddings = hidden_states[-1].mean(dim=1).cpu().numpy()  # Average embeddings over sequence length
            embeddings.extend(batch_embeddings)
    return embeddings

In [None]:
base_model.eval()

# Base model encoder
# Arguments:
# - `inputs` (list[str]): List of text inputs to encode.
# - `tokenizer` (AutoTokenizer): Tokenizer corresponding to the base model.
# - `model` (AutoModelForMaskedLM): Base model to generate embeddings.
# Returns:
# - `embeddings` (list[np.array]): List of embeddings generated by the model.
def encoder_base_model(inputs, tokenizer, model):
    embeddings = []
    for input in inputs:
        tokens = tokenizer(input, return_tensors="pt", truncation=True, padding=True, max_length=128)
        with torch.no_grad():
            outputs = model(input_ids=tokens['input_ids'])
            hidden_states = outputs.hidden_states  # Extract hidden states
            embedding = hidden_states[-1].mean(dim=1).squeeze().cpu().numpy()  # Mean of last hidden state
            embeddings.append(embedding)
    return embeddings

## 1.2.3. Generate t-SNE graphics 

### Download CWE-to-CVE Mapping Dataset
This section downloads a dataset that maps CWE (Common Weakness Enumeration) codes to CVE (Common Vulnerabilities and Exposures) entries.

#### Dataset Details
- **Source URL**: [Google Drive Link](https://drive.google.com/file/d/1FLnrXFRYinbhttmcZFnqq15t7DAykfJD)
- **Local Path**: `./app/data/cwe2cve_dataset.csv`

#### Steps
1. Check if the directory exists; if not, create it.
2. Download the file using the `gdown` library.
3. Save the dataset locally for further processing.

This dataset is essential for linking vulnerabilities (CVE) to their corresponding weaknesses (CWE), facilitating downstream analysis.

In [None]:
# Define the URL and local file path for the dataset
# Arguments:
# - `cwe2cve_dataset_url_download` (str): URL to download the CWE-to-CVE mapping dataset.
# - `cwe2cve_dataset_path` (str): Local path to save the downloaded dataset.
cwe2cve_dataset_url_download = "https://drive.google.com/uc?id=1FLnrXFRYinbhttmcZFnqq15t7DAykfJD"
cwe2cve_dataset_path = "./app/data/cwe2cve_dataset.csv"

# Create the directory if it doesn't exist
os.makedirs(os.path.dirname(cwe2cve_dataset_path), exist_ok=True)

# Download the dataset from Google Drive using gdown
gdown.download(cwe2cve_dataset_url_download, cwe2cve_dataset_path, quiet=False)

### Filtering and Embedding Extraction
This section processes the CWE-to-CVE dataset to extract embeddings for both base and fine-tuned models.

#### Steps
1. **Load the dataset**:
   - The CWE-to-CVE mapping dataset is loaded from the previously downloaded CSV file.
2. **Filter the dataset**:
   - Only entries with CWE IDs present in the predefined `cwe_list` are retained.
3. **Extract embeddings**:
   - **Base Model**: Uses the base model to generate embeddings for the CVE descriptions.
   - **Fine-Tuned Model**: Uses the fine-tuned model (with LoRA) to generate embeddings for the same descriptions.

#### Outputs
- `embedding_base_model`: List of embeddings generated by the base model.
- `embedding_fine_tuned_model`: List of embeddings generated by the fine-tuned model.

These embeddings can be used for downstream tasks, such as clustering, classification, or comparison of model performance.

In [None]:
# Load the CWE-to-CVE dataset
# Arguments:
# - `cwe2cve_dataset_path` (str): Path to the locally downloaded CWE-to-CVE mapping dataset.
# Returns:
# - `cwe2cve_dataset` (DataFrame): Loaded dataset for further filtering and processing.
cwe2cve_dataset = pd.read_csv(cwe2cve_dataset_path)

# Filter the dataset to include only rows with CWE IDs from the predefined list
# Arguments:
# - `cwe_list` (list): List of CWE IDs to filter the dataset.
# Returns:
# - Filtered DataFrame containing only relevant rows.
cwe2cve_dataset = cwe2cve_dataset[cwe2cve_dataset['cwe_id'].isin(cwe_list)]

# Extract embeddings using the base model
# Arguments:
# - `inputs` (list[str]): List of CVE descriptions from the filtered dataset.
# - `tokenizer_bm` (AutoTokenizer): Tokenizer for the base model.
# - `base_model` (AutoModelForMaskedLM): Base model instance.
# Returns:
# - `embedding_base_model` (list[np.array]): List of embeddings generated by the base model.
embedding_base_model = encoder_base_model(
    inputs=cwe2cve_dataset['cve_description'].to_list(), 
    tokenizer=tokenizer_bm, 
    model=base_model
)

# Extract embeddings using the fine-tuned model
# Arguments:
# - `inputs` (list[str]): List of CVE descriptions from the filtered dataset.
# - `tokenizer_ftm` (AutoTokenizer): Tokenizer for the fine-tuned model.
# - `fine_tuned_model` (PeftModel): Fine-tuned model instance.
# Returns:
# - `embedding_fine_tuned_model` (list[np.array]): List of embeddings generated by the fine-tuned model.
embedding_fine_tuned_model = encoder_fine_tuned(
    inputs=cwe2cve_dataset['cve_description'].to_list(), 
    tokenizer=tokenizer_ftm, 
    model=fine_tuned_model
)

### Visualizing Embeddings with t-SNE
This section visualizes the embeddings generated by the base and fine-tuned models using t-SNE, a dimensionality reduction technique.

#### Steps
1. **Label Encoding**:
   - CWE IDs are converted to numeric values for visualization purposes using `LabelEncoder`.
2. **t-SNE Dimensionality Reduction**:
   - The embeddings are reduced to two dimensions for both the base and fine-tuned models.
   - Parameters:
     - `perplexity=30`: Controls the balance between local and global aspects of the data.
     - `n_iter=1000`: Number of optimization iterations.
3. **Scatter Plot**:
   - Each point represents an embedding projected onto two dimensions.
   - Colors correspond to different CWE IDs, aiding in cluster identification.

#### Outputs
- **Left Plot**: t-SNE visualization for the base model embeddings.
- **Right Plot**: t-SNE visualization for the fine-tuned model embeddings.

#### Observations
- Clusters indicate similarities in the embeddings for CWE IDs.
- Improved clustering in the fine-tuned model plot may suggest better representation learning.

In [19]:
# Extract CWE labels from the dataset
# Arguments:
# - `cwe2cve_dataset` (DataFrame): The filtered CWE-to-CVE dataset.
# Returns:
# - `labels` (np.array): Array of CWE IDs.
labels = np.array(cwe2cve_dataset["cwe_id"].tolist())

# Convert CWE labels to numeric form using LabelEncoder
# Arguments:
# - `labels` (list[str]): List of CWE IDs.
# Returns:
# - `numeric_labels` (np.array): Numeric representation of CWE IDs.
label_encoder = LabelEncoder()
numeric_labels = label_encoder.fit_transform(labels)

# Apply t-SNE to reduce the dimensionality of the embeddings from the base model
# Arguments:
# - `embedding_base_model` (list[np.array]): List of embeddings from the base model.
# Returns:
# - `embeddings_tsne_base` (np.array): 2D t-SNE projections of base model embeddings.
tsne_base = TSNE(n_components=2, random_state=42, perplexity=30, n_iter=1000)
embeddings_tsne_base = tsne_base.fit_transform(np.array(embedding_base_model))

# Apply t-SNE to reduce the dimensionality of the embeddings from the fine-tuned model
# Arguments:
# - `embedding_fine_tuned_model` (list[np.array]): List of embeddings from the fine-tuned model.
# Returns:
# - `embeddings_tsne_fine_tuned` (np.array): 2D t-SNE projections of fine-tuned model embeddings.
tsne_fine_tuned = TSNE(n_components=2, random_state=42, perplexity=30, n_iter=1000)
embeddings_tsne_fine_tuned = tsne_fine_tuned.fit_transform(np.array(embedding_fine_tuned_model))

# Plotting the t-SNE visualizations
fig, axes = plt.subplots(1, 2, figsize=(20, 7))

# t-SNE visualization for the base model
scatter_base = axes[0].scatter(
    embeddings_tsne_base[:, 0], embeddings_tsne_base[:, 1],
    c=numeric_labels, cmap='viridis', alpha=0.7, s=50
)
axes[0].set_title('t-SNE - Base Model')
axes[0].set_xlabel('1st Dimension')
axes[0].set_ylabel('2nd Dimension')
cbar_base = fig.colorbar(scatter_base, ax=axes[0])
cbar_base.set_label('CWE ID')

# t-SNE visualization for the fine-tuned model
scatter_fine_tuned = axes[1].scatter(
    embeddings_tsne_fine_tuned[:, 0], embeddings_tsne_fine_tuned[:, 1],
    c=numeric_labels, cmap='viridis', alpha=0.7, s=50
)
axes[1].set_title('t-SNE - Fine-Tuned Model')
axes[1].set_xlabel('1st Dimension')
axes[1].set_ylabel('2nd Dimension')
cbar_fine_tuned = fig.colorbar(scatter_fine_tuned, ax=axes[1])
cbar_fine_tuned.set_label('CWE ID')

plt.tight_layout()
plt.show()

NameError: name 'embedding_base_model' is not defined

## 1.2.4. Generate vectorial database using Pinecone

### Pinecone Initialization
This section initializes the Pinecone client for interacting with Pinecone's vector database.

#### Steps
1. **Load Environment Variables**:
   - The `load_dotenv` function reads environment variables from a `.env` file.
   - Ensure the `.env` file contains a valid `PINECONE_API_KEY`.
2. **Retrieve API Key**:
   - Use `os.getenv("PINECONE_API_KEY")` to securely fetch the API key.
3. **Initialize Pinecone**:
   - The `Pinecone` client is created using the retrieved API key for future operations.

#### Requirements
- A `.env` file with the PINECONE_API_KEY value
#### Outputs
- `pc`: A Pinecone client instance, ready for use with operations like creating or querying vector indices.

In [20]:
# Load environment variables from a .env file
# This step reads the environment variables defined in the .env file into the script.
# Arguments:
# - `.env` file: Should contain `PINECONE_API_KEY` with your Pinecone API key.
# Returns:
# - None, but sets the environment variables accessible via `os.getenv`.
load_dotenv()

# Retrieve Pinecone API key from environment variables
# Arguments:
# - `PINECONE_API_KEY` (str): API key for authenticating with Pinecone.
# Returns:
# - `pc_api_key` (str): The retrieved API key.
pc_api_key = os.getenv("PINECONE_API_KEY")

# Initialize Pinecone client
# Arguments:
# - `api_key` (str): Pinecone API key required for authentication.
# Returns:
# - `pc` (Pinecone): Pinecone client instance to interact with the Pinecone service.
pc = Pinecone(api_key=pc_api_key)

### Generate Embedding for CVE Description
This section demonstrates how to generate an embedding for a specific CVE description using the fine-tuned model.

#### Steps
1. **Define the CVE description**:
   - A sample description containing references to vulnerabilities and CWE IDs is provided.
2. **Generate the embedding**:
   - The `encoder_fine_tuned` function processes the description and returns the corresponding embedding vector.
3. **Print the embedding dimensionality**:
   - The length of the embedding vector indicates the feature space size of the model.

#### Outputs
- **Embedding Dimensionality**: Represents the number of features in the embedding vector.

In [21]:
# Generate embedding for a CVE description using the fine-tuned model

# Define a sample CVE description
# This description contains details about vulnerabilities and CWE references.
cve_description = (
    "Large language model (LLM) management tool does not validate the format of a digest value (CWE-1287) "
    "from a private, untrusted model registry, enabling relative path traversal (CWE-23), a.k.a. Probllama"
)

# Generate embedding from the CVE description
# Arguments:
# - `cve_description` (list[str]): A single-item list containing the CVE description.
# - `tokenizer_ftm` (AutoTokenizer): Tokenizer for the fine-tuned model.
# - `fine_tuned_model` (PeftModel): Fine-tuned model to generate the embedding.
# Returns:
# - `embedding` (list[np.array]): List containing the embedding vector for the CVE description.
embedding = encoder_fine_tuned([cve_description], tokenizer_ftm, fine_tuned_model)

# Print the length of the generated embedding
# This indicates the dimensionality of the embedding vector.
print(f"Embedding dimensionality: {len(embedding[0])}")


Embedding dimensionality: 768


### Pinecone Index Initialization
This section demonstrates how to initialize a vector database index in Pinecone for storing and querying embeddings.

#### Steps
1. **Check Existing Indexes**:
   - Lists all existing indexes to ensure the desired index does not already exist.
2. **Define Index Specification**:
   - Specifies the cloud provider and region for hosting the index.
3. **Determine Embedding Dimensionality**:
   - Uses the length of the embedding vector to set the index dimension.
4. **Create Index**:
   - Creates a new index with the specified name, dimension, and similarity metric if it does not already exist.
5. **Wait for Index Readiness**:
   - Polls the index status until it is ready for use.
6. **Describe Index Statistics**:
   - Provides details about the index, such as its size and configuration.

#### Outputs
- **Index Name**: `fine-tuned-vectorial-database`
- **Similarity Metric**: `cosine`
- **Cloud**: AWS
- **Region**: us-east-1

The index is now ready for vector insertion and search operations.

In [22]:
# Define the name of the Pinecone index
# Arguments:
# - `index_name` (str): Name of the index to be created or accessed.
index_name = "fine-tuned-vectorial-database"

# List existing indexes on the Pinecone service
# Arguments:
# - None
# Returns:
# - `existing_indexes` (list[str]): List of names of existing indexes.
existing_indexes = [
    index_info["name"] for index_info in pc.list_indexes()
]

# Define the Pinecone serverless specification
# Arguments:
# - `cloud` (str): Cloud provider to host the index (e.g., AWS).
# - `region` (str): Region where the index will be hosted.
# Returns:
# - `spec` (ServerlessSpec): Pinecone serverless specification object.
spec = ServerlessSpec(
    cloud="aws", 
    region="us-east-1"
)

# Determine the dimensionality of the embeddings
# Arguments:
# - `embedding` (list[np.array]): List containing the embedding vector.
# Returns:
# - `dimension` (int): The length of the embedding vector.
dimension = len(embedding[0])

# Create the index if it does not already exist
if index_name not in existing_indexes:
    # Create a new index
    # Arguments:
    # - `index_name` (str): Name of the new index.
    # - `dimension` (int): Dimensionality of the vectors.
    # - `metric` (str): Similarity metric for vector search (e.g., cosine).
    # - `spec` (ServerlessSpec): Serverless configuration for the index.
    pc.create_index(
        index_name,
        dimension=dimension,
        metric="cosine",
        spec=spec
    )

    # Wait for the index to become ready
    while not pc.describe_index(index_name).status["ready"]:
        time.sleep(1)

# Access the index
# Arguments:
# - `index_name` (str): Name of the index to access.
# Returns:
# - `index` (Index): Pinecone index instance.
index = pc.Index(index_name)

# Allow some time for initialization
time.sleep(1)

# Describe the index statistics
# Arguments:
# - None
# Returns:
# - Index statistics, including size and configuration.
stats = index.describe_index_stats()
print(stats)

{'dimension': 768,
 'index_fullness': 0.0,
 'namespaces': {'': {'vector_count': 25}},
 'total_vector_count': 25}


### CWE Data Preparation
This section processes a dataset to prepare unique CWE details with metadata for further analysis.

#### Steps
1. **Load Dataset**:
   - Load the CWE-to-CVE dataset into a DataFrame.
2. **Drop Duplicates**:
   - Retain only unique rows based on `cwe_id` and `cwe_description`.
3. **Filter by CWE List**:
   - Filter the dataset to include only CWEs in the predefined list (`cwe_list`).
4. **Add Unique IDs**:
   - Assign a unique ID to each row starting from 1.
5. **Normalize Descriptions**:
   - Remove accents and special characters from `cwe_description` using the `unidecode` library.
6. **Generate Metadata**:
   - Create a `metadata` column containing dictionaries with CWE ID and description.

#### Outputs
- **Filtered and Processed DataFrame**:
  - Columns:
    - `cwe_id`: CWE ID (e.g., CWE-79).
    - `cwe_description`: Description of the CWE without accents or special characters.
    - `id`: Unique string ID for each row.
    - `metadata`: Dictionary containing `id` and `description`.


In [23]:
# Load the CWE-to-CVE dataset
# Arguments:
# - `cwe2cve_dataset_path` (str): Path to the dataset file.
# Returns:
# - `cwe2cve_dataset` (DataFrame): Loaded dataset.
cwe2cve_dataset = pd.read_csv(cwe2cve_dataset_path)

# Select unique CWE IDs and descriptions
# Arguments:
# - `cwe2cve_dataset` (DataFrame): Original dataset.
# Returns:
# - `data` (DataFrame): Dataset with unique CWE IDs and descriptions.
data = cwe2cve_dataset[['cwe_id', 'cwe_description']].drop_duplicates()

# Filter the dataset to include only CWEs from the predefined list
# Arguments:
# - `cwe_list` (list): List of CWE IDs to retain.
# Returns:
# - Filtered `data` DataFrame.
cwe_list = ["CWE-79", "CWE-787", "CWE-89", "CWE-352", "CWE-22", "CWE-125", "CWE-78", "CWE-416", "CWE-862", "CWE-434",
            "CWE-94", "CWE-20", "CWE-77", "CWE-287", "CWE-269", "CWE-502", "CWE-200", "CWE-863", "CWE-918", "CWE-119",
            "CWE-476", "CWE-798", "CWE-190", "CWE-400", "CWE-306"]
data = data[data['cwe_id'].isin(cwe_list)].reset_index(drop=True)

# Assign unique IDs to each row
# Arguments:
# - `data` (DataFrame): Filtered dataset.
# Returns:
# - Updated `data` DataFrame with an 'id' column.
data['id'] = range(1, len(data) + 1)
data['id'] = data['id'].astype(str)

# Remove accents and special characters from descriptions
# Arguments:
# - `data['cwe_description']` (Series): CWE descriptions.
# Returns:
# - Updated descriptions without accents or special characters.
data['cwe_description'] = data['cwe_description'].apply(unidecode.unidecode)

# Create metadata column
# Arguments:
# - `data` (DataFrame): Dataset with CWE details.
# Returns:
# - Updated `data` with a 'metadata' column containing dictionaries.
data['metadata'] = data.apply(lambda x: {
    "id": x["cwe_id"],
    "description": x["cwe_description"]
}, axis=1)

# Print the final dataset
print(data)

     cwe_id                                    cwe_description  id  \
0    CWE-20                          Improper Input Validation   1   
1    CWE-22  Improper Limitation of a Pathname to a Restric...   2   
2    CWE-77  Improper Neutralization of Special Elements us...   3   
3    CWE-78  Improper Neutralization of Special Elements us...   4   
4    CWE-79  Improper Neutralization of Input During Web Pa...   5   
5    CWE-89  Improper Neutralization of Special Elements us...   6   
6    CWE-94  Improper Control of Generation of Code ('Code ...   7   
7   CWE-119  Improper Restriction of Operations within the ...   8   
8   CWE-125                                 Out-of-bounds Read   9   
9   CWE-190                     Integer Overflow or Wraparound  10   
10  CWE-200  Exposure of Sensitive Information to an Unauth...  11   
11  CWE-269                      Improper Privilege Management  12   
12  CWE-287                            Improper Authentication  13   
13  CWE-306       Mi

### Batch Processing and Pinecone Index Upsertion
This section processes data in batches and upserts embeddings into the Pinecone index.

#### Steps
1. **Preprocess Queries**:
   - Cleans the description strings by converting to lowercase and removing non-alphanumeric characters.
2. **Batch Embedding Generation**:
   - Processes descriptions in batches using the fine-tuned model to generate embeddings.
3. **Upsert Vectors**:
   - Prepares the batch data, including IDs, embeddings, and metadata, for insertion into the Pinecone index.

#### Parameters
- **Batch Size**: `1` (number of items processed per batch).
- **Index**: Pinecone index initialized earlier.

#### Outputs
- **Index Updates**:
  - Vectors (ID, embedding, metadata) are upserted into the Pinecone index.

#### Notes
- **Assertions**:
  - Ensures the number of embeddings matches the batch size to avoid inconsistencies.
- **Scalability**:
  - Batch processing ensures efficient handling of large datasets.

In [24]:
# Define batch size and data size for processing
batch_size = 1
data_size = len(data)

print(data_size)

# Preprocess a query string for vectorization
# Arguments:
# - `query` (str): The input query string.
# Returns:
# - `query` (str): Cleaned and preprocessed query string.
def preprocess_query(query: str) -> str:
    query = query.lower().strip()  # Convert to lowercase and remove leading/trailing spaces
    query = ''.join(char for char in query if char.isalnum() or char.isspace())  # Keep alphanumeric and spaces
    return query

# Process data in batches and upsert into Pinecone index
# Arguments:
# - `data` (list[dict]): List of data entries, where each entry contains an ID and metadata.
# - `batch_size` (int): Number of items to process in each batch.
# - `data_size` (int): Total size of the data to process.
# - `index` (Index): Pinecone index instance for upserting vectors.
for i in tqdm(range(0, len(data[:data_size + 1]), batch_size)):
    # Define batch range
    i_end = min(len(data), i + batch_size)
    batch = data[i:i_end]
    
    # Preprocess descriptions in the batch
    # Arguments:
    # - `x["description"]`: Text description from the metadata.
    # Returns:
    # - `chunks` (list[str]): Preprocessed descriptions.
    chunks = [preprocess_query(f'{x["description"]}') for x in batch["metadata"]]
    print(chunks)  # Optional: Print preprocessed chunks for debugging
    
    # Generate embeddings for the batch using the fine-tuned model
    # Arguments:
    # - `chunks` (list[str]): Preprocessed descriptions.
    # - `tokenizer_ftm` (AutoTokenizer): Tokenizer for the fine-tuned model.
    # - `fine_tuned_model` (PeftModel): Fine-tuned model instance.
    # Returns:
    # - `embeds` (list[np.array]): List of embeddings for the batch.
    embeds = encoder_base_model(chunks, tokenizer_ftm, fine_tuned_model)
    
    # Ensure the number of embeddings matches the batch size
    assert len(embeds) == (i_end - i), "Mismatch between embeddings and batch size"
    
    # Prepare data for upsertion into Pinecone index
    # Arguments:
    # - `batch["id"]`: List of IDs for the batch.
    # - `embeds` (list[np.array]): List of embeddings for the batch.
    # - `batch["metadata"]`: List of metadata corresponding to the batch.
    # Returns:
    # - `to_upsert` (list[tuple]): List of tuples containing ID, embedding, and metadata.
    to_upsert = list(zip(batch["id"], embeds, batch["metadata"]))
    
    # Upsert the batch into the Pinecone index
    # Arguments:
    # - `vectors` (list[tuple]): List of vectors to upsert into the index.
    index.upsert(vectors=to_upsert)

25


  0%|          | 0/25 [00:00<?, ?it/s]

['improper input validation']


  4%|▍         | 1/25 [00:00<00:11,  2.08it/s]

['improper limitation of a pathname to a restricted directory path traversal']


  8%|▊         | 2/25 [00:00<00:08,  2.66it/s]

['improper neutralization of special elements used in a command command injection']


 12%|█▏        | 3/25 [00:01<00:07,  2.84it/s]

['improper neutralization of special elements used in an os command os command injection']


 16%|█▌        | 4/25 [00:01<00:06,  3.08it/s]

['improper neutralization of input during web page generation crosssite scripting']


 20%|██        | 5/25 [00:01<00:06,  3.13it/s]

['improper neutralization of special elements used in an sql command sql injection']


 24%|██▍       | 6/25 [00:01<00:05,  3.28it/s]

['improper control of generation of code code injection']


 28%|██▊       | 7/25 [00:02<00:05,  3.37it/s]

['improper restriction of operations within the bounds of a memory buffer']


 32%|███▏      | 8/25 [00:02<00:04,  3.43it/s]

['outofbounds read']


 36%|███▌      | 9/25 [00:02<00:04,  3.44it/s]

['integer overflow or wraparound']


 40%|████      | 10/25 [00:03<00:04,  3.36it/s]

['exposure of sensitive information to an unauthorized actor']


 44%|████▍     | 11/25 [00:03<00:04,  3.46it/s]

['improper privilege management']


 48%|████▊     | 12/25 [00:03<00:03,  3.42it/s]

['improper authentication']


 52%|█████▏    | 13/25 [00:04<00:05,  2.30it/s]

['missing authentication for critical function']


 56%|█████▌    | 14/25 [00:04<00:04,  2.61it/s]

['crosssite request forgery csrf']


 60%|██████    | 15/25 [00:05<00:03,  2.73it/s]

['uncontrolled resource consumption']


 64%|██████▍   | 16/25 [00:05<00:03,  2.95it/s]

['use after free']


 68%|██████▊   | 17/25 [00:05<00:02,  3.06it/s]

['unrestricted upload of file with dangerous type']


 72%|███████▏  | 18/25 [00:05<00:02,  3.17it/s]

['null pointer dereference']


 76%|███████▌  | 19/25 [00:06<00:01,  3.35it/s]

['deserialization of untrusted data']


 80%|████████  | 20/25 [00:06<00:01,  3.42it/s]

['outofbounds write']


 84%|████████▍ | 21/25 [00:06<00:01,  3.50it/s]

['use of hardcoded credentials']


 88%|████████▊ | 22/25 [00:07<00:00,  3.41it/s]

['missing authorization']


 92%|█████████▏| 23/25 [00:07<00:00,  3.51it/s]

['incorrect authorization']


 96%|█████████▌| 24/25 [00:07<00:00,  3.60it/s]

['serverside request forgery ssrf']


100%|██████████| 25/25 [00:07<00:00,  3.19it/s]


## 1.2.5. Example of a retrival from the database

### Retrieve Documents from Pinecone Index
This function retrieves documents from the Pinecone index that are most similar to a given query.

#### Steps
1. **Preprocess Query**:
   - The input query string is preprocessed (lowercased and cleaned) using the `preprocess_query` function.
2. **Generate Query Embedding**:
   - The preprocessed query is encoded into a vector using the base model and tokenizer.
3. **Query Pinecone Index**:
   - The query vector is used to search the Pinecone index for the top-k most similar vectors.
4. **Extract Metadata**:
   - Metadata from the matching documents is extracted and returned.

#### Parameters
- **`query`**: The text query for which similar documents are to be retrieved.
- **`top_k`**: Number of top results to return.

#### Outputs
- **`docs`**: A list of metadata corresponding to the top-k matching documents.

In [25]:
# Retrieve documents from Pinecone index based on a query

def get_docs(query: str, top_k: int) -> list[str]:
    """
    Retrieve the top-k documents from the Pinecone index that are most similar to the query.

    Arguments:
    - `query` (str): The input query string to search for.
    - `top_k` (int): The number of top documents to retrieve.

    Returns:
    - `docs` (list[str]): A list of metadata from the top-k matching documents.
    """
    # Preprocess and encode the query into an embedding
    xq = encoder_base_model([preprocess_query(query)], tokenizer_bm, fine_tuned_model)
    xq = xq[0].tolist()  # Convert the embedding to a list for querying Pinecone
    
    # Query the Pinecone index using the embedding
    # Arguments:
    # - `vector` (list[float]): Query vector generated from the input query.
    # - `top_k` (int): Number of top matches to retrieve.
    # - `include_metadata` (bool): Include metadata in the query results.
    # Returns:
    # - `res` (dict): Query results containing matches and their metadata.
    res = index.query(vector=xq, top_k=top_k, include_metadata=True)
    
    # Extract metadata from the matching results
    docs = [x["metadata"] for x in res["matches"]]
    return docs

### Querying Pinecone Index and Displaying Results
This section demonstrates how to query the Pinecone index for the most similar documents and format the results for display.

#### Steps
1. **Define the Query**:
   - A CVE description is used as the input query.
2. **Retrieve Documents**:
   - The `get_docs` function retrieves the top-k matching documents based on semantic similarity to the query.
3. **Format Results**:
   - Each document's ID and description are extracted and formatted for readability.
4. **Display Results**:
   - The results are joined with a "---" separator for clear distinction between documents and printed to the console.

#### Outputs
- **Formatted Results**:
   - Example output format:
     ```
     Id: CWE-526
     Description: A vulnerability in the product allows...
     ---
     Id: CWE-624
     Description: An issue in the authentication module leads...
     ```

#### Notes
- Ensure that the `docs` list contains dictionaries with `id` and `description` keys. 
- If the metadata structure differs, adjust the key names accordingly in the code.


In [27]:
# Example CVE description to query the Pinecone index
cve = "Admin password in cleartext in a cookie"
query = cve

# Retrieve the top-k matching documents from the index
docs = get_docs(query, top_k=5)

# Extract titles and descriptions from each document in the retrieved results
# Arguments:
# - `docs` (list[dict]): List of metadata dictionaries from the matching documents.
# Returns:
# - `docs_text` (list[str]): List of formatted strings containing ID and description.
docs_text = [
    f"Id: {doc['id']}\nDescription: {doc['description']}" for doc in docs
]

# Join the results with "---" separator for better readability
# Arguments:
# - `docs_text` (list[str]): List of formatted document strings.
# Returns:
# - `top_k_responses` (str): Formatted string with "---" separating each document.
top_k_responses = "\n---\n".join(docs_text)

# Print the results
print(top_k_responses)

Id: CWE-918
Description: Server-Side Request Forgery (SSRF)
---
Id: CWE-306
Description: Missing Authentication for Critical Function
---
Id: CWE-798
Description: Use of Hard-coded Credentials
---
Id: CWE-352
Description: Cross-Site Request Forgery (CSRF)
---
Id: CWE-434
Description: Unrestricted Upload of File with Dangerous Type


# 1.3. Downstream task

## 1.3.1. Generate a pair question/response for incontext learning

### Supervised Dataset Preparation and In-Context Examples
This section prepares a supervised dataset for training and generates in-context examples for supervised learning tasks.

#### Steps
1. **Directory Creation**:
   - Creates the directory `./datasets/supervised` to store the supervised dataset.
2. **Data Splitting**:
   - Splits the dataset into training and testing sets with an 80-20 split if the files do not already exist.
   - Saves the splits as `train.csv` and `test.csv`.
3. **In-Context Example Creation**:
   - Formats each row of the training dataset as a question-answer pair.
   - Example:
     ```
     Question: CVE:Description of a vulnerability?
     Answer: CWE-ID:Description of the weakness.
     ```
4. **Joining Examples**:
   - Joins the examples with `---` as a separator for better readability and context during training.

#### Outputs
- **Training and Testing Splits**:
  - `train.csv` and `test.csv` saved in `./datasets/supervised`.
- **In-Context Examples**:
  - Printed question-answer pairs formatted for supervised learning.

In [28]:
# Create a directory for supervised datasets if it does not exist
os.makedirs("./datasets/supervised", exist_ok=True)

# Check if the supervised dataset files already exist
if not any(os.scandir('./datasets/supervised/')):

    # Split the data into training and testing sets
    # Arguments:
    # - `cwe2cve_dataset` (DataFrame): Dataset containing CVE and CWE mappings.
    # - `test_size` (float): Proportion of data for the test set.
    # - `random_state` (int): Seed for reproducibility.
    # Returns:
    # - `train_data` (DataFrame): Training portion of the dataset.
    # - `test_data` (DataFrame): Testing portion of the dataset.
    train_data, test_data = train_test_split(
        cwe2cve_dataset, 
        test_size=0.2, 
        random_state=42
    )

    # Reset the index of the dataframes
    train_data = train_data.reset_index(drop=True)
    test_data = test_data.reset_index(drop=True)

    # Print the number of samples in each split
    print(f"Training samples: {len(train_data)}, Testing samples: {len(test_data)}")

    # Save the splits to CSV files
    train_data.to_csv('./datasets/supervised/train.csv', index=False)
    test_data.to_csv('./datasets/supervised/test.csv', index=False)

else:
    # Load the existing training and testing data from CSV files
    train_data = pd.read_csv('./datasets/supervised/train.csv')
    test_data = pd.read_csv('./datasets/supervised/test.csv')

# Create examples for in-context supervised learning
# Arguments:
# - `train_data` (DataFrame): Training dataset.
# Returns:
# - `train_examples` (list[str]): List of formatted question-answer examples.
train_examples = []
for _, row in train_data.iterrows():
    question = f"CVE:{row['cve_description']}?"
    answer = f"{row['cwe_id']}:{row['cwe_description']}"
    train_examples.append(f"{question}\n{answer}")

# Join the examples with "---" as a separator for in-context use
# Arguments:
# - `train_examples` (list[str]): List of question-answer examples.
# Returns:
# - `in_context_examples` (str): In-context formatted string of examples.
in_context_examples = "\n---\n".join(train_examples)

# Print the in-context examples
print(in_context_examples)

CVE:database file under web root.?
CWE-219:Storage of File with Sensitive Data Under Web Root
---
CVE:Spoofed entries in web server log file via carriage returns?
CWE-93:Improper Neutralization of CRLF Sequences ('CRLF Injection')
---
CWE-356:Product UI does not Warn User of Unsafe Actions
---
CVE:MIE. MFV too? bypass AV/security with fields that should not be quoted, duplicate quotes, missing leading/trailing quotes.?
CWE-149:Improper Neutralization of Quoting Syntax
---
CVE:Bypass GUI and access restricted dialog box.?
CWE-422:Unprotected Windows Messaging Channel ('Shatter')
---
CVE:malformed inputs cause accesses of uninitialized or previously-deleted objects, leading to memory corruption?
CWE-787:Out-of-bounds Write
---
CVE:Product does not warn user about a certificate if it has already been accepted for a different site. Possibly resultant.?
CWE-356:Product UI does not Warn User of Unsafe Actions
---
CVE:Port scan triggers CPU consumption with processes that attempt to read data

## 1.3.2. Instantiates a Llama agent to classify CVEs into CWE classes

### CWE Classification using Chatbot and Context-Driven Approach
This function leverages a chatbot model (`llama3`) to classify CVEs into CWE categories using a provided context.

#### Steps
1. **Define CWE List**:
   - Specifies a fixed list of CWE categories for classification.
2. **Prepare Context (RAG)**:
   - Formats relevant documents into a readable format using "---" separators.
3. **System Message**:
   - Guides the chatbot to follow strict classification rules, including:
     - Context-driven analysis.
     - Strict output format: `CWE-<ID>: <Description>`.
     - Using only CWEs from the predefined list.
4. **Chat Interaction**:
   - Sends the query and context to the chatbot with a low temperature for deterministic responses.
5. **Return Classification**:
   - Outputs the most relevant CWE classification for the given query.

#### Notes
- The system ensures consistency and avoids extraneous explanations or commentary in the output.
- Only the specified CWEs are used for classification to maintain focus.

In [29]:
# Function to generate a CWE classification based on a CVE query and RAG context
def generate(query: str, docs: list[dict]):
    """
    Classifies a CVE query to the most relevant CWE category using a context-driven approach.

    Arguments:
    - `query` (str): CVE description to classify.
    - `docs` (list[dict]): List of relevant documents retrieved from the Pinecone index.

    Returns:
    - `chat_response["message"]["content"]` (str): The classified CWE in the strict output format.
    """

    # List of predefined CWE categories for classification
    cwe_list = [
        "CWE-79: Improper Neutralization of Input During Web Page Generation ('Cross-site Scripting')",
        "CWE-787: Out-of-bounds Write",
        "CWE-89: Improper Neutralization of Special Elements used in an SQL Command ('SQL Injection')",
        "CWE-352: Cross-Site Request Forgery (CSRF)",
        "CWE-22: Improper Limitation of a Pathname to a Restricted Directory ('Path Traversal')",
        "CWE-125: Out-of-bounds Read",
        "CWE-78: Improper Neutralization of Special Elements used in an OS Command ('OS Command Injection')",
        "CWE-416: Use After Free",
        "CWE-862: Missing Authorization",
        "CWE-434: Unrestricted Upload of File with Dangerous Type",
        "CWE-94: Improper Control of Generation of Code ('Code Injection')",
        "CWE-20: Improper Input Validation",
        "CWE-77: Improper Neutralization of Special Elements used in a Command ('Command Injection')",
        "CWE-287: Improper Authentication",
        "CWE-269: Improper Privilege Management",
        "CWE-502: Deserialization of Untrusted Data",
        "CWE-200: Exposure of Sensitive Information to an Unauthorized Actor",
        "CWE-863: Incorrect Authorization",
        "CWE-918: Server-Side Request Forgery (SSRF)",
        "CWE-119: Improper Restriction of Operations within the Bounds of a Memory Buffer",
        "CWE-476: NULL Pointer Dereference",
        "CWE-798: Use of Hard-coded Credentials",
        "CWE-190: Integer Overflow or Wraparound",
        "CWE-400: Uncontrolled Resource Consumption",
        "CWE-306: Missing Authentication for Critical Function"
    ]

    # Format relevant documents (RAG context)
    docs_text = [f"Id: {doc['id']}\nDescription: {doc['description']}" for doc in docs]
    rag = "\n---\n".join(docs_text)

    # System message for the chatbot
    system_message = (
        "You are a highly skilled cybersecurity expert specializing in the classification of CVE vulnerabilities. "
        "Your primary objective is to map each CVE to its most relevant CWE category by analyzing the provided context and examples. "
        "Follow these guidelines to ensure accurate and consistent classifications:\n\n"
        "1. Context-Driven Analysis: Use the provided RAG context as the primary source of information for understanding and categorizing the vulnerabilities.\n"
        "2. Example-Based Learning: Refer to the examples provided to identify patterns and logical reasoning used in prior classifications.\n"
        "3. Strict Output Format: Always format your response exactly as follows:\n"
        "   CWE-<ID>: <Description>\n\n"
        "4. Focus and Precision: Avoid explanations, justifications, or any additional commentary outside the required format.\n\n"
        "5. You should only respond using CWEs from the following list: "
        + ", ".join(f'"{cwe}"' for cwe in cwe_list)
        + "\n"
        "### Resources Provided:\n"
        "CONTEXT:\n"
        + rag
        + "\nEXAMPLES:\n"
        + "\n".join(in_context_examples[0:100])  # Only a portion of examples for brevity
    )

    # Construct the message for the chatbot
    messages = [
        {"role": "system", "content": system_message},
        {"role": "user", "content": query}
    ]

    # Call the chatbot with the provided messages and options
    chat_response = ollama.chat(
        model="llama3",
        messages=messages,
        options={
            "temperature": 0.1
        }
    )

    # Return the chatbot's classification response
    return chat_response["message"]["content"]


## 1.3.3. Evalueta model in test database

### CWE Classification Evaluation
This section evaluates the performance of the CWE classification system on a test dataset.

#### Steps
1. **Dataset Preparation**:
   - A copy of the test dataset (`results`) is created to store predictions.
2. **Define the Evaluation Function**:
   - The `evaluate` function:
     1. Constructs a query and question based on the CVE details (ID and description).
     2. Retrieves relevant documents (`docs`) using the `get_docs` function.
     3. Generates a CWE classification (`out`) using the `generate` function.
     4. Parses and returns the predicted CWE ID and description.
3. **Apply the Function**:
   - The `evaluate` function is applied to each row in the test dataset using `apply`, with results stored in new columns (`cwe_id_pred` and `cwe_description_pred`).

#### Outputs
- **Updated Dataset**:
  - The `results` DataFrame includes the following new columns:
    - `cwe_id_pred`: Predicted CWE ID.
    - `cwe_description_pred`: Predicted CWE description.


In [30]:
# Evaluate the performance of CWE classification for each row in the test dataset

results = test_data[0:1].copy()  # Create a copy of the test data for evaluation

def evaluate(row):
    """
    Evaluate the CWE classification for a given row in the dataset.

    Arguments:
    - `row` (pd.Series): A row from the test dataset containing CVE details.

    Returns:
    - pd.Series: Predicted CWE ID and CWE description.
    """
    # Construct the query and question for classification
    query = f"{row['cve_id']}: {row['cve_description']}"
    question = f"Which CWE category best classifies CVE '{row['cve_id']}': {row['cve_description']}?"
    
    # Retrieve relevant documents from the Pinecone index
    docs = get_docs(query, top_k=10)
    
    # Generate CWE classification using the chatbot
    out = generate(query=question, docs=docs)
    print(out)  # Print the chatbot's output for debugging
    
    # Parse the predicted CWE ID and description
    cwe_id_pred, cwe_description_pred = out.split(':', 1)
    return pd.Series({
        'cwe_id_pred': cwe_id_pred.strip(),
        'cwe_description_pred': cwe_description_pred.strip()
    })

# Apply the evaluation function to each row of the test dataset
# Arguments:
# - `results` (DataFrame): Copy of the test dataset.
# - `evaluate` (function): Function to perform CWE classification.
# Returns:
# - Updated DataFrame with predicted CWE ID and description.
results[['cwe_id_pred', 'cwe_description_pred']] = results.apply(lambda row: evaluate(row), axis=1)

CWE-798: Use of Hard-coded Credentials


# 1.4. Plot results

### Evaluate CWE Classification Performance
This section evaluates the performance of CWE classification at the class level and overall.

#### Steps
1. **Filter Valid Predictions**:
   - Rows with valid predicted CWE IDs (`CWE-<ID>`) are retained.
2. **Extract Labels**:
   - True (`y_true`) and predicted (`y_pred`) CWE IDs are extracted as lists.
3. **Calculate Class-Level Metrics**:
   - For each unique CWE ID (`Class`):
     - **Occurrences**: Number of true labels for the class.
     - **Correct Predictions**: Number of correctly predicted labels for the class.
     - **Accuracy**: Proportion of correct predictions for the class.
4. **Summarize Results**:
   - Create a DataFrame summarizing accuracy, occurrences, and correct predictions for each class.
5. **Overall Statistics**:
   - Compute mean accuracy across all classes, total occurrences, and total correct predictions.

#### Outputs
- **Class-Level Results (`df_results`)**:
  - Columns:
    - `Class`: CWE ID.
    - `Accuracy`: Accuracy for the class.
    - `Occurrences`: Total occurrences in true labels.
    - `Correct`: Number of correct predictions.
- **Overall Statistics**:
  - Mean Accuracy.
  - Total Occurrences.
  - Total Correct Predictions.


In [None]:
# Filter results to include only valid CWE predictions
# Arguments:
# - `results` (DataFrame): DataFrame containing original and predicted CWE classifications.
# Returns:
# - `df_cleaned` (DataFrame): DataFrame with valid CWE predictions (e.g., matching "CWE-<ID>" format).
df_cleaned = results[results['cwe_id_pred'].str.match(r"CWE-\d+$", na=False)]

# Extract true and predicted CWE IDs as lists
# Arguments:
# - `df_cleaned` (DataFrame): DataFrame with valid predictions.
# Returns:
# - `y_true` (list): List of true CWE IDs.
# - `y_pred` (list): List of predicted CWE IDs.
y_true = df_cleaned['cwe_id'].to_list()
y_pred = df_cleaned['cwe_id_pred'].to_list()

# Get the sorted list of all unique classes
# Arguments:
# - `y_true` (list): List of true CWE IDs.
# - `y_pred` (list): List of predicted CWE IDs.
# Returns:
# - `classes` (list): Sorted list of unique CWE IDs from true and predicted values.
classes = sorted(set(y_true + y_pred))

# Calculate accuracy for each class
valores = []
for cls in classes:
    # Find indices of true labels matching the current class
    true_indices = [i for i, label in enumerate(y_true) if label == cls]
    
    # Count correct predictions for the current class
    correct_predictions = sum(y_pred[i] == cls for i in true_indices)
    
    # Count total occurrences of the current class in true labels
    total_true = len(true_indices)
    
    # Calculate accuracy for the current class
    accuracy = correct_predictions / total_true if total_true > 0 else 0
    
    # Append results to the list
    valores.append((cls, accuracy, total_true, correct_predictions))

# Create a DataFrame to summarize results
# Arguments:
# - `valores` (list[tuple]): List containing class, accuracy, occurrences, and correct predictions.
# Returns:
# - `df_results` (DataFrame): DataFrame summarizing evaluation metrics per class.
df_results = pd.DataFrame(valores, columns=["Class", "Accuracy", "Occurrences", "Correct"])
df_results = df_results.sort_values(by="Class")
df_results = df_results[df_results['Occurrences'] != 0].reset_index(drop=True)

# Calculate overall statistics
# Mean accuracy across all classes
mean_accuracy = df_results['Accuracy'].mean()

# Total occurrences of all classes in true labels
total_occurences = df_results['Occurrences'].sum()

# Total correct predictions across all classes
total_correct_predictions = df_results['Correct'].sum()

# Print evaluation results
print(df_results)

print("Occurrences:", total_occurences)
print("Correct Predictions:", total_correct_predictions)

### Confusion Matrix for CWE Classification
This section visualizes the confusion matrix to evaluate the performance of CWE classification.

#### Steps
1. **Mapping Classes to Indices**:
   - Creates a mapping (`class_to_index`) to convert class labels (CWE IDs) into numeric indices.
   - Generates the reverse mapping (`index_to_class`) to convert indices back into labels.
2. **Convert Labels to Numeric**:
   - Transforms true (`y_true`) and predicted (`y_pred`) labels into numeric format using `class_to_index`.
3. **Compute Confusion Matrix**:
   - Uses `confusion_matrix` from `sklearn.metrics` to compute the confusion matrix from numeric labels.
4. **Plot the Confusion Matrix**:
   - Uses `ConfusionMatrixDisplay` to visualize the confusion matrix.
   - The matrix displays the number of true and predicted instances for each class.

#### Outputs
- **Confusion Matrix**:
  - Rows: Actual CWE classes.
  - Columns: Predicted CWE classes.
  - Diagonal values represent correct predictions.
  - Off-diagonal values represent misclassifications.


In [None]:
# Generate mappings for unique classes to indices and vice-versa
# Arguments:
# - `unique_classes` (list): Sorted list of unique CWE IDs.
# Returns:
# - `class_to_index` (dict): Mapping from class label to numeric index.
# - `index_to_class` (dict): Mapping from numeric index to class label.
unique_classes = sorted(set(y_true + y_pred))
class_to_index = {cls: idx for idx, cls in enumerate(unique_classes)}
index_to_class = {idx: cls for cls, idx in class_to_index.items()}

# Convert true and predicted labels to numeric form
# Arguments:
# - `y_true` (list): List of true CWE labels.
# - `y_pred` (list): List of predicted CWE labels.
# Returns:
# - `y_true_numeric` (list[int]): Numeric representation of true labels.
# - `y_pred_numeric` (list[int]): Numeric representation of predicted labels.
y_true_numeric = [class_to_index[label] for label in y_true]
y_pred_numeric = [class_to_index[label] for label in y_pred]

# Compute the confusion matrix
# Arguments:
# - `y_true_numeric` (list[int]): Numeric true labels.
# - `y_pred_numeric` (list[int]): Numeric predicted labels.
# Returns:
# - `conf_matrix` (ndarray): Confusion matrix.
conf_matrix = confusion_matrix(y_true_numeric, y_pred_numeric)

# Plot the confusion matrix
fig, ax = plt.subplots(figsize=(12, 8))
disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=unique_classes)
disp.plot(ax=ax, cmap='Blues', xticks_rotation=90)
plt.title("Confusion Matrix")
plt.show()

### Evaluation Metrics for CWE Classification
This section computes standard evaluation metrics for CWE classification.

#### Metrics
1. **Accuracy**:
   - Proportion of correctly classified instances among all predictions.
   - Formula: `(True Positives + True Negatives) / Total Samples`
2. **Precision (Weighted)**:
   - Proportion of true positive predictions among all positive predictions, weighted by class size.
   - Formula (for each class): `TP / (TP + FP)`
3. **Recall (Weighted)**:
   - Proportion of true positive predictions among all actual positives, weighted by class size.
   - Formula (for each class): `TP / (TP + FN)`
4. **F1-Score (Weighted)**:
   - Harmonic mean of precision and recall, weighted by class size.
   - Formula: `2 * (Precision * Recall) / (Precision + Recall)`

#### Steps
1. **Compute Metrics**:
   - `accuracy_score`: Computes overall accuracy.
   - `precision_score`: Computes weighted precision.
   - `recall_score`: Computes weighted recall.
   - `f1_score`: Computes weighted F1-score.
2. **Print Results**:
   - Metrics are formatted to four decimal places for clarity.

In [None]:
# Calculate evaluation metrics
# Arguments:
# - `y_true_numeric` (list[int]): Numeric representation of true labels.
# - `y_pred_numeric` (list[int]): Numeric representation of predicted labels.
# Returns:
# - `accuracy` (float): Overall accuracy of the classification.
# - `precision_weighted` (float): Weighted precision score.
# - `recall_weighted` (float): Weighted recall score.
# - `f1_weighted` (float): Weighted F1 score.
accuracy = accuracy_score(y_true_numeric, y_pred_numeric)
precision_weighted = precision_score(y_true_numeric, y_pred_numeric, average='weighted', zero_division=0)
recall_weighted = recall_score(y_true_numeric, y_pred_numeric, average='weighted', zero_division=0)
f1_weighted = f1_score(y_true_numeric, y_pred_numeric, average='weighted', zero_division=0)

# Print the metrics with four decimal places
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision (Weighted): {precision_weighted:.4f}")
print(f"Recall (Weighted): {recall_weighted:.4f}")
print(f"F1-Score (Weighted): {f1_weighted:.4f}")

## 2. Base Model

A feedforward multilayer ANN for classification task of CVEs - in their vectorization format - into their corresponding CWE.

### 2.1. Import data for supervised learning

In [None]:
number_of_cwes = 25

### Train data
# Step 1: Load the CSV file
file_path_train = "./datasets/supervised/train.csv"
data_train = pd.read_csv(file_path_train)

# Step 2: Separate X and Y
X_train = [s[:-1] if s.endswith('.') else s for s in data_train.iloc[:, 1].values]  # Keep only the CVE description
Y_train = data_train.iloc[:, 2:].values  # Latter two columns for targets

# Example: Assume these are the unique CWE IDs in your dataset
unique_cwe_ids = sorted(set([int(re.search(r'CWE-(\d{1,4})', row[0]).group(1)) for row in Y_train]))

# Create a mapping from CWE ID to an index (0 to 24)
cwe_to_index = {cwe: idx for idx, cwe in enumerate(unique_cwe_ids)}

# Map the numeric CWE IDs in Y_train to their indices
numeric_cwe_train = np.array([int(re.search(r'CWE-(\d{1,4})', row[0]).group(1)) for row in Y_train])
mapped_indices = np.array([cwe_to_index[cwe] for cwe in numeric_cwe_train])

# Create one-hot encoded matrix for the 25 unique CWE IDs
one_hot_encoded_fixed_train = np.zeros((len(mapped_indices), 25))  # 25 categories

# Set the appropriate positions
for i, index in enumerate(mapped_indices):
    one_hot_encoded_fixed_train[i, index] = 1

### Test data
# Load test data
file_path_test = "./datasets/supervised/test.csv"
data_test = pd.read_csv(file_path_test)

# Step 2: Separate X and Y
X_test = [s[:-1] if s.endswith('.') else s for s in data_test.iloc[:, 1].values]  # CVE descriptions
Y_test = data_test.iloc[:, 2:].values  # Target columns

# Extract numeric part from 'CWE-' using list comprehension and regex
numeric_cwe_test = np.array([int(re.search(r'CWE-(\d{1,4})', row[0]).group(1)) for row in Y_test])

# Map numeric CWE IDs to their indices using the training mapping
mapped_indices_test = np.array([
    cwe_to_index[cwe] if cwe in cwe_to_index else -1  # Use -1 for unknown CWE IDs
    for cwe in numeric_cwe_test
])

# Filter out rows with unknown CWE IDs (-1), if needed
valid_indices = mapped_indices_test != -1
mapped_indices_test = mapped_indices_test[valid_indices]
X_test = np.array(X_test)[valid_indices]

# Create one-hot encoded matrix for the 25 unique CWE IDs
one_hot_encoded_fixed_test = np.zeros((len(mapped_indices_test), 25))  # 25 categories

# Set the appropriate positions
for i, index in enumerate(mapped_indices_test):
    one_hot_encoded_fixed_test[i, index] = 1

### 2.2. Transform train and test data into their embedding format

In [None]:
# Generate embeddings for a list of CVE descriptions
embeddings_train = encoder_fine_tuned(X_train)
embeddings_test = encoder_fine_tuned(X_test)

### 2.3. ANN Definition

Hyperparameters

In [None]:
input_size = 768               # Input vector size (V_CVE_size)
num_classes = number_of_cwes   # Number of output classes (N_CWE)
hidden_sizes = [256, 128, 64]  # Sizes of hidden layers
activation_function = nn.ReLU  # Activation function to be used
batch_size = 64                # Batch size
learning_rate = 1e-3           # Learning rate
num_epochs = 10000             # Number of training epochs
dropout_prob = 0.5             # Dropout probability for regularization

Neural network model

In [None]:
# Build the neural network model dynamically
layers = []

# Input layer
layers.append(nn.Linear(input_size, hidden_sizes[0]))
layers.append(activation_function())

# Hidden layers
for i in range(len(hidden_sizes) - 1):
    layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i+1]))
    layers.append(activation_function())
    layers.append(nn.Dropout(dropout_prob))

# Output layer
layers.append(nn.Linear(hidden_sizes[-1], num_classes))
layers.append(nn.Softmax(dim=1))

# Create the sequential model
model = nn.Sequential(*layers)

In [None]:
print("Neural Network Model:")
print(model)

Loss function

In [None]:
# Loss function definiton

loss_fn = nn.CrossEntropyLoss()

Optimization method

In [None]:
# Defining the optimization method
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

### 2.4. ANN training

Prepare data for training

In [None]:
num_samples = 10

X_train = torch.tensor(embeddings_train, dtype=torch.float32)
y_train = torch.tensor(one_hot_encoded_fixed_train, dtype=torch.float32)  # CrossEntropyLoss expects labels of type Long

# Create a TensorDataset from X_train and y_train
train_dataset = TensorDataset(X_train, y_train)

X_test = torch.tensor(embeddings_test, dtype=torch.float32)
y_test = torch.tensor(one_hot_encoded_fixed_test, dtype=torch.float32)  # CrossEntropyLoss expects labels of type Long

# Create a TensorDataset from X_train and y_train
test_dataset = TensorDataset(X_test, y_test)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

Store data for training and the accuracy of the model

In [None]:
# Lists to store loss and accuracy for plotting
train_losses = []
train_accuracies = []
val_accuracies = []

Choose the GPU, if avaliable

In [None]:
# Check if CUDA is available and use the GPU if possible
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Move the model to the GPU
model = model.to(device)

Training the model

In [None]:
for epoch in range(num_epochs):
    # Training phase
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    for batch_X, batch_y in train_loader:
        # Move the batch to the GPU
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)

        # Convert one-hot encoded batch_y to class indices
        batch_y_indices = torch.argmax(batch_y, dim=1)  # Convert one-hot to class indices

        
        # Forward pass
        outputs = model(batch_X)
        # loss = loss_fn(outputs, batch_y)

        # Compute loss
        loss = loss_fn(outputs, batch_y_indices)  # Pass indices to loss function

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        # _, predicted = torch.max(outputs.data, 1)
        # total += batch_y.size(0)
        # correct += (predicted == batch_y).sum().item()

        # Get predicted class indices
        _, predicted = torch.max(outputs.data, 1)  # Outputs are of shape [batch_size, num_classes]
        
        # Calculate correct predictions
        total += batch_y_indices.size(0)
        correct += (predicted == batch_y_indices).sum().item()

    avg_train_loss = total_loss / len(train_loader)
    train_accuracy = 100 * correct / total
    train_losses.append(avg_train_loss)
    train_accuracies.append(train_accuracy)

    # Save the model after each epoch
    torch.save(model.state_dict(), "base_model.pth")

    print(f'Epoch [{epoch+1}/{num_epochs}], '
          f'Train Loss: {avg_train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, ')

### 2.5. ANN analysis

Data from training process

In [None]:
# Visualization of Loss and Accuracy

# Plot Loss over epochs
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

# Plot Accuracy over epochs
plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label='Train Accuracy')
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()

# Save the figure as a PNG with high DPI
plt.tight_layout()  # Adjust layout to prevent overlap
plt.savefig("training_metrics.png", dpi=600, bbox_inches='tight')  # Save as PNG with 600 DPI

plt.show()

In [None]:
# Create the figure and axis
fig, ax1 = plt.subplots(figsize=(12, 6))

# Plot the train loss on the left y-axis
ax1.plot(train_losses, 'b-', label='Train Loss', color='blue')
ax1.set_xlabel('Epoch', fontsize=16)  # Increase x-axis label font size
ax1.set_ylabel('Loss', color='blue', fontsize=20)  # Increase y-axis label font size
ax1.tick_params(axis='both', labelsize=12)  # Increase tick label font size
ax1.tick_params(axis='y', labelcolor='blue')

# Create a second y-axis to plot train accuracy
ax2 = ax1.twinx()
ax2.plot(train_accuracies, 'g-', label='Train Accuracy', color='green')
ax2.set_ylabel('Accuracy (%)', color='green', fontsize=14)  # Increase y-axis label font size
ax2.tick_params(axis='both', labelsize=12)  # Increase tick label font size
ax2.tick_params(axis='y', labelcolor='green')

Model final evaluation

In [None]:
# Evaluate on Test Data and Compute Performance Metrics

model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
    for test_X, test_y in test_loader:
        
        # Move the test data to the same device as the model
        test_X, test_y = test_X.to(device), test_y.to(device)
        
        outputs = model(test_X)
        _, predicted = torch.max(outputs.data, 1)
        all_preds.extend(predicted.cpu().numpy())

        # Convert one-hot encoded labels to class indices
        labels_indices = torch.argmax(test_y, dim=1)  # Class indices for true lab
        
        all_labels.extend(labels_indices.cpu().numpy())

# Compute metrics
test_accuracy = accuracy_score(all_labels, all_preds) * 100
precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
recall = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

print(f'Test Accuracy: {test_accuracy:.2f}%')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')

In [None]:
# Compute and Display Classification Report
num_classes = 25
class_names = [str(i) for i in range(num_classes)] 
report = classification_report(all_labels, all_preds, target_names=class_names)
print("\nClassification Report:")
print(report)

Save the base model

In [None]:
# Save the trained model
torch.save(model.state_dict(), "base_model.pth")
print("Model saved as 'base_model.pth'")