In [None]:
import gzip
from io import BytesIO
from time import sleep

import numpy as np
import pandas
import pandas as pd
import os
import requests
import json
import csv
import shutil
from datetime import date, timedelta

from pandas import concat
from sklearn.model_selection import train_test_split
from sklearn.svm import OneClassSVM

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

sns.set_theme(style="whitegrid")

In [None]:
from preprocessing_utils import preprocess_NVD_data

In [None]:
data_path = 'data'
if not os.path.exists(data_path):
    os.makedirs(data_path)

# EPSS data

Download the EPSS data from https://www.first.org/epss/data_stats into `data` folder

In [None]:
base_url = "https://epss.empiricalsecurity.com/epss_scores-"
date_current = str(date.today() - timedelta(days=1))
epss_url = base_url + date_current + ".csv.gz"
epss_filename = "epss_scores-latest.csv"

response = requests.get(epss_url)
if response.status_code != 200:
    print("Error:", response.status_code)
else:
    with open(os.path.join(data_path, epss_filename), "wb") as f:
        f.write(gzip.decompress(response.content))

In [None]:
epss_current = pd.read_csv(os.path.join(data_path, epss_filename), header=1)
epss_current  # a Python statement with a variable name at the end of a cell will display its contents below


# NVD data

In [None]:
base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
date_start_NVD = '2025-09-01T00:00:00.000Z'  # Do NOT change these dates
date_end_NVD = '2025-10-01T00:00:00.000Z'  # Do NOT change these dates
start_index = 0
results_per_page = 1000
total_results = 1

candidate_cves = []
while start_index < total_results:
    params = {
        "pubStartDate": date_start_NVD,
        "pubEndDate": date_end_NVD,
        "resultsPerPage": results_per_page,
        "startIndex": start_index,
        "noRejected": ""
    }
    response = requests.get(base_url, params=params, timeout=6)
    if response.status_code != 200:
        print("Error:", response.status_code)
        break
    data = response.json()
    total_results = data.get("totalResults", 0)
    candidate_cves.extend(data.get("vulnerabilities", []))
    start_index += results_per_page
    print(start_index)

In [None]:
# normalize and preprocess data
candidate_cves_df = pd.json_normalize(candidate_cves, record_path=None, sep='.', max_level=None)
candidate_cves_df = preprocess_NVD_data(candidate_cves_df)

# remove vulnerabilities marked as "reject" or "reserved"
candidate_cves_df = candidate_cves_df[
    (candidate_cves_df['cve.vulnStatus'] != 'Reserved') & (candidate_cves_df['cve.vulnStatus'] != 'Reject')]

# merge NVD and EPSS data
candidate_cves_df = candidate_cves_df.merge(epss_current, left_on="cve.id", right_on="cve", how="left")

# Exploratory Data Analysis

- display some examples (e.g., the first two CVE records)

In [None]:
candidate_cves_df.head(2).T

- show a bar plot with the daily volume of published CVEs

In [None]:
published_counts = candidate_cves_df["cve.published"].dt.date.value_counts().sort_index()

plt.figure(figsize=(12, 5))
sns.barplot(x=published_counts.index, y=published_counts.values, color="k")
plt.xticks(rotation=90)
plt.xlabel("Date")
plt.ylabel("Number of CVEs Published")
plt.title("CVE Publications per Day")
plt.tight_layout()
plt.show()

- print the description of the last ten published vulnerabilities

In [None]:
for idx, x in enumerate(candidate_cves_df.sort_values('cve.published', ascending=False)[:10].iterrows()):
    print('-' * 100)
    print(x[1]['cve.id'], x[1]['cve.published'])
    print(x[1].description)


### <font color='blue'><b><i>TODO</i></b>: produce plots or tables to address the folowing points</font>
- <b>be creative</b>!
    - How many vulnerabilities are published on CISA KEV? 
    - What are the the 20 most frequent vendors? (vendor name can be extracted from the `vulnerable_cpes` field).
    - What are the 20 most frequent CWEs?
    - Anaything else you see fit!

