In [None]:
import logging
from pyspark.sql.functions import col, regexp_replace, lit
from pyspark.sql.types import DoubleType, IntegerType, StructType, StructField, StringType
from delta.tables import DeltaTable

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler()]
)

logger = logging.getLogger(__name__)

In [None]:
from pyspark.sql.functions import col, trim, upper, to_timestamp

bronze_df = spark.read.format("delta").load("/mnt/bronze/retail/invoices")

silver_df = (
    bronze_df
    # Cast types
    .withColumn("Quantity", col("Quantity").cast("int"))
    .withColumn("UnitPrice", col("UnitPrice").cast("double"))
    .withColumn("InvoiceDate", to_timestamp(col("InvoiceDate"), "MM/dd/yyyy HH:mm"))  
    # Clean text fields
    .withColumn("Description", trim(col("Description")))
    .withColumn("Country", upper(trim(col("Country"))))
    # Remove duplicates
    .dropDuplicates(["InvoiceNo", "StockCode", "CustomerID"])
    # Filter invalid data
    .filter(
        (col("InvoiceNo").isNotNull()) &
        (col("CustomerID").isNotNull()) &
        (col("Quantity") > 0) &
        (col("UnitPrice") > 0)
    )
)

silver_df.write.format("delta").mode("overwrite").save("/mnt/silver/retail/invoices")