#Extracting the Data from the S3

In [0]:
dbutils.fs.unmount("/mnt/s3_mount")
aws_access_key = dbutils.widgets.text("AWS Access Key", "")
aws_secret_key = dbutils.widgets.text("AWS Secret Key", "")

# Getting the values through widgets
aws_access_key_value = dbutils.widgets.get("AWS Access Key")
aws_secret_key_value = dbutils.widgets.get("AWS Secret Key")


/mnt/s3_mount has been unmounted.


In [0]:
s3_bucket_name = "retail90"
mount_name = "s3_mount"
# Use the  access and secret key generated from the AWS account
dbutils.fs.mount(
    source=f"s3a://{s3_bucket_name}",
    mount_point=f"/mnt/{mount_name}",
    extra_configs={
        "fs.s3a.access.key": aws_access_key_value,
        "fs.s3a.secret.key": aws_secret_key_value
    }
)

Out[106]: True

In [0]:
display(dbutils.fs.ls("/mnt/%s" % mount_name))

path,name,size,modificationTime
dbfs:/mnt/s3_mount/Processed/,Processed/,0,0
dbfs:/mnt/s3_mount/Staging/,Staging/,0,0


# Transform the dataset 

In [0]:
spark

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

# Online Store Database Schema by manually defining the schema
online_store_schema= StructType([
    StructField("cust_id", IntegerType(), True),  
    StructField("date", DateType(), True),        
    StructField("age", IntegerType(), True),      
    StructField("gender", StringType(), True),    
    StructField("item", StringType(), True),      
    StructField("quantity", IntegerType(), True), 
    StructField("amount", FloatType(), True),     
    StructField("discount", StringType(), True),  
    StructField("rating", IntegerType(), True),   
    StructField("transaction_id", StringType(), True), 
    StructField("location", StringType(), True)   
])
# Reading the schema of csv format
online_store= spark.read.schema(online_store_schema).format("csv").option("header","true").load("dbfs:/mnt/s3_mount/Staging/online_store_schema.csv")

     

In [0]:
# Physical Store POS System Schema
pos_system_schema = StructType([
    StructField("cust_id", IntegerType(), True),  
    StructField("date", DateType(), True),        
    StructField("age", IntegerType(), True),      
    StructField("gender", StringType(), True),    
    StructField("item", StringType(), True),     
    StructField("quantity", IntegerType(), True), 
    StructField("amount", FloatType(), True),     
    StructField("discount", StringType(), True),  # Discount expecting string in percentage format
    StructField("rating", IntegerType(), True),   
    StructField("transaction_id", StringType(), True), 
    StructField("location", StringType(), True)   
])
pos_system= spark.read.schema(pos_system_schema).format("csv").option("header","true").load("dbfs:/mnt/s3_mount/Staging/pos_system_schema.csv")

In [0]:
# Loyalty Program Database Schema
loyalty_program_schema = StructType([
    StructField("cust_id", IntegerType(), True),  
    StructField("date", DateType(), True),        
    StructField("age", IntegerType(), True),      
    StructField("gender", StringType(), True),    
    StructField("item", StringType(), True),      
    StructField("quantity", IntegerType(), True),
    StructField("amount", FloatType(), True),     
    StructField("discount", StringType(), True),  
    StructField("rating", IntegerType(), True),   
    StructField("transaction_id", StringType(), True), 
    StructField("location", StringType(), True)   
])
loyalty_program= spark.read.schema(loyalty_program_schema).format("csv").option("header","true").load("dbfs:/mnt/s3_mount/Staging/loyalty_program_schema.csv")


In [0]:
loyalty_program.head(5)

