
# Silver Notebook - Transformation


## Read Bronze Data
- customer
- product
- transaction
- currency

In [0]:
# Define S3 bucket and database paths for Bronze and Silver layers
s3_bucket = "s3://r2de-bucket"
bronze_db_path = "r2de_project.bronze"
silver_db_path = "r2de_project.silver"

In [0]:
bronze_customer = spark.sql(f"SELECT * FROM {bronze_db_path}.customer;")
bronze_product = spark.sql(f"SELECT * FROM {bronze_db_path}.product;")
bronze_transaction = spark.sql(f"SELECT * FROM {bronze_db_path}.transaction;")
bronze_currency = spark.sql(f"SELECT * FROM {bronze_db_path}.currency;")


## Data Transformation

In [0]:
# Import Spark SQL functions module as F
from pyspark.sql import functions as F


### Clean Customer Bronze Data

In [0]:
# 1. Rename columns
silver_customer = (bronze_customer
    .withColumnRenamed("CustomerNo", "customer_id")
    .withColumnRenamed("Country", "country")
    .withColumnRenamed("Name", "name")
)

# 2. Remove Null Values
silver_customer = silver_customer.na.drop(subset=["customer_id"])

# 3. Cast columns
silver_customer = silver_customer.withColumn("customer_id", F.col("customer_id").cast("double").cast("int"))


### Clean Product Bronze Data

In [0]:
# 1. Rename columns
silver_product = (bronze_product
    .withColumnRenamed("ProductNo", "product_id")
    .withColumnRenamed("ProductName", "product_name")
)


### Clean Transaction Bronze Data

In [0]:
# 1. Rename columns
silver_transaction = (bronze_transaction
    .withColumnRenamed("TransactionNo", "transaction_id")
    .withColumnRenamed("Date", "date")
    .withColumnRenamed("ProductNo", "product_id")
    .withColumnRenamed("Price", "price")
    .withColumnRenamed("Quantity", "quantity")
    .withColumnRenamed("CustomerNo", "customer_id")
)

# 2. Cast columns
silver_transaction = (silver_transaction
    .withColumn("date", F.to_date(F.col("date"), "yyyy-MM-dd"))
    .withColumn("price", F.col("price").cast("double"))
    .withColumn("quantity", F.col("quantity").cast("int"))
    .withColumn("customer_id", F.col("customer_id").cast("double").cast("int"))
)


### Clean Currency Bronze Data

In [0]:
# 1. Cast columns
silver_currency = (bronze_currency
    .withColumn("date", F.to_date(F.col("date"), "yyyy-MM-dd"))
    .withColumn("gbp_thb", F.round(F.col("gbp_thb").cast("double"), 2))
)


## Ingest Silver Data into Silver Delta Tables

In [0]:
silver_customer.write.format("delta").mode("overwrite").option("path", f"{s3_bucket}/silver/customer").saveAsTable(f"{silver_db_path}.customer")
silver_product.write.format("delta").mode("overwrite").option("path", f"{s3_bucket}/silver/product").saveAsTable(f"{silver_db_path}.product")
silver_transaction.write.format("delta").mode("overwrite").option("path", f"{s3_bucket}/silver/transaction").saveAsTable(f"{silver_db_path}.transaction")
silver_currency.write.format("delta").mode("overwrite").option("path", f"{s3_bucket}/silver/currency").saveAsTable(f"{silver_db_path}.currency")

In [0]:
silver_customer = spark.sql(f"SELECT * FROM {silver_db_path}.customer;")
silver_product = spark.sql(f"SELECT * FROM {silver_db_path}.product;")
silver_transaction = spark.sql(f"SELECT * FROM {silver_db_path}.transaction;")
silver_currency = spark.sql(f"SELECT * FROM {silver_db_path}.currency;")

In [0]:
print("Customer Silver Data:")
display(silver_customer)
print("Product Silver Data:")
display(silver_product)
print("Transaction Silver Data:")
display(silver_transaction)
print("Currency Silver Data:")
display(silver_currency)