In [0]:
# Databricks notebook source
# ==============================================================
# 🥈 CVE LAKEHOUSE – SILVER LAYER (Flattened Schema Compatible)
# --------------------------------------------------------------
# Works when Bronze layer columns have dots in their names,
# e.g., "cveMetadata.cveId", "containers.cna.affected"
# ==============================================================

from pyspark.sql import functions as F, types as T

# ---------------------------------------------------------------
# Paths
# ---------------------------------------------------------------
bronze_path = "/Volumes/workspace/default/assignment1/bronze"
silver_core_path = "/Volumes/workspace/default/assignment1/silver/core"
silver_aff_path  = "/Volumes/workspace/default/assignment1/silver/affected"

# ---------------------------------------------------------------
# 1️⃣ Read Bronze Delta
# ---------------------------------------------------------------
print("📥 Reading Bronze Delta data...")
df_bronze = spark.read.format("delta").load(bronze_path)

print(f"✅ Bronze records loaded: {df_bronze.count():,}")
print("🧩 Example columns:", df_bronze.columns[:15])

# ---------------------------------------------------------------
# 2️⃣ Core CVE Table
# ---------------------------------------------------------------
print("\n🧱 Creating Core CVE Table...")

df_core = (
    df_bronze
    .select(
        F.col("`cveMetadata.cveId`").alias("cve_id"),
        F.col("`cveMetadata.assignerShortName`").alias("assigner"),
        F.col("`cveMetadata.state`").alias("state"),
        F.to_timestamp("`cveMetadata.datePublished`").alias("date_published"),
        F.to_timestamp("`cveMetadata.dateUpdated`").alias("date_updated"),
        F.col("dataType").alias("data_type"),
        F.col("dataVersion").alias("data_version"),
        F.col("`containers.cna.descriptions`").alias("descriptions_json"),
        F.col("`containers.cna.metrics`").alias("metrics_json")
    )
    # Extract nested JSON details
    .withColumn(
        "description_text",
        F.get_json_object(F.col("descriptions_json"), "$[0].value")
    )
    .withColumn(
        "base_score",
        F.get_json_object(F.col("metrics_json"), "$[0].cvssV3_1.baseScore").cast("double")
    )
    .withColumn(
        "base_severity",
        F.get_json_object(F.col("metrics_json"), "$[0].cvssV3_1.baseSeverity")
    )
    .drop("descriptions_json", "metrics_json")
)

core_count = df_core.count()
print(f"✅ Core CVE table ready: {core_count:,} rows")

# Save to Delta
df_core.write.format("delta").mode("overwrite").save(silver_core_path)
print(f"✅ Core CVE table written → {silver_core_path}")

# ---------------------------------------------------------------
# 3️⃣ Affected Products Table
# ---------------------------------------------------------------
print("\n🧩 Creating Affected Products Table...")

df_aff = (
    df_bronze
    .withColumn(
        "affected_json",
        F.from_json(
            F.col("`containers.cna.affected`"),
            T.ArrayType(
                T.StructType([
                    T.StructField("vendor", T.StringType()),
                    T.StructField("product", T.StringType()),
                    T.StructField(
                        "versions",
                        T.ArrayType(
                            T.StructType([
                                T.StructField("version", T.StringType()),
                                T.StructField("status", T.StringType())
                            ])
                        )
                    )
                ])
            )
        )
    )
    .withColumn("affected", F.explode_outer("affected_json"))
    .select(
        F.col("`cveMetadata.cveId`").alias("cve_id"),
        F.col("affected.vendor").alias("vendor"),
        F.col("affected.product").alias("product"),
        F.col("affected.versions").alias("versions")
    )
)

aff_count = df_aff.count()
print(f"✅ Affected Products table ready: {aff_count:,} rows")

# Save to Delta
df_aff.write.format("delta").mode("overwrite").save(silver_aff_path)
print(f"✅ Affected Products table written → {silver_aff_path}")

