# Scoring Removed and Non-Removed Triples Tutorial

The goal of this tutorial is to document my attempts to be able to seperate removed and non-removed triples via the ampligraph package's Knowledge Graph embedding models.

## Prerequisites

This tutorial was done with Python 3.7 and AmpliGraph version 1.4.0 installed. First, we will need to import some necessary packages.

In [None]:
import numpy as np
import pandas as pd
import ampligraph

Futhermore, we will need several `.csv` files of our Knowledge Graph triples. These files can be found in the original Knowledge Graph tutorial GitHub page at https://github.com/nislab/threat-knowledge-graph/tree/main. The particular ones used for this tutorial were `cpe2cve-aug2021.csv`, `cve2cwe-aug2021.csv`, `cpe2cve-nov2022.csv`, `cve2cwe-nov2022.csv`, and `kg_demo_aug2021.csv`.

Finally, we will need the trained embedding models from the Ampligraph package. This tutorial will use the TransE embedding model. To train a model, we can run the following block of code: 

In [None]:
from ampligraph.latent_features import TransE

model_transe = TransE(batches_count=50,
                epochs=300,
                k=100,
                eta=20,
                embedding_model_params={'corrupt_sides': ['s,o'], 'negative_corruption_entities': 'batch'},
                optimizer='adam',
                optimizer_params={'lr':1e-4},
                loss='multiclass_nll',
                regularizer="LP",
                regularizer_params={'p':3, 'lambda':1e-5},
                seed=0,
                verbose=True)

import tensorflow as tf
tf.logging.set_verbosity(tf.logging.ERROR)

# load knowledge graph triples
triples = []
triples_df = pd.read_csv(".../kg_demo_aug2021.csv", usecols=["subject", "predicate", "object"])

triples = triples_df.to_numpy()

#training and saving
from ampligraph.latent_features import save_model

model_transe.fit(triples)
save_model(model_transe, '.../kg_all_model_transe_aug2021.pkl')


The code in this tutorial can be applied to any of the other built-in ampligraph embedding models.

## Obtaining the Testing Set

We now will need to seperate our CPE-CVE and CVE-CWE triples into removed and non-removed triples. First, load in our triples:

In [None]:
kg_cpe2cve = []
kg_cve2cwe = []

kg_cpe2cve_df = pd.read_csv(".../cpe2cve-aug2021.csv")
for i,r in kg_cpe2cve_df.iterrows():
    kg_cpe2cve.append([r['subject'],r['predicate'],r['object']])

kg_cve2cwe_df = pd.read_csv(".../cve2cwe-aug2021.csv")
for i,r in kg_cve2cwe_df.iterrows():
    kg_cve2cwe.append([r['subject'],r['predicate'],r['object']])

kg_cpe2cve_nov2022 = []

kg_cpe2cve_nov2022_df = pd.read_csv(".../cpe2cve-nov2022.csv")
for i,r in kg_cpe2cve_nov2022_df.iterrows():
    kg_cpe2cve_nov2022.append([r['subject'],r['predicate'],r['object']])

kg_cve2cwe_nov2022 = []

kg_cve2cwe_2022_df = pd.read_csv(".../cve2cwe-nov2022.csv")
for i,r in kg_cve2cwe_2022_df.iterrows():
    kg_cve2cwe_nov2022.append([r['subject'],r['predicate'],r['object']])

We now have a list where each element is a list with the 3 strings that make up a triple. For this specific purpose, we will mash the 3 strings into one string to create a 1D list for easier manipulation and string comparison:

In [None]:
li1 = []
li2 = []
l1 =[]
l2 = []
for l in kg_cpe2cve_nov2022:
    l1.append(l[0]+','+l[1]+','+l[2])
for l in kg_cve2cwe_nov2022:
    li1.append(l[0]+','+l[1]+','+l[2])
for l in kg_cpe2cve:
    l2.append(l[0]+','+l[1]+','+l[2])
for l in kg_cve2cwe:
    li2.append(l[0] + ',' + l[1] + ',' + l[2])

To obtain the removed triples, we then need to find the triples that were in the August 2021 triples but are not in the November 2022 triples. That can be done by subtracting the 1D lists from each other:

In [None]:
new_tmp1 = list(set(li2) - set(li1))
new_tmp2 = list(set(l2)- set(l1))

We then convert the 1D lists back to a list of list of 3 strings and save them into a seperate `.csv` file.

In [None]:
for s in new_tmp1:
    removed_cwe2cve.append(s.split(','))
for s in new_tmp2:
    removed_cpe2cve.append(s.split(','))

removed_cwe2cve_df = pd.DataFrame(removed_cwe2cve, columns=["subject", "predicate", "object"]).sort_values(by=['subject'], ascending=True).reset_index(drop=True)
removed_cwe2cve_df.to_csv('.../removed_cwe2cve.csv', index=False)

