# **CVE TO MITRE BERT (WITHOUT CLUSTER)**


# USED THIS CODE FOR THE **MODEL**

In [None]:

import os
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertModel
import torch.nn as nn
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
from tqdm import tqdm
import transformers
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

transformers.logging.set_verbosity_error()

# Initialize device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Load the datasets
lama_df = pd.read_csv('20cluster_based_evaluation_results_with_similarity5.csv')
mitre_df = pd.read_csv('Processed_mitre.csv')

# Ensure all entries are strings and fill NaN values
lama_df['cve_text'] = lama_df['cve_text'].fillna("").astype(str)
mitre_df['Processed_Technique'] = mitre_df['Processed_Technique'].fillna("").astype(str)

# Set the similarity threshold and TF-IDF weight
similarity_threshold = 0.6
tfidf_weight = 0.5

# TF-IDF Vectorization
vectorizer = TfidfVectorizer(stop_words='english')
all_texts = lama_df['cve_text'].tolist() + mitre_df['Processed_Technique'].tolist()
tfidf_matrix = vectorizer.fit_transform(all_texts)
cve_tfidf = tfidf_matrix[:len(lama_df['cve_text'])]
mitre_tfidf = tfidf_matrix[len(lama_df['cve_text']):]

# Define a custom Dataset class for the Siamese network
class TextPairDataset(Dataset):
    def __init__(self, cve_texts, mitre_techniques, tokenizer, max_length=128):
        self.cve_texts = cve_texts
        self.mitre_techniques = mitre_techniques
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.cve_texts)

    def __getitem__(self, idx):
        cve_text = self.cve_texts[idx]
        mitre_technique = self.mitre_techniques[idx % len(self.mitre_techniques)]

        inputs_cve = self.tokenizer(cve_text, return_tensors='pt', max_length=self.max_length, truncation=True, padding='max_length')
        inputs_mitre = self.tokenizer(mitre_technique, return_tensors='pt', max_length=self.max_length, truncation=True, padding='max_length')

        return {
            'input_ids_cve': inputs_cve['input_ids'].squeeze(0),
            'attention_mask_cve': inputs_cve['attention_mask'].squeeze(0),
            'input_ids_mitre': inputs_mitre['input_ids'].squeeze(0),
            'attention_mask_mitre': inputs_mitre['attention_mask'].squeeze(0)
        }

# Define the Siamese Network with BERT for similarity scoring
class SiameseBertSimilarityModel(nn.Module):
    def __init__(self, bert_model):
        super(SiameseBertSimilarityModel, self).__init__()
        self.bert = bert_model
        self.dense = nn.Linear(768, 256)
        self.cosine_similarity = nn.CosineSimilarity(dim=1)

    def forward(self, input_ids_cve, attention_mask_cve, input_ids_mitre, attention_mask_mitre):
        # Encode the CVE text
        cve_output = self.bert(input_ids=input_ids_cve, attention_mask=attention_mask_cve).pooler_output
        cve_embedding = self.dense(cve_output)

        # Encode the MITRE technique text
        mitre_output = self.bert(input_ids=input_ids_mitre, attention_mask=attention_mask_mitre).pooler_output
        mitre_embedding = self.dense(mitre_output)

        # Compute the cosine similarity between the embeddings
        similarity_score = self.cosine_similarity(cve_embedding, mitre_embedding)
        return similarity_score

# Initialize the tokenizer and model
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased').to(device)
model = SiameseBertSimilarityModel(bert_model).to(device)

# Loss and optimizer
criterion = nn.MSELoss()
optimizer = Adam(model.parameters(), lr=2e-5)
scheduler = StepLR(optimizer, step_size=1, gamma=0.1)

# Prepare the data for training
train_dataset = TextPairDataset(lama_df['cve_text'], mitre_df['Processed_Technique'], tokenizer)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Training loop
epochs = 3
for epoch in range(epochs):
    model.train()
    train_loss = 0
    all_labels = []
    all_preds = []

    with tqdm(train_loader, desc=f"Training Epoch {epoch+1}", leave=False) as progress_bar:
        for batch in progress_bar:
            input_ids_cve = batch['input_ids_cve'].to(device)
            attention_mask_cve = batch['attention_mask_cve'].to(device)
            input_ids_mitre = batch['input_ids_mitre'].to(device)
            attention_mask_mitre = batch['attention_mask_mitre'].to(device)

            # Forward pass
            similarity_scores = model(input_ids_cve, attention_mask_cve, input_ids_mitre, attention_mask_mitre)
            loss = criterion(similarity_scores, torch.ones_like(similarity_scores).to(device))

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            all_labels.extend([1] * input_ids_cve.size(0))
            all_preds.extend(similarity_scores.cpu().detach().numpy())

        progress_bar.set_postfix({'Loss': train_loss / len(train_loader)})

    # Step the scheduler
    scheduler.step()
    print(f"Epoch {epoch+1} completed, Average Training Loss: {train_loss / len(train_loader)}")