<font color='blue'>Use text cells to discuss the outcome after each point</font>

We keep track of some information to help us later on.

In [None]:
dropped_columns = []

- What is the percentage of CVEs which received a CVSS score?

In [None]:
print(f"{(candidate_cves_df["cvss_baseScore"].count() / len(candidate_cves_df)) * 100:.02f}%")

- Report descriptive statistics of CVSS the CVSS base score and/or show its distribution

In [None]:
candidate_cves_df.info()

We see that feature 6, 7, 8, and 9 have a very small amount of non null values. Therefore, we drop those columns to reduce dimensionality. We also remove all CVEs withtout CVSS data.

In [None]:
dropped_columns = ["cve.cisaExploitAdd", "cve.cisaActionDue", "cve.cisaRequiredAction", "cve.cisaVulnerabilityName"]
candidate_cves_df = candidate_cves_df.drop(columns=dropped_columns).dropna()

Here we print some statistics about CVSS base score and we show its distribution related to publication date.

In [None]:
candidate_cves_df["cvss_baseScore"].describe()

In [None]:
plt.figure(figsize=(12, 5))
sns.displot(x=candidate_cves_df["cvss_baseScore"], color="k")
plt.xticks(rotation=90)
plt.xlabel("CVSS")
plt.ylabel("Count")
plt.title("September 2025")
plt.tight_layout()
plt.show()

It would seem that a relatively high number of CVEs published in september 2025 have a very high CVSS.

In [None]:
plt.figure(figsize=(12, 5))
sns.scatterplot(x=candidate_cves_df["cve.published"], y=candidate_cves_df["cvss_baseScore"], color="k")
plt.xticks(rotation=90)
plt.xlabel("Date")
plt.ylabel("CVSS")
plt.tight_layout()
plt.show()


- #### Report descriptive statistics of EPSS and/or show its distribution

Here we print some statistics about EPSS base score and we show its distribution related to publication date.

In [None]:
candidate_cves_df["epss"].describe()

In [None]:
plt.figure(figsize=(12, 5))
sns.scatterplot(x=candidate_cves_df["cve.published"], y=candidate_cves_df["epss"], color="k")
plt.xticks(rotation=90)
plt.xlabel("Date")
plt.ylabel("EPSS")
plt.tight_layout()
plt.show()


It is evident that, except for a couple of outliers, on average the EPSS is extremely low.

- #### Produce a scatter plot showing CVSS vs EPSS


In [None]:
plt.figure(figsize=(12, 5))
sns.scatterplot(x=candidate_cves_df["cvss_baseScore"], y=candidate_cves_df["epss"], color="k")
plt.xticks(rotation=90)
plt.xlabel("CVSS")
plt.ylabel("EPSS")
plt.title("September 2025")
plt.tight_layout()
plt.show()

As we can see, the CVSS and EPSS are not really related with each other, even though the only times the EPSS is high enough, it's in the presence of an equally high CVSS. We can further visualize this lack of correlation with a correlation matrix:

In [None]:
plt.figure(figsize=(8, 6))
sns.heatmap(candidate_cves_df[["cvss_baseScore", "epss", "percentile"]].corr(), annot=True, cmap="coolwarm")
plt.title("September 2025")
plt.show()

- #### Extra analysis

### <font color='blue'><b><i>TODO</i></b>
- Filter the CVEs with low EPSS (<1%)
- Select candidate CVEs
    - From the resulting subset, select 10 CVEs that you think will reach high EPSS by the end of the course.
    - Clearly describe the criteria you used for selection (e.g., high CVSS, popular software, CWE, popular vendor, number of references, keyword in description, manual inspection, random sampling, security blogs).
- Share the selected CVE ids with the instructor (by two weeks). Use the code cell below to produce the csv file to submit.
- Track the EPSS of your CVEs over time


As per specification, we start by filtering the CVEs with low EPSS (<1%), removing all features we will not need, and turning the remaining ones into categorical.