removed_cpe2cve_df = pd.DataFrame(removed_cpe2cve, columns=["subject", "predicate", "object"]).sort_values(by=['subject'], ascending=True).reset_index(drop=True)
removed_cpe2cve_df.to_csv('.../removed_cpe2cve.csv', index=False)

The non-removed triples can be obtained with similiar logic. We just need to subtract the removed triples from the August 2021 triples to get the triples that weren't removed.

In [None]:
nonremoved_cve2cwe_temp = list(set(li2) - set(new_tmp1))
nonremoved_cpe2cve_temp = list(set(l2) - set(new_tmp2))

nonremoved_cwe2cve = []
nonremoved_cpe2cve = []

for s in nonremoved_cve2cwe_temp:
    nonremoved_cwe2cve.append(s.split(','))
for s in nonremoved_cpe2cve_temp:
    nonremoved_cpe2cve.append(s.split(','))

nonremoved_cwe2cve_df = pd.DataFrame(nonremoved_cwe2cve, columns=["subject", "predicate", "object"]).sort_values(by=['subject'], ascending=True).reset_index(drop=True)
nonremoved_cwe2cve_df.to_csv('.../nonremoved_cwe2cve.csv', index=False)

nonremoved_cpe2cve_df = pd.DataFrame(nonremoved_cpe2cve, columns=["subject", "predicate", "object"]).sort_values(by=['subject'], ascending=True).reset_index(drop=True)
nonremoved_cpe2cve_df.to_csv('.../nonremoved_cpe2cve.csv', index=False)

Since the number of non-removed triples significantly outnumber the number of removed triples, I chose to only evaluate a sample of the non-removed triples so that the plots we generate later would be easier to see. I arbituarly chose 5% as the the percentage of non-removed triples to include, but for more realistic results, these evaulations can also be done with all the non-removed triples as the testing sample.

In [None]:
testing_fraction = 0.05


pos_cwe2cve_triples_testing = random.sample(nonremoved_cwe2cve, int(len(nonremoved_cwe2cve)*testing_fraction))
pos_cpe2cve_triples_testing = random.sample(nonremoved_cpe2cve, int(len(nonremoved_cpe2cve)*testing_fraction))

pos_cwe2cve_triples_testing_df = pd.DataFrame(pos_cwe2cve_triples_testing, columns=["subject", "predicate", "object"])
pos_cwe2cve_triples_testing_df.to_csv('.../pos_cwe2cve_triples_testing.csv', index=False)

pos_cpe2cve_triples_testing_df = pd.DataFrame(pos_cpe2cve_triples_testing, columns=["subject", "predicate", "object"])
pos_cpe2cve_triples_testing_df.to_csv('.../pos_cpe2cve_triples_testing.csv', index=False)

## Evaulating Score

Each embedding model has a scoring function, where one can feed a triple into the model and the model produces a score which determines how likely that triple is to actually exist or not. Lower scores indicate a triple is less likely to exist, and vice versa for higher scores. We will use our embedding model's scoring function to see if there is any difference in behavior between removed triples and non-removed triples.

First, starting with CPE-CVE triples, we load in our datasets and model in order to calculate score.

In [None]:
#Loading in triples
pos_test_cpe2cve = []
pos_test_cpe2cve_df = pd.read_csv(".../pos_cpe2cve_triples_testing.csv", usecols=["subject", "predicate", "object"])
for i,r in pos_test_cpe2cve_df.iterrows():
    pos_test_cpe2cve.append([r['subject'],r['predicate'],r['object']])
    
neg_test_cpe2cve = []
neg_test_cpe2cve_df = pd.read_csv(".../removed_cpe2cve.csv", usecols=["subject", "predicate", "object"])
for i,r in neg_test_cpe2cve_df.iterrows():
    neg_test_cpe2cve.append([r['subject'],r['predicate'],r['object']])
    
from ampligraph.latent_features import restore_model
model_transe = restore_model('.../kg_all_model_transe_aug2021.pkl')

#Calculate scores
test = np.array(pos_test_cpe2cve + neg_test_cpe2cve)
scores_cpe2cve = model_transe.predict(test)

With score, some typical performance metrics that use the ground truth label are precision, recall, and F1-score (which balances precision and recall as they are typically inversly related). We then need to create a list of ground-truth labels: 

In [None]:
is_pos_cpe2cve = []

for i in range(len(pos_test_cpe2cve)):
    is_pos_cpe2cve.append(1)
for i in range(len(neg_test_cpe2cve)):
    is_pos_cpe2cve.append(0)

is_pos_cpe2cve = np.asarray(is_pos_cpe2cve)

The function we are using, `precision_recall_curve()` from the sklearn package, finds a threshold with the best F1-score for positive triples, where the triples above the threshold are labeled positive and the triples below are labeled negative. An issue arises if we attempt to find the best threshold for non-removed (considered positive here) triples' F1-score, where the best performance is obtained by simply labeling literally every triple as positive, which obviously is not useful.

