In [0]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# configuration
year = 2024
bronze_table_path = "workspace.cve_bronze.records"
silver_path = "cve_silver"
silver_table = "records"
silver_core = "core"
silver_affected = "affected"

DELTA_SILVER_PATH = f"/Volumes/workspace/default/assignment1/cve/silver"


In [0]:
# partial schema for parsing nested structure of containers column
containers_schema = StructType([
  StructField("cna", StructType([
    StructField("providerMetadata", StructType([
      StructField("orgId", StringType(), True),
      StructField("shortName", StringType(), True),
      StructField("dateUpdated", TimestampType(), True)
    ]), True),
    StructField("title", StringType(), True),
    StructField("problemTypes", ArrayType(StructType([
      StructField("descriptions", ArrayType(StructType([
        StructField("type", StringType(), True),
        StructField("cweId", StringType(), True),
        StructField("lang", StringType(), True),
        StructField("description", StringType(), True)
      ])), True)
    ])), True),
    StructField("affected", ArrayType(StructType([
      StructField("vendor", StringType(), True),
      StructField("product", StringType(), True),
      StructField("versions", ArrayType(StructType([
        StructField("version", StringType(), True),
        StructField("status", StringType(), True)
      ])), True)
    ])), True),
    StructField("descriptions", ArrayType(StructType([
      StructField("lang", StringType(), True),
      StructField("value", StringType(), True)
    ])), True),
    StructField("metrics", ArrayType(StructType([
      StructField("cvssV4_0", StructType([
      StructField("version", StringType(), True),
      StructField("baseScore", DoubleType(), True),
      StructField("vectorString", StringType(), True),
      StructField("baseSeverity", StringType(), True)
      ]), True),
      StructField("cvssV3_1", StructType([
      StructField("version", StringType(), True),
      StructField("baseScore", DoubleType(), True),
      StructField("vectorString", StringType(), True),
      StructField("baseSeverity", StringType(), True)
      ]), True),
      StructField("cvssV3_0", StructType([
      StructField("version", StringType(), True),
      StructField("baseScore", DoubleType(), True),
      StructField("vectorString", StringType(), True),
      StructField("baseSeverity", StringType(), True)
      ]), True),
      StructField("cvssV2_0", StructType([
      StructField("version", StringType(), True),
      StructField("baseScore", DoubleType(), True),
      StructField("vectorString", StringType(), True)
      ]), True)])), True),
    ]), True),
    StructField("timeline", ArrayType(StructType([
      StructField("time", TimestampType(), True),
      StructField("lang", StringType(), True),
      StructField("value", StringType(), True)
  ])), True),
  StructField("adp", ArrayType(StructType([
    StructField("title", StringType(), True),
    StructField("providerMetadata", StructType([
      StructField("orgId", StringType(), True),
      StructField("shortName", StringType(), True),
      StructField("dateUpdated", TimestampType(), True)
    ]), True)
  ])), True)
])

In [0]:
# Bronze to silver
df_bronze = spark.read.format("delta").table(bronze_table_path)

# parse nested structure of containers column
df_silver = df_bronze.withColumn("containers_parsed", from_json(col("containers"), containers_schema)) \
        .drop("containers") \
        .withColumnRenamed("containers_parsed", "containers")

# Write to Delta Lake
(df_silver.write
.format("delta")
.mode("overwrite")
.option("mergeSchema", "true")
.option("overwriteSchema", "true")
.option("delta.columnMapping.mode", "name")
.save(DELTA_SILVER_PATH))

print(f"\n{year} Silver layer created: {DELTA_SILVER_PATH}")

# Create schema if it does not exist
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.cve_silver")

# Register table for SQL access
# silver records
(df_silver.write
    .format("delta")
    .mode("overwrite")
    .option("mergeSchema", "true")
    .option("overwriteSchema", "true")
    .option("delta.columnMapping.mode", "name")
    .saveAsTable(f"{silver_path}.{silver_table}"))

print(f"\n{year} Silver records table registered: {silver_path}.{silver_table}")