# Save the trained model and tokenizer
model_dir = 'Mitre_SiameseBert_trained_model_similarity'
if not os.path.exists(model_dir):
    os.makedirs(model_dir)
model.bert.save_pretrained(model_dir)
tokenizer.save_pretrained(model_dir)
print("Siamese BERT model and tokenizer saved.")


Using device: cpu


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]



Epoch 1 completed, Average Training Loss: 0.00013913426931593366




Epoch 2 completed, Average Training Loss: 4.696037284118491e-06




Epoch 3 completed, Average Training Loss: 4.227787390186494e-06




Epoch 3 completed, Average Training Loss: 4.227787390186494e-06
Siamese BERT model and tokenizer saved.
Siamese BERT model and tokenizer saved.


TypeError: Concatenation operation is not implemented for NumPy arrays, use np.concatenate() instead. Please do not rely on this error; it may not be given on all Python implementations.

TypeError: Concatenation operation is not implemented for NumPy arrays, use np.concatenate() instead. Please do not rely on this error; it may not be given on all Python implementations.

# Training and Saving CSV

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
from transformers import BertTokenizer, BertModel
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
import torch.nn as nn

# Load model
model_dir = '/content/Untitled Folder'
tokenizer = BertTokenizer.from_pretrained(model_dir)
bert_model = BertModel.from_pretrained(model_dir)

lama_df = pd.read_csv('PRE_FRI_TRAIN_CVE_BERTresults.csv')
mitre_df = pd.read_csv('Processed_mitre_recommidtaion.csv')

class SiameseBertSimilarityModel(nn.Module):
    def _init_(self, bert_model):
        super(SiameseBertSimilarityModel, self)._init_()
        self.bert = bert_model
        self.dense = nn.Linear(768, 256)
        self.cosine_similarity = nn.CosineSimilarity(dim=1)

    def forward(self, input_ids_cve, attention_mask_cve, input_ids_mitre, attention_mask_mitre):
        cve_output = self.bert(input_ids=input_ids_cve, attention_mask=attention_mask_cve).pooler_output
        mitre_output = self.bert(input_ids=input_ids_mitre, attention_mask=attention_mask_mitre).pooler_output
        cve_embedding = self.dense(cve_output)
        mitre_embedding = self.dense(mitre_output)
        similarity_score = self.cosine_similarity(cve_embedding, mitre_embedding)
        return similarity_score

# Initialize the pretrained model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SiameseBertSimilarityModel(bert_model).to(device)
model.eval()

# Preselect relevant MITRE techniques based on TF-IDF
def preselect_mitre(cve_texts, mitre_texts, top_n=5):
    vectorizer = TfidfVectorizer(stop_words='english')
    tfidf_cve = vectorizer.fit_transform(cve_texts)
    tfidf_mitre = vectorizer.transform(mitre_texts)

    preselected_mitre_indices = []
    for i in range(tfidf_cve.shape[0]):
        similarities = cosine_similarity(tfidf_cve[i], tfidf_mitre).flatten()
        top_indices = similarities.argsort()[-top_n:][::-1]  # Get top N indices
        preselected_mitre_indices.append(top_indices)

    return preselected_mitre_indices

# Parameters
batch_size = 128
similarity_threshold = 0.6
max_matches_per_cve = 3

# Preselect MITRE techniques for each CVE
cve_texts = lama_df['cve_text'].tolist()
mitre_texts = mitre_df['Processed_Technique'].tolist()
preselected_mitre_indices = preselect_mitre(cve_texts, mitre_texts, top_n=5)