Instead, I looked to find the best threshold by treating our removed triples as positive. However, `precision_recall_curve()` always labels values above a threshold as positive and values below as negative when testing the different thresholds. This wouldn't work since we expect that the removed triples (that we are now treating as positive) will tend to have lower scores than the non-removed ones. 

To get around this, I simply inverted the scores, multiplying all them by -1. This would result in the removed triples tending towards higher scores than the non-removed ones. 

In [None]:
#invert the scores
inverse_scores_cpe2cve = [];

for s in scores_cpe2cve:
    inverse_scores_cpe2cve.append(-s);

We can now find the best F1-score threshold, and print our results:

In [None]:
numerator_cpe2cve = 2 * recall_cpe2cve * precision_cpe2cve
denom_cpe2cve = recall_cpe2cve + precision_cpe2cve
f1_scores_cpe2cve = np.divide(numerator_cpe2cve, denom_cpe2cve, out=np.zeros_like(denom_cpe2cve), where=(denom_cpe2cve!=0))

print("Results:")
print('CPE to CVE, Scores Best threshold: ', -thresholds_cpe2cve[np.argmax(f1_scores_cpe2cve)])
print('CPE to CVE, Scores Rank Precision and recall: ', precision_cpe2cve[np.argmax(f1_scores_cpe2cve)], recall_cpe2cve[np.argmax(f1_scores_cpe2cve)])
print('CPE to CVE, Scores Rank Best F1-Score: ', np.max(f1_scores_cpe2cve))
print()

#Plotting
fig, ax = plt.subplots()
ax.plot(recall_cpe2cve[50:-10], precision_cpe2cve[50:-10])
plt.xlim([0,1])
plt.ylim([0,1])
plt.xticks(fontfamily = 'Times New Roman', fontsize=18)
plt.yticks(fontfamily = 'Times New Roman', fontsize=18)
plt.ylabel('Precision', fontfamily = 'Times New Roman', fontsize = 18)
plt.xlabel('Recall', fontfamily = 'Times New Roman', fontsize = 18)
plt.title('CPE to CVE')
plt.grid(b=True, which='major', color='#999999', linestyle='-')
plt.minorticks_on()
plt.grid(b=True, which='minor', color='#999999', linestyle='-', alpha=0.2)
plt.show()

To futher understand the behavior of the triples, we can create a simple histogram of our triples' scores. I also included our best F1-score threshold as a reference. 

In [None]:
scores_cpe2cve_pos = sorted(model_transe.predict(pos_test_cpe2cve))
scores_cpe2cve_neg = sorted(model_transe.predict(neg_test_cpe2cve))
bins = np.linspace(-10, 0, 50)

y1, x1, _ = plt.hist(scores_cpe2cve_pos, bins, alpha=0.5, label='Non-Removed')
y2, x2, _ = plt.hist(scores_cpe2cve_neg, bins, alpha=0.5, label='Removed')
thresh_cpe2cve = -thresholds_cpe2cve[np.argmax(f1_scores_cpe2cve)]
plt.plot([thresh_cpe2cve, thresh_cpe2cve], [0,max([y1.max(), y2.max()])], color='k', label = 'Threshold')
plt.legend(loc='upper right')
plt.title('CPE to CVE')
plt.xlabel("Score")
plt.ylabel("Frequency")
plt.show()

Another way to visualize the scores is to see what percentage of triples are below each threshold. To do that, we need to calculate the percentile at each threshold:  

In [None]:
scores_cpe2cve_pos_percentile = []
current_count = 0
current_threshold_index = 0
thresholds_cpe2cve_flipped = np.negative(np.flip(thresholds_cpe2cve))
for thresh in thresholds_cpe2cve_flipped:
    while(current_count < len(scores_cpe2cve_pos)):
        if(scores_cpe2cve_pos[current_count] >= thresh):
            current_percentile = 100 * (current_count/len(scores_cpe2cve_pos))
            scores_cpe2cve_pos_percentile.append(current_percentile)
            break
        current_count += 1

while(len(scores_cpe2cve_pos_percentile) < len(thresholds_cpe2cve_flipped)):
    scores_cpe2cve_pos_percentile.append(100)
    
        
scores_cpe2cve_neg_percentile = []
current_count = 0
current_threshold_index = 0
for thresh in thresholds_cpe2cve_flipped:
    while(current_count < len(scores_cpe2cve_neg)):
        if(scores_cpe2cve_neg[current_count] >= thresh):
            current_percentile = 100 * (current_count/len(scores_cpe2cve_neg))
            scores_cpe2cve_neg_percentile.append(current_percentile)
            break
        current_count += 1

while(len(scores_cpe2cve_neg_percentile) < len(thresholds_cpe2cve_flipped)):
    scores_cpe2cve_neg_percentile.append(100)