# ---------------------------------------------------------------
# 4️⃣ Verification
# ---------------------------------------------------------------
print("\n📊 Core CVE sample (for screenshot):")
display(df_core.limit(10))

print("\n📊 Affected Products sample (for screenshot):")
display(df_aff.limit(10))

print("\n📁 Files in Silver/Core path:")
display(dbutils.fs.ls(silver_core_path))

print("\n📁 Files in Silver/Affected path:")
display(dbutils.fs.ls(silver_aff_path))

print("\n📸 REQUIRED SCREENSHOTS:")
print("   • df_core.count() and df_aff.count()")
print("   • display(df_core.limit(10)) and display(df_aff.limit(10))")
print("   • dbutils.fs.ls(...) showing _delta_log + parquet files")


📥 Reading Bronze Delta data...
✅ Bronze records loaded: 32,924
🧩 Example columns: ['containers.adp', 'containers.cna.affected', 'containers.cna.configurations', 'containers.cna.cpeApplicability', 'containers.cna.credits', 'containers.cna.dateAssigned', 'containers.cna.datePublic', 'containers.cna.descriptions', 'containers.cna.exploits', 'containers.cna.impacts', 'containers.cna.metrics', 'containers.cna.problemTypes', 'containers.cna.providerMetadata.dateUpdated', 'containers.cna.providerMetadata.orgId', 'containers.cna.providerMetadata.shortName']

🧱 Creating Core CVE Table...
✅ Core CVE table ready: 32,924 rows
✅ Core CVE table written → /Volumes/workspace/default/assignment1/silver/core

🧩 Creating Affected Products Table...
✅ Affected Products table ready: 61,825 rows
✅ Affected Products table written → /Volumes/workspace/default/assignment1/silver/affected

📊 Core CVE sample (for screenshot):


