**Title**: CVSS prediction\
**Description**: Load all pre-trained models to predict CVSS score\
**Developer**: Teck Lim\
**Create date**: 04/06/2021

# Import packages

In [None]:
import os
import pandas as pd
import json
import matplotlib.pyplot as plt
import numpy as np
import math
from google.colab import drive

!pip install transformers

In [None]:
drive.mount('/content/gdrive')

# CVSS Calculator

In [None]:
def round_up(input):
    int_input = round(input * 100000)
    if int_input % 10000 == 0:
        return int_input / 100000.0
    else:
        return (math.floor(int_input / 10000) + 1) / 10.0

def get_av_score(metric):
    if metric == 'network':
        return 0.85
    elif metric == 'adjacent_network':
        return 0.62
    elif metric == 'local':
        return 0.55
    elif metric == 'physical':
        return 0.20
    else:
        raise ValueError('Invalid metric value')

def get_ac_score(metric):
    if metric == 'low':
        return 0.77
    elif metric == 'high':
        return 0.44
    else:
        raise ValueError('Invalid metric value')

def get_pr_score(metric, s):
    if metric == 'none':
        return 0.85
    elif metric == 'low':
        return 0.68 if s == 'changed' else 0.62
    elif metric == 'high':
        return 0.50 if s == 'changed' else 0.27
    else:
        raise ValueError('Invalid metric value')

def get_ui_score(metric):
    if metric == 'none':
        return 0.85
    elif metric == 'required':
        return 0.62
    else:
        raise ValueError('Invalid metric value')

def get_c_score(metric):
    if metric == 'high':
        return 0.56
    elif metric == 'low':
        return 0.22
    elif metric == 'none':
        return 0
    else:
        raise ValueError('Invalid metric value')

def get_i_score(metric):
    if metric == 'high':
        return 0.56
    elif metric == 'low':
        return 0.22
    elif metric == 'none':
        return 0
    else:
        raise ValueError('Invalid metric value')

def get_a_score(metric):
    if metric == 'high':
        return 0.56
    elif metric == 'low':
        return 0.22
    elif metric == 'none':
        return 0
    else:
        raise ValueError('Invalid metric value')

def calculcate_iss(c, i, a):
    return 1 - (1-get_c_score(c)) * (1-get_i_score(i)) * (1-get_a_score(a))

def calculate_impact(s, c, i, a):
    iss = calculcate_iss(c, i, a)
    if s == 'unchanged':
        return 6.42 * iss
    elif s == 'changed':
        return (7.52 * (iss - 0.029)) - (3.25 * (iss - 0.02)**15)
    else:
        raise ValueError('Invalid metric value')

def calculate_exploitability(av, ac, pr, ui, s):
    return 8.22 * get_av_score(av) * get_ac_score(ac) * get_pr_score(pr, s) * get_ui_score(ui)

def calculate_scores(av, ac, pr, ui, s, c, i, a):
    av = av.lower()
    ac = ac.lower()
    pr = pr.lower()
    ui = ui.lower()
    s = s.lower()
    c = c.lower()
    i = i.lower()
    a = a.lower()

    impact = calculate_impact(s, c, i, a)
    exploitability = calculate_exploitability(av, ac, pr, ui, s)
    if impact <= 0:
        base = 0
    if s == 'unchanged':
        base = min((impact + exploitability), 10)
    elif s == 'changed':
        base = min(1.08 * (impact + exploitability), 10)
    return round_up(base), round(impact, 1), round(exploitability, 1)

In [None]:
# Sample to validate the calculator
calculate_scores('Network', 'High', 'Low', 'Required', 'Unchanged', 'Low', 'Low', 'Low')

## Validation

In [None]:
file_path = './gdrive/Shareddrives/ucsd_drive/Data/cve_train.csv'
df_train = pd.read_csv(file_path)

