In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import col, trim
from pyspark.sql.window import Window

sales _ details

In [0]:
# Read from Bronze
sales_details = spark.read.parquet(
    "abfss://bronze@fordatabricks111.dfs.core.windows.net/sales_details/"
)

# Write as Delta into Silver storage path
(
    sales_details
    .write
    .format("delta")
    .mode("overwrite")   # first load
    .option("overwriteSchema", "true")
    .save("abfss://silver@fordatabricks111.dfs.core.windows.net/sales_details/")
)

# Register Delta table in Silver schema
spark.sql("""
    CREATE TABLE IF NOT EXISTS silver.sales_details
    USING DELTA
    LOCATION 'abfss://silver@fordatabricks111.dfs.core.windows.net/sales_details/'
""")


In [0]:
Sales_trans=spark.read.format("delta").table("silver.sales_details")

RENAME_MAP = {
    "sls_ord_num": "order_number",
    "sls_prd_key": "product_number",
    "sls_cust_id": "customer_id",
    "sls_order_dt": "order_date",
    "sls_ship_dt": "ship_date",
    "sls_due_dt": "due_date",
    "sls_sales": "sales_amount",
    "sls_quantity": "quantity",
    "sls_price": "price"
}
for old_name, new_name in RENAME_MAP.items():
    Sales_trans = Sales_trans.withColumnRenamed(old_name, new_name)

Sales_trans.write.format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("databrickswp.silver.sales_details")


prd_info

In [0]:
# Read from Bronze
prd_info = spark.read.parquet(
    "abfss://bronze@fordatabricks111.dfs.core.windows.net/prd_info/"
)

# Write as Delta into Silver storage path
(
    prd_info
    .write
    .format("delta")
    .mode("overwrite")   # first load
    .option("overwriteSchema", "true")
    .save("abfss://silver@fordatabricks111.dfs.core.windows.net/prd_info/")
)

# Register Delta table in Silver schema
spark.sql("""
    CREATE TABLE IF NOT EXISTS silver.prd_info
    USING DELTA
    LOCATION 'abfss://silver@fordatabricks111.dfs.core.windows.net/prd_info/'
""")


In [0]:

prod_trans=spark.read.format("delta").table("silver.prd_info")

for field in prod_trans.schema.fields:
    if isinstance(field.dataType, StringType):
        prod_trans = prod_trans.withColumn(field.name, trim(col(field.name)))



prod_trans = prod_trans.withColumn("cat_id", F.regexp_replace(F.substring(col("prd_key"), 1, 5), "-", "_"))
prod_trans = prod_trans.withColumn("prd_key", F.substring(col("prd_key"), 7, F.length(col("prd_key"))))

prod_trans = prod_trans.withColumn("prd_cost", F.coalesce(col("prd_cost"), F.lit(0)))

prod_trans = (
    prod_trans
    # Normalize product line
    .withColumn(
        "prd_line",
        F.when(F.upper(col("prd_line")) == "M", "Mountain")
         .when(F.upper(col("prd_line")) == "R", "Road")
         .when(F.upper(col("prd_line")) == "S", "Other Sales")
         .when(F.upper(col("prd_line")) == "T", "Touring")
         .otherwise("n/a")
    )
)


RENAME_MAP = {
    "prd_id": "product_id",
    "cat_id": "category_id",
    "prd_key": "product_number",
    "prd_nm": "product_name",
    "prd_cost": "product_cost",
    "prd_line": "product_line",
    "prd_start_dt": "start_date",
    "prd_end_dt": "end_date"
}
for old_name, new_name in RENAME_MAP.items():
    prod_trans = prod_trans.withColumnRenamed(old_name, new_name)


prod_trans.write.format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("databrickswp.silver.prd_info")


cust_info

In [0]:
# Read from Bronze
cust_info = spark.read.parquet(
    "abfss://bronze@fordatabricks111.dfs.core.windows.net/cust_info/"
)

# Write as Delta into Silver storage path
(
    cust_info
    .write
    .format("delta")
    .mode("overwrite")   # first load
    .option("overwriteSchema", "true")
    .save("abfss://silver@fordatabricks111.dfs.core.windows.net/cust_info/")
)

# Register Delta table in Silver schema
spark.sql("""
    CREATE TABLE IF NOT EXISTS silver.cust_info
    USING DELTA
    LOCATION 'abfss://silver@fordatabricks111.dfs.core.windows.net/cust_info/'
""")


In [0]:
cust_trans=spark.read.format("delta").table("silver.cust_info")


cust_trans = (
    cust_trans
    .withColumn(
        "cst_marital_status",
        F.when(F.upper(F.col("cst_marital_status")) == "S", "Single")
         .when(F.upper(F.col("cst_marital_status")) == "M", "Married")
         .otherwise("n/a")
    )
    .withColumn(
        "cst_gndr",
        F.when(F.upper(F.col("cst_gndr")) == "F", "Female")
         .when(F.upper(F.col("cst_gndr")) == "M", "Male")
         .otherwise("n/a")
    )
)

RENAME_MAP = {
    "cst_id": "customer_id",
    "cst_key": "customer_number",
    "cst_firstname": "first_name",
    "cst_lastname": "last_name",
    "cst_marital_status": "marital_status",
    "cst_gndr": "gender",
    "cst_create_date": "created_date"
}
for old_name, new_name in RENAME_MAP.items():
    cust_trans = cust_trans.withColumnRenamed(old_name, new_name)


