# CVE Data Quailty

In [None]:
from IPython.display import display
import json
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import re

## Load and Parse CVE Data

In [11]:
all_rows = []
base_dir = "../Data/CVE/cves"

# Loop through each year directory
for year_dir in os.listdir(base_dir):
    if year_dir.isdigit():
        year_path = os.path.join(base_dir, year_dir)
        if os.path.isdir(year_path):
            for root, dirs, files in os.walk(year_path):
                for filename in files:
                    if filename.endswith(".json"):
                        filepath = os.path.join(root, filename)
                        try:
                            with open(filepath, "r") as file:
                                cve_data = json.load(file)

                                meta = cve_data.get("cveMetadata", {})
                                containers = cve_data.get("containers", {})
                                cna = containers.get("cna", {})

                                cve_id = meta.get("cveId", None)
                                state = meta.get("state", None)
                                assigner_short = meta.get("assignerShortName", None)
                                date_reserved = meta.get("dateReserved", None)
                                date_published = meta.get("datePublished", None)
                                date_updated = meta.get("dateUpdated", None)
                                
                                desc_en = next(
                                    (d.get("value") for d in cna.get("descriptions", []) if d.get("lang") == "en"),
                                    None
                                )

                                # Collect any CVSS base scores by version
                                cvss_scores_v3 = [
                                    metric["cvssV3_1"]["baseScore"]
                                    for metric in cna.get("metrics", [])
                                    if "cvssV3_1" in metric
                                ]
                                cvss_scores_v4 = [
                                    metric["cvssV4_0"]["baseScore"]
                                    for metric in cna.get("metrics", [])
                                    if "cvssV4_0" in metric
                                ]
                                cvss_scores_v2 = [
                                    metric["cvssV2_0"]["baseScore"]
                                    for metric in cna.get("metrics", [])
                                    if "cvssV2_0" in metric
                                ]

                                # Only take the first score if available
                                cvss_score_v3_1 = cvss_scores_v3[0] if cvss_scores_v3 else None
                                cvss_score_v4_0 = cvss_scores_v4[0] if cvss_scores_v4 else None
                                cvss_score_v2_0 = cvss_scores_v2[0] if cvss_scores_v2 else None

                                # Only take the first CWE if present
                                cwe_ids = []
                                for problem_type in cna.get("problemTypes", []):
                                    for desc in problem_type.get("descriptions", []):
                                        if "cweId" in desc:
                                            cwe_ids.append(desc["cweId"])
                                cwe_id = cwe_ids[0] if cwe_ids else None
                                
                                provider_meta = cna.get("providerMetadata", {})
                                cna_short_name = provider_meta.get("shortName", None)

                                # Collect affected products and versions
                                affected_products = [
                                    f"{aff.get('vendor', 'n/a')} {aff.get('product', 'n/a')} {ver.get('version', 'n/a')}"
                                    for aff in cna.get("affected", [])
                                    for ver in aff.get("versions", [])
                                ]

                                # Collect credits
                                credits = [
                                    f"{credit['value']} ({credit.get('type', 'unknown')})"
                                    for credit in cna.get("credits", [])
                                ]

                                # Collect impacts
                                impacts = [
                                    impact.get("capecId", "unknown")
                                    for impact in cna.get("impacts", [])
                                ]

                                # Collect references
                                references = [
                                    ref.get("url", "unknown")
                                    for ref in cna.get("references", [])
                                ]

                                all_rows.append({
                                    "CVE ID": cve_id,
                                    "State": state,
                                    "Assigner Org": assigner_short,
                                    "Date Reserved": date_reserved,
                                    "Date Published": date_published,
                                    "Date Updated": date_updated,
                                    "CVE Description": desc_en,
                                    "CVSS Score (v3.1)": cvss_score_v3_1,
                                    "CVSS Score (v4.0)": cvss_score_v4_0,
                                    "CVSS Score (v2.0)": cvss_score_v2_0,
                                    "CWE ID": cwe_id,
                                    "CNA Short Name": cna_short_name,
                                    "Affected Products": affected_products,
                                    "Credits": credits,
                                    "Impacts": impacts,
                                    "References": references
                                })
                        except Exception as e:
                            print(f"Error processing file {filepath}: {e}")

cve_df = pd.DataFrame(all_rows)


In [None]:
# Generate a data quality report
def data_quality_report(df):
    report = pd.DataFrame({
        'Column': cve_df.columns,
        'Non-Null Count': cve_df.notnull().sum(),
        'Null Count': cve_df.isnull().sum(),
        'Unique Count': cve_df.apply(lambda x: x.dropna().apply(str).nunique()),
        'Data Type': cve_df.dtypes
    })
    
    # Add basic statistics for numeric columns, excluding list-type columns
    numeric_columns = cve_df.select_dtypes(include=[np.number]).columns
    numeric_stats = cve_df[numeric_columns].describe().transpose()
    numeric_stats = numeric_stats[['mean', 'std', 'min', '25%', '50%', '75%', 'max']]
    numeric_stats.columns = ['Mean', 'Std Dev', 'Min', '25%', '50%', '75%', 'Max']
    
    # Merge the numeric stats with the report
    report = report.merge(numeric_stats, left_on='Column', right_index=True, how='left')
    
    return report

# Generate the report
report = data_quality_report(cve_df)

# Display the report without the index
display(report.style.hide(axis="index"))

Column,Non-Null Count,Null Count,Unique Count,Data Type,Mean,Std Dev,Min,25%,50%,75%,Max
CVE ID,7038,0,7038,object,,,,,,,
State,7038,0,2,object,,,,,,,
Assigner Org,7038,0,176,object,,,,,,,
Date Reserved,7038,0,3322,object,,,,,,,
Date Published,6980,58,6417,object,,,,,,,
Date Updated,7038,0,7038,object,,,,,,,
CVE Description,6580,458,6458,object,,,,,,,
CVSS Score (v3.1),4981,2057,78,float64,6.438928,1.557028,1.8,5.4,6.5,7.3,10.0
CVSS Score (v4.0),1517,5521,58,float64,6.221753,1.725125,0.0,5.3,5.5,6.9,10.0
CVSS Score (v2.0),901,6137,34,float64,5.576471,1.741098,1.0,4.0,6.5,6.5,10.0