In [None]:
candidate_cves_df = candidate_cves_df[candidate_cves_df['percentile'] <= 0.01].drop(columns=["epss", "percentile", "cve", "cve.published", "cve.lastModified", "cvss_version", "cve.references", "num_references", "vulnerable_cpes"])

In [None]:
cols_to_cat = ["cve.sourceIdentifier", "cve.vulnStatus", "cvss_vectorString", "cvss_baseSeverity", "cvss_attackVector",
               "cvss_attackComplexity", "cvss_privilegesRequired", "cvss_userInteraction", "cvss_scope",
               "cvss_confidentialityImpact", "cvss_integrityImpact",
               "cvss_availabilityImpact"]
candidate_cves_df[cols_to_cat] = candidate_cves_df[cols_to_cat].astype('category')
candidate_cves_df.info()
# save the final dataframe
candidate_cves_df.to_csv(os.path.join(data_path, "candidate_cves_df.csv"))

## NVD 2022-2025 database
We start by downloading all the CVEs that have been published between 2022 and 2024. This is because EPSS data is only available from 2021, and since we want to analyze CVEs behavior from when they were published, we need CVEs from after 2021.

In [None]:
base_url = "https://nvd.nist.gov/feeds/json/cve/2.0/nvdcve-2.0-"
ext = ".json.gz"

# is json file doesn't exist use json to store all_cves inside it
if not os.path.exists(os.path.join(data_path, "2024_cves.json")):
    csv_url = base_url + "2024" + ext
    response = requests.get(csv_url)
    if response.status_code != 200:
        print("Error:", response.status_code)
    with open(os.path.join(data_path, "2024_cves.json"), "wt") as f:
        json.dump(json.loads(gzip.decompress(response.content)), f, indent=2)
cves = json.load(open(os.path.join(data_path, "2024_cves.json"))).get("vulnerabilities", [])

base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
cve_date_start = date(2022, 1, 1)
cve_date_end = date(2025, 6, 1)
cve_window_start = cve_date_start
cves = []

while cve_window_start + timedelta(days=30) <= cve_date_end:
    start_index = 0
    results_per_page = 1000
    total_results = 1
    date_start_NVD = f'{cve_window_start}T00:00:00.001Z'
    date_end_NVD = f'{cve_window_start + timedelta(days=30)}T00:00:00.000Z'
    print(f"Downloading CVEs between {date_start_NVD} and {date_end_NVD}...")
    while start_index < total_results:
        params = {
            "pubStartDate": date_start_NVD,
            "pubEndDate": date_end_NVD,
            "resultsPerPage": results_per_page,
            "startIndex": start_index,
            "noRejected": ""
        }
        response = requests.get(base_url, params=params, timeout=6)
        if response.status_code != 200:
            print("Error:", response.status_code)
            break
        data = response.json()
        total_results = data.get("totalResults", 0)
        cves.extend(data.get("vulnerabilities", []))
        start_index += results_per_page
        print(start_index)
        sleep(1)
    sleep(5)
    cve_window_start += timedelta(days=30)

In [None]:
# normalize and preprocess data
cves_df = pd.json_normalize(cves, record_path=None, sep='.', max_level=None)
cves_df = preprocess_NVD_data(cves_df)

In [None]:
# remove vulnerabilities marked as "reject" or "reserved"
cves_df = cves_df[(cves_df['cve.vulnStatus'] != 'Reserved') & (cves_df['cve.vulnStatus'] != 'Reject')]

In [None]:
cves_df.describe()

In [None]:
cves_df.isnull().sum()

Some features have a high number of missing values, so we drop the columns directly. Due to the sheer amount of samples, we also remove all the rows that have a missing value. Since we will aggregate data from all the CVEs' histories, we also drop date-related columns.

In [None]:
X = cves_df.drop(columns=["cve.cisaExploitAdd", "cve.evaluatorComment", "cve.cisaActionDue", "cve.cisaRequiredAction",
                          "cve.cisaVulnerabilityName", "cve.published", "cve.lastModified"]).dropna()