cve_id,assigner,state,date_published,date_updated,data_type,data_version,description_text,base_score,base_severity
CVE-2024-42149,Linux,PUBLISHED,2024-07-30T07:46:42.133Z,2025-05-04T09:24:10.572Z,CVE_RECORD,5.1,"In the Linux kernel, the following vulnerability has been resolved: fs: don't misleadingly warn during thaw operations The block device may have been frozen before it was claimed by a filesystem. Concurrently another process might try to mount that frozen block device and has temporarily claimed the block device for that purpose causing a concurrent fs_bdev_thaw() to end up here. The mounter is already about to abort mounting because they still saw an elevanted bdev->bd_fsfreeze_count so get_bdev_super() will return NULL in that case. For example, P1 calls dm_suspend() which calls into bdev_freeze() before the block device has been claimed by the filesystem. This brings bdev->bd_fsfreeze_count to 1 and no call into fs_bdev_freeze() is required. Now P2 tries to mount that frozen block device. It claims it and checks bdev->bd_fsfreeze_count. As it's elevated it aborts mounting. In the meantime P3 called dm_resume(). P3 sees that the block device is already claimed by a filesystem and calls into fs_bdev_thaw(). P3 takes a passive reference and realizes that the filesystem isn't ready yet. P3 puts itself to sleep to wait for the filesystem to become ready. P2 now puts the last active reference to the filesystem and marks it as dying. P3 gets woken, sees that the filesystem is dying and get_bdev_super() fails.",,
CVE-2024-42150,Linux,PUBLISHED,2024-07-30T07:46:43.031Z,2025-05-04T12:57:47.297Z,CVE_RECORD,5.1,"In the Linux kernel, the following vulnerability has been resolved: net: txgbe: remove separate irq request for MSI and INTx When using MSI or INTx interrupts, request_irq() for pdev->irq will conflict with request_threaded_irq() for txgbe->misc.irq, to cause system crash. So remove txgbe_request_irq() for MSI/INTx case, and rename txgbe_request_msix_irqs() since it only request for queue irqs. Add wx->misc_irq_domain to determine whether the driver creates an IRQ domain and threaded request the IRQs.",,
CVE-2024-42151,Linux,PUBLISHED,2024-07-30T07:46:43.969Z,2025-05-04T09:24:13.803Z,CVE_RECORD,5.1,"In the Linux kernel, the following vulnerability has been resolved: bpf: mark bpf_dummy_struct_ops.test_1 parameter as nullable Test case dummy_st_ops/dummy_init_ret_value passes NULL as the first parameter of the test_1() function. Mark this parameter as nullable to make verifier aware of such possibility. Otherwise, NULL check in the test_1() code:  SEC(""struct_ops/test_1"")  int BPF_PROG(test_1, struct bpf_dummy_ops_state *state)  {  if (!state)  return ...;  ... access state ...  } Might be removed by verifier, thus triggering NULL pointer dereference under certain conditions.",,
CVE-2024-42152,Linux,PUBLISHED,2024-07-30T07:46:44.795Z,2025-11-03T22:02:15.586Z,CVE_RECORD,5.2,"In the Linux kernel, the following vulnerability has been resolved: nvmet: fix a possible leak when destroy a ctrl during qp establishment In nvmet_sq_destroy we capture sq->ctrl early and if it is non-NULL we know that a ctrl was allocated (in the admin connect request handler) and we need to release pending AERs, clear ctrl->sqs and sq->ctrl (for nvme-loop primarily), and drop the final reference on the ctrl. However, a small window is possible where nvmet_sq_destroy starts (as a result of the client giving up and disconnecting) concurrently with the nvme admin connect cmd (which may be in an early stage). But *before* kill_and_confirm of sq->ref (i.e. the admin connect managed to get an sq live reference). In this case, sq->ctrl was allocated however after it was captured in a local variable in nvmet_sq_destroy. This prevented the final reference drop on the ctrl. Solve this by re-capturing the sq->ctrl after all inflight request has completed, where for sure sq->ctrl reference is final, and move forward based on that. This issue was observed in an environment with many hosts connecting multiple ctrls simoutanuosly, creating a delay in allocating a ctrl leading up to this race window.",,
CVE-2024-42153,Linux,PUBLISHED,2024-07-30T07:46:45.724Z,2025-11-03T22:02:17.048Z,CVE_RECORD,5.2,"In the Linux kernel, the following vulnerability has been resolved: i2c: pnx: Fix potential deadlock warning from del_timer_sync() call in isr When del_timer_sync() is called in an interrupt context it throws a warning because of potential deadlock. The timer is used only to exit from wait_for_completion() after a timeout so replacing the call with wait_for_completion_timeout() allows to remove the problematic timer and its related functions altogether.",,
CVE-2024-42154,Linux,PUBLISHED,2024-07-30T07:46:51.456Z,2025-11-03T22:02:18.507Z,CVE_RECORD,5.2,"In the Linux kernel, the following vulnerability has been resolved: tcp_metrics: validate source addr length I don't see anything checking that TCP_METRICS_ATTR_SADDR_IPV4 is at least 4 bytes long, and the policy doesn't have an entry for this attribute at all (neither does it for IPv6 but v6 is manually validated).",,
CVE-2024-42155,Linux,PUBLISHED,2024-07-30T07:46:57.729Z,2025-05-04T09:24:19.111Z,CVE_RECORD,5.1,"In the Linux kernel, the following vulnerability has been resolved: s390/pkey: Wipe copies of protected- and secure-keys Although the clear-key of neither protected- nor secure-keys is accessible, this key material should only be visible to the calling process. So wipe all copies of protected- or secure-keys from stack, even in case of an error.",,
CVE-2024-42156,Linux,PUBLISHED,2024-07-30T07:46:58.513Z,2025-05-20T14:27:36.396Z,CVE_RECORD,5.1,"In the Linux kernel, the following vulnerability has been resolved: s390/pkey: Wipe copies of clear-key structures on failure Wipe all sensitive data from stack for all IOCTLs, which convert a clear-key into a protected- or secure-key.",,
CVE-2024-42157,Linux,PUBLISHED,2024-07-30T07:46:59.362Z,2025-11-03T22:02:19.958Z,CVE_RECORD,5.2,"In the Linux kernel, the following vulnerability has been resolved: s390/pkey: Wipe sensitive data on failure Wipe sensitive data from stack also if the copy_to_user() fails.",,
CVE-2024-42158,Linux,PUBLISHED,2024-07-30T07:47:00.343Z,2025-05-04T09:24:22.974Z,CVE_RECORD,5.1,"In the Linux kernel, the following vulnerability has been resolved: s390/pkey: Use kfree_sensitive() to fix Coccinelle warnings Replace memzero_explicit() and kfree() with kfree_sensitive() to fix warnings reported by Coccinelle: WARNING opportunity for kfree_sensitive/kvfree_sensitive (line 1506) WARNING opportunity for kfree_sensitive/kvfree_sensitive (line 1643) WARNING opportunity for kfree_sensitive/kvfree_sensitive (line 1770)",,