In [None]:
for idx, row in df_train.iterrows():
    av = row['attack_vector'].lower()
    ac = row['attack_complexity'].lower()
    pr = row['privileges_required'].lower()
    ui = row['user_interaction'].lower()
    s = row['scope'].lower()
    c = row['confidentiality'].lower()
    i = row['integrity'].lower()
    a = row['availability'].lower()
    base_score = row['base_score']
    exploitability_score = row['exploitability_score']
    impact_score = row['impact_score']

    try:
        cal_base_score, cal_impact_score, cal_exploitability_score = calculate_scores(av, ac, pr, ui, s, c, i, a)
    except Exception as e:
        print('Index: {}, {}'.format(idx, row['cve_id']))
        continue

    if base_score != cal_base_score or exploitability_score != cal_exploitability_score or impact_score != cal_impact_score:
        print('Index: {}, {}'.format(idx, row['cve_id']))
        print('Base score: {}, {}'.format(base_score, cal_base_score))
        print('Exploitability score: {}, {}'.format(exploitability_score, cal_exploitability_score))
        print('Impact score: {}, {}'.format(impact_score, cal_impact_score))
        continue

# Load the pre-trained models

In [None]:
av_output_dir = './gdrive/Shareddrives/ucsd_drive/Model/AV'
ac_output_dir = './gdrive/Shareddrives/ucsd_drive/Model/AC'
ui_output_dir = './gdrive/Shareddrives/ucsd_drive/Model/UI'
pr_output_dir = './gdrive/Shareddrives/ucsd_drive/Model/PR'
s_output_dir = './gdrive/Shareddrives/ucsd_drive/Model/SC'
c_output_dir = './gdrive/Shareddrives/ucsd_drive/Model/CI'
i_output_dir = './gdrive/Shareddrives/ucsd_drive/Model/II'
a_output_dir = './gdrive/Shareddrives/ucsd_drive/Model/AI'

In [None]:
import torch

# If there's a GPU available...
if torch.cuda.is_available():    
    # Tell PyTorch to use the GPU.    
    device = torch.device('cuda')
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
# If not...
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device('cpu')

In [None]:
from transformers import BertForSequenceClassification, BertTokenizer

av_model = BertForSequenceClassification.from_pretrained(av_output_dir, output_hidden_states=True)
av_tokenizer = BertTokenizer.from_pretrained(av_output_dir)
av_model.to(device)

ac_model = BertForSequenceClassification.from_pretrained(ac_output_dir, output_hidden_states=True)
ac_tokenizer = BertTokenizer.from_pretrained(ac_output_dir)
ac_model.to(device)

pr_model = BertForSequenceClassification.from_pretrained(pr_output_dir, output_hidden_states=True)
pr_tokenizer = BertTokenizer.from_pretrained(pr_output_dir)
pr_model.to(device)

ui_model = BertForSequenceClassification.from_pretrained(ui_output_dir, output_hidden_states=True)
ui_tokenizer = BertTokenizer.from_pretrained(ui_output_dir)
ui_model.to(device)

s_model = BertForSequenceClassification.from_pretrained(s_output_dir, output_hidden_states=True)
s_tokenizer = BertTokenizer.from_pretrained(s_output_dir)
s_model.to(device)

c_model = BertForSequenceClassification.from_pretrained(c_output_dir, output_hidden_states=True)
c_tokenizer = BertTokenizer.from_pretrained(c_output_dir)
c_model.to(device)

i_model = BertForSequenceClassification.from_pretrained(i_output_dir, output_hidden_states=True)
i_tokenizer = BertTokenizer.from_pretrained(i_output_dir)
i_model.to(device)

a_model = BertForSequenceClassification.from_pretrained(a_output_dir, output_hidden_states=True)
a_tokenizer = BertTokenizer.from_pretrained(a_output_dir)
a_model.to(device)

print('All models loaded')

In [None]:
import torch
def text_to_embedding(tokenizer, model, max_len, in_text):
    encoded_dict = tokenizer.encode_plus(
                        in_text,                      # Sentence to encode.
                        add_special_tokens = True,    # Add '[CLS]' and '[SEP]'
                        max_length = max_len,         # Pad & truncate all sentences.
                        padding='max_length',
                        # pad_to_max_length = True,
                        truncation=True,
                        return_attention_mask = True, # Construct attn. masks.
                        return_tensors = 'pt',        # Return pytorch tensors.
                    )
    input_ids = encoded_dict['input_ids']
    attn_mask = encoded_dict['attention_mask']

    model.eval()

    input_ids = input_ids.to(device)
    attn_mask = attn_mask.to(device)

    with torch.no_grad():
        result = model(input_ids=input_ids,
                    token_type_ids=None,
                    attention_mask=attn_mask)

    # print(result.hidden_states[12][0][0])
    layer_i = 12
    batch_i = 0
    token_i = 0

    logits = result.logits
    logits = logits.detach().cpu().numpy()

    vec = result.hidden_states[layer_i][batch_i][token_i]
    vec = vec.detach().cpu().numpy()

    return logits, vec