X.info()

In [None]:
X.head(3).T

Since all CVEs share the same CVSS version, we can remove that column from our dataset.

In [None]:
X = X.drop(columns="cvss_version")

Even though the number of references and cpes could be very useful, since they refer to the status of the CVE at the moment of download and do not contain time series data, we cannot use them to asses how the CVE evolved during its first months after publication. For this reason, we drop those columns too from our dataset.

In [None]:
X = X.drop(columns=["cve.references", "num_references", "vulnerable_cpes"])


We now transform all remaining variables into categorial ones.

In [None]:
cols_to_cat = ["cve.sourceIdentifier", "cve.vulnStatus", "cvss_vectorString", "cvss_baseSeverity", "cvss_attackVector",
               "cvss_attackComplexity", "cvss_privilegesRequired", "cvss_userInteraction", "cvss_scope",
               "cvss_confidentialityImpact", "cvss_integrityImpact",
               "cvss_availabilityImpact"]
X[cols_to_cat] = X[cols_to_cat].astype('category')

In [None]:
X.to_csv(os.path.join(data_path, "cves_df.csv"))

## Feature construction from historical EPSS data

For each CVE, we download its complete EPSS history. We take all CVEs that started with a percentile value < 1% and we calculate the following metrics for their first 3 months after publication:
- $\frac{\sum_{t=1}^{T}pct_{i,t}-pct_{i,0}}{T}, T = 90$
- $\text{max}_t(pct_{i,t}-pct_{i,0}), t\in[1,...,90]$

In [None]:
window = 90

start_date = date(2022, 1, 1)
end_date = date(2025, 6, 1) + timedelta(days=window)
epss_path = os.path.join(data_path, "epss_history")
os.makedirs(epss_path, exist_ok=True)
while start_date <= end_date:
    url = "https://epss.empiricalsecurity.com/epss_scores-{:%Y-%m-%d}.csv.gz".format(start_date)
    filename = os.path.join(epss_path, f"epss_scores-{start_date:%Y-%m-%d}.csv.gz")

    # Skip if already downloaded
    if os.path.exists(filename):
        print(f"Skipping {filename} (already exists)")
    else:
        print(f"Downloading {url}...")
        response = requests.get(url)
        if response.status_code == 200:
            with open(filename, "wb") as f:
                f.write(response.content)
            print(f"Saved to {filename}")
        else:
            print(f"No file for {start_date:%Y-%m-%d} (HTTP {response.status_code})")
        sleep(1)
    start_date += timedelta(days=1)

print("Download complete.")

In [None]:
# Loop through all .gz files
for filename in os.listdir(epss_path):
    gz_path = os.path.join(epss_path, filename)
    csv_path = os.path.join(epss_path, filename[:-3])  # Remove .gz
    if filename.endswith(".csv.gz") and not os.path.exists(csv_path):
        print(f"Unzipping {gz_path} -> {csv_path}")
        with gzip.open(gz_path, "rb") as f_in:
            with open(csv_path, "wb") as f_out:
                shutil.copyfileobj(f_in, f_out)

print("All files unzipped successfully.")

We create a dictionary containing as keys all the CVEs published between 2022 and 2024, and as values a list of their first 90 percentile values since first publication on EPSS.

In [None]:
percentiles_history = {}
r = 0
p = 0
# Loop through all CSV files
for filename in sorted(os.listdir(epss_path)):
    if filename.endswith(".csv") and filename.startswith("epss_scores-"):
        file_path = os.path.join(epss_path, filename)
        # Read file and acquire CVEs
        with open(file_path, newline='', encoding="utf-8") as csvfile:
            reader = csv.reader(csvfile)
            print(f"Reading {filename}...")
            for row in reader:
                if len(row) >= 3 and row[0].strip().startswith(("CVE-2022", "CVE-2023", "CVE-2024")):
                    r += 1
                    if row[0].strip() not in percentiles_history:
                        percentiles_history[row[0].strip()] = []
                    if float(row[2].strip()) < 0.01:
                        p += 1
                    percentiles_history[row[0].strip()].append(float(row[2].strip()))
                    #print(f"Found {row[0].strip()} in {filename} with percentile {row[2]}")

