Environment

In [None]:
# ——————————————————————————————
#  LIBRARIES
# ——————————————————————————————   
from pyspark.sql.functions import *
from pyspark.sql.types import *
import dlt

In [0]:
# ——————————————————————————————
#  ENVIRONMENT CONFIGURATION
# ——————————————————————————————

try:
    env = spark.conf.get("pipeline.env")
except:
    env = "dev"
catalog = "book_rec_catalog"

# Schema names
bronze_schema = f"{env}_bronze"
silver_schema = f"{env}_silver"

Silver layer

In [0]:
# ------------------------------------------------------ BOOKS ------------------------------------------------------
# Silver table for cleaned Books data
@dlt.table(name=f"{silver_schema}.books_silver", comment="Silver table for cleaned Books data") 
@dlt.expect_or_drop("valid_isbn", "ISBN IS NOT NULL AND ISBN != ''") # Missing ISBN or empty
@dlt.expect_or_drop("valid_book_title", "`Book-Title` IS NOT NULL AND `Book-Title` != ''") # Missing Title
@dlt.expect_or_drop("valid_book_author", "`Book-Author` IS NOT NULL AND `Book-Author` != ''") # Missing Author
def books_silver():
    return (spark.table(f"{bronze_schema}.books_bronze")
            .withColumn("Year-Of-Publication", 
                       when(col("Year-Of-Publication").cast("int").isNotNull(), # Cast to int
                            col("Year-Of-Publication").cast("int"))
                       .otherwise(None)) # If not int, set to None
            .withColumn("Publisher", 
                       when(col("Publisher").isNotNull() & (col("Publisher") != ""), 
                            trim(col("Publisher")))
                       .otherwise("Unknown")) # If not string, set to Unknown
            .withColumn("Book-Title", trim(col("Book-Title"))) # Trim Title
            .withColumn("Book-Author", trim(col("Book-Author"))) # Trim Author
            .withColumn("ISBN", trim(col("ISBN"))) # Trim ISBN
            .withColumn("cleaned_ts", current_timestamp())
    )
# Quarantine table for problematic Books data
@dlt.table(name=f"{silver_schema}.books_quarantine", comment="Quarantine table for problematic Books data")
def books_quarantine():
    return (spark.table(f"{bronze_schema}.books_bronze")
            .filter((col("ISBN").isNull()) | (col("ISBN") == "") | # Missing ISBN or empty
                   (col("Book-Title").isNull()) | (col("Book-Title") == "") | # Missing Title
                   (col("Book-Author").isNull()) | (col("Book-Author") == "")) # Missing Author
            .withColumn("quarantine_reason", 
                       when((col("ISBN").isNull()) | (col("ISBN") == ""), "Missing ISBN") # Missing ISBN or empty
                       .when((col("Book-Title").isNull()) | (col("Book-Title") == ""), "Missing Title") # Missing Title
                       .when((col("Book-Author").isNull()) | (col("Book-Author") == ""), "Missing Author") # Missing Author
                       .otherwise("Multiple Issues")) # Multiple Issues
            .withColumn("quarantine_ts", current_timestamp()) # Add ingestion timestamp
    )



# ------------------------------------------------------ RATINGS ------------------------------------------------------
# Silver table for cleaned Ratings data
@dlt.table(name=f"{silver_schema}.ratings_silver", comment="Silver table for cleaned Ratings data")
@dlt.expect_or_drop("valid_user_id", "`User-ID` IS NOT NULL") # Missing User ID
@dlt.expect_or_drop("valid_isbn", "ISBN IS NOT NULL AND ISBN != ''") # Missing ISBN or empty
@dlt.expect_or_drop("valid_book_rating", "`Book-Rating` IS NOT NULL AND `Book-Rating` >= 0 AND `Book-Rating` <= 10") # Invalid Rating
def ratings_silver():
    return (spark.table(f"{bronze_schema}.ratings_bronze")
            .withColumn("Book-Rating", 
                       when(col("Book-Rating").cast("int").isNotNull(), # Cast to int
                            col("Book-Rating").cast("int"))
                       .otherwise(None)) # If not int, set to None
            .withColumn("ISBN", trim(col("ISBN"))) # Trim ISBN
            .withColumn("cleaned_ts", current_timestamp()) # Add ingestion timestamp
    )