plt.ylim(0, 100)
plt.plot(thresholds_cpe2cve_flipped, scores_cpe2cve_pos_percentile, label = "Non-Removed") 
plt.plot(thresholds_cpe2cve_flipped, scores_cpe2cve_neg_percentile, label = "Removed") 
plt.plot([thresh_cpe2cve, thresh_cpe2cve], [0,100], color='k', alpha=0.5, label = 'Best F1 Threshold')
plt.title('CPE to CVE Percentile')
plt.xlabel("Threshold (score)")
plt.ylabel("Percentile Below Threshold")
plt.legend() 
plt.show()

Since our ability to seperate the removed and non-removed triples was not particularly impressive, I wanted to compare how aritifically generated negatives compared to removed triples. I generated one artifical triple per each non-removed triple by replacing the cpe element with a random cpe element. 

We will also need `cpelist_connected.txt` from the original GitHub to do this.

In [None]:
# load kg_cpe2cve and kg_cve2cwe
kg_cpe2cve = []

kg_cpe2cve_df = pd.read_csv(".../cpe2cve-aug2021.csv")
for i,r in kg_cpe2cve_df.iterrows():
    kg_cpe2cve.append([r['subject'],r['predicate'],r['object']])

# load connected_cpelist
f = open('.../cpelist_connected.txt', 'r')
connected_cpelist = f.read().splitlines()
f.close()

import random

neg_test_cpe2cve_generated = []

dict_cpe2cve = {} # cve as key, replace cpe
dict_cve2cpe = {} # cpe as key, replace cve

cpe2cve_alltime = kg_cpe2cve

for i in range(len(cpe2cve_alltime)):
    if cpe2cve_alltime[i][2] not in dict_cpe2cve.keys():
        dict_cpe2cve[cpe2cve_alltime[i][2]] = []
    dict_cpe2cve[cpe2cve_alltime[i][2]].append(cpe2cve_alltime[i][0])

    if cpe2cve_alltime[i][0] not in dict_cve2cpe.keys():
        dict_cve2cpe[cpe2cve_alltime[i][0]] = []
    dict_cve2cpe[cpe2cve_alltime[i][0]].append(cpe2cve_alltime[i][2])

test_cves = []
for i in range(len(pos_test_cpe2cve)):
    if pos_test_cpe2cve[i][2] not in test_cves:
        test_cves.append(pos_test_cpe2cve[i][2])

for v in test_cves:
    rnd_lst = []
    while len(rnd_lst) < 1:
        rnd_cpe = random.choice(connected_cpelist)
        if rnd_cpe not in dict_cpe2cve[v]:
            rnd_lst.append(rnd_cpe)
    for p in rnd_lst:
        neg_test_cpe2cve_generated.append([p,'MatchingCVE',v])
        
# save generated negative CPE-CVE triples locally
neg_test_cpe2cve_generated_df = pd.DataFrame(neg_test_cpe2cve_generated, columns=["subject", "predicate", "object"])
neg_test_cpe2cve_generated_df.to_csv('.../neg_test_cpe2cve_generated.csv', index=False)

We then need to rebuild our ground truth label list to include the labels for the arictifial negatives, before scoring and plotting like before:

In [None]:
#Label list based on removed, non-removed, and arifiticial triples
is_pos_cpe2cve = []

for i in range(len(pos_test_cpe2cve)):
    is_pos_cpe2cve.append(1)
for i in range(len(neg_test_cpe2cve)):
    is_pos_cpe2cve.append(0)
for i in range(len(neg_test_cpe2cve_generated)):
    is_pos_cpe2cve.append(0)

is_pos_cpe2cve = np.asarray(is_pos_cpe2cve)


#Calculate Scores
test = np.array(pos_test_cpe2cve + neg_test_cpe2cve + neg_test_cpe2cve_generated)
scores_cpe2cve = model_transe.predict(test)

#invert scores
inverse_scores_cpe2cve = [];
for s in scores_cpe2cve:
    inverse_scores_cpe2cve.append(-s);

#Calculate F1-scores based on label list and TransE score
precision_cpe2cve, recall_cpe2cve, thresholds_cpe2cve = precision_recall_curve(is_pos_cpe2cve, inverse_scores_cpe2cve, pos_label=0)

#Plotting
plt.figure()
fig, ax = plt.subplots()
ax.plot(recall_cpe2cve[50:-10], precision_cpe2cve[50:-10])
plt.xlim([0,1])
plt.ylim([0,1])
plt.xticks(fontfamily = 'Times New Roman', fontsize=18)
plt.yticks(fontfamily = 'Times New Roman', fontsize=18)
plt.ylabel('Precision', fontfamily = 'Times New Roman', fontsize = 18)
plt.xlabel('Recall', fontfamily = 'Times New Roman', fontsize = 18)
plt.title('CPE to CVE')
plt.grid(b=True, which='major', color='#999999', linestyle='-')
plt.minorticks_on()
plt.grid(b=True, which='minor', color='#999999', linestyle='-', alpha=0.2)
plt.show()