# We cap the percentiles at the first 90 days
for cve in percentiles_history:
    percentiles_history[cve] = percentiles_history[cve][0:90]

In [None]:
print(f"N of entries: {r}\n N of 1%: {p}")

In [None]:
def get_first_percentile(cveid):
    if cveid in percentiles_history:
        return percentiles_history[cveid][0]
    return None

In [None]:
X = X[X["cve.id"].apply(lambda c: c in percentiles_history)]

$$\text{mean\_daily\_gain} = \frac{\sum_{t=1}^{T}pct_{i,t}-pct_{i,0}}{T} = \frac{(\sum_{t=1}^{T}pct_{i,t})-(T-1)pct_{i,0}}{T},  T = 90$$

In [None]:
def mean_daily_gain(values):
    return (sum(values[1:-1]) - (len(values)-1)*values[0]) / len(values)

$$\text{total\_gain}=\text{max}_t(pct_{i,t}-pct_{i,0}), t\in[1,...,90]$$

In [None]:
def total_gain(values):
    res = 0
    for value in values[1:-1]:
        res = max(res, value - values[0])
    return res

In [None]:
# merge NVD and derivative metrics
X['initial_percentile'] = X["cve.id"].apply(get_first_percentile)
X['mean_daily_gain'] = X["cve.id"].apply(lambda c: mean_daily_gain(percentiles_history[c]))
X['total_gain'] = X["cve.id"].apply(lambda c: total_gain(percentiles_history[c]))
# save the final dataframe
X.to_csv(os.path.join(data_path, "cves_df.csv"))

As we can see from the following plots, CVEs tend to start with an EPSS score sitting in the 10%. It is also evident how the mean daily gain and total gain over the first 90 days after publishing sit close to zero, meaning the vast majority of CVEs do not increase their threat level.

In [None]:
plt.figure(figsize=(12, 5))
plt.ylabel("contribution%")
plt.hist(X["initial_percentile"], bins="auto", density=True)
plt.xlabel("EPSS percentile on 1st day")
plt.show()

plt.figure(figsize=(12, 5))
plt.ylabel("contribution%")
plt.hist(X["mean_daily_gain"], bins=50, density=True)
plt.xlabel("Mean daily gain in first 90 days")
plt.show()

plt.figure(figsize=(12, 5))
plt.ylabel("contribution%")
plt.hist(x=X["total_gain"], bins=50, density=True)
plt.xlabel("Max gain in the first 90 days")
plt.xticks(rotation=90)
plt.tight_layout()
plt.show()


In [None]:
X[X["initial_percentile"] < 0.01].info()

There are only 80 CVEs from 2022 to the middle of 2025 that have started with an EPSS below the 1% percentile. We will try three different ML-based approaches to select our final candidates to submit.

In [None]:
candidate_cves_df = candidate_cves_df.drop(columns=["cve", "cve.published", "cve.lastModified", "cvss_version", "cve.references", "num_references", "vulnerable_cpes"])
candidate_cves_df.info()
candidate_cves_df.to_csv(os.path.join(data_path, "candidate_cves_df.csv"))

# START FROM HERE IF YOU ALREADY DOWNLOADED EVERYTHING

In [None]:
import gzip
from io import BytesIO
from time import sleep

import numpy as np
import pandas
import pandas as pd
import os
import requests
import json
import csv
import shutil
from datetime import date, timedelta
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import KBinsDiscretizer
from pandas import concat
from sklearn.model_selection import train_test_split
from sklearn.svm import OneClassSVM
from sklearn.feature_selection import chi2
import seaborn as sns
import matplotlib.pyplot as plt

sns.set_theme(style="whitegrid")
from preprocessing_utils import preprocess_NVD_data