# Predictions

## Predict function

In [None]:
import textwrap

def print_custom(text, enabled=True):
    if enabled:
        print(text)

def predict(input_text, enabled=True):
    wrapper = textwrap.TextWrapper(initial_indent='  ', subsequent_indent='  ', width=120)
    print_custom('Description: \n\n{}'.format(wrapper.fill(input_text)), enabled)

    print_custom('\nPredictions:\n', enabled)
    logits, vec = text_to_embedding(av_tokenizer, av_model, 512, input_text)
    if np.argmax(logits, axis=1) == 0:
        pred_av = 'network'
    elif np.argmax(logits, axis=1) == 1:
        pred_av = 'adjacent_network'
    elif np.argmax(logits, axis=1) == 2:
        pred_av = 'local'
    else:
        pred_av = 'physical'
    print_custom('  AV: {}\t\t{}'.format(pred_av.capitalize(), logits[0]), enabled)
    # print('\nEmbedding shape:', str(vec.shape))

    logits, vec = text_to_embedding(ac_tokenizer, ac_model, 512, input_text)
    if np.argmax(logits, axis=1) == 0:
        pred_ac = 'low'
    else:
        pred_ac = 'high'
    print_custom('  AC: {}\t\t{}'.format(pred_ac.capitalize(), logits[0]), enabled)

    logits, vec = text_to_embedding(pr_tokenizer, pr_model, 512, input_text)
    if np.argmax(logits, axis=1) == 0:
        pred_pr = 'none'
    elif np.argmax(logits, axis=1) == 1:
        pred_pr = 'low'
    else:
        pred_pr = 'high'
    print_custom('  PR: {}\t\t{}'.format(pred_pr.capitalize(), logits[0]), enabled)

    logits, vec = text_to_embedding(ui_tokenizer, ui_model, 512, input_text)
    if np.argmax(logits, axis=1) == 0:
        pred_ui = 'none'
    else:
        pred_ui = 'required'
    print_custom('  UI: {}\t\t{}'.format(pred_ui.capitalize(), logits[0]), enabled)

    logits, vec = text_to_embedding(s_tokenizer, s_model, 512, input_text)
    if np.argmax(logits, axis=1) == 0:
        pred_sc = 'unchanged'
    else:
        pred_sc = 'changed'
    print_custom('  S : {}\t\t{}'.format(pred_sc.capitalize(), logits[0]), enabled)

    logits, vec = text_to_embedding(c_tokenizer, c_model, 512, input_text)
    if np.argmax(logits, axis=1) == 0:
        pred_ci = 'none'
    elif np.argmax(logits, axis=1) == 1:
        pred_ci = 'low'
    else:
        pred_ci = 'high'
    print_custom('  C : {}\t\t{}'.format(pred_ci.capitalize(), logits[0]), enabled)

    logits, vec = text_to_embedding(i_tokenizer, i_model, 512, input_text)
    if np.argmax(logits, axis=1) == 0:
        pred_ii = 'none'
    elif np.argmax(logits, axis=1) == 1:
        pred_ii = 'low'
    else:
        pred_ii = 'high'
    print_custom('  I : {}\t\t{}'.format(pred_ii.capitalize(), logits[0]), enabled)

    logits, vec = text_to_embedding(a_tokenizer, a_model, 512, input_text)
    if np.argmax(logits, axis=1) == 0:
        pred_ai = 'none'
    elif np.argmax(logits, axis=1) == 1:
        pred_ai = 'low'
    else:
        pred_ai = 'high'
    print_custom('  A : {}\t\t{}'.format(pred_ai.capitalize(), logits[0]), enabled)

    pred_b, pred_i, pred_e = calculate_scores(pred_av, pred_ac, pred_pr, pred_ui, 
                                              pred_sc, pred_ci, pred_ii, pred_ai)
    print_custom('', enabled)
    print_custom('  Base score: {}'.format(pred_b), enabled)
    print_custom('  Impact score: {}'.format(pred_i), enabled)
    print_custom('  Exploitability score: {}'.format(pred_e), enabled)
    return (pred_b, pred_i, pred_e), (pred_av, pred_ac, pred_pr, pred_ui, pred_sc, pred_ci, pred_ii, pred_ai)