Out[112]: [Row(cust_id=332, date=datetime.date(2023, 6, 26), age=39, gender='Female', item='Accessories', quantity=2, amount=3854.830078125, discount='90%', rating=2, transaction_id='be169d6b-f290-4831-852d-143c23a61526', location='California'),
 Row(cust_id=158, date=datetime.date(2023, 10, 13), age=120, gender='Female', item='Clothing', quantity=2, amount=10000.0, discount='-10%', rating=3, transaction_id='1a0408e7-cbab-45d7-a28c-77158f8ac4cd', location='N/A'),
 Row(cust_id=364, date=datetime.date(2023, 11, 5), age=120, gender='Female', item='Grocery', quantity=10, amount=10000.0, discount='-10%', rating=1, transaction_id='9b86d80d-10b8-4060-b580-d3a313ab42c4', location='Unknown'),
 Row(cust_id=340, date=datetime.date(2023, 5, 15), age=20, gender='Female', item='Accessories', quantity=1, amount=318.92999267578125, discount='54%', rating=5, transaction_id='33a70c3d-1da5-4563-a939-e190c6f1c3ad', location='Florida'),
 Row(cust_id=92, date=datetime.date(2023, 5, 6), age=37, gender='Male'

In [0]:


from pyspark.sql.functions import col, when, regexp_replace, count, isnan
from pyspark.sql import functions as F

# Transform DataFrame Function
def transformation(df):
    MANDATORY_COLS = ['cust_id', 'date', 'age', 'gender', 'item', 'quantity', 'amount', 'discount', 'rating', 'transaction_id', 'location']

    # Checking  if all the  mandatory columns exist
    print("Initial columns:", df.columns)
    existing_cols = [col for col in MANDATORY_COLS if col in df.columns]
    print("Columns being selected:", existing_cols)
    df = df.select(existing_cols)
    
    # Removing invalid locations
    invalid_locations = ['Unknown', 'N/A', 'Invalid Location']
    df = df.filter(~F.col('location').isin(invalid_locations))
    print("Rows after removing invalid locations:", df.count())

    # Calculate mode item (handle NoneType issues)
    mode_item_row = df.groupBy('item').count().orderBy(F.desc('count')).first()
    mode_item = mode_item_row['item'] if mode_item_row else None
    print(f"Mode item calculated: {mode_item}")

    #  Fill missing values
    default_values = {'quantity': 1, 'amount': 0, 'discount': 0, 'rating': 3}
    df = df.fillna(default_values)
    print("Rows after filling missing values:", df.count())

    #  Handle missing or invalid ages
    mean_age = df.select(F.mean('age')).first()[0] or 30  # Default to 30 if mean_age is None
    mean_age=round(mean_age)
    print(f"Mean age calculated: {mean_age}")

    df = df.withColumn('age', F.when(F.col('age').isNull(), mean_age).otherwise(F.col('age')))
    df = df.withColumn('age', F.when(F.col('age').between(18, 100), F.col('age')).otherwise(mean_age))

    # Replace missing 'item' with mode item if mode exists
    if mode_item:
        df = df.withColumn('item', F.when(F.col('item').isNull(), mode_item).otherwise(F.col('item')))
    
    # Normalize 'amount' and 'discount' fields by removing % symbol also adjusting values from 0 to 100
    df = df.withColumn('amount', when(col('amount') < 0, 0).otherwise(col('amount')))
    # In regex replacing the % symbol with blank and converting to float
    df = df.withColumn('discount', regexp_replace(col('discount'), r'%', '').cast('float'))
    df = df.withColumn('discount', when(col('discount') < 0, 0).when(col('discount') > 100, 100).otherwise(col('discount')))
    
    # Format 'date' column to correct date format
    df = df.withColumn('date', F.to_date(col('date'), 'yyyy-MM-dd'))

    print("Final row count:", df.count())
    return df

# Applying transformations on all 3 dataframe
transformed_online_store_df = transformation(online_store)
transformed_pos_system_df = transformation(pos_system)
transformed_loyalty_program_df = transformation(loyalty_program)



Initial columns: ['cust_id', 'date', 'age', 'gender', 'item', 'quantity', 'amount', 'discount', 'rating', 'transaction_id', 'location']
Columns being selected: ['cust_id', 'date', 'age', 'gender', 'item', 'quantity', 'amount', 'discount', 'rating', 'transaction_id', 'location']
Rows after removing invalid locations: 573
Mode item calculated: Clothing
Rows after filling missing values: 573
Mean age calculated: 46
Final row count: 573
Initial columns: ['cust_id', 'date', 'age', 'gender', 'item', 'quantity', 'amount', 'discount', 'rating', 'transaction_id', 'location']
Columns being selected: ['cust_id', 'date', 'age', 'gender', 'item', 'quantity', 'amount', 'discount', 'rating', 'transaction_id', 'location']
Rows after removing invalid locations: 748
Mode item calculated: Electronics
Rows after filling missing values: 748
Mean age calculated: 44
Final row count: 748
Initial columns: ['cust_id', 'date', 'age', 'gender', 'item', 'quantity', 'amount', 'discount', 'rating', 'transaction_id',

In [0]:
from pyspark.sql.functions import lit

# Adding a new column 'Source' with the respective value of the source obtained from 
transformed_online_store_df = transformed_online_store_df.withColumn("Source", lit("Online Store"))

transformed_pos_system_df = transformed_pos_system_df.withColumn("Source", lit("Physical store"))

transformed_loyalty_program_df = transformed_loyalty_program_df.withColumn("Source", lit("Loyalty Program"))


# Loading the data again to S3

In [0]:
combined_df = transformed_online_store_df.union(transformed_pos_system_df).union(transformed_loyalty_program_df)

combined_df_coalesced = combined_df.coalesce(1)

# Defining the S3 path where the combined parquet file will be stored
output_path = "dbfs:/mnt/s3_mount/Processed/"

# Write the combined DataFrame as a single Parquet file to S3
combined_df_coalesced.write.mode("overwrite").parquet(output_path)

print("Combined Parquet file successfully saved to S3.")


Combined Parquet file successfully saved to S3.