class CVEMITREDataset(Dataset):
    def _init_(self, cve_texts, mitre_texts, preselected_indices, tokenizer, max_length=128):
        self.cve_texts = cve_texts
        self.mitre_texts = mitre_texts
        self.preselected_indices = preselected_indices
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.tokenized_cve = [self.tokenizer(text, return_tensors='pt', max_length=self.max_length, truncation=True, padding='max_length') for text in cve_texts]
        self.tokenized_mitre = [self.tokenizer(text, return_tensors='pt', max_length=self.max_length, truncation=True, padding='max_length') for text in mitre_texts]

    def _len_(self):
        return sum(len(indices) for indices in self.preselected_indices)

    def _getitem_(self, idx):
        cumulative_count = 0
        for cve_idx, indices in enumerate(self.preselected_indices):
            if cumulative_count + len(indices) > idx:
                mitre_local_idx = idx - cumulative_count
                mitre_idx = indices[mitre_local_idx]
                break
            cumulative_count += len(indices)

        inputs_cve = self.tokenized_cve[cve_idx]
        inputs_mitre = self.tokenized_mitre[mitre_idx]

        return {
            'input_ids_cve': inputs_cve['input_ids'].squeeze(0),
            'attention_mask_cve': inputs_cve['attention_mask'].squeeze(0),
            'input_ids_mitre': inputs_mitre['input_ids'].squeeze(0),
            'attention_mask_mitre': inputs_mitre['attention_mask'].squeeze(0),
            'cve_idx': cve_idx,
            'mitre_idx': mitre_idx
        }

# Dataloader for batching
dataloader = DataLoader(CVEMITREDataset(cve_texts, mitre_texts, preselected_mitre_indices, tokenizer), batch_size=batch_size, shuffle=False)

# Process with batching
cve_similarities = {i: [] for i in range(len(cve_texts))}
for batch in tqdm(dataloader, desc="Processing"):
    input_ids_cve = batch['input_ids_cve'].to(device)
    attention_mask_cve = batch['attention_mask_cve'].to(device)
    input_ids_mitre = batch['input_ids_mitre'].to(device)
    attention_mask_mitre = batch['attention_mask_mitre'].to(device)

    with torch.no_grad():
        similarity_scores = model(input_ids_cve, attention_mask_cve, input_ids_mitre, attention_mask_mitre)

    for i, score in enumerate(similarity_scores):
        cve_idx = batch['cve_idx'][i].item()
        mitre_idx = batch['mitre_idx'][i].item()
        if score.item() >= similarity_threshold:
            cve_similarities[cve_idx].append((mitre_idx, score.item()))

# Add all matching techniques for each CVE
lama_df['Matched_Techniques'] = ''
lama_df['Similarity_Scores'] = ''

for idx, similarities in cve_similarities.items():
    # Limit the number of matches to max_matches_per_cve
    sorted_similarities = sorted(similarities, key=lambda x: x[1], reverse=True)[:max_matches_per_cve]
    matched_techniques = [mitre_df.iloc[mitre_idx]['Processed_Technique'] for mitre_idx, _ in sorted_similarities]
    similarity_scores = [score for _, score in sorted_similarities]

    lama_df.at[idx, 'Matched_Techniques'] = '; '.join(matched_techniques)
    lama_df.at[idx, 'Similarity_Scores'] = '; '.join(map(str, similarity_scores))

# Map Tactics and Mitigations to Matched Techniques
def map_tactics_and_mitigations(matched_techniques, mitre_df):
    tactics = []
    mitigations = []
    for technique in matched_techniques.split('; '):
        tactic = mitre_df.loc[mitre_df['Processed_Technique'] == technique, 'Tactic'].values
        mitigation = mitre_df.loc[mitre_df['Processed_Technique'] == technique, 'mitigation'].values
        tactics.append(tactic[0] if len(tactic) > 0 else 'Unknown')
        mitigations.append(mitigation[0] if len(mitigation) > 0 else 'Unknown')
    return '; '.join(tactics), '; '.join(mitigations)

# Add new columns
lama_df['Mapped_Tactics'] = ''
lama_df['Mapped_Mitigations'] = ''
lama_df['Label'] = ''