## Single input

In [None]:
file_path = './gdrive/Shareddrives/ucsd_drive/Data/cve_test.csv'
df_test = pd.read_csv(file_path)
sample = df_test.sample(1)
sample_text = sample.iloc[0]['description']

(*_,) = predict(sample_text)

print('')
print('Truths:')
print('')
print('  AV: {}'.format(sample.iloc[0]['attack_vector'].capitalize()))
print('  AC: {}'.format(sample.iloc[0]['attack_complexity'].capitalize()))
print('  PR: {}'.format(sample.iloc[0]['privileges_required'].capitalize()))
print('  UI: {}'.format(sample.iloc[0]['user_interaction'].capitalize()))
print('  S : {}'.format(sample.iloc[0]['scope'].capitalize()))
print('  C : {}'.format(sample.iloc[0]['confidentiality'].capitalize()))
print('  I : {}'.format(sample.iloc[0]['integrity'].capitalize()))
print('  A : {}'.format(sample.iloc[0]['availability'].capitalize()))
print('')
print('  Base score: {}'.format(sample.iloc[0]['base_score']))
print('  Impact score: {}'.format(sample.iloc[0]['impact_score']))
print('  Exploitability score: {}'.format(sample.iloc[0]['exploitability_score']))


In [None]:
input_text_1 = 'Sudo before 1.6.6 contains an off-by-one error that can result in a heap-based buffer overflow that may allow ' \
      'local users to gain root privileges via special characters in the -p (prompt) argument, which are not properly expanded.'
input_text_2 = 'Ubiquiti Networks EdgeSwitch version 1.7.3 and prior suffer from an improperly neutralized element in an OS command ' \
      'due to lack of protection on the admin CLI, leading to code execution and privilege escalation greater than administrators themselves ' \
      'are allowed. An attacker with access to an admin account could escape the restricted CLI and execute arbitrary shell instructions.'
input_text_3 = 'A "javascript:" url loaded by a malicious page can obfuscate its location by blanking the URL displayed in the addressbar, ' \
      'allowing for an attacker to spoof an existing page without the malicious page\'s address being displayed correctly. This vulnerability affects Firefox < 52.'
input_text_4 = 'Stack over flow that caused by user inserting long URL in chrome browser address'

(pred_b, pred_i, pred_e), (pred_av, pred_ac, pred_pr, pred_ui, pred_s, pred_c, pred_i, pred_a) = predict(input_text_4)

## Batch input data 

In [None]:
file_path = './gdrive/Shareddrives/ucsd_drive/Data/cve_test.csv'
df_test = pd.read_csv(file_path)
# df_test = df_test.head(1000)

In [None]:
pred_b_labels = list()
pred_i_labels = list()
pred_e_labels = list()

pred_av_labels = list()
pred_ac_labels = list()
pred_pr_labels = list()
pred_ui_labels = list()
pred_sc_labels = list()
pred_ci_labels = list()
pred_ii_labels = list()
pred_ai_labels = list()

for idx, row in df_test.iterrows():
    (b, i, e), (av, ac, pr, ui, sc, ci, ii, ai) = predict(row['description'], False)
    pred_b_labels.append(b)
    pred_i_labels.append(i)
    pred_e_labels.append(e)

    pred_av_labels.append(av)
    pred_ac_labels.append(ac)
    pred_pr_labels.append(pr)
    pred_ui_labels.append(ui)
    pred_sc_labels.append(sc)
    pred_ci_labels.append(ci)
    pred_ii_labels.append(ii)
    pred_ai_labels.append(ai)

    if (idx + 1) % 1000 == 0:
        print('Processing index: {}'.format(idx + 1))

In [None]:
from sklearn.metrics import accuracy_score, r2_score, mean_squared_error, mean_absolute_error