#printing precision recall results 
numerator_cpe2cve = 2 * recall_cpe2cve * precision_cpe2cve
denom_cpe2cve = recall_cpe2cve + precision_cpe2cve
f1_scores_cpe2cve = np.divide(numerator_cpe2cve, denom_cpe2cve, out=np.zeros_like(denom_cpe2cve), where=(denom_cpe2cve!=0))

print('Results:')
print('CPE to CVE, Scores Best threshold: ', -thresholds_cpe2cve[np.argmax(f1_scores_cpe2cve)])
print('CPE to CVE, Scores Rank Precision and recall: ', precision_cpe2cve[np.argmax(f1_scores_cpe2cve)], recall_cpe2cve[np.argmax(f1_scores_cpe2cve)])
print('CPE to CVE, Scores Rank Best F1-Score: ', np.max(f1_scores_cpe2cve))
print()

#plotting histogram
scores_cpe2cve_pos = model_transe.predict(pos_test_cpe2cve);
scores_cpe2cve_neg = model_transe.predict(neg_test_cpe2cve)
scores_cpe2cve_neg_generated = model_transe.predict(neg_test_cpe2cve_generated)
bins = np.linspace(-20, 0, 50)

plt.figure()
y1, x1, _ = plt.hist(scores_cpe2cve_pos, bins, alpha=0.5, label='Non-Removed')
y2, x2, _ = plt.hist(scores_cpe2cve_neg, bins, alpha=0.5, label='Removed')
y3, x3, _ = plt.hist(scores_cpe2cve_neg_generated, bins, alpha=0.5, label='Generated negs')
thresh_cpe2cve = -thresholds_cpe2cve[np.argmax(f1_scores_cpe2cve)]
plt.plot([thresh_cpe2cve, thresh_cpe2cve], [0,max([y1.max(), y2.max(), y3.max()])], color='k', label = 'Threshold')
plt.legend(loc='upper left')
plt.title('CPE to CVE')
plt.xlabel("Score")
plt.ylabel("Frequency")
plt.show()

All this score evaluation can also be done for CVE-CWE triples with the same methods.

## Evaluating Rank

We can also run a similiar evaluation using rank. We have a ranking function that replaces either the subject or object with a bunch of random matching types, generating several artificial negatives, and then ranks how the original triple performs relative to the artificial triples. Since this rank is just based on score, we don't expect any improvement but I still provide my code for testing this.

When calculating a best F1-score threshold fo rank, we actually expect higher rank for removed triples compared to non-removed triples. Thus, we don't actually have to invert the ranks like we did for scores when we treat the removed triples as positives. If we were to treat non-removed triples as positive, then the inversion would be necessary again.

First, we use ampligraph's `evaluate_performance()` function to calculate the ranks of our triples. I decided not to do random sampling for the non-removed triples set to simplify things, but you certainly could.

In [None]:
#Loading in positive and negative validation triples
pos_test_cwe2cve = []
pos_test_cwe2cve_df = pd.read_csv(".../nonremoved_cwe2cve.csv", usecols=["subject", "predicate", "object"])
for i,r in pos_test_cwe2cve_df.iterrows():
    pos_test_cwe2cve.append([r['subject'],r['predicate'],r['object']])

neg_test_cwe2cve = []
neg_test_cwe2cve_df = pd.read_csv(".../removed_cwe2cve.csv", usecols=["subject", "predicate", "object"])
for i,r in neg_test_cwe2cve_df.iterrows():
    neg_test_cwe2cve.append([r['subject'],r['predicate'],r['object']])


pos_test_cpe2cve = []
pos_test_cpe2cve_df = pd.read_csv(".../nonremoved_cpe2cve.csv", usecols=["subject", "predicate", "object"])
for i,r in pos_test_cpe2cve_df.iterrows():
    pos_test_cpe2cve.append([r['subject'],r['predicate'],r['object']])

neg_test_cpe2cve = []
neg_test_cpe2cve_df = pd.read_csv(".../removed_cpe2cve.csv", usecols=["subject", "predicate", "object"])
for i,r in neg_test_cpe2cve_df.iterrows():
    neg_test_cpe2cve.append([r['subject'],r['predicate'],r['object']])

#Creating validation triple sets
X_test_cwe2cve_ordered = np.array(pos_test_cwe2cve + neg_test_cwe2cve);
X_test_cpe2cve_ordered = np.array(pos_test_cpe2cve + neg_test_cpe2cve);

#Filter triples
triples = pd.read_csv(".../kg_demo_aug2021.csv", usecols=["subject", "predicate", "object"])
filter_triples = triples.to_numpy()

#Model evaluation
from ampligraph.latent_features import restore_model
model_transe = restore_model('.../kg_all_model_transe_aug2021.pkl')

from ampligraph.evaluation import evaluate_performance

cpe2cve_ranks = evaluate_performance(X_test_cpe2cve_ordered,
                              model=model_transe,
                              filter_triples=filter_triples,
                              verbose=True);