# Process each row to map tactics, mitigations, and determine logical mapping
for idx, row in lama_df.iterrows():
    matched_techniques = row['Matched_Techniques']
    similarity_scores = row['Similarity_Scores']

    if pd.notna(matched_techniques):
        # Map tactics and mitigations
        tactics, mitigations = map_tactics_and_mitigations(matched_techniques, mitre_df)
        lama_df.at[idx, 'Mapped_Tactics'] = tactics
        lama_df.at[idx, 'Mapped_Mitigations'] = mitigations

        # Determine label based on similarity score threshold
        scores = list(map(float, similarity_scores.split('; '))) if pd.notna(similarity_scores) else []
        if any(score > 0.75 for score in scores):
            lama_df.at[idx, 'Label'] = 'Yes'
        else:
            lama_df.at[idx, 'Label'] = 'No'
    else:
        lama_df.at[idx, 'Label'] = 'No'

# Save final mapping results
lama_df.to_csv("optimized_lama6_bert_mapping_mitre_tactics_mitigations.csv", index=False)
print("Updated DataFrame with tactics, mitigations, and label saved to 'optimized_lama6_bert_mapping_mitre_tactics_mitigations.csv'.")
print(lama_df.head())

# Testing

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
from transformers import BertTokenizer, BertModel
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
import torch.nn as nn
from sklearn.metrics import classification_report, precision_score, recall_score, f1_score, accuracy_score

# Load files
test_mapping_path = 'test_Expirt_cluster_with_keywords.csv'
expert_label_path = 'CVE_Dataset_with_Binary_Logical_Match_Column.csv'
mitre_path = 'Processed_mitre_recommidtaion.csv'

test_mapping_df = pd.read_csv(test_mapping_path)
expert_label_df = pd.read_csv(expert_label_path)
mitre_df = pd.read_csv(mitre_path)

# Load pretrained model
model_dir = '/content/Untitled Folder'
tokenizer = BertTokenizer.from_pretrained(model_dir)
bert_model = BertModel.from_pretrained(model_dir)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Revised Siamese BERT Model with weighted cosine similarity
class SiameseBertSimilarityModel(nn.Module):
    def _init_(self, bert_model, weight=0.84):
        super(SiameseBertSimilarityModel, self)._init_()
        self.bert = bert_model
        self.dense = nn.Linear(768, 256)
        self.cosine_similarity = nn.CosineSimilarity(dim=1)
        self.weight = weight

    def forward(self, input_ids_cve, attention_mask_cve, input_ids_mitre, attention_mask_mitre):
        cve_output = self.bert(input_ids=input_ids_cve, attention_mask=attention_mask_cve).pooler_output
        mitre_output = self.bert(input_ids=input_ids_mitre, attention_mask=attention_mask_mitre).pooler_output

        # Dense layer for embeddings
        cve_embedding = self.dense(cve_output)
        mitre_embedding = self.dense(mitre_output)

        # Compute cosine similarity and apply weight
        similarity_score = self.cosine_similarity(cve_embedding, mitre_embedding) * self.weight
        return similarity_score

model = SiameseBertSimilarityModel(bert_model, weight=0.84).to(device)
model.eval()

# Function to preselect MITRE techniques using TF-IDF
def preselect_mitre(cve_texts, mitre_texts, top_n=5):
    vectorizer = TfidfVectorizer(stop_words='english')
    tfidf_cve = vectorizer.fit_transform(cve_texts)
    tfidf_mitre = vectorizer.transform(mitre_texts)

    preselected_mitre_indices = []
    for i in range(tfidf_cve.shape[0]):
        similarities = cosine_similarity(tfidf_cve[i], tfidf_mitre).flatten()
        top_indices = similarities.argsort()[-top_n:][::-1]
        preselected_mitre_indices.append(top_indices)

    return preselected_mitre_indices

# Dataset class for batching
class CVEMITREDataset(Dataset):
    def _init_(self, cve_texts, mitre_texts, preselected_indices, tokenizer, max_length=128):
        self.cve_texts = cve_texts
        self.mitre_texts = mitre_texts
        self.preselected_indices = preselected_indices
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.tokenized_cve = [
            self.tokenizer(text, return_tensors='pt', max_length=self.max_length, truncation=True, padding='max_length')
            for text in cve_texts
        ]
        self.tokenized_mitre = [
            self.tokenizer(text, return_tensors='pt', max_length=self.max_length, truncation=True, padding='max_length')
            for text in mitre_texts
        ]

    def _len_(self):
        return sum(len(indices) for indices in self.preselected_indices)

    def _getitem_(self, idx):
        cumulative_count = 0
        for cve_idx, indices in enumerate(self.preselected_indices):
            if cumulative_count + len(indices) > idx:
                mitre_local_idx = idx - cumulative_count
                mitre_idx = indices[mitre_local_idx]
                break
            cumulative_count += len(indices)

        inputs_cve = self.tokenized_cve[cve_idx]
        inputs_mitre = self.tokenized_mitre[mitre_idx]

        return {
            'input_ids_cve': inputs_cve['input_ids'].squeeze(0),
            'attention_mask_cve': inputs_cve['attention_mask'].squeeze(0),
            'input_ids_mitre': inputs_mitre['input_ids'].squeeze(0),
            'attention_mask_mitre': inputs_mitre['attention_mask'].squeeze(0),
            'cve_idx': cve_idx,
            'mitre_idx': mitre_idx
        }

