In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
import pandas as pd
from datetime import datetime
import re


### reading the files from bronze

In [0]:
bronze_path = "abfss://bronze@migrationstoragerinith.dfs.core.windows.net/second_method/"
silver_path = "abfss://silver@migrationstoragerinith.dfs.core.windows.net/second_method/"

paths = {
    "products": bronze_path + "products",
    "customers": bronze_path + "customers",
    "dates": bronze_path + "dates",
    "orders": bronze_path + "orders"
}

In [0]:


def clean_colnames(df):
    rename_dict = {col: re.sub(r"[^\w]", "_", col.strip().lower()) for col in df.columns}
    for old_col, new_col in rename_dict.items():
        df = df.withColumnRenamed(old_col, new_col)
    return df



# Clean and standardize string columns
def clean_string_columns(df):
    for col_name, dtype in df.dtypes:
        if dtype == 'string':
            df = df.withColumn(col_name, regexp_replace(trim(lower(col(col_name))), r'[^a-zA-Z0-9\s]', ''))
    return df

# Replace nulls in selected columns
def fill_missing(df, fill_dict):
    return df.fillna(fill_dict)

# Remove duplicates based on key columns
def remove_duplicates(df, subset_cols):
    return df.dropDuplicates(subset=subset_cols)


#### products table cleaning

In [0]:
df_products = spark.read.format("csv").option("inferschema",True).option("header", True).load(paths["products"])

# Standardize column names
df_products = clean_colnames(df_products)

# Drop rows with null primary key
df_products = df_products.dropna(subset=["product_id"])

# Cast to proper types
df_products = df_products \
    .withColumn("product_id", col("product_id").cast("int")) \
    .withColumn("price", col("price").cast("float")) \
    .withColumn("weight_lbs", col("weight_lbs").cast("float")) \
    .withColumn("in_stock", col("in_stock").cast("boolean"))

# Fill missing brand with default
df_products = fill_missing(df_products, {"brand": "Unknown"})

# Clean all string columns
df_products = clean_string_columns(df_products)

# Cap unreasonable price outliers
df_products = df_products.withColumn("price", when(col("price") > 5000.0, 5000.0).otherwise(col("price")))

# ------------------------
# Final Select and Save
# ------------------------

df_products = df_products.select(
    "product_id",
    "product_name",
    "category",
    "brand",
    "price",
    "sku",
    "weight_lbs",
    "dimensions",
    "in_stock"
).dropDuplicates(["product_id"])

df_products.write.format("delta").mode("append").save(silver_path + "dim_products")


In [0]:
df_products.printSchema()


#### After tranfromation move the bronze file to archive

In [0]:
today = datetime.now().strftime('%Y-%m-%d')

base_path = "abfss://bronze@migrationstoragerinith.dfs.core.windows.net/second_method/products"
archive_path = f"{base_path}/archive/{today}"

files = [f.path for f in dbutils.fs.ls(base_path) if not f.isDir() and "archive/" not in f.path]

# Move each file to archive
for file in files:
    filename = file.split("/")[-1]
    destination = f"{archive_path}/{filename}"
    dbutils.fs.mv(file, destination)
    print(f"Moved: {filename} → {destination}")


#### customers table cleaning

In [0]:
df_customers = spark.read.option("header", True).csv(paths["customers"])
df_customers.printSchema()
df_customers.display()


In [0]:
df_customers = spark.read.option("header", True).csv(paths["customers"])

df_customers = clean_colnames(df_customers)

# Drop records missing customer_id
df_customers = df_customers.filter(col("customer_id").rlike("^[0-9]+$"))

df_customers = df_customers.dropna(subset=["customer_id"])

# Convert types
df_customers = df_customers \
    .withColumn("customer_id", col("customer_id").cast("int")) \
    .withColumn("zip_code", col("zip_code").cast("int")) \
    .withColumn("age", col("age").cast("int")) \
    .withColumn("annual_income", col("annual_income").cast("float")) \
    .withColumn("customer_since", to_date(col("customer_since"), "yyyy-MM-dd"))

gender_map = {'m': 'Male', 'male': 'Male', 'f': 'Female', 'female': 'Female'}
df_customers = df_customers.withColumn("gender", lower(col("gender")))
df_customers = df_customers.replace(gender_map, subset=["gender"])

df_customers = df_customers.withColumn("age", when((col("age") < 18) | (col("age") > 90), None).otherwise(col("age")))



df_customers=df_customers.withColumn("annual_income", col("annual_income").cast("float"))

df_customers = df_customers.withColumn("annual_income", when(col("annual_income") < 10000.0, 10000)
                                                       .when(col("annual_income") > 50000.0, 500000)
                                                       .otherwise(col("annual_income")))

df_customers = remove_duplicates(df_customers, ["first_name", "last_name", "email"])

df_customers = fill_missing(df_customers, {"phone": "Not Provided"})

df_customers = clean_string_columns(df_customers)

df_customers.write.format("delta").mode("append").save(silver_path + "dim_customers")




#### after the tranformation we wrtiting the bronze file to archive


In [0]:
today = datetime.now().strftime('%Y-%m-%d')

base_path = "abfss://bronze@migrationstoragerinith.dfs.core.windows.net/second_method/customers"
archive_path = f"{base_path}/archive/{today}"

files = [f.path for f in dbutils.fs.ls(base_path) if not f.isDir() and "archive/" not in f.path]

# Move each file to archive
for file in files:
    filename = file.split("/")[-1]
    destination = f"{archive_path}/{filename}"
    dbutils.fs.mv(file, destination)
    print(f"Moved: {filename} → {destination}")


#### tables table cleaning

In [0]:
df_dates = spark.read.option("header", True).csv(paths["dates"])
df_dates = clean_colnames(df_dates)

df_dates = df_dates.withColumn("date_id", col("date_id").cast("int")) \
                   .withColumn("date", to_date(col("date"), "yyyy-MM-dd")) \
                   .withColumn("year", col("year").cast("int")) \
                   .withColumn("month", col("month").cast("int")) \
                   .withColumn("week", col("week").cast("int")) \
                   .withColumn("day", col("day").cast("int")) \
                   .withColumn("day_of_week", col("day_of_week").cast("int")) \
                   .withColumn("is_weekend", col("is_weekend").cast("boolean")) \
                   .withColumn("is_holiday", col("is_holiday").cast("boolean"))

# Clean strings
df_dates = clean_string_columns(df_dates)

# Save cleaned data
df_dates.write.format("delta").mode("append").save(silver_path + "dim_dates")





#### after the tranformation we wrtiting the bronze file to archive


In [0]:
today = datetime.now().strftime('%Y-%m-%d')

base_path = "abfss://bronze@migrationstoragerinith.dfs.core.windows.net/second_method/dates"
archive_path = f"{base_path}/archive/{today}"

files = [f.path for f in dbutils.fs.ls(base_path) if not f.isDir() and "archive/" not in f.path]

for file in files:
    filename = file.split("/")[-1]
    destination = f"{archive_path}/{filename}"
    dbutils.fs.mv(file, destination)
    print(f"Moved: {filename} → {destination}")


#### orders table cleaning

In [0]:
df_orders = spark.read.option("header", True).csv(paths["orders"])
df_orders = clean_colnames(df_orders)

df_orders = df_orders.dropna(subset=["order_id", "customer_id", "product_id"])

conversion_types = {
    "order_id": "int", "customer_id": "int", "product_id": "int", "date_id": "int", "store_id": "int",
    "quantity": "int", "unit_price": "float", "line_total": "float", "order_subtotal": "float",
    "discount_percent": "float", "discount_amount": "float", "tax_rate": "float", "tax_amount": "float",
    "shipping_cost": "float", "total_amount": "float"
}
for col_name, dtype in conversion_types.items():
    df_orders = df_orders.withColumn(col_name, col(col_name).cast(dtype))

df_orders = df_orders.withColumn("payment_method", lower(trim(col("payment_method")))) \
                     .withColumn("order_status", lower(trim(col("order_status"))))

payment_map = {
    "credit card": "Credit Card", "cash": "Cash", "paypal": "PayPal", "gift card": "Gift Card", "debit card": "Debit Card", "debit": "Debit"
}
status_map = {
    "completed": "Completed", "shipped": "Shipped", "cancelled": "Cancelled", "returned": "Returned", "processing": "Processing"
}
df_orders = df_orders.replace(payment_map, subset=["payment_method"])
df_orders = df_orders.replace(status_map, subset=["order_status"])

df_orders = df_orders.withColumn("quantity", when((col("quantity") <= 0) | (col("quantity") > 100), None).otherwise(col("quantity")))

df_orders = clean_string_columns(df_orders)

df_orders.write.format("delta").mode("append").save(silver_path + "fact_orders")



In [0]:
today = datetime.now().strftime('%Y-%m-%d')

base_path = "abfss://bronze@migrationstoragerinith.dfs.core.windows.net/second_method/orders"
archive_path = f"{base_path}/archive/{today}"

files = [f.path for f in dbutils.fs.ls(base_path) if not f.isDir() and "archive/" not in f.path]

# Move each file to archive
for file in files:
    filename = file.split("/")[-1]
    destination = f"{archive_path}/{filename}"
    dbutils.fs.mv(file, destination)
    print(f"Moved: {filename} → {destination}")