cpe2cve_ranks_df = pd.DataFrame(cpe2cve_ranks, columns=["subject", "object"])
cpe2cve_ranks_df.to_csv('.../cpe2cve_ranks.csv', index=False)

cwe2cve_ranks = evaluate_performance(X_test_cwe2cve_ordered,
                              model=model_transe,
                              filter_triples=filter_triples,
                              verbose=True);

cwe2cve_ranks_df = pd.DataFrame(cwe2cve_ranks, columns=["subject", "object"])
cwe2cve_ranks_df.to_csv('.../cwe2cve_ranks.csv', index=False)

The ranking function replaces either the subject or object with a bunch of random matching types, generating several artificial negatives, and then compares how the original triple performs relative to the artificial triples. For each triple then, we get 2 ranks: the rank compared to generated negatives from randomly replacing the subject, and the rank compared to generated negatives from randomly replacing the object. 

Starting with CPE-CVE triples, I first evaulated the performance if the subject (CPE) was replaced in generating the aritfical negatives. I followed the same process as with score for finding the best F1-score threshold and plotting a histogram. 

In [None]:
#Loading in ranks and triples
cpe2cve_ranks = []
cpe2cve_ranks_df = pd.read_csv('.../cpe2cve_ranks_all.csv',usecols = ["subject", "object"])
for i,r in cpe2cve_ranks_df.iterrows():
    cpe2cve_ranks.append([r['subject'],r['object']])
    
pos_test_cpe2cve = []
pos_test_cpe2cve_df = pd.read_csv(".../nonremoved_cpe2cve.csv", usecols=["subject", "predicate", "object"])
for i,r in pos_test_cpe2cve_df.iterrows():
    pos_test_cpe2cve.append([r['subject'],r['predicate'],r['object']])

    
neg_test_cpe2cve = []
neg_test_cpe2cve_df = pd.read_csv(".../removed_cpe2cve.csv", usecols=["subject", "predicate", "object"])
for i,r in neg_test_cpe2cve_df.iterrows():
    neg_test_cpe2cve.append([r['subject'],r['predicate'],r['object']])


#Label list based on 'pos_test_cpe2cve' and 'neg_test_cpe2cve'
is_pos_cpe2cve = []

for i in range(len(pos_test_cpe2cve)):
    is_pos_cpe2cve.append(1)
for i in range(len(neg_test_cpe2cve)):
    is_pos_cpe2cve.append(0)

is_pos_cpe2cve = np.asarray(is_pos_cpe2cve)

#Get the ranks of the first column
cperank_cpe2cve = np.transpose(cpe2cve_ranks)[0]

#Calculate F1-scores based on label list and ranks of first column (subject/CPE)
precision, recall, thresholds = precision_recall_curve(is_pos_cpe2cve, cperank_cpe2cve, pos_label=0)

numerator = 2 * recall * precision
denom = recall + precision
f1_scores = np.divide(numerator, denom, out=np.zeros_like(denom), where=(denom!=0))

print('Results')
print('CPE to CVE, CPE Rank Best threshold: ', thresholds[np.argmax(f1_scores)])
print('CPE to CVE, CPE Rank Precision and recall: ', precision[np.argmax(f1_scores)], recall[np.argmax(f1_scores)])
print('CPE to CVE, CPE Rank Best F1-Score: ', np.max(f1_scores))
print()

pos_cperank_cpe2cve = []
for i in range(len(pos_test_cpe2cve)):
    pos_cperank_cpe2cve.append(cperank_cpe2cve[i])

temp = i + 1;
neg_cperank_cpe2cve = []
for i in range(len(neg_test_cpe2cve)):
    neg_cperank_cpe2cve.append(cperank_cpe2cve[i + temp])

mr = mr_score(pos_cperank_cpe2cve)
mrr = mrr_score(pos_cperank_cpe2cve)

print("Nonremoved CPE to CVE, CPE Rank MRR: %.3f" % (mrr))
print("Nonremoved CPE to CVE, CPE Rank MR: %.3f" % (mr))

hits_20 = hits_at_n_score(pos_cperank_cpe2cve, n=20)
print("Nonremoved CPE to CVE, CPE Rank Hits@20: %.3f" % (hits_20))
hits_10 = hits_at_n_score(pos_cperank_cpe2cve, n=10)
print("Nonremoved CPE to CVE, CPE Rank Hits@10: %.3f" % (hits_10))
hits_3 = hits_at_n_score(pos_cperank_cpe2cve, n=3)
print("Nonremoved CPE to CVE, CPE Rank Hits@3: %.3f" % (hits_3))
hits_1 = hits_at_n_score(pos_cperank_cpe2cve, n=1)
print("Nonremoved CPE to CVE, CPE Rank Hits@1: %.3f" % (hits_1))
print()

mr = mr_score(neg_cperank_cpe2cve)
mrr = mrr_score(neg_cperank_cpe2cve)

