In [0]:
import os
import zipfile
import urllib.request
import shutil
from pyspark.sql.functions import input_file_name, col, to_timestamp, year

USE_OFFLINE = False
TARGET_YEAR = 2024
zip_destination = "/tmp/cve/cvelistV5.zip"
extraction_folder = "/tmp/cve/cvelistV5-main"

os.makedirs("/tmp/cve", exist_ok=True)

if not USE_OFFLINE:
    github_url = "https://github.com/CVEProject/cvelistV5/archive/refs/heads/main.zip"
    with urllib.request.urlopen(github_url) as response:
        downloaded_data = response.read()
    with open(zip_destination, "wb") as file:
        file.write(downloaded_data)
else:
    zip_destination = "/dbfs/FileStore/cvelistV5.zip"
    assert os.path.exists(zip_destination), f"Missing file: {zip_destination}"

if os.path.exists(extraction_folder):
    shutil.rmtree(extraction_folder, ignore_errors=True)

with zipfile.ZipFile(zip_destination) as archive:
    archive.extractall("/tmp/cve/")

cve_json_directory = "/tmp/cve/cvelistV5-main/cves"

In [0]:
import json
import os
import pandas as pd
from pyspark.sql.functions import from_json, schema_of_json, col

TARGET_YEAR = 2024
year_folder = f"/tmp/cve/cvelistV5-main/cves/{TARGET_YEAR}"

json_data = []

for root, directories, files in os.walk(year_folder):
    for filename in files:
        if filename.endswith('.json'):
            full_path = os.path.join(root, filename)
            with open(full_path, 'r') as json_file:
                json_data.append(json_file.read())

pandas_df = pd.DataFrame({'json_string': json_data})
spark_temp = spark.createDataFrame(pandas_df)

sample_record = json_data[0]
inferred_schema = schema_of_json(sample_record)

df_raw = spark_temp.select(
    from_json(col("json_string"), inferred_schema).alias("data")
).select("data.*")

In [0]:
from pyspark.sql import functions as F

bronze_table_name = "cve_bronze.records"
spark.conf.set("spark.sql.shuffle.partitions", "8")

publication_date = F.col("cveMetadata.datePublished").cast("string")
df_with_timestamp = df_raw.withColumn("_datePublished_ts", F.to_timestamp(publication_date))
df_2024 = df_with_timestamp.filter(F.year(F.col("_datePublished_ts")) == 2024)

total_records = df_raw.count()
records_2024 = df_2024.count()
null_cve_ids = df_2024.filter(F.col("cveMetadata.cveId").isNull()).count()
unique_cve_ids = df_2024.select("cveMetadata.cveId").distinct().count()

assert records_2024 >= 30000, f"Expected at least 30,000 records, got {records_2024:,}"
assert null_cve_ids == 0, f"Found {null_cve_ids} null CVE IDs"
assert unique_cve_ids == records_2024, f"Duplicate CVE IDs detected: {records_2024 - unique_cve_ids}"

spark.sql(f"DROP TABLE IF EXISTS {bronze_table_name}")

df_2024.write \
    .format("delta") \
    .mode("overwrite") \
    .option("delta.columnMapping.mode", "name") \
    .option("overwriteSchema", "true") \
    .saveAsTable(bronze_table_name)

verified_count = spark.table(bronze_table_name).count()
assert verified_count == records_2024, f"Record count mismatch: {verified_count} != {records_2024}"


# After the assertions, add:
print(f"✓ Total raw records: {total_records:,}")
print(f"✓ 2024 records: {records_2024:,}")
print(f"✓ Null CVE IDs: {null_cve_ids}")
print(f"✓ Unique CVE IDs: {unique_cve_ids}")
print(f"✓ All assertions passed!")

# After saveAsTable, add:
print(f"\n✓ Table verified: {verified_count:,} records in {bronze_table_name}")

# Show table details:
spark.sql(f"DESCRIBE DETAIL {bronze_table_name}").show(truncate=False)

✓ Total raw records: 38,753
✓ 2024 records: 32,924
✓ Null CVE IDs: 0
✓ Unique CVE IDs: 32924
✓ All assertions passed!

✓ Table verified: 32,924 records in cve_bronze.records
+------+------------------------------------+----------------------------+-----------+--------+----------------------+-------------------+----------------+-----------------+--------+-----------+-------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+----------------+--------------------------------------------------------+---------------------------------------------------------------+-------------+
|format|id                                  |name                        |description|location|createdAt             |lastModified       |partitionColumns|clusteringColumns|numFiles|sizeInBytes|properties                                                                                                          

In [0]:
from pyspark.sql import functions as F