print('Metrics accuracy:')
print('  AV (4-cat): {:.4f}%'.format(accuracy_score(pred_av_labels, df_test['attack_vector'].apply(lambda x: x.lower()))))
print('  AC (2-cat): {:.4f}%'.format(accuracy_score(pred_ac_labels, df_test['attack_complexity'].apply(lambda x: x.lower()))))
print('  PR (3-cat): {:.4f}%'.format(accuracy_score(pred_pr_labels, df_test['privileges_required'].apply(lambda x: x.lower()))))
print('  UI (2-cat): {:.4f}%'.format(accuracy_score(pred_ui_labels, df_test['user_interaction'].apply(lambda x: x.lower()))))
print('  S  (2-cat): {:.4f}%'.format(accuracy_score(pred_sc_labels, df_test['scope'].apply(lambda x: x.lower()))))
print('  C  (3-cat): {:.4f}%'.format(accuracy_score(pred_ci_labels, df_test['confidentiality'].apply(lambda x: x.lower()))))
print('  I  (3-cat): {:.4f}%'.format(accuracy_score(pred_ii_labels, df_test['integrity'].apply(lambda x: x.lower()))))
print('  A  (3-cat): {:.4f}%'.format(accuracy_score(pred_ai_labels, df_test['availability'].apply(lambda x: x.lower()))))
print('Base score:')
print('  MSE: {:.4f}'.format(mean_squared_error(pred_b_labels, df_test['base_score'])))
print('  MAE: {:.4f}'.format(mean_absolute_error(pred_b_labels, df_test['base_score'])))
print('  R2 : {:.4f}'.format(r2_score(pred_b_labels, df_test['base_score'])))
print('Impact score:')
print('  MSE: {:.4f}'.format(mean_squared_error(pred_i_labels, df_test['impact_score'])))
print('  MAE: {:.4f}'.format(mean_absolute_error(pred_i_labels, df_test['impact_score'])))
print('  R2 : {:.4f}'.format(r2_score(pred_i_labels, df_test['impact_score'])))
print('Exploitability score:')
print('  MSE: {:.4f}'.format(mean_squared_error(pred_e_labels, df_test['exploitability_score'])))
print('  MAE: {:.4f}'.format(mean_absolute_error(pred_e_labels, df_test['exploitability_score'])))
print('  R2 : {:.4f}'.format(r2_score(pred_e_labels, df_test['exploitability_score'])))

In [None]:
df_test = pd.read_csv(file_path)
# df_test = df_test.head(1000)

In [None]:
df_test.insert(loc=3, column='attack_vector_pred', value=[x.upper() for x in pred_av_labels])
df_test.insert(loc=5, column='attack_complexity_pred', value=[x.upper() for x in pred_ac_labels])
df_test.insert(loc=7, column='privileges_required_pred', value=[x.upper() for x in pred_pr_labels])
df_test.insert(loc=9, column='user_interaction_pred', value=[x.upper() for x in pred_ui_labels])
df_test.insert(loc=11, column='scope_pred', value=[x.upper() for x in pred_sc_labels])
df_test.insert(loc=13, column='confidentiality_pred', value=[x.upper() for x in pred_ci_labels])
df_test.insert(loc=15, column='integrity_pred', value=[x.upper() for x in pred_ii_labels])
df_test.insert(loc=17, column='availability_pred', value=[x.upper() for x in pred_ai_labels])

df_test.insert(loc=22, column='base_score_pred', value=pred_b_labels)
df_test.insert(loc=24, column='exploitability_score_pred', value=pred_e_labels)
df_test.insert(loc=26, column='impact_score_pred', value=pred_i_labels)

In [None]:
test_results = './gdrive/Shareddrives/ucsd_drive/Data/cve_test_prediction_results_80_20.csv'
df_test.to_csv(test_results)

In [None]:
df_same = df_test[df_test.base_score == df_test.base_score_pred]
df_same.shape[0]

df_high = df_test[df_test.base_score < df_test.base_score_pred]
df_high.shape[0]

df_low = df_test[df_test.base_score > df_test.base_score_pred]
df_low.shape[0]

df = pd.DataFrame({'lab':['Exact', 'Pred > Actual', 'Actual > Pred'], 'val':[df_same.shape[0], df_high.shape[0], df_low.shape[0]]})
ax = df.plot.bar(x='lab', y='val', rot=0, figsize=(10,5))

In [None]:
df_hist = df_test.base_score - df_test.base_score_pred
df_hist.hist(bins=20, figsize=(10,5));