print("Removed CPE to CVE, CPE Rank MRR: %.3f" % (mrr))
print("Removed CPE to CVE, CPE Rank MR: %.3f" % (mr))

hits_20 = hits_at_n_score(neg_cperank_cpe2cve, n=20)
print("Removed CPE to CVE, CPE Rank Hits@20: %.3f" % (hits_20))
hits_10 = hits_at_n_score(neg_cperank_cpe2cve, n=10)
print("Removed CPE to CVE, CPE Rank Hits@10: %.3f" % (hits_10))
hits_3 = hits_at_n_score(neg_cperank_cpe2cve, n=3)
print("Removed CPE to CVE, CPE Rank Hits@3: %.3f" % (hits_3))
hits_1 = hits_at_n_score(neg_cperank_cpe2cve, n=1)
print("Removed CPE to CVE, CPE Rank Hits@1: %.3f" % (hits_1))
print()
print()

bins = np.linspace(1, 10, 10)
y1, x1, _ = plt.hist(pos_cperank_cpe2cve, bins, alpha=0.5, label='Non-Removed')
y2, x2, _ = plt.hist(neg_cperank_cpe2cve, bins, alpha=0.5, label='Removed')
thresh = thresholds[np.argmax(f1_scores)]
plt.plot([thresh, thresh], [0,max([y1.max(), y2.max()])], color='k', label = 'Threshold')
plt.legend(loc='upper right')
plt.title('CPE to CVE, CPE Rank')
plt.xlabel("Rank")
plt.ylabel("Frequency")
plt.show()

bins = np.linspace(1, 10, 10)
y1, x1, _ = plt.hist(pos_cperank_cpe2cve, bins, color='lightblue')
thresh = thresholds[np.argmax(f1_scores)]
plt.plot([thresh, thresh], [0, y1.max()], color='k', label = 'Threshold')
plt.legend(loc='upper right')
plt.title('CPE to CVE, CPE Rank Non-Removed')
plt.xlabel("Rank")
plt.ylabel("Frequency")
plt.show()

bins = np.linspace(1, 10, 10)
y1, x1, _ = plt.hist(neg_cperank_cpe2cve, bins, color='orange')
thresh = thresholds[np.argmax(f1_scores)]
plt.plot([thresh, thresh], [0, y1.max()], color='k', label = 'Threshold')
plt.legend(loc='upper right')
plt.title('CPE to CVE, CPE Rank Removed')
plt.xlabel("Rank")
plt.ylabel("Frequency")
plt.show()

I then evaulate the performance for if the object (CVE) was replaced in generating the aritfical negatives.

In [None]:
###
###Calculate F1-scores based on label list and ranks of second column (Object/CVE)
###
cverank_cpe2cve = np.transpose(cpe2cve_ranks)[1]
precision, recall, thresholds = precision_recall_curve(is_pos_cpe2cve, cverank_cpe2cve, pos_label=0)

numerator = 2 * recall * precision
denom = recall + precision
f1_scores = np.divide(numerator, denom, out=np.zeros_like(denom), where=(denom!=0))

print('CPE to CVE, CVE Rank Best threshold: ', thresholds[np.argmax(f1_scores)])
print('CPE to CVE, CVE Rank Precision and recall: ', precision[np.argmax(f1_scores)], recall[np.argmax(f1_scores)])
print('CPE to CVE, CVE Rank Best F1-Score: ', np.max(f1_scores))
print()

pos_cverank_cpe2cve = []
for i in range(len(pos_test_cpe2cve)):
    pos_cverank_cpe2cve.append(cverank_cpe2cve[i])

temp = i + 1;
neg_cverank_cpe2cve = []
for i in range(len(neg_test_cpe2cve)):
    neg_cverank_cpe2cve.append(cverank_cpe2cve[i + temp])

mr = mr_score(pos_cverank_cpe2cve)
mrr = mrr_score(pos_cverank_cpe2cve)

print("Nonremoved CPE to CVE, CPE Rank MRR: %.3f" % (mrr))
print("Nonremoved CPE to CVE, CPE Rank MR: %.3f" % (mr))

hits_20 = hits_at_n_score(pos_cverank_cpe2cve, n=20)
print("Nonremoved CPE to CVE, CPE Rank Hits@20: %.3f" % (hits_20))
hits_10 = hits_at_n_score(pos_cverank_cpe2cve, n=10)
print("Nonremoved CPE to CVE, CVE Rank Hits@10: %.3f" % (hits_10))
hits_3 = hits_at_n_score(pos_cverank_cpe2cve, n=3)
print("Nonremoved CPE to CVE, CVE Rank Hits@3: %.3f" % (hits_3))
hits_1 = hits_at_n_score(pos_cverank_cpe2cve, n=1)
print("Nonremoved CPE to CVE, CVE Rank Hits@1: %.3f" % (hits_1))
print()

mr = mr_score(neg_cverank_cpe2cve)
mrr = mrr_score(neg_cverank_cpe2cve)