# Preprocess test dataset
test_cve_texts = test_mapping_df['cve_text'].tolist()
mitre_texts = mitre_df['Processed_Technique'].tolist()
test_preselected_indices = preselect_mitre(test_cve_texts, mitre_texts, top_n=5)

# Create DataLoader
batch_size = 32
similarity_threshold = 0.838
max_matches_per_cve = 3

test_dataset = CVEMITREDataset(test_cve_texts, mitre_texts, test_preselected_indices, tokenizer)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Map test dataset
test_similarities = {i: [] for i in range(len(test_cve_texts))}

for batch in tqdm(test_dataloader, desc="Mapping Test Data"):
    input_ids_cve = batch['input_ids_cve'].to(device)
    attention_mask_cve = batch['attention_mask_cve'].to(device)
    input_ids_mitre = batch['input_ids_mitre'].to(device)
    attention_mask_mitre = batch['attention_mask_mitre'].to(device)

    with torch.no_grad():
        similarity_scores = model(input_ids_cve, attention_mask_cve, input_ids_mitre, attention_mask_mitre)

    for i, score in enumerate(similarity_scores):
        cve_idx = batch['cve_idx'][i].item()
        mitre_idx = batch['mitre_idx'][i].item()
        if score.item() >= similarity_threshold:
            test_similarities[cve_idx].append((mitre_idx, score.item()))

# Populate results
test_mapping_df['Matched_Techniques'] = ''
test_mapping_df['Similarity_Scores'] = ''
test_mapping_df['Tactics'] = ''
test_mapping_df['Mitigations'] = ''

for idx, similarities in test_similarities.items():
    sorted_similarities = sorted(similarities, key=lambda x: x[1], reverse=True)[:max_matches_per_cve]
    matched_techniques = [mitre_df.iloc[mitre_idx]['Processed_Technique'] for mitre_idx, _ in sorted_similarities]
    similarity_scores = [score for _, score in sorted_similarities]
    matched_tactics = [mitre_df.iloc[mitre_idx]['Tactic'] for mitre_idx, _ in sorted_similarities]
    matched_mitigations = [mitre_df.iloc[mitre_idx]['mitigation'] for mitre_idx, _ in sorted_similarities]

    test_mapping_df.at[idx, 'Matched_Techniques'] = '; '.join(matched_techniques)
    test_mapping_df.at[idx, 'Similarity_Scores'] = '; '.join(map(str, similarity_scores))
    test_mapping_df.at[idx, 'Tactics'] = '; '.join(matched_tactics)
    test_mapping_df.at[idx, 'Mitigations'] = '; '.join(matched_mitigations)

# Generate predictions with binary labels
def calculate_predicted_label(scores):
    if not scores or scores == '':
        return 0
    try:
        return 1 if any(float(score) > similarity_threshold for score in scores.split('; ')) else 0
    except ValueError:
        return 0

test_mapping_df['Predicted_Label'] = test_mapping_df['Similarity_Scores'].apply(calculate_predicted_label)

# Compare with expert labels
expert_labels = expert_label_df['Logical Match'].tolist()
predicted_labels = test_mapping_df['Predicted_Label'].tolist()

# Ensure label alignment
assert len(expert_labels) == len(predicted_labels), "Mismatch between expert and predicted labels."

# Calculate evaluation metrics
print("Classification Report:")
print(classification_report(expert_labels, predicted_labels))

precision = precision_score(expert_labels, predicted_labels, average='weighted')
recall = recall_score(expert_labels, predicted_labels, average='weighted')
f1 = f1_score(expert_labels, predicted_labels, average='weighted')
accuracy = accuracy_score(expert_labels, predicted_labels)