# Quarantine table for problematic Ratings data
@dlt.table(name=f"{silver_schema}.ratings_quarantine", comment="Quarantine table for problematic Ratings data")
def ratings_quarantine():
    return (spark.table(f"{bronze_schema}.ratings_bronze")
            .filter(
                   (col("User-ID").isNull()) | # Missing User ID 
                   (col("ISBN").isNull()) | (col("ISBN") == "") | # Missing ISBN or empty
                   (col("Book-Rating").isNull()) | # Missing Rating   
                   (col("Book-Rating") < 0) | (col("Book-Rating") > 10) # Invalid Rating
                   )
            .withColumn("quarantine_reason", 
                       when(col("User-ID").isNull(), "Missing User ID") # Missing User ID
                       .when((col("ISBN").isNull()) | (col("ISBN") == ""), "Missing ISBN") # Missing ISBN or empty
                       .when(col("Book-Rating").isNull(), "Missing Rating") # Missing Rating
                       .when((col("Book-Rating") < 0) | (col("Book-Rating") > 10), "Invalid Rating") # Invalid Rating
                       .otherwise("Multiple Issues")) # Multiple Issues
            .withColumn("quarantine_ts", current_timestamp())
    )


# ------------------------------------------------------ USERS ------------------------------------------------------
# Silver table for cleaned Users data
@dlt.table(name=f"{silver_schema}.users_silver", comment="Silver table for cleaned Users data")
@dlt.expect_or_drop("valid_user_id", "`User-ID` IS NOT NULL")
def users_silver():
    return (spark.table(f"{bronze_schema}.users_bronze")
            .withColumn("Age", 
                       when(col("Age").cast("int").isNotNull(), # Cast to int
                            col("Age").cast("int"))
                       .otherwise(None)) # If not int, set to None 
            .withColumn("Location", 
                       when(col("Location").isNotNull() & (col("Location") != ""), 
                            trim(col("Location")))
                       .otherwise("Unknown")) # If not string, set to Unknown
            .withColumn("location_cleaned", 
                       when(col("Location") != "Unknown", # If not Unknown, replace / with ,
                            regexp_replace(col("Location"), r'(?<!n)/(?!a)', ", "))
                       .otherwise("Unknown")) # If not string, set to Unknown
            .withColumn("location_parts", 
                       when(col("location_cleaned") != "Unknown", # If not Unknown, split by ,
                            split(col("location_cleaned"), ","))
                       .otherwise(array(lit("Unknown")))) # If not string, set to Unknown
            .withColumn("city", 
                       when((size(col("location_parts")) >= 1) & 
                            (trim(element_at(col("location_parts"), 1)) != ""), 
                            initcap(trim(element_at(col("location_parts"), 1))))
                       .otherwise("Unknown")) # If not string, set to Unknown
            .withColumn("region", 
                       when((size(col("location_parts")) >= 2) & 
                            (trim(element_at(col("location_parts"), 2)) != ""), 
                            initcap(trim(element_at(col("location_parts"), 2))))
                       .otherwise("Unknown")) # If not string, set to Unknown
            .withColumn("state", 
                       when((size(col("location_parts")) >= 3) & 
                            (trim(element_at(col("location_parts"), 3)) != ""), 
                            initcap(trim(element_at(col("location_parts"), 3))))
                       .otherwise("Unknown")) # If not string, set to Unknown
            .drop("location_parts", "location_cleaned") # Drop temporary columns
            .withColumn("cleaned_ts", current_timestamp()) # Add ingestion timestamp
    )

# Quarantine table for problematic Users data
@dlt.table(name=f"{silver_schema}.users_quarantine", comment="Quarantine table for problematic Users data")
def users_quarantine():
    return (spark.table(f"{bronze_schema}.users_bronze")
            .filter(col("User-ID").isNull()) # Missing User ID
            .withColumn("quarantine_reason", lit("Missing User ID")) # Missing User ID
            .withColumn("quarantine_ts", current_timestamp()) # Add ingestion timestamp
    )