Step 1: Initialize Silver Architecture

In [0]:
from pyspark.sql.functions import col, from_json, coalesce, to_timestamp, explode_outer
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType

# --- IMPORTANT: PASTE YOUR ACTUAL DELTA FILE PATH HERE ---
# This path was likely printed in the output of your original Bronze notebook (Cell 5, Output 3/3).
# Example format: "/Volumes/workspace/default/cve_demo/bronze/2024_Nov13"
# You MUST replace the placeholder below with your actual path.
ACTUAL_DELTA_FILE_PATH = "/Volumes/workspace/default/cve_demo/bronze/2024_Nov13"
# --------------------------------------------------------

print(f"1. Loading Bronze Delta files directly from: {ACTUAL_DELTA_FILE_PATH}")

try:
    # Read the data directly from the Delta files on disk.
    bronze_df = spark.read.format("delta").load(ACTUAL_DELTA_FILE_PATH)
    print(f"✅ Successfully loaded Delta files: {bronze_df.count():,} records.")
except Exception as e:
    print("❌ ERROR: Could not read Delta files. Please verify the file path.")
    raise e

# --- Re-register Bronze Table for convenience and correctness ---
# This re-establishes the 'cve_bronze.records' table for the current session.
spark.sql("CREATE SCHEMA IF NOT EXISTS cve_bronze")

# Overwrite (re-register) the in-memory DataFrame as the official table
(bronze_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("cve_bronze.records")
)
print("✅ Bronze table re-registered successfully.")

1. Loading Bronze Delta files directly from: /Volumes/workspace/default/cve_demo/bronze/2024_Nov13
✅ Successfully loaded Delta files: 38,753 records.
✅ Bronze table re-registered successfully.


Step 2: Create the cve_silver.core Table (Normalization)

In [0]:
from pyspark.sql.functions import col, from_json, explode_outer, coalesce, to_timestamp
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType

# --- IMPORTANT: PASTE YOUR ACTUAL DELTA FILE PATH HERE ---
# This path was printed in the output of your original Bronze notebook.
ACTUAL_DELTA_FILE_PATH = "/Volumes/workspace/default/cve_demo/bronze/2024_Nov13"
# --------------------------------------------------------

# --- Schemas for JSON Parsing (The FIX for both errors) ---

# 1. Schema for cveMetadata column
cve_metadata_schema = StructType([
    StructField("cveId", StringType(), True),
    StructField("datePublished", StringType(), True),
    StructField("dateReserved", StringType(), True),
    StructField("state", StringType(), True)
])

# 2. Schema for the top-level containers object (We parse the outer layer to access 'cna')
containers_schema = StructType([
    StructField("cna", StructType([
        StructField("descriptions", ArrayType(StructType([StructField("value", StringType(), True)])), True),
        StructField("metrics", ArrayType(StringType()), True),
        StructField("affected", StringType(), True)
    ]), True)
])

# 3. Schema for nested objects inside containers
affected_array_schema = ArrayType(StructType([
    StructField("vendor", StringType(), True),
    StructField("product", StringType(), True),
    StructField("versions", ArrayType(StructType([
        StructField("version", StringType(), True),
        StructField("status", StringType(), True)
    ])), True)
]))
metrics_schema = "struct<cvssV3_1:struct<baseScore:double, baseSeverity:string>, cvssV3_0:struct<baseScore:double, baseSeverity:string>>"


# --- 1. Load Data, Re-register Bronze, and Parse JSON Columns ---

print(f"1. Loading Delta files directly from: {ACTUAL_DELTA_FILE_PATH}")
bronze_df = spark.read.format("delta").load(ACTUAL_DELTA_FILE_PATH)

# Re-register Bronze table
spark.sql("CREATE SCHEMA IF NOT EXISTS cve_bronze")
(bronze_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true")
    .saveAsTable("cve_bronze.records")
)

