In [0]:
# Configuration
BRONZE_TABLE = "bronze_spooky_authors"
SILVER_TABLE = "silver_spooky_authors"

In [0]:
# Valid author codes
VALID_AUTHORS = ['EAP', 'HPL', 'MWS']

In [0]:
# import libraries
from pyspark.sql.functions import col, trim, length, current_timestamp, lit, when

In [0]:
# Read Bronze table
df_bronze = spark.table(BRONZE_TABLE)


In [0]:
print(f"Records in Bronze layer: {df_bronze.count():,}")


Records in Bronze layer: 19,579


In [0]:
df_bronze.show(5, truncate=False)

+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+--------------------------+------+
|id     |text                                                                                                                                                                                                                                   |author|_ingestion_timestamp      |_layer|
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+--------------------------+------+
|id26305|This process, however, afforded me no means of ascertaining the dimensions of my dungeon; as I might make its circuit, and return to the point

In [0]:
# Track quality metrics
total_records = df_bronze.count()

In [0]:
# Check 1: Null IDs
null_id_count = df_bronze.filter(col("id").isNull()).count()
print(f"Records with NULL id: {null_id_count:,}")

Records with NULL id: 0


In [0]:
# Check 2: Null or empty text
null_text_count = df_bronze.filter(
    (col("text").isNull()) | (trim(col("text")) == "")
).count()
print(f"Records with NULL/empty text: {null_text_count:,}")

Records with NULL/empty text: 0


In [0]:
# Check 3: Invalid authors
invalid_author_count = df_bronze.filter(
    ~col("author").isin(VALID_AUTHORS)
).count()
print(f"Records with invalid author: {invalid_author_count:,}")

Records with invalid author: 1,532


In [0]:
# Check 4: Duplicates
duplicate_count = total_records - df_bronze.dropDuplicates(["id"]).count()
print(f"Duplicate records (by id): {duplicate_count:,}")

Duplicate records (by id): 0


In [0]:
# Apply quality filters and transformations
df_silver = (
    df_bronze
    # Filter: Remove null IDs
    .filter(col("id").isNotNull())
    # Filter: Remove null/empty text
    .filter((col("text").isNotNull()) & (trim(col("text")) != ""))
    # Filter: Keep only valid authors
    .filter(col("author").isin(VALID_AUTHORS))
    # Clean: Trim whitespace from text columns
    .withColumn("id", trim(col("id")))
    .withColumn("text", trim(col("text")))
    .withColumn("author", trim(col("author")))
    # Add text length for analysis
    .withColumn("text_length", length(col("text")))
    # Deduplicate by id (keep first occurrence)
    .dropDuplicates(["id"])
    # Select final columns
    .select(
        "id",
        "text",
        "author",
        "text_length",
        "_ingestion_timestamp"
    )
    # Add silver layer metadata
    .withColumn("_silver_timestamp", current_timestamp())
    .withColumn("_layer", lit("silver"))
)

In [0]:
print(f"Records after cleaning: {df_silver.count():,}")

Records after cleaning: 18,047


In [0]:
# Calculate quality metrics
silver_count = df_silver.count()
records_removed = total_records - silver_count
removal_pct = (records_removed / total_records * 100) if total_records > 0 else 0

In [0]:
print("=" * 50)
print("DATA QUALITY SUMMARY")
print("=" * 50)
print(f"Bronze layer records:    {total_records:,}")
print(f"Silver layer records:    {silver_count:,}")
print(f"Records removed:         {records_removed:,} ({removal_pct:.2f}%)")
print("=" * 50)

DATA QUALITY SUMMARY
Bronze layer records:    19,579
Silver layer records:    18,047
Records removed:         1,532 (7.82%)


In [0]:
# Show distribution by author
print("\nRecords by Author:")
df_silver.groupBy("author").count().orderBy("author").show()


Records by Author:
+------+-----+
|author|count|
+------+-----+
|   EAP| 7044|
|   HPL| 5451|
|   MWS| 5552|
+------+-----+



In [0]:
# Write to Delta table
(
    df_silver
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(SILVER_TABLE)
)

print(f"✓ Silver table '{SILVER_TABLE}' created successfully!")

✓ Silver table 'silver_spooky_authors' created successfully!


In [0]:
# Verify the table
silver_final_count = spark.table(SILVER_TABLE).count()
print(f"✓ Silver table contains {silver_final_count:,} records")

✓ Silver table contains 18,047 records


In [0]:
# Show sample
print("\nSample from Silver table:")
spark.sql(f"SELECT * FROM {SILVER_TABLE} LIMIT 5").show(truncate=False)


Sample from Silver table:
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----------+--------------------------+--------------------------+------+
|id     |text                                                                                                                                                  |author|text_length|_ingestion_timestamp      |_silver_timestamp         |_layer|
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----------+--------------------------+--------------------------+------+
|id11321|As the Comte and his associates turned away from the lowly abode of the alchemists, the form of Charles Le Sorcier appeared through the trees.        |HPL   |142        |2026-01-03 23:09:39.969833|2026-01-03 23:14:55.303888|silver|
|id20705|

In [0]:
# Show schema
print("\nSilver table schema:")
spark.table(SILVER_TABLE).printSchema()


Silver table schema:
root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- author: string (nullable = true)
 |-- text_length: integer (nullable = true)
 |-- _ingestion_timestamp: timestamp (nullable = true)
 |-- _silver_timestamp: timestamp (nullable = true)
 |-- _layer: string (nullable = true)



In [0]:
# Return success for job orchestration
dbutils.notebook.exit(f"SUCCESS: Silver layer created with {silver_final_count} records")