bronze_table_name = "cve_bronze.records"
bronze_data = spark.table(bronze_table_name)

core_cve_table = bronze_data.select(
    F.col("cveMetadata.cveId").alias("cve_id"),
    F.to_timestamp(F.col("cveMetadata.datePublished")).alias("date_published"),
    F.to_timestamp(F.col("cveMetadata.dateReserved")).alias("date_reserved"),
    F.to_timestamp(F.col("cveMetadata.dateUpdated")).alias("date_updated"),
    F.col("cveMetadata.state").alias("state"),
    
    F.when(
        F.size(F.col("containers.adp")) > 0,
        F.element_at(F.element_at(F.col("containers.adp"), 1)["metrics"], 1)["cvssV3_1"]["baseScore"]
    ).alias("cvss_base_score"),
    
    F.when(
        F.size(F.col("containers.adp")) > 0,
        F.element_at(F.element_at(F.col("containers.adp"), 1)["metrics"], 1)["cvssV3_1"]["baseSeverity"]
    ).alias("cvss_severity"),
    
    F.when(
        F.size(F.col("containers.adp")) > 0,
        F.element_at(F.element_at(F.col("containers.adp"), 1)["metrics"], 1)["cvssV3_1"]["vectorString"]
    ).alias("cvss_vector"),
    
    F.when(
        F.size(F.col("containers.cna.descriptions")) > 0,
        F.element_at(F.col("containers.cna.descriptions"), 1)["value"]
    ).alias("description")
)

core_table_name = "cve_silver.core"
spark.sql(f"DROP TABLE IF EXISTS {core_table_name}")

core_cve_table.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(core_table_name)

affected_products = bronze_data.select(
    F.col("cveMetadata.cveId").alias("cve_id"),
    F.explode_outer(F.col("containers.cna.affected")).alias("affected_item")
).select(
    F.col("cve_id"),
    F.col("affected_item.vendor").alias("vendor"),
    F.col("affected_item.product").alias("product")
)

affected_products_clean = affected_products.filter(
    F.col("vendor").isNotNull() & F.col("product").isNotNull()
)

affected_table_name = "cve_silver.affected_products"
spark.sql(f"DROP TABLE IF EXISTS {affected_table_name}")

affected_products_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(affected_table_name)

spark.table(core_table_name).createOrReplaceTempView("cve_silver_core")
spark.table(affected_table_name).createOrReplaceTempView("cve_silver_affected_products")


# After creating core table, add:
core_count = spark.table("cve_silver_core").count()
print(f"✓ Created cve_silver_core: {core_count:,} records")

# After creating affected products table:
products_count = spark.table("cve_silver_affected_products").count()
print(f"✓ Created cve_silver_affected_products: {products_count:,} records")

# Show sample data:
print("\nSample from cve_silver_core:")
spark.table("cve_silver_core").show(5, truncate=False)

print("\nSample from cve_silver_affected_products:")
spark.table("cve_silver_affected_products").show(5, truncate=False)

✓ Created cve_silver_core: 32,924 records
✓ Created cve_silver_affected_products: 61,238 records

Sample from cve_silver_core:
+--------------+-----------------------+-----------------------+-----------------------+---------+---------------+-------------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|cve_id        |date_published         |date_reserved          |date_updated           |state    |cvss_base_score|cvss_severity|cvss_vector|description                                                                                                                                                                                                               |
+--------------+-----------------------+-----------------------+-----------------------+---------+---------------+-------------+-----------+-------------

In [0]:
spark.sql("""
    SELECT 
        MONTH(date_published) as month,
        COUNT(*) as vulnerability_count,
        ROUND(AVG(cvss_base_score), 2) as avg_cvss_score
    FROM cve_silver_core
    WHERE date_published IS NOT NULL
    GROUP BY MONTH(date_published)
    ORDER BY month
""").show()

spark.sql("""
    SELECT 
        ROUND(AVG(DATEDIFF(date_published, date_reserved)), 1) as avg_days_to_publish,
        MIN(DATEDIFF(date_published, date_reserved)) as min_days,
        MAX(DATEDIFF(date_published, date_reserved)) as max_days,
        PERCENTILE_APPROX(DATEDIFF(date_published, date_reserved), 0.5) as median_days
    FROM cve_silver_core
    WHERE date_reserved IS NOT NULL AND date_published IS NOT NULL
""").show()