print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1-Score: {f1:.2f}")
print(f"Accuracy: {accuracy:.2f}")

# Save results with tactics and mitigations
test_mapping_df.to_csv("final_test_mapping_results_bert_try3.csv", index=False)
print("Mapped test dataset with tactics and mitigations saved to 'final_test_mapping_results_with_tactics_and_mitigations.csv'.")

# Expert(with Cluster)

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
from transformers import BertTokenizer, BertModel
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
import torch.nn as nn
from sklearn.metrics import classification_report, precision_score, recall_score, f1_score, accuracy_score

# Load files
test_mapping_path = 'test_Expirt_cluster_with_keywords.csv'
expert_label_path = 'CVE_Dataset_with_Binary_Logical_Match_Column.csv'
mitre_path = 'Processed_mitre_recommidtaion.csv'

test_mapping_df = pd.read_csv(test_mapping_path)
expert_label_df = pd.read_csv(expert_label_path)
mitre_df = pd.read_csv(mitre_path)

# Load pretrained model
model_dir = '/content/Untitled Folder'
tokenizer = BertTokenizer.from_pretrained(model_dir)
bert_model = BertModel.from_pretrained(model_dir)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Revised Siamese BERT Model with weighted cosine similarity
class SiameseBertSimilarityModel(nn.Module):
    def _init_(self, bert_model, weight=0.84):
        super(SiameseBertSimilarityModel, self)._init_()
        self.bert = bert_model
        self.dense = nn.Linear(768, 256)
        self.cosine_similarity = nn.CosineSimilarity(dim=1)
        self.weight = weight

    def forward(self, input_ids_cve, attention_mask_cve, input_ids_mitre, attention_mask_mitre):
        cve_output = self.bert(input_ids=input_ids_cve, attention_mask=attention_mask_cve).pooler_output
        mitre_output = self.bert(input_ids=input_ids_mitre, attention_mask=attention_mask_mitre).pooler_output

        # Dense layer for embeddings
        cve_embedding = self.dense(cve_output)
        mitre_embedding = self.dense(mitre_output)

        # Compute cosine similarity and apply weight
        similarity_score = self.cosine_similarity(cve_embedding, mitre_embedding) * self.weight
        return similarity_score

model = SiameseBertSimilarityModel(bert_model, weight=0.84).to(device)
model.eval()

# Function to preselect MITRE techniques using TF-IDF
def preselect_mitre(cve_texts, mitre_texts, top_n=5):
    vectorizer = TfidfVectorizer(stop_words='english')
    tfidf_cve = vectorizer.fit_transform(cve_texts)
    tfidf_mitre = vectorizer.transform(mitre_texts)

    preselected_mitre_indices = []
    for i in range(tfidf_cve.shape[0]):
        similarities = cosine_similarity(tfidf_cve[i], tfidf_mitre).flatten()
        top_indices = similarities.argsort()[-top_n:][::-1]
        preselected_mitre_indices.append(top_indices)

    return preselected_mitre_indices

# Dataset class for batching
class CVEMITREDataset(Dataset):
    def _init_(self, cve_texts, mitre_texts, preselected_indices, tokenizer, max_length=128):
        self.cve_texts = cve_texts
        self.mitre_texts = mitre_texts
        self.preselected_indices = preselected_indices
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.tokenized_cve = [
            self.tokenizer(text, return_tensors='pt', max_length=self.max_length, truncation=True, padding='max_length')
            for text in cve_texts
        ]
        self.tokenized_mitre = [
            self.tokenizer(text, return_tensors='pt', max_length=self.max_length, truncation=True, padding='max_length')
            for text in mitre_texts
        ]

    def _len_(self):
        return sum(len(indices) for indices in self.preselected_indices)

    def _getitem_(self, idx):
        cumulative_count = 0
        for cve_idx, indices in enumerate(self.preselected_indices):
            if cumulative_count + len(indices) > idx:
                mitre_local_idx = idx - cumulative_count
                mitre_idx = indices[mitre_local_idx]
                break
            cumulative_count += len(indices)

        inputs_cve = self.tokenized_cve[cve_idx]
        inputs_mitre = self.tokenized_mitre[mitre_idx]

        return {
            'input_ids_cve': inputs_cve['input_ids'].squeeze(0),
            'attention_mask_cve': inputs_cve['attention_mask'].squeeze(0),
            'input_ids_mitre': inputs_mitre['input_ids'].squeeze(0),
            'attention_mask_mitre': inputs_mitre['attention_mask'].squeeze(0),
            'cve_idx': cve_idx,
            'mitre_idx': mitre_idx
        }