data_path = 'data'
if not os.path.exists(data_path):
    os.makedirs(data_path)

In [None]:
X = pd.read_csv(os.path.join(data_path, "cves_df.csv")).drop(columns="Unnamed: 0")
candidate_cves_df = pd.read_csv(os.path.join(data_path, "candidate_cves_df.csv")).drop(columns="Unnamed: 0")

In [None]:
X.info()

## 1. Novelty detection with OneClassSVM

Since we have collected a huge amount of CVEs with pretty low scores and that haven't "achieved" anything in their first 90 days, we will train a OneClassSVM on all CVEs that satisfy the following criteria:

   - percentile on first day <= 0.1
   - mean daily gain in first 90 days <= 0.1
   - max gain in first 90 days <= 0.1

This training set will be considered the "losers", and we hope to find some CVEs among our candidates that are detected as outliars (novel) by the SVM.

In [None]:
import sklearn
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import OneHotEncoder

In [None]:
X_train_svm = X[X["initial_percentile"] < 0.1]
X_train_svm = X_train_svm[X_train_svm["mean_daily_gain"] < 0.1]
X_train_svm = X_train_svm[X_train_svm["total_gain"] < 0.1]
X_train_svm = X_train_svm.drop(columns=["initial_percentile", "mean_daily_gain", "total_gain", "cve.id", "description", "cwe_list"])
X_train_svm.info()

In [None]:
X_test_svm = X[X["initial_percentile"] >= 0.1]
X_test_svm = X_test_svm[X_test_svm["mean_daily_gain"] >= 0.1]
X_test_svm = X_test_svm[X_test_svm["total_gain"] >= 0.1]
X_test_svm = X_test_svm.drop(columns=["initial_percentile", "mean_daily_gain", "total_gain", "cve.id", "description", "cwe_list"])
X_test_svm.info()

In [None]:
encoder = OneHotEncoder()
print("Encoding categorial features...")
X_svm_encode = pd.concat([X.drop(columns=["initial_percentile", "mean_daily_gain", "total_gain", "cve.id", "description", "cwe_list"]), candidate_cves_df.drop(columns=["cve.id", "description", "cwe_list"])])
encoder.fit(X_svm_encode)
X_train_svm_encoded = encoder.transform(X_train_svm)
X_test_svm_encoded = encoder.transform(X_test_svm)

In [None]:
estimator = OneClassSVM()
print("Fitting model...")
estimator.fit(X_train_svm_encoded)

In [None]:
print("Testing...")
y_svm_test = estimator.predict(X_test_svm_encoded)
print(f"Accuracy on testing data: {len(y_svm_test[y_svm_test == -1]) / len(X_test_svm)}%")

In [None]:
candidate_cves_df_encoded = encoder.transform(candidate_cves_df.drop(columns=["cve.id", "description", "cwe_list"]))
print("Predicting...")
y_svm_predict = estimator.predict(candidate_cves_df_encoded)
print(f"Outliars in candidates: {100*len(y_svm_predict[y_svm_predict == -1])/len(y_svm_predict):.02f}%")
y_svm_predict = ["Outlier" if p == -1 else "Inlier" for p in y_svm_predict]
print(y_svm_predict)

## 1bis. Novelty detection on the filtered dataset

We try to train another OneClasSVM, this time on the dataset filtered by percentile < 0.01. Since the size of the dataset is so small (80 samples), this is performed just as an extra step.

In [None]:
import sklearn
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import OneHotEncoder

In [None]:
X_train_svm = X[X["initial_percentile"] < 0.01]
X_train_svm = X_train_svm.drop(
    columns=["initial_percentile", "mean_daily_gain", "total_gain", "cve.id", "description", "cwe_list"])
X_train_svm.info()

In [None]:
X_test_svm = X[X["initial_percentile"] >= 0.01]
X_test_svm = X_test_svm.drop(
    columns=["initial_percentile", "mean_daily_gain", "total_gain", "cve.id", "description", "cwe_list"])
