In [1]:
from IPython.core.magic import register_cell_magic
from IPython.display import Markdown
from datetime import date
import glob
import json
import logging
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns


In [2]:
columns = ['CVE']
metasploit_df = pd.read_csv('metasploit.txt', header=None, names=columns)
metasploit_df.drop_duplicates(keep='first',inplace=True)
nuclei_df = pd.read_csv('nuclei.txt', header=None, names=columns)
nuclei_df.drop_duplicates(keep='first',inplace=True)
metasploit_df['Source'] = 'Metasploit'
nuclei_df['Source']= 'Nuclei'
metasploit_df = metasploit_df[['CVE', 'Source']]
nuclei_df = nuclei_df[['CVE', 'Source']]

In [3]:
CISA_df = pd.read_csv('known_exploited_vulnerabilities.csv')
CISA_df = CISA_df.rename(columns={"cveID": "CVE"})
CISA_df['Source'] = 'CISA'
CISA_df = CISA_df[['CVE', 'Source']]

In [4]:
epss_df = pd.read_csv('epss_scores-current.csv', skiprows=1)
epss_df = epss_df.rename(columns={"cve": "CVE"})
epss_df_all = epss_df
epss_df = epss_df[epss_df.epss > .95].copy()  # Use .copy() to avoid SettingWithCopyWarning
epss_df['Source'] = 'EPSS'  # Use .loc to avoid SettingWithCopyWarning
epss_df = epss_df[['CVE', 'Source']]

In [5]:
CVE_list = pd.concat([metasploit_df, nuclei_df, epss_df, CISA_df,], ignore_index=True, sort=False)
CVE_list = CVE_list.groupby('CVE', as_index=False).agg({'CVE' : 'first', 'Source' : '/'.join})

In [6]:
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def load_nvd_data(filename):
    try:
        with open(filename, 'r', encoding='utf-8') as f:
            return json.load(f)
    except json.JSONDecodeError as e:
        logging.error(f"Error decoding JSON from file {filename}: {e}")
        return []

def extract_entry_data(entry):
    fields = {
        'assigner': 'Missing_Data',
        'published_date': 'Missing_Data',
        'attack_vector': 'Missing_Data',
        'attack_complexity': 'Missing_Data',
        'privileges_required': 'Missing_Data',
        'user_interaction': 'Missing_Data',
        'scope': 'Missing_Data',
        'confidentiality_impact': 'Missing_Data',
        'integrity_impact': 'Missing_Data',
        'availability_impact': 'Missing_Data',
        'base_score': '0.0',
        'base_severity': 'Missing_Data',
        'exploitability_score': 'Missing_Data',
        'impact_score': 'Missing_Data',
        'cwe': 'Missing_Data',
        'description': ''
    }

    fields['cve'] = entry['cve']['id']
    fields['assigner'] = entry['cve'].get('sourceIdentifier', fields['assigner'])
    fields['published_date'] = entry['cve'].get('published', fields['published_date'])

    metrics = entry['cve'].get('metrics', {}).get('cvssMetricV31', [{}])[0].get('cvssData', {})
    fields.update({
        'attack_vector': metrics.get('attackVector', fields['attack_vector']),
        'attack_complexity': metrics.get('attackComplexity', fields['attack_complexity']),
        'privileges_required': metrics.get('privilegesRequired', fields['privileges_required']),
        'user_interaction': metrics.get('userInteraction', fields['user_interaction']),
        'scope': metrics.get('scope', fields['scope']),
        'confidentiality_impact': metrics.get('confidentialityImpact', fields['confidentiality_impact']),
        'integrity_impact': metrics.get('integrityImpact', fields['integrity_impact']),
        'availability_impact': metrics.get('availabilityImpact', fields['availability_impact']),
        'base_score': metrics.get('baseScore', fields['base_score']),
        'base_severity': metrics.get('baseSeverity', fields['base_severity']),
        'exploitability_score': metrics.get('exploitabilityScore', fields['exploitability_score']),
        'impact_score': metrics.get('impactScore', fields['impact_score']),
    })

    weaknesses = entry['cve'].get('weaknesses', [{}])[0].get('description', [{}])
    if weaknesses:
        fields['cwe'] = weaknesses[0].get('value', fields['cwe'])

    descriptions = entry['cve'].get('descriptions', [{}])
    if descriptions:
        fields['description'] = descriptions[0].get('value', fields['description'])

    return fields

def process_nvd_files():
    row_accumulator = []
    for filename in glob.glob('nvd.jsonl'):
        nvd_data = load_nvd_data(filename)
        for entry in nvd_data:
            entry_data = extract_entry_data(entry)
            if not entry_data['description'].startswith('** REJECT **'):
                row_accumulator.append(entry_data)

    nvd = pd.DataFrame(row_accumulator)
    nvd = nvd.rename(columns={'published_date': 'Published'})  # Rename the column to 'Published'
    nvd['Published'] = pd.to_datetime(nvd['Published'], errors='coerce')  # Convert to datetime, handle errors
    nvd = nvd.sort_values(by=['Published'])
    nvd = nvd.reset_index(drop=True)
    return nvd

# Process the NVD files and get the DataFrame
nvd = process_nvd_files()
nvd = nvd.rename(columns={'cve': 'CVE', 'description' : 'Description', 'base_score' : 'CVSS Score'})

In [7]:
patchthisapp_df = pd.merge(CVE_list, nvd, how='inner', left_on='CVE', right_on='CVE')
patchthisapp_df = pd.merge(patchthisapp_df, epss_df_all, how='inner', left_on='CVE', right_on='CVE')
patchthisapp_df = patchthisapp_df[['CVE', 'CVSS Score', 'epss', 'Description', 'Published', 'Source']]
patchthisapp_df = patchthisapp_df.rename(columns={"epss": "EPSS"})
patchthisapp_df.to_csv('data/data.csv', index=False)