In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
#MOUNTING CONTAINER
# configs = {"fs.azure.account.auth.type": "OAuth",
# "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
# "fs.azure.account.oauth2.client.id": "0f150395-7115-4690-8321-5ec532208f67",
# "fs.azure.account.oauth2.client.secret": 'ZLP8Q~4QQDQXhCgniLSK9mzxocuTcwxZ0K3JXc~k',
# "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/c3332291-99de-4e59-959a-e6d431b68bd7/oauth2/token"}

# dbutils.fs.mount(
# source = "abfss://dataset@storageaccount68x.dfs.core.windows.net/",
# mount_point = "/mnt/inputData",
# extra_configs = configs)

In [0]:
%fs
ls "/mnt/inputData/"

path,name,size,modificationTime
dbfs:/mnt/inputData/cleaned-data/,cleaned-data/,0,1731045331000
dbfs:/mnt/inputData/input/,input/,0,1730977215000
dbfs:/mnt/inputData/processed-data/,processed-data/,0,1731045341000


In [0]:
schema = StructType([
    StructField("Transaction_ID", StringType(), True),
    StructField("Customer_ID", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("Phone", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Zipcode", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Income", StringType(), True),
    StructField("Customer_Segment", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Month", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("Total_Purchases", DoubleType(), True),
    StructField("Amount", DoubleType(), True),
    StructField("Total_Amount", DoubleType(), True),
    StructField("Product_Category", StringType(), True),
    StructField("Product_Brand", StringType(), True),
    StructField("Product_Type", StringType(), True),
    StructField("Feedback", StringType(), True),
    StructField("Shipping_Method", StringType(), True),
    StructField("Payment_Method", StringType(), True),
    StructField("Order_Status", StringType(), True),
    StructField("Ratings", IntegerType(), True),
    StructField("products", StringType(), True)
])

In [0]:
rawData_df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("/mnt/inputData/input/retail_data.csv")

In [0]:
rawData_df.printSchema()

root
 |-- Transaction_ID: string (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Total_Purchases: double (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Total_Amount: double (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Brand: string (nullable = true)
 |-- Product_Type: string (nullable = true)
 |-- Feedback: string (nullable = 

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def raw_to_clean(df):
    # First handle the date-related transformations
    date_handled_df = df.withColumn(
        "Date",
        F.when(
            (F.col("Date").isNull()) & 
            (F.col("Month").isNotNull()) & 
            (F.col("Year").isNotNull()),
            F.concat(
                F.lit("01/"),  # Default to 1st of the month
                F.col("Month"), 
                F.lit("/"), 
                F.col("Year")
            )
        ).otherwise(F.col("Date"))
    )
    
    # Create a window spec for city-country mapping
    window_spec = Window.partitionBy("City") # City -> Country
    
    # Handle city-country mapping and all other transformations
    cleanData_df = date_handled_df \
        .withColumn("Date", F.to_date(F.col("Date"), "M/d/yyyy")) \
        .withColumn("Quarter", 
            F.when(F.lower(F.col("Month")).isin("january", "february", "march"), 1)
            .when(F.lower(F.col("Month")).isin("april", "may", "june"), 2)
            .when(F.lower(F.col("Month")).isin("july", "august", "september"), 3)
            .when(F.lower(F.col("Month")).isin("october", "november", "december"), 4)
        ) \
        .withColumn("Month", F.upper(F.col("Month"))) \
        .withColumn("City", F.initcap(F.col("City"))) \
        .withColumn("Total_Amount", F.round(F.col("Total_Amount"), 2)) \
        .withColumn(
            "Country",
            F.when(
                (F.col("City").isNotNull()) & (F.col("Country").isNull()),
                F.first(F.col("Country"), ignorenulls=True).over(window_spec)
            ).otherwise(F.col("Country"))
        ) \
        .dropDuplicates(["Transaction_ID"]) \
        .na.fill({
            "Total_Amount": 0,
            "Total_Purchases": 0,
            "Ratings": 0
        })
    
    # Apply all the filtering conditions
    final_df = cleanData_df \
        .filter(F.col("Total_Amount") >= 0) \
        .filter(F.col("Age") > 0) \
        .filter(F.col("Transaction_ID").isNotNull()) \
        .filter(F.col("City").isNotNull()) \
        .filter(F.col("Product_Category").isNotNull()) \
        .filter(F.col("Year").isNotNull()) \
        .filter(F.col("Quarter").isNotNull())
    
    # Log the transformations
    total_rows = df.count()
    valid_rows = final_df.count()
    
    # Detailed logging of dropped rows
    print("Data Quality Report:")
    print(f"Total initial rows: {total_rows}")
    print(f"Final rows: {valid_rows}")
    print(f"Total rows dropped: {total_rows - valid_rows}")
    
    return final_df

In [0]:
cleanData_df = raw_to_clean(rawData_df)

Data Quality Report:
Total initial rows: 302010
Final rows: 293165
Total rows dropped: 8845


In [0]:
display(cleanData_df)

Transaction_ID,Customer_ID,Name,Email,Phone,Address,City,State,Zipcode,Country,Age,Gender,Income,Customer_Segment,Date,Year,Month,Time,Total_Purchases,Amount,Total_Amount,Product_Category,Product_Brand,Product_Type,Feedback,Shipping_Method,Payment_Method,Order_Status,Ratings,products,Quarter
7270055,99974.0,Frances Church,Richard25@gmail.com,6650214356.0,6758 Powers Crossroad,Adelaide,New South Wales,11814.0,Australia,20,Male,Medium,New,,2023,MAY,6:24:44,8.0,235.3204747,1882.56,Electronics,Samsung,Television,Average,Same-Day,Cash,Delivered,2,OLED TV,2
2814443,93614.0,Charles Pratt,Bianca36@gmail.com,8918576649.0,20561 Brady Lodge,Adelaide,New South Wales,26317.0,Australia,26,Male,Low,Regular,2023-10-29,2023,JANUARY,2:54:15,1.0,113.3718382,113.37,Electronics,Whirepool,Fridge,Good,Same-Day,Credit Card,Delivered,3,Compact refrigerator,1
6462798,80146.0,Mr. Timothy Smith,Kristin11@gmail.com,3474606858.0,8728 Justin Canyon,Adelaide,New South Wales,95176.0,Australia,19,Male,High,Regular,,2023,JULY,3:58:49,2.0,178.3946963,356.79,Home Decor,IKEA,Lighting,Good,Same-Day,Cash,Pending,3,Desk lamps,3
7673539,79405.0,Justin Weiss,Sarah51@gmail.com,1234142332.0,12618 Reyes Village Suite 368,Adelaide,New South Wales,47123.0,Australia,57,Female,Medium,Premium,2023-11-17,2023,NOVEMBER,5:23:12,10.0,88.54288601,885.43,Books,HarperCollins,Fiction,Good,Same-Day,PayPal,Pending,4,Thriller,4
4966667,55212.0,Daniel Hunt,Briana67@gmail.com,3199875625.0,4850 Leah Valley,Adelaide,New South Wales,40599.0,Australia,58,Female,Medium,Regular,2023-09-19,2023,SEPTEMBER,3:41:24,6.0,132.6711697,796.03,Electronics,Apple,Tablet,Average,Express,Cash,Processing,2,Sony Xperia Tablet,3
5951982,95229.0,Dr. Erin Tran,Anna20@gmail.com,5044281967.0,2264 Fisher Spur,Adelaide,New South Wales,87745.0,Australia,56,Male,Low,Regular,2023-08-20,2023,AUGUST,20:55:25,5.0,190.2253459,951.13,Home Decor,Home Depot,Tools,Excellent,Express,PayPal,Shipped,4,Utility knife,3
1349958,60595.0,Teresa Logan,Erika85@gmail.com,7967515103.0,50805 Bond Mission,Adelaide,New South Wales,4536.0,Australia,21,Male,Medium,Regular,2023-12-17,2023,MAY,15:35:05,10.0,201.5590074,2015.59,Home Decor,Home Depot,Furniture,Excellent,Same-Day,Cash,Processing,5,Wardrobe,2
3963331,25351.0,Michelle Williams,John21@gmail.com,9976351815.0,90518 Reilly Fort Suite 629,Adelaide,New South Wales,90671.0,Australia,24,Male,Low,Premium,2023-11-18,2023,NOVEMBER,10:30:54,8.0,99.67863543,797.43,Home Decor,Bed Bath & Beyond,Kitchen,Good,Express,Cash,Shipped,3,Dishwasher,4
7441750,36491.0,Edward Moran,Michael53@gmail.com,8727888478.0,2271 Brown Street,Adelaide,New South Wales,10604.0,Australia,33,Female,Medium,New,,2023,AUGUST,20:40:43,8.0,107.0706797,856.57,Electronics,Samsung,Tablet,Good,Same-Day,Credit Card,Processing,3,Asus ZenPad,3
6400936,66973.0,Amanda Griffith,Kathleen8@gmail.com,6671978823.0,64602 Elizabeth Lodge,Adelaide,New South Wales,30113.0,Australia,53,Female,Medium,Regular,2023-12-26,2023,APRIL,15:05:13,10.0,429.2460803,4292.46,Clothing,Adidas,Shoes,Good,Express,Debit Card,Pending,4,Oxfords,2


In [0]:
cleanData_df.write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/inputData/cleaned-data/")