# Apply the JSON parsing fix to BOTH cveMetadata and containers
parsed_df = spark.table("cve_bronze.records") \
    .withColumn("cveMetadata_parsed", from_json(col("cveMetadata"), cve_metadata_schema)) \
    .withColumn("containers_parsed", from_json(col("containers"), containers_schema)) \
    .drop("cveMetadata", "containers") # Drop old string columns

spark.sql("CREATE SCHEMA IF NOT EXISTS cve_silver")


# --- 2. Create the cve_silver.core Table (Normalization) ---

core_df = parsed_df.select(
    # Use the parsed columns (FIXED)
    col("cveMetadata_parsed.cveId").alias("cve_id"),
    to_timestamp(col("cveMetadata_parsed.datePublished")).alias("date_published"),
    to_timestamp(col("cveMetadata_parsed.dateReserved")).alias("date_reserved"),
    col("cveMetadata_parsed.state").alias("status"),
    
    # Extract Description
    col("containers_parsed.cna.descriptions")[0]["value"].alias("description"),
    
    # Parse metrics (using the nested JSON metrics_schema)
    from_json(col("containers_parsed.cna.metrics")[0], metrics_schema).alias("parsed_metrics")
).select(
    "*",
    coalesce(col("parsed_metrics.cvssV3_1.baseScore"), col("parsed_metrics.cvssV3_0.baseScore")).alias("cvss_score"),
    coalesce(col("parsed_metrics.cvssV3_1.baseSeverity"), col("parsed_metrics.cvssV3_0.baseSeverity")).alias("cvss_severity")
).drop("parsed_metrics")

# Register core table
(core_df.write.format("delta").mode("overwrite").option("mergeSchema", "true")
    .saveAsTable("cve_silver.core")
)
print("✅ Table 'cve_silver.core' registered.")


# --- 3. Create the cve_silver.affected Table (Explode Operation) ---

affected_df = parsed_df.select(
    col("cveMetadata_parsed.cveId").alias("cve_id"),
    # Parse the affected string (inside the parsed containers struct) then explode
    from_json(col("containers_parsed.cna.affected"), affected_array_schema).alias("affected_list")
).select(
    col("cve_id"),
    explode_outer(col("affected_list")).alias("affected_item")
).select(
    col("cve_id"),
    col("affected_item.vendor").alias("vendor"),
    col("affected_item.product").alias("product")
).filter(col("vendor").isNotNull()) 

# Register affected table
(affected_df.write.format("delta").mode("overwrite").option("mergeSchema", "true")
    .saveAsTable("cve_silver.affected")
)
print("✅ Table 'cve_silver.affected' registered.")


# --- 4. Final Verification (Required for Deliverables) ---

core_count = spark.table("cve_silver.core").count()
affected_count = spark.table("cve_silver.affected").count()

print(f"\n--- Silver Layer Verification ---")
print(f"📊 Silver Core Records: {core_count:,}")
print(f"📊 Silver Affected Records: {affected_count:,}")

print("\n--- DESCRIBE DETAIL: cve_silver.core ---")
spark.sql("DESCRIBE DETAIL cve_silver.core").display()

1. Loading Delta files directly from: /Volumes/workspace/default/cve_demo/bronze/2024_Nov13
✅ Table 'cve_silver.core' registered.
✅ Table 'cve_silver.affected' registered.

--- Silver Layer Verification ---
📊 Silver Core Records: 38,753
📊 Silver Affected Records: 73,998

--- DESCRIBE DETAIL: cve_silver.core ---


format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics,clusterByAuto
delta,3abe94f6-eb0a-474e-9f48-a4f210149a36,workspace.cve_silver.core,,,2025-11-17T16:00:42.367Z,2025-11-17T16:00:46.000Z,List(),List(),1,4451977,"Map(delta.parquet.compression.codec -> zstd, delta.enableDeletionVectors -> true)",3,7,"List(appendOnly, deletionVectors, invariants)","Map(numRowsDeletedByDeletionVectors -> 0, numDeletionVectors -> 0)",False