X_test_svm.info()

In [None]:
encoder = OneHotEncoder()
print("Encoding categorial features...")
X_svm_encode = pd.concat(
    [X.drop(columns=["initial_percentile", "mean_daily_gain", "total_gain", "cve.id", "description", "cwe_list"]),
     candidate_cves_df.drop(columns=["cve.id", "description", "cwe_list"])])
encoder.fit(X_svm_encode)
X_train_svm_encoded = encoder.transform(X_train_svm)
X_test_svm_encoded = encoder.transform(X_test_svm)

In [None]:
estimator = OneClassSVM()
print("Fitting model...")
estimator.fit(X_train_svm_encoded)

In [None]:
print("Testing...")
y_svm_test = estimator.predict(X_test_svm_encoded)
print(f"Accuracy on testing data: {len(y_svm_test[y_svm_test == -1]) / len(X_test_svm)}%")

In [None]:
candidate_cves_df_encoded = encoder.transform(candidate_cves_df.drop(columns=["cve.id", "description", "cwe_list"]))
print("Predicting...")
y_svm_predict = estimator.predict(candidate_cves_df_encoded)
print(f"Outliars in candidates: {100 * len(y_svm_predict[y_svm_predict == -1]) / len(y_svm_predict):.02f}%")
y_svm_predict = ["Outlier" if p == -1 else "Inlier" for p in y_svm_predict]
print(y_svm_predict)

## 2. Metrics prediction with multi output RandomForest

We use RandomForestRegressor on the whole dataset and see what its best predictions are for our candidate CVEs

In [None]:
import sklearn
from sklearn.ensemble import RandomForestRegressor
from sklearn.multioutput import MultiOutputRegressor
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, root_mean_squared_error, accuracy_score

In [None]:
encoder = OneHotEncoder()
print("Encoding categorial features...")
X_rr_features = X.drop(columns=["initial_percentile", "mean_daily_gain", "total_gain", "cve.id", "description", "cwe_list"])
X_rr_encode = pd.concat([X_rr_features, candidate_cves_df.drop(columns=["cve.id", "description", "cwe_list"])])
encoder.fit(X_rr_encode)
X_rr_features_encoded = encoder.transform(X_rr_features)

In [None]:
print("Generating train/test split...")
targets = ["mean_daily_gain", "total_gain"]
y_rr_targets = X[targets]
X_rr_train, X_rr_test, y_rr_train, y_rr_test = train_test_split(X_rr_features_encoded, y_rr_targets, test_size=0.33, random_state=42)

In [None]:
print("Fitting model...")
rr = RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1)
estimator = MultiOutputRegressor(rr, n_jobs=-1)
estimator.fit(X_rr_train, y_rr_train)

In [None]:
print("Testing...")
y_rr_predict = estimator.predict(X_rr_test)
print("R^2 per target:", r2_score(y_rr_test, y_rr_predict, multioutput='raw_values'))
print("MSE per target:", mean_squared_error(y_rr_test, y_rr_predict, multioutput='raw_values'))
print("RMSE per target:", root_mean_squared_error(y_rr_test, y_rr_predict, multioutput='raw_values'))

In [None]:
candidate_cves_df_encoded = encoder.transform(candidate_cves_df.drop(columns=["cve.id", "description", "cwe_list"]))
print("Predicting...")
y_rr_predict = estimator.predict(candidate_cves_df_encoded)
print(y_rr_predict)

## 2.bis Metric prediction on the filtered dataset

We try to train another RandomForestRegressor, this time on the dataset filtered by percentile < 0.01. Since the size of the dataset is so small (80 samples), this is performed just as an extra step.

In [None]:
import sklearn
from sklearn.ensemble import RandomForestRegressor
from sklearn.multioutput import MultiOutputRegressor
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, root_mean_squared_error, accuracy_score

In [None]:
encoder = OneHotEncoder()
print("Encoding categorial features...")
X_rr_features = X[X["initial_percentile"] < 0.01]
X_rr_features = X.drop(
    columns=["initial_percentile", "mean_daily_gain", "total_gain", "cve.id", "description", "cwe_list"])