# Preprocess test dataset
test_cve_texts = test_mapping_df['cve_text'].tolist()
mitre_texts = mitre_df['Processed_Technique'].tolist()
test_preselected_indices = preselect_mitre(test_cve_texts, mitre_texts, top_n=5)

# Create DataLoader
batch_size = 32
similarity_threshold = 0.838
max_matches_per_cve = 3

test_dataset = CVEMITREDataset(test_cve_texts, mitre_texts, test_preselected_indices, tokenizer)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Map test dataset
test_similarities = {i: [] for i in range(len(test_cve_texts))}

for batch in tqdm(test_dataloader, desc="Mapping Test Data"):
    input_ids_cve = batch['input_ids_cve'].to(device)
    attention_mask_cve = batch['attention_mask_cve'].to(device)
    input_ids_mitre = batch['input_ids_mitre'].to(device)
    attention_mask_mitre = batch['attention_mask_mitre'].to(device)

    with torch.no_grad():
        similarity_scores = model(input_ids_cve, attention_mask_cve, input_ids_mitre, attention_mask_mitre)

    for i, score in enumerate(similarity_scores):
        cve_idx = batch['cve_idx'][i].item()
        mitre_idx = batch['mitre_idx'][i].item()
        if score.item() >= similarity_threshold:
            test_similarities[cve_idx].append((mitre_idx, score.item()))

# Populate results
test_mapping_df['Matched_Techniques'] = ''
test_mapping_df['Similarity_Scores'] = ''
test_mapping_df['Tactics'] = ''
test_mapping_df['Mitigations'] = ''

for idx, similarities in test_similarities.items():
    sorted_similarities = sorted(similarities, key=lambda x: x[1], reverse=True)[:max_matches_per_cve]
    matched_techniques = [mitre_df.iloc[mitre_idx]['Processed_Technique'] for mitre_idx, _ in sorted_similarities]
    similarity_scores = [score for _, score in sorted_similarities]
    matched_tactics = [mitre_df.iloc[mitre_idx]['Tactic'] for mitre_idx, _ in sorted_similarities]
    matched_mitigations = [mitre_df.iloc[mitre_idx]['mitigation'] for mitre_idx, _ in sorted_similarities]

    test_mapping_df.at[idx, 'Matched_Techniques'] = '; '.join(matched_techniques)
    test_mapping_df.at[idx, 'Similarity_Scores'] = '; '.join(map(str, similarity_scores))
    test_mapping_df.at[idx, 'Tactics'] = '; '.join(matched_tactics)
    test_mapping_df.at[idx, 'Mitigations'] = '; '.join(matched_mitigations)

# Generate predictions with binary labels
def calculate_predicted_label(scores):
    if not scores or scores == '':
        return 0
    try:
        return 1 if any(float(score) > similarity_threshold for score in scores.split('; ')) else 0
    except ValueError:
        return 0

test_mapping_df['Predicted_Label'] = test_mapping_df['Similarity_Scores'].apply(calculate_predicted_label)

# Compare with expert labels
expert_labels = expert_label_df['Logical Match'].tolist()
predicted_labels = test_mapping_df['Predicted_Label'].tolist()

# Ensure label alignment
assert len(expert_labels) == len(predicted_labels), "Mismatch between expert and predicted labels."

# Calculate evaluation metrics
print("Classification Report:")
print(classification_report(expert_labels, predicted_labels))

precision = precision_score(expert_labels, predicted_labels, average='weighted')
recall = recall_score(expert_labels, predicted_labels, average='weighted')
f1 = f1_score(expert_labels, predicted_labels, average='weighted')
accuracy = accuracy_score(expert_labels, predicted_labels)

print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1-Score: {f1:.2f}")
print(f"Accuracy: {accuracy:.2f}")

# Save results with tactics and mitigations
test_mapping_df.to_csv("final_test_mapping_results_bert_try3.csv", index=False)
print("Mapped test dataset with tactics and mitigations saved to 'final_test_mapping_results_with_tactics_and_mitigations.csv'.")