print("Removed CPE to CVE, CVE Rank MRR: %.3f" % (mrr))
print("Removed CPE to CVE, CVE Rank MR: %.3f" % (mr))

hits_20 = hits_at_n_score(neg_cverank_cpe2cve, n=20)
print("Removed CPE to CVE, CVE Rank Hits@20: %.3f" % (hits_20))
hits_10 = hits_at_n_score(neg_cverank_cpe2cve, n=10)
print("Removed CPE to CVE, CVE Rank Hits@10: %.3f" % (hits_10))
hits_3 = hits_at_n_score(neg_cverank_cpe2cve, n=3)
print("Removed CPE to CVE, CVE Rank Hits@3: %.3f" % (hits_3))
hits_1 = hits_at_n_score(neg_cverank_cpe2cve, n=1)
print("Removed CPE to CVE, CVE Rank Hits@1: %.3f" % (hits_1))
print()
print()

bins = np.linspace(1, 10, 10)

y1, x1, _ = plt.hist(pos_cverank_cpe2cve, bins, alpha=0.5, label='Non-Removed')
y2, x2, _ = plt.hist(neg_cverank_cpe2cve, bins, alpha=0.5, label='Removed')
thresh = thresholds[np.argmax(f1_scores)]
plt.legend(loc='upper right')
plt.title('CPE to CVE, CVE Rank')
plt.xlabel("Rank")
plt.ylabel("Frequency")
plt.show()

bins = np.linspace(1, 10, 10)
y1, x1, _ = plt.hist(pos_cverank_cpe2cve, bins, color='lightblue')
thresh = thresholds[np.argmax(f1_scores)]
plt.legend(loc='upper right')
plt.title('CPE to CVE, CVE Rank Non-Removed')
plt.xlabel("Rank")
plt.ylabel("Frequency")
plt.show()

bins = np.linspace(1, 10, 10)
y1, x1, _ = plt.hist(neg_cverank_cpe2cve, bins, color='orange')
thresh = thresholds[np.argmax(f1_scores)]
plt.legend(loc='upper right')
plt.title('CPE to CVE, CVE Rank Removed')
plt.xlabel("Rank")
plt.ylabel("Frequency")
plt.show()

Finally, `evaluate_performance()` is capable of handling both object and subject ranks at the same time, but I wasn't able to find the details of how this works and so I wasn't able to generate a histogram to visualize our threshold.

In [None]:
pos_cpe2cve_ranks = []
for i in range(len(pos_test_cpe2cve)):
    pos_cpe2cve_ranks.append(cpe2cve_ranks[i])

temp = i + 1;
neg_cpe2cve_ranks = []
for i in range(len(neg_test_cpe2cve)):
    neg_cpe2cve_ranks.append(cpe2cve_ranks[i + temp])

mr = mr_score(pos_cpe2cve_ranks)
mrr = mrr_score(neg_cpe2cve_ranks)

print("Nonremoved CPE to CVE, Overall Rank MRR: %.3f" % (mrr))
print("Nonremoved CPE to CVE, Overall Rank MR: %.3f" % (mr))

hits_20 = hits_at_n_score(pos_cverank_cpe2cve, n=20)
print("Nonremoved CPE to CVE, Overall Rank Hits@20: %.3f" % (hits_20))
hits_10 = hits_at_n_score(pos_cverank_cpe2cve, n=10)
print("Nonremoved CPE to CVE, Overall Rank Hits@10: %.3f" % (hits_10))
hits_3 = hits_at_n_score(pos_cverank_cpe2cve, n=3)
print("Nonremoved CPE to CVE, Overall Rank Hits@3: %.3f" % (hits_3))
hits_1 = hits_at_n_score(pos_cverank_cpe2cve, n=1)
print("Nonremoved CPE to CVE, Overall Rank Hits@1: %.3f" % (hits_1))
print()

mr = mr_score(neg_cpe2cve_ranks)
mrr = mrr_score(neg_cpe2cve_ranks)

print("Removed CPE to CVE, Overall Rank MRR: %.3f" % (mrr))
print("Removed CPE to CVE, Overall Rank MR: %.3f" % (mr))

hits_20 = hits_at_n_score(neg_cverank_cpe2cve, n=20)
print("Removed CPE to CVE, Overall Rank Hits@20: %.3f" % (hits_20))
hits_10 = hits_at_n_score(neg_cverank_cpe2cve, n=10)
print("Removed CPE to CVE, Overall Rank Hits@10: %.3f" % (hits_10))
hits_3 = hits_at_n_score(neg_cverank_cpe2cve, n=3)
print("Removed CPE to CVE, Overall Rank Hits@3: %.3f" % (hits_3))
hits_1 = hits_at_n_score(neg_cverank_cpe2cve, n=1)
print("Removed CPE to CVE, Overall Rank Hits@1: %.3f" % (hits_1))
print()

All this rank evaluation can also be done for CVE-CWE triples with the same methods.