📊 Affected Products sample (for screenshot):


cve_id,vendor,product,versions
CVE-2024-22399,Apache Software Foundation,Apache Seata,"List(List(2.0.0, affected), List(1.0.0, affected))"
CVE-2024-22400,nextcloud,security-advisories,"List(List(>= 5.0.0, < 5.1.5, affected), List(>= 5.2.0, < 5.2.5, affected), List(>= 6.0.0, < 6.0.1, affected))"
CVE-2024-22401,nextcloud,security-advisories,"List(List(>= 2.4.0, < 2.4.1, affected), List(>= 2.5.0, < 2.5.1, affected), List(>= 3.0.0, < 3.0.1, affected))"
CVE-2024-22402,nextcloud,security-advisories,"List(List(>= 2.4.0, < 2.4.1, affected), List(>= 2.5.0, < 2.5.1, affected), List(>= 3.0.0, < 3.0.1, affected))"
CVE-2024-22403,nextcloud,security-advisories,"List(List(< 28.0.0, affected))"
CVE-2024-22404,nextcloud,security-advisories,"List(List(>= 1.2.0, < 1.2.1, affected), List(>= 1.3.0, < 1.4.1, affected))"
CVE-2024-22405,MacPaw,XADMaster,"List(List(< 1.10.8, affected))"
CVE-2024-22406,shopware,shopware,"List(List(< 6.5.7.4, affected))"
CVE-2024-22407,shopware,shopware,"List(List(< 6.5.7.4, affected))"
CVE-2024-22408,shopware,shopware,"List(List(< 6.5.7.4, affected))"



📁 Files in Silver/Core path:


path,name,size,modificationTime
dbfs:/Volumes/workspace/default/assignment1/silver/core/_delta_log/,_delta_log/,0,1762836967035
dbfs:/Volumes/workspace/default/assignment1/silver/core/part-00000-34c9f66f-d0b5-48c5-8c14-d5bf15e76e81.c000.snappy.parquet,part-00000-34c9f66f-d0b5-48c5-8c14-d5bf15e76e81.c000.snappy.parquet,5986812,1762836963000



📁 Files in Silver/Affected path:


path,name,size,modificationTime
dbfs:/Volumes/workspace/default/assignment1/silver/affected/_delta_log/,_delta_log/,0,1762836967621
dbfs:/Volumes/workspace/default/assignment1/silver/affected/part-00000-84df95ee-f09a-47d2-a379-fbf1eedd1175.c000.snappy.parquet,part-00000-84df95ee-f09a-47d2-a379-fbf1eedd1175.c000.snappy.parquet,1247485,1762836965000



📸 REQUIRED SCREENSHOTS:
   • df_core.count() and df_aff.count()
   • display(df_core.limit(10)) and display(df_aff.limit(10))
   • dbutils.fs.ls(...) showing _delta_log + parquet files