# 1) Create core CVE table with: CVE IDs, publication dates, CVSS scores, descriptions
df_silver_core = spark.sql("""
SELECT
  cveMetadata.cveId AS cve_id,
  cveMetadata.dateReserved AS date_reserved,
  cveMetadata.datePublished AS date_published,
  CAST(
    aggregate(
      containers.cna.metrics,
      CAST(NULL AS DOUBLE),
      (acc, m) -> COALESCE(
        acc,
        m.cvssV3_1.baseScore,
        m.cvssV3_0.baseScore,
        m.cvssV4_0.baseScore,
        m.cvssV2_0.baseScore
      )
    ) AS DOUBLE
  ) AS cvss_score,
  CASE
    WHEN ARRAY_POSITION(
      TRANSFORM(containers.cna.
      descriptions, x -> x.lang), 'en'
    ) > 0 THEN
      element_at(
        containers.cna.descriptions,
        CAST( 
        ARRAY_POSITION(
          TRANSFORM(containers.cna.
          descriptions, x -> x.lang), 'en'
        ) AS INT
     )
    ).value 
  ELSE NULL
  END AS description
FROM workspace.cve_silver.records
WHERE cveMetadata.cveId IS NOT NULL
""")

#display(df_silver_core)

# 2) Create Affected Products table, flatten vendor/product arrays into separate rows
#from pyspark.sql.functions import explode_outer, col

df_silver_affected = spark.sql("""
SELECT
  cveMetadata.cveId AS cve_id,
  containers.cna.affected AS affected
FROM workspace.cve_silver.records
WHERE cveMetadata.cveId IS NOT NULL
""")

df_silver_affected = (
    df_silver_affected
    .withColumn("affected_entry", explode_outer(col("affected")))
    .select(
        col("cve_id"),
        col("affected_entry.vendor"),
        col("affected_entry.product").alias("product")
    )
  )

#display(df_silver_affected)

# 3) Data Quality Improvements

# standardize timestamp formats using to_timestamp
"""
df_silver_core = df_silver_core.withColumn(
    "date_published",
    coalesce(
        try_to_timestamp(
            col("date_published"),
            "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
        ),
        try_to_timestamp(
            col("date_published"),
            "yyyy-MM-dd'T'HH:mm:ss'Z'"
        ),
        try_to_timestamp(
            col("date_published"),
            "yyyy-MM-dd"
        )
    )
)
"""

# normalize unknown vendor and product naming convention
# fill all null values
df_silver_affected = df_silver_affected.na.fill({"vendor": "n/a", "product": "n/a"})
# replace "n/a" with "Unknown" for vendor and product
df_silver_affected = df_silver_affected.replace("n/a", "Unknown", subset=["vendor"])
df_silver_affected = df_silver_affected.replace("n/a", "Unknown", subset=["product"])

# drop CVEs with no description
#df_silver_core = df_silver_core.filter(col("description").isNotNull())

# drop CVEs with no CVSS score
#df_silver_core = df_silver_core.filter(col("cvss_score").isNotNull())

# drop CVEs with no affected products
#df_silver_affected = df_silver_affected.filter(col("product").isNotNull())

# Register tables for SQL access
# silver core
(df_silver_core.write
    .format("delta")
    .mode("overwrite")
    .option("mergeSchema", "true")
    .option("overwriteSchema", "true")
    .option("delta.columnMapping.mode", "name")
    .saveAsTable(f"{silver_path}.{silver_core}"))

print(f"\n{year} Silver core table registered: {silver_path}.{silver_core}")

# silver affected 
(df_silver_affected.write
    .format("delta")
    .mode("overwrite")
    .option("mergeSchema", "true")
    .option("overwriteSchema", "true")
    .option("delta.columnMapping.mode", "name")
    .saveAsTable(f"{silver_path}.{silver_affected}"))

print(f"\n{year} Silver affected products table registered: {silver_path}.{silver_affected}")



2024 Silver layer created: /Volumes/workspace/default/assignment1/cve/silver

2024 Silver records table registered: cve_silver.records

2024 Silver core table registered: cve_silver.core

2024 Silver affected products table registered: cve_silver.affected