X_rr_encode = pd.concat([X_rr_features, candidate_cves_df.drop(columns=["cve.id", "description", "cwe_list"])])
encoder.fit(X_rr_encode)
X_rr_features_encoded = encoder.transform(X_rr_features)
print("Generating train/test split...")
targets = ["mean_daily_gain", "total_gain"]
y_rr_targets = X[targets]
X_rr_train, X_rr_test, y_rr_train, y_rr_test = train_test_split(X_rr_features_encoded,
                                                                y_rr_targets,
                                                                test_size=0.33,
                                                                random_state=42)

In [None]:
print("Fitting model...")
rr = RandomForestRegressor(n_estimators=1000, random_state=42, n_jobs=-1)
estimator = MultiOutputRegressor(rr)
estimator.fit(X_rr_train, y_rr_train)

In [None]:
print("Testing...")
y_rr_predict = estimator.predict(X_rr_test)
print("R^2 per target:", r2_score(y_rr_test, y_rr_predict, multioutput='raw_values'))
print("MSE per target:", mean_squared_error(y_rr_test, y_rr_predict, multioutput='raw_values'))
print("RMSE per target:", root_mean_squared_error(y_rr_test, y_rr_predict, multioutput='raw_values'))

In [None]:
candidate_cves_df_encoded = encoder.transform(candidate_cves_df.drop(columns=["cve.id", "description", "cwe_list"]))
print("Predicting...")
y_rr_predict = estimator.predict(candidate_cves_df_encoded)
print(y_rr_predict)

# Final analysis

We now merge the predictions with the candidates:

In [None]:
candidate_cves_df["Status"] = y_svm_predict
candidate_cves_df["predicted_mean_daily_gain"] = y_rr_predict.T[0]
candidate_cves_df["predicted_total_gain"] = y_rr_predict.T[1]
candidate_cves_df.T

We only care about outliers, so we drop the others.

In [None]:
candidate_cves_df = candidate_cves_df[candidate_cves_df["Status"] == "Outlier"].drop(columns="Status")

We now sort the remaining CVEs by their two target metrics:

In [None]:
candidate_cves_df = candidate_cves_df.sort_values(by=["predicted_mean_daily_gain", "predicted_total_gain"], ascending=False)

At a cursory glance, we can see that all of Apple's and JetBrains' CVEs have already been fixed prior to publication, thus we exclude them.

In [None]:
candidate_cves_df = candidate_cves_df[candidate_cves_df["cve.sourceIdentifier"] != "cve@jetbrains.com"]
candidate_cves_df = candidate_cves_df[candidate_cves_df["cve.sourceIdentifier"] != "product-security@apple.com"]
candidate_cves_df

By manual inspection, we also exclude CVE-2025-7445 and CVE-2025-59934 since they have been fixed. This leaves us with the first remaining 10 CVEs, which we will select for the lab activity.

In [None]:
candidate_cves_df = candidate_cves_df[candidate_cves_df["cve.id"] != "CVE-2025-7445"]
candidate_cves_df = candidate_cves_df[candidate_cves_df["cve.id"] != "CVE-2025-59934"]
candidate_cves_df

In [None]:
candidate_cves_df.to_csv(os.path.join(data_path, "candidate_cves_ranked_df.csv"))

In [None]:
nickname = 'ora_et_LABora'

selected = ['CVE-2025-9364',
            'CVE-2025-50255',
            'CVE-2025-54588',
            'CVE-2025-57520',
            'CVE-2025-20364',
            'CVE-2025-10205',
            'CVE-2025-36899',
            'CVE-2025-26419',
            'CVE-2025-26420',
            'CVE-2025-26427',
            ]

candidate_cves_df = candidate_cves_df[candidate_cves_df["cve.id"].isin(selected)]
candidate_cves_df.to_csv(os.path.join(data_path, f'{nickname}.csv'))