In [1]:
import json
with open("../config/metadata.json", "r") as f:
    config = json.load(f)

In [0]:
from pyspark.sql.functions import trim, col, regexp_extract

In [None]:
# Configuration
catalog = config["catalog"]
bronze_schema = config["bronze_schema"]
silver_schema = config["silver_schema"]
email_regex = r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'

In [0]:
# Drop schema
# spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{silver_schema} CASCADE")

In [0]:
# Create schema
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{silver_schema}")

In [0]:
# cpg_consumer
df_raw = spark.table(f"{catalog}.{bronze_schema}.cpg_consumer")
string_cols = ["name", "city", "state", "country", "gender", "email"]
for c in string_cols:
    df_raw = df_raw.withColumn(c, trim(col(c)))

df_raw = df_raw.withColumn("email", regexp_extract(col("email"), email_regex, 0))
df_raw = df_raw.filter(
    "consumer_id IS NOT NULL AND registration_date IS NOT NULL AND age >= 18 AND email IS NOT NULL AND email != ''"
)
df_raw = df_raw.drop("phone", "address")
df_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{silver_schema}.cpg_consumer")

In [0]:
# cpg_consumer_invoice
df_raw = spark.table(f"{catalog}.{bronze_schema}.cpg_consumer_invoice")
string_cols = ["invoice_status", "payment_method"]
for c in string_cols:
    df_raw = df_raw.withColumn(c, trim(col(c)))
df_raw = df_raw.filter("invoice_id IS NOT NULL AND consumer_id IS NOT NULL AND gross_amount >= 0 AND net_amount >= 0")
df_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{silver_schema}.cpg_consumer_invoice")

In [0]:
# cpg_consumer_order
df_raw = spark.table(f"{catalog}.{bronze_schema}.cpg_consumer_order")
string_cols = ["order_status", "currency", "payment_method", "channel"]
for c in string_cols:
    df_raw = df_raw.withColumn(c, trim(col(c)))
df_raw = df_raw.filter("order_id IS NOT NULL AND consumer_id IS NOT NULL AND total_amount >= 0 AND order_date IS NOT NULL")
df_raw = df_raw.drop("billing_address", "shipping_address")
df_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{silver_schema}.cpg_consumer_order")

In [0]:
# cpg_consumer_order_items
df_raw = spark.table(f"{catalog}.{bronze_schema}.cpg_consumer_order_items")
df_raw = df_raw.filter("order_item_id IS NOT NULL AND order_id IS NOT NULL AND product_id IS NOT NULL AND quantity > 0 AND unit_price > 0 AND total_price > 0")
df_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{silver_schema}.cpg_consumer_order_items")

In [0]:
# cpg_distributor
df_raw = spark.table(f"{catalog}.{bronze_schema}.cpg_distributor")
string_cols = ["distributor_name", "company_name", "city", "state", "country", "email"]
for c in string_cols:
    df_raw = df_raw.withColumn(c, trim(col(c)))

df_raw = df_raw.withColumn("email", regexp_extract(col("email"), email_regex, 0))
df_raw = df_raw.filter(
    "distributor_id IS NOT NULL AND registration_date IS NOT NULL AND email IS NOT NULL AND email != ''"
)
df_raw = df_raw.drop("no_of_associated_deals", "total_open_deal_value", "phone", "address")
df_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{silver_schema}.cpg_distributor")

In [0]:
# cpg_distributor_invoice
df_raw = spark.table(f"{catalog}.{bronze_schema}.cpg_distributor_invoice")
string_cols = ["payment_status"]
for c in string_cols:
    df_raw = df_raw.withColumn(c, trim(col(c)))
df_raw = df_raw.filter("invoice_id IS NOT NULL AND purchase_id IS NOT NULL AND amount_due >= 0 AND total_payable >= 0")
df_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{silver_schema}.cpg_distributor_invoice")

In [0]:
# cpg_distributor_purchase_items
df_raw = spark.table(f"{catalog}.{bronze_schema}.cpg_distributor_purchase_items")
df_raw = df_raw.filter("purchase_item_id IS NOT NULL AND purchase_id IS NOT NULL AND product_id IS NOT NULL AND quantity_ordered > 0 AND unit_cost > 0 AND total_price > 0")
df_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{silver_schema}.cpg_distributor_purchase_items")

In [0]:
# cpg_distributor_purchases
df_raw = spark.table(f"{catalog}.{bronze_schema}.cpg_distributor_purchases")
string_cols = ["order_status", "currency"]
for c in string_cols:
    df_raw = df_raw.withColumn(c, trim(col(c)))
df_raw = df_raw.filter("purchase_id IS NOT NULL AND distributor_id IS NOT NULL AND total_amount >= 0 AND order_date IS NOT NULL")
df_raw = df_raw.drop("expected_delivery_date")
df_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{silver_schema}.cpg_distributor_purchases")

In [0]:
# cpg_inventory
df_raw = spark.table(f"{catalog}.{bronze_schema}.cpg_inventory")
string_cols = ["location_type", "location_name", "location_code", "city", "state", "country", "inventory_status", "last_updated"]
for c in string_cols:
    df_raw = df_raw.withColumn(c, trim(col(c)))
df_raw = df_raw.filter("inventory_id IS NOT NULL AND product_id IS NOT NULL AND quantity_on_hand >= 0 AND reorder_level >= 0")
df_raw = df_raw.drop("address", "phone", "email")
df_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{silver_schema}.cpg_inventory")

In [0]:
# cpg_product
df_raw = spark.table(f"{catalog}.{bronze_schema}.cpg_product")
string_cols = ["product_name", "brand", "manufacturer", "category", "department", "description", "sku_id", "unit_of_measurement", "product_status"]
for c in string_cols:
    df_raw = df_raw.withColumn(c, trim(col(c)))
df_raw = df_raw.filter("product_id IS NOT NULL AND unit_price > 0 AND retail_price > 0")
df_raw = df_raw.drop("upc", "gtin", "expiration_days")
df_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{silver_schema}.cpg_product")