spark.sql("""
    SELECT 
        CASE 
            WHEN cvss_base_score >= 9.0 THEN 'CRITICAL'
            WHEN cvss_base_score >= 7.0 THEN 'HIGH'
            WHEN cvss_base_score >= 4.0 THEN 'MEDIUM'
            WHEN cvss_base_score > 0.0 THEN 'LOW'
            ELSE 'UNKNOWN'
        END as severity_level,
        COUNT(*) as count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
    FROM cve_silver_core
    GROUP BY 
        CASE 
            WHEN cvss_base_score >= 9.0 THEN 'CRITICAL'
            WHEN cvss_base_score >= 7.0 THEN 'HIGH'
            WHEN cvss_base_score >= 4.0 THEN 'MEDIUM'
            WHEN cvss_base_score > 0.0 THEN 'LOW'
            ELSE 'UNKNOWN'
        END
    ORDER BY 
        CASE severity_level
            WHEN 'CRITICAL' THEN 1
            WHEN 'HIGH' THEN 2
            WHEN 'MEDIUM' THEN 3
            WHEN 'LOW' THEN 4
            ELSE 5
        END
""").show()

spark.sql("""
    SELECT 
        MONTH(date_published) as month,
        ROUND(AVG(cvss_base_score), 2) as avg_severity,
        COUNT(CASE WHEN cvss_base_score >= 9.0 THEN 1 END) as critical_count,
        COUNT(CASE WHEN cvss_base_score >= 7.0 AND cvss_base_score < 9.0 THEN 1 END) as high_count
    FROM cve_silver_core
    WHERE date_published IS NOT NULL
    GROUP BY MONTH(date_published)
    ORDER BY month
""").show()

spark.sql("""
    SELECT 
        vendor,
        COUNT(DISTINCT cve_id) as vulnerability_count,
        COUNT(DISTINCT product) as product_count
    FROM cve_silver_affected_products
    WHERE vendor IS NOT NULL
    GROUP BY vendor
    ORDER BY vulnerability_count DESC
    LIMIT 25
""").show(25, truncate=False)

spark.sql("""
    WITH vendor_counts AS (
        SELECT 
            vendor,
            COUNT(DISTINCT cve_id) as vuln_count
        FROM cve_silver_affected_products
        GROUP BY vendor
    ),
    total_vulns AS (
        SELECT COUNT(DISTINCT cve_id) as total FROM cve_silver_affected_products
    )
    SELECT 
        vendor,
        vuln_count,
        ROUND(vuln_count * 100.0 / (SELECT total FROM total_vulns), 2) as market_share_pct,
        ROUND(SUM(vuln_count * 100.0 / (SELECT total FROM total_vulns)) OVER (ORDER BY vuln_count DESC), 2) as cumulative_pct
    FROM vendor_counts
    ORDER BY vuln_count DESC
    LIMIT 10
""").show(10, truncate=False)

spark.sql("""
    SELECT 
        p.vendor,
        COUNT(DISTINCT p.cve_id) as total_cves,
        ROUND(AVG(c.cvss_base_score), 2) as avg_cvss_score,
        MAX(c.cvss_base_score) as max_cvss_score,
        COUNT(CASE WHEN c.cvss_base_score >= 9.0 THEN 1 END) as critical_count
    FROM cve_silver_affected_products p
    JOIN cve_silver_core c ON p.cve_id = c.cve_id
    WHERE p.vendor IS NOT NULL AND c.cvss_base_score IS NOT NULL
    GROUP BY p.vendor
    HAVING COUNT(DISTINCT p.cve_id) >= 10
    ORDER BY avg_cvss_score DESC
    LIMIT 10
""").show(10, truncate=False)

spark.sql("""
    SELECT 
        COUNT(DISTINCT cve_id) as total_cves_2024,
        COUNT(DISTINCT CASE WHEN cvss_base_score IS NOT NULL THEN cve_id END) as scored_cves,
        ROUND(AVG(cvss_base_score), 2) as avg_cvss_score,
        MAX(cvss_base_score) as max_cvss_score,
        MIN(date_published) as first_published,
        MAX(date_published) as last_published
    FROM cve_silver_core
""").show(truncate=False)

+-----+-------------------+--------------+
|month|vulnerability_count|avg_cvss_score|
+-----+-------------------+--------------+
|    1|               1134|          8.37|
|    2|               1769|          7.06|
|    3|               2616|          6.65|
|    4|               3218|          6.74|
|    5|               3348|          7.02|
|    6|               2707|           7.0|
|    7|               2877|          7.11|
|    8|               2692|          7.57|
|    9|               2408|          7.16|
|   10|               3373|          7.24|
|   11|               3760|          6.72|
|   12|               3022|          6.93|
+-----+-------------------+--------------+

+-------------------+--------+--------+-----------+
|avg_days_to_publish|min_days|max_days|median_days|
+-------------------+--------+--------+-----------+
|               38.8|       0|     396|         16|
+-------------------+--------+--------+-----------+

+--------------+-----+----------+
|severity_level|