In [0]:
dbutils.fs.ls("dbfs:/mnt/walmartbronze/")

In [0]:
dbutils.fs.ls("dbfs:/mnt/walmartbronze/Tables/test/")

In [0]:
# Define raw file paths
train_file = "dbfs:/mnt/walmartbronze/Tables/train/train.csv"
test_file = "dbfs:/mnt/walmartbronze/Tables/test/test.csv"
stores_file = "dbfs:/mnt/walmartbronze/Tables/stores/stores.csv"
features_file = "dbfs:/mnt/walmartbronze/Tables/features/features.csv"


In [0]:
# Step 3: Read raw data into DataFrames
train_df = spark.read.csv(train_file, header=True, inferSchema=True)
test_df = spark.read.csv(test_file, header=True, inferSchema=True)
stores_df = spark.read.csv(stores_file, header=True, inferSchema=True)
features_df = spark.read.csv(features_file, header=True, inferSchema=True)

In [0]:
# Step 4: Perform basic data validation (optional but recommended)
print("Record Counts:")
print(f"Train data: {train_df.count()} rows")
print(f"Test data: {test_df.count()} rows")
print(f"Stores data: {stores_df.count()} rows")
print(f"Features data: {features_df.count()} rows")

In [0]:
print("\nSchemas:")
train_df.printSchema()
test_df.printSchema()
stores_df.printSchema()
features_df.printSchema()

In [0]:
# Remove duplicates
train_df = train_df.dropDuplicates()
test_df = test_df.dropDuplicates()
stores_df = stores_df.dropDuplicates()
features_df = features_df.dropDuplicates()

In [0]:
# Handle missing values
# Fill missing values in Features dataset
features_df = features_df.fillna({
    "MarkDown1": 0,
    "MarkDown2": 0,
    "MarkDown3": 0,
    "MarkDown4": 0,
    "MarkDown5": 0,
    "CPI": features_df.select("CPI").dropna().agg({"CPI": "avg"}).collect()[0][0],
    "Unemployment": features_df.select("Unemployment").dropna().agg({"Unemployment": "avg"}).collect()[0][0]
})

# Handle missing `Size` in Stores dataset
stores_df = stores_df.fillna({"Size": stores_df.select("Size").dropna().agg({"Size": "median"}).collect()[0][0]})

# Handle missing sales in train dataset
train_df = train_df.na.fill({"Weekly_Sales": 0})

In [0]:
from pyspark.sql.functions import col, when, mean, stddev
# -- Train Dataset Cleaning --
# Remove negative sales (invalid data)
train_df = train_df.filter(col("Weekly_Sales") >= 0)

# Handle outliers in Weekly_Sales using Z-Score
sales_stats = train_df.select(mean("Weekly_Sales").alias("mean"), stddev("Weekly_Sales").alias("std")).collect()[0]
mean_sales, std_sales = sales_stats["mean"], sales_stats["std"]

train_df = train_df.withColumn(
    "z_score",
    (col("Weekly_Sales") - mean_sales) / std_sales
).filter(col("z_score").between(-3, 3)).drop("z_score")

# Convert Date column to a standard format
train_df = train_df.withColumn("Date", col("Date").cast("date"))

# -- Test Dataset Cleaning --
# Convert Date column to a standard format
test_df = test_df.withColumn("Date", col("Date").cast("date"))

In [0]:
from pyspark.sql.functions import col, when, upper, trim

# -- Stores Dataset Cleaning --

# Standardize store types (e.g., trim whitespace or fix inconsistent case)
stores_df = stores_df.withColumn("Type", trim(upper(col("Type"))))  # Use trim and upper from PySpark functions

# Handle outliers in Size (e.g., replace extreme sizes with median)
size_stats = stores_df.select(mean("Size").alias("mean"), stddev("Size").alias("std")).collect()[0]
median_size = stores_df.approxQuantile("Size", [0.5], 0)[0]

stores_df = stores_df.withColumn(
    "Size",
    when(col("Size") < 0, median_size).otherwise(col("Size"))
)

# -- Features Dataset Cleaning --

# Convert Date column to a standard format
features_df = features_df.withColumn("Date", col("Date").cast("date"))

# Handle outliers in numerical columns (MarkDown1-5, CPI, Unemployment)
numerical_columns = ["MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4", "MarkDown5", "CPI", "Unemployment"]

for col_name in numerical_columns:
    stats = features_df.select(mean(col_name).alias("mean"), stddev(col_name).alias("std")).collect()[0]
    mean_value, std_value = stats["mean"], stats["std"]
    
    features_df = features_df.withColumn(
        col_name,
        when(
            col(col_name).between(mean_value - 3 * std_value, mean_value + 3 * std_value), col(col_name)
        ).otherwise(mean_value)
    )

# Fill missing MarkDown values with 0 (if still missing after outlier handling)
features_df = features_df.fillna({col_name: 0 for col_name in numerical_columns})


**Writing All DF to Silver Container**

In [0]:
dbutils.fs.ls("/mnt/walmartsilver/")

In [0]:
base_path = "/mnt/walmartsilver/"

# Function to overwrite Delta files cleanly
def write_clean_delta(df, folder_name):
    path = f"{base_path}{folder_name}"
    # Remove the contents of the directory to ensure a clean overwrite
    files = dbutils.fs.ls(path)
    for file in files:
        dbutils.fs.rm(file.path, True)
    # Write the DataFrame in Delta format
    df.write.format("delta").mode("overwrite").save(path)

# Write each DataFrame to its respective folder
write_clean_delta(features_df, "Features Silver")
write_clean_delta(train_df, "Train Silver")
write_clean_delta(test_df, "Test Silver")
write_clean_delta(stores_df, "Stores Silver")