cust_trans.write.format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("databrickswp.silver.cust_info")



PX_CAT_G1V2

In [0]:
# Read from Bronze
PX_CAT_G1V2 = spark.read.parquet(
    "abfss://bronze@fordatabricks111.dfs.core.windows.net/PX_CAT_G1V2/"
)

# Write as Delta into Silver storage path
(
    PX_CAT_G1V2
    .write
    .format("delta")
    .mode("overwrite")   # first load
    .option("overwriteSchema", "true")
    .save("abfss://silver@fordatabricks111.dfs.core.windows.net/PX_CAT_G1V2/")
)

# Register Delta table in Silver schema
spark.sql("""
    CREATE TABLE IF NOT EXISTS silver.PX_CAT_G1V2
    USING DELTA
    LOCATION 'abfss://silver@fordatabricks111.dfs.core.windows.net/PX_CAT_G1V2/'
""")


In [0]:
PX_trans=spark.read.format("delta").table("databrickswp.silver.px_cat_g1v2")


PX_trans = PX_trans.withColumn(
    "maintenance",
    F.when(F.upper(col("maintenance")) == "YES", F.lit(True))
     .when(F.upper(col("maintenance")) == "NO", F.lit(False))
     .otherwise(None)
)

RENAME_MAP = {
    "id": "category_id",
    "cat": "category",
    "subcat": "subcategory",
    "maintenance": "maintenance_flag"
}
for old_name, new_name in RENAME_MAP.items():
    PX_trans = PX_trans.withColumnRenamed(old_name, new_name)


PX_trans.write.format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("databrickswp.silver.px_cat_g1v2")



LOC_A101

In [0]:
# Read from Bronze
LOC_A101 = spark.read.parquet(
    "abfss://bronze@fordatabricks111.dfs.core.windows.net/LOC_A101/"
)

# Write as Delta into Silver storage path
(
    LOC_A101
    .write
    .format("delta")
    .mode("overwrite")   # first load
    .option("overwriteSchema", "true")
    .save("abfss://silver@fordatabricks111.dfs.core.windows.net/LOC_A101/")
)

# Register Delta table in Silver schema
spark.sql("""
    CREATE TABLE IF NOT EXISTS silver.LOC_A101
    USING DELTA
    LOCATION 'abfss://silver@fordatabricks111.dfs.core.windows.net/LOC_A101/'
""")


In [0]:
LOC_A101=spark.read.format("delta").table("silver.loc_a101")


LOC_A101 = LOC_A101.withColumn("cid", F.regexp_replace(col("cid"), "-", ""))


LOC_A101 = LOC_A101.withColumn(
    "cntry",
    F.when(col("cntry") == "DE", "Germany")
     .when(col("cntry").isin("US", "USA"), "United States")
     .when((col("cntry") == "") | col("cntry").isNull(), "n/a")
     .otherwise(col("cntry"))
)

RENAME_MAP = {
    "cid": "customer_number",
    "cntry": "country"
}
for old_name, new_name in RENAME_MAP.items():
    LOC_A101 = LOC_A101.withColumnRenamed(old_name, new_name)



LOC_A101.write.format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("databrickswp.silver.loc_a101")


CUST_AZ12

In [0]:
# Read from Bronze
CUST_AZ12 = spark.read.parquet(
    "abfss://bronze@fordatabricks111.dfs.core.windows.net/CUST_AZ12/"
)

# Write as Delta into Silver storage path
(
    CUST_AZ12
    .write
    .format("delta")
    .mode("overwrite")   # first load
    .option("overwriteSchema", "true")
    .save("abfss://silver@fordatabricks111.dfs.core.windows.net/CUST_AZ12/")
)

# Register Delta table in Silver schema
spark.sql("""
    CREATE TABLE IF NOT EXISTS silver.CUST_AZ12
    USING DELTA
    LOCATION 'abfss://silver@fordatabricks111.dfs.core.windows.net/CUST_AZ12/'
""")


In [0]:
cust_az12_trans=spark.read.format("delta").table("silver.cust_az12")


cust_az12_trans = cust_az12_trans.withColumn(
    "cid",
    F.when(col("cid").startswith("NAS"),
           F.substring(col("cid"), 4, F.length(col("cid"))))
     .otherwise(col("cid"))
)


cust_az12_trans = cust_az12_trans.withColumn(
    "bdate",
    F.when(col("bdate") > F.current_date(), None)
     .otherwise(col("bdate"))
)



cust_az12_trans = cust_az12_trans.withColumn(
    "gen",
    F.when(F.upper(col("gen")).isin("F", "FEMALE"), "Female")
     .when(F.upper(col("gen")).isin("M", "MALE"), "Male")
     .otherwise("n/a")
)


RENAME_MAP = {
    "cid": "customer_number",
    "bdate": "birth_date",
    "gen": "gender"
}
for old_name, new_name in RENAME_MAP.items():
    cust_az12_trans = cust_az12_trans.withColumnRenamed(old_name, new_name)


cust_az12_trans.write.format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("databrickswp.silver.cust_az12")