In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder.appName("Ecom-DataPipeline").getOrCreate()  

### Accessing the data from bronze Layer

In [0]:
userDf=spark.read.format("delta")\
    .load("/mnt/delta/tables/bronze/users")

### Transformations on Users Data

In [0]:
#changing the countryCode to uppercase
userDf=userDf.withColumn("countryCode", upper(col("countryCode")))

#changing the language to English, French or Non-English
userDf=userDf.withColumn("language", expr("CASE WHEN language='en' THEN 'English'" +
                                          "WHEN language='fr' THEN 'French'" +
                                          "ELSE 'Non-English' END"))

#changing the gender from M,F to Female, Male or Other
userDf=userDf.withColumn("gender",expr("CASE WHEN gender='F' THEN 'Female'" +
                                       "WHEN gender='M' THEN 'Male'" +
                                       "ELSE 'Other' END"))

#changing the civilityTitle to uppercase
userDf=userDf.withColumn("civilityTitle",upper(col("civilityTitle")))

#changing the daysSinceLastLogin to monthsSinceLastLogin
userDf=userDf.withColumn("monthsSinceLastLogin",col("daysSinceLastLogin")/12)

#Add flags (0/1) to indicate specific conditions for further analysis:
# Creating a flag for whether the user has any app installed
userDf = userDf.withColumn("hasAnyAppFlag", when(col("hasAnyApp") == "TRUE", 1).otherwise(0))

# Creating a flag for active users (based on daysSinceLastLogin <= 30)
userDf = userDf.withColumn("isActiveUser", when(col("daysSinceLastLogin") <= 30, 1).otherwise(0))


#Create bins for user seniority to categorize users into levels:
userDf = userDf.withColumn("seniorityCategory", expr(
    "CASE WHEN seniorityAsYears < 1 THEN 'New User' " +
    "WHEN seniorityAsYears BETWEEN 1 AND 3 THEN 'Intermediate' " +
    "WHEN seniorityAsYears BETWEEN 4 AND 6 THEN 'Experienced' " +
    "ELSE 'Veteran' END"))


#Create a new column to combine social interactions into a single score:
userDf = userDf.withColumn("socialEngagementScore", 
    col("socialNbFollowers") + col("socialNbFollows") + col("socialProductsLiked"))


#Convert columns like hasAndroidApp, hasIosApp, and hasProfilePicture to 0/1 for better analysis
userDf = userDf.withColumn("hasAndroidApp", when(col("hasAndroidApp") == "TRUE", 1).otherwise(0))
userDf = userDf.withColumn("hasIosApp", when(col("hasIosApp") == "TRUE", 1).otherwise(0))
userDf = userDf.withColumn("hasProfilePicture", when(col("hasProfilePicture") == "TRUE", 1).otherwise(0))


In [0]:
userDf.printSchema()

root
 |-- identifierHash: string (nullable = true)
 |-- type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- language: string (nullable = false)
 |-- socialNbFollowers: string (nullable = true)
 |-- socialNbFollows: string (nullable = true)
 |-- socialProductsLiked: string (nullable = true)
 |-- productsListed: string (nullable = true)
 |-- productsSold: string (nullable = true)
 |-- productsPassRate: string (nullable = true)
 |-- productsWished: string (nullable = true)
 |-- productsBought: string (nullable = true)
 |-- gender: string (nullable = false)
 |-- civilityGenderId: string (nullable = true)
 |-- civilityTitle: string (nullable = true)
 |-- hasAnyApp: string (nullable = true)
 |-- hasAndroidApp: integer (nullable = false)
 |-- hasIosApp: integer (nullable = false)
 |-- hasProfilePicture: integer (nullable = false)
 |-- daysSinceLastLogin: string (nullable = true)
 |-- seniority: string (nullable = true)
 |-- seniorityAsMonths: string (nullable = true)
 |

In [0]:
# chnaging data types for better analytics


# Convert datatypes
userDf = userDf.withColumn("identifierHash", col("identifierHash").cast("long")) \
               .withColumn("socialNbFollowers", col("socialNbFollowers").cast("int")) \
               .withColumn("socialNbFollows", col("socialNbFollows").cast("int")) \
               .withColumn("socialProductsLiked", col("socialProductsLiked").cast("int")) \
               .withColumn("productsListed", col("productsListed").cast("int")) \
               .withColumn("productsSold", col("productsSold").cast("int")) \
               .withColumn("productsPassRate", col("productsPassRate").cast("double")) \
               .withColumn("productsWished", col("productsWished").cast("int")) \
               .withColumn("productsBought", col("productsBought").cast("int")) \
               .withColumn("daysSinceLastLogin", col("daysSinceLastLogin").cast("int")) \
               .withColumn("seniority", col("seniority").cast("int")) \
               .withColumn("seniorityAsMonths", col("seniorityAsMonths").cast("double")) \
               .withColumn("seniorityAsYears", col("seniorityAsYears").cast("double")) \
               .withColumn("hasAnyApp", col("hasAnyApp").cast("boolean")) \
               .withColumn("civilityGenderId", col("civilityGenderId").cast("int"))
               
# Verify updated schema
userDf.printSchema()


root
 |-- identifierHash: long (nullable = true)
 |-- type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- language: string (nullable = false)
 |-- socialNbFollowers: integer (nullable = true)
 |-- socialNbFollows: integer (nullable = true)
 |-- socialProductsLiked: integer (nullable = true)
 |-- productsListed: integer (nullable = true)
 |-- productsSold: integer (nullable = true)
 |-- productsPassRate: double (nullable = true)
 |-- productsWished: integer (nullable = true)
 |-- productsBought: integer (nullable = true)
 |-- gender: string (nullable = false)
 |-- civilityGenderId: integer (nullable = true)
 |-- civilityTitle: string (nullable = true)
 |-- hasAnyApp: boolean (nullable = true)
 |-- hasAndroidApp: integer (nullable = false)
 |-- hasIosApp: integer (nullable = false)
 |-- hasProfilePicture: integer (nullable = false)
 |-- daysSinceLastLogin: integer (nullable = true)
 |-- seniority: integer (nullable = true)
 |-- seniorityAsMonths: double (nullable =

### Transformation on Countries data

In [0]:
countriesDf=spark.read.format("delta")\
    .load("/mnt/delta/tables/bronze/countries")

In [0]:
countriesDf.printSchema()

root
 |-- country: string (nullable = true)
 |-- sellers: string (nullable = true)
 |-- topsellers: string (nullable = true)
 |-- topsellerratio: string (nullable = true)
 |-- femalesellersratio: string (nullable = true)
 |-- topfemalesellersratio: string (nullable = true)
 |-- femalesellers: string (nullable = true)
 |-- malesellers: string (nullable = true)
 |-- topfemalesellers: string (nullable = true)
 |-- topmalesellers: string (nullable = true)
 |-- countrysoldratio: string (nullable = true)
 |-- bestsoldratio: string (nullable = true)
 |-- toptotalproductssold: string (nullable = true)
 |-- totalproductssold: string (nullable = true)
 |-- toptotalproductslisted: string (nullable = true)
 |-- totalproductslisted: string (nullable = true)
 |-- topmeanproductssold: string (nullable = true)
 |-- topmeanproductslisted: string (nullable = true)
 |-- meanproductssold: string (nullable = true)
 |-- meanproductslisted: string (nullable = true)
 |-- meanofflinedays: string (nullable = tr

In [0]:
#changing data types

# Cast columns to Integer
int_columns = [
    "sellers", "topsellers", "femalesellers", "malesellers", 
    "topfemalesellers", "topmalesellers"
]

# Cast columns to Double
double_columns = [
    "topsellerratio", "femalesellersratio", "topfemalesellersratio", 
    "countrysoldratio", "bestsoldratio", "toptotalproductssold", 
    "totalproductssold", "toptotalproductslisted", "totalproductslisted",
    "topmeanproductssold", "topmeanproductslisted", "meanproductssold", 
    "meanproductslisted", "meanofflinedays", "topmeanofflinedays", 
    "meanfollowers", "meanfollowing", "topmeanfollowers", "topmeanfollowing"
]


for col_name in int_columns:
    countriesDf = countriesDf.withColumn(col_name, col(col_name).cast("int"))

for col_name in double_columns:
    countriesDf = countriesDf.withColumn(col_name, col(col_name).cast("double"))


countriesDf.printSchema()


root
 |-- country: string (nullable = true)
 |-- sellers: integer (nullable = true)
 |-- topsellers: integer (nullable = true)
 |-- topsellerratio: double (nullable = true)
 |-- femalesellersratio: double (nullable = true)
 |-- topfemalesellersratio: double (nullable = true)
 |-- femalesellers: integer (nullable = true)
 |-- malesellers: integer (nullable = true)
 |-- topfemalesellers: integer (nullable = true)
 |-- topmalesellers: integer (nullable = true)
 |-- countrysoldratio: double (nullable = true)
 |-- bestsoldratio: double (nullable = true)
 |-- toptotalproductssold: double (nullable = true)
 |-- totalproductssold: double (nullable = true)
 |-- toptotalproductslisted: double (nullable = true)
 |-- totalproductslisted: double (nullable = true)
 |-- topmeanproductssold: double (nullable = true)
 |-- topmeanproductslisted: double (nullable = true)
 |-- meanproductssold: double (nullable = true)
 |-- meanproductslisted: double (nullable = true)
 |-- meanofflinedays: double (nullabl

In [0]:
from pyspark.sql.functions import col, when, avg

# Ratio of Female Sellers
countriesDf = countriesDf.withColumn("femaleSellerRatio", 
                           when(col("sellers") > 0, col("femalesellers") / col("sellers")).otherwise(0))

# Ratio of Male Sellers
countriesDf = countriesDf.withColumn("maleSellerRatio", 
                           when(col("sellers") > 0, col("malesellers") / col("sellers")).otherwise(0))

# Product Sold/List Ratio
countriesDf = countriesDf.withColumn("productSoldListRatio", 
                           when(col("totalproductslisted") > 0, 
                                col("totalproductssold") / col("totalproductslisted")).otherwise(0))

# Calculate mean values for reference
mean_products_sold = countriesDf.agg(avg("totalproductssold").alias("meanTotalProductsSold")).collect()[0]["meanTotalProductsSold"]

# Add a flag for "Above Average Seller"
countriesDf = countriesDf.withColumn("isAboveAverageSeller", 
                           when(col("totalproductssold") > mean_products_sold, 1).otherwise(0))

# Show the updated DataFrame
countriesDf.show()


+-----------+-------+----------+--------------+------------------+---------------------+-------------+-----------+----------------+--------------+----------------+-------------+--------------------+-----------------+----------------------+-------------------+-------------------+---------------------+------------------+------------------+---------------+------------------+-------------+-------------+----------------+----------------+------------------+-------------------+--------------------+--------------------+
|    country|sellers|topsellers|topsellerratio|femalesellersratio|topfemalesellersratio|femalesellers|malesellers|topfemalesellers|topmalesellers|countrysoldratio|bestsoldratio|toptotalproductssold|totalproductssold|toptotalproductslisted|totalproductslisted|topmeanproductssold|topmeanproductslisted|  meanproductssold|meanproductslisted|meanofflinedays|topmeanofflinedays|meanfollowers|meanfollowing|topmeanfollowers|topmeanfollowing| femaleSellerRatio|    maleSellerRatio|product

### Transformations on Buyers data

In [0]:
buyersDf=spark.read.format("delta")\
    .load("/mnt/delta/tables/bronze/buyers")

In [0]:
buyersDf.printSchema()

root
 |-- country: string (nullable = true)
 |-- buyers: string (nullable = true)
 |-- topbuyers: string (nullable = true)
 |-- topbuyerratio: string (nullable = true)
 |-- femalebuyers: string (nullable = true)
 |-- malebuyers: string (nullable = true)
 |-- topfemalebuyers: string (nullable = true)
 |-- topmalebuyers: string (nullable = true)
 |-- femalebuyersratio: string (nullable = true)
 |-- topfemalebuyersratio: string (nullable = true)
 |-- boughtperwishlistratio: string (nullable = true)
 |-- boughtperlikeratio: string (nullable = true)
 |-- topboughtperwishlistratio: string (nullable = true)
 |-- topboughtperlikeratio: string (nullable = true)
 |-- totalproductsbought: string (nullable = true)
 |-- totalproductswished: string (nullable = true)
 |-- totalproductsliked: string (nullable = true)
 |-- toptotalproductsbought: string (nullable = true)
 |-- toptotalproductswished: string (nullable = true)
 |-- toptotalproductsliked: string (nullable = true)
 |-- meanproductsbought: s

In [0]:
#changing data types

# Columns to cast as Integer
int_columns = [
    "buyers", "topbuyers", "femalebuyers", "malebuyers", "topfemalebuyers", 
    "topmalebuyers", "totalproductsbought", "totalproductswished", 
    "totalproductsliked", "toptotalproductsbought", "toptotalproductswished", 
    "toptotalproductsliked"
]

# Columns to cast as Double
double_columns = [
    "topbuyerratio", "femalebuyersratio", "topfemalebuyersratio", 
    "boughtperwishlistratio", "boughtperlikeratio", "topboughtperwishlistratio", 
    "topboughtperlikeratio", "meanproductsbought", "meanproductswished", 
    "meanproductsliked", "topmeanproductsbought", "topmeanproductswished", 
    "topmeanproductsliked", "meanofflinedays", "topmeanofflinedays", 
    "meanfollowers", "meanfollowing", "topmeanfollowers", "topmeanfollowing"
]


for col_name in int_columns:
    buyersDf = buyersDf.withColumn(col_name, col(col_name).cast("int"))

for col_name in double_columns:
    buyersDf = buyersDf.withColumn(col_name, col(col_name).cast("double"))



In [0]:

# Buyer Activity Ratio: totalproductsbought / buyers
buyersDf = buyersDf.withColumn("buyerActivityRatio", 
    when(col("buyers") > 0, col("totalproductsbought") / col("buyers")).otherwise(0))

# Female Buyer Ratio: femalebuyers / 
buyersDf = buyersDf.withColumn("femaleBuyerRatio", 
    when(col("buyers") > 0, col("femalebuyers") / col("buyers")).otherwise(0)) \
    .withColumn("maleBuyerRatio", 
    when(col("buyers") > 0, col("malebuyers") / col("buyers")).otherwise(0))

# Top Buyer Ratio: topbuyers / 
buyersDf = buyersDf.withColumn("wishlistConversionRate", 
    when(col("totalproductswished") > 0, col("totalproductsbought") / col("totalproductswished")).otherwise(0))

# Top Female Buyer Ratio: topfemalebuyers /
buyersDf = buyersDf.withColumn("isHighEngagementCountry", 
    when((col("meanproductsbought") > 5) & (col("meanproductsliked") > 20), 1).otherwise(0))


### Transformations on Seller data

In [0]:
sellersDf=spark.read.format("delta")\
    .load("/mnt/delta/tables/bronze/sellers")

In [0]:
sellersDf.printSchema()

root
 |-- country: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- nbsellers: string (nullable = true)
 |-- meanproductssold: string (nullable = true)
 |-- meanproductslisted: string (nullable = true)
 |-- meansellerpassrate: string (nullable = true)
 |-- totalproductssold: string (nullable = true)
 |-- totalproductslisted: string (nullable = true)
 |-- meanproductsbought: string (nullable = true)
 |-- meanproductswished: string (nullable = true)
 |-- meanproductsliked: string (nullable = true)
 |-- totalbought: string (nullable = true)
 |-- totalwished: string (nullable = true)
 |-- totalproductsliked: string (nullable = true)
 |-- meanfollowers: string (nullable = true)
 |-- meanfollows: string (nullable = true)
 |-- percentofappusers: string (nullable = true)
 |-- percentofiosusers: string (nullable = true)
 |-- meanseniority: string (nullable = true)



In [0]:
#changing data types

# Columns to cast as Integer
int_columns = [
    "nbsellers", "totalproductssold", "totalproductslisted", 
    "totalbought", "totalwished", "totalproductsliked"
]

# Columns to cast as Double
double_columns = [
    "meanproductssold", "meanproductslisted", "meansellerpassrate", 
    "meanproductsbought", "meanproductswished", "meanproductsliked", 
    "meanfollowers", "meanfollows", "percentofappusers", 
    "percentofiosusers", "meanseniority"
]


for col_name in int_columns:
    sellersDf = sellersDf.withColumn(col_name, col(col_name).cast("int"))


for col_name in double_columns:
    sellersDf = sellersDf.withColumn(col_name, col(col_name).cast("double"))




In [0]:



# Add a flag for top-performing sellers
threshold = 3.0
sellersDf = sellersDf.withColumn("isTopPerformer", 
                   when(col("meanproductssold") > threshold, 1).otherwise(0))


# Add a flag for high-usage 
sellersDf = sellersDf.withColumn("appUsageCategory", 
                                 when((col("percentofappusers") > 70) & (col("percentofiosusers") > 50), "High Usage")
                                 .when((col("percentofappusers") > 50), "Moderate Usage")
                                 .otherwise("Low Usage"))



### Saving the data frames to delta lake

In [0]:
userDf.write.format("delta")\
    .mode("overwrite")\
    .save("/mnt/delta/tables/silver/user")

buyersDf.write.format("delta")\
    .mode("overwrite")\
    .save("/mnt/delta/tables/silver/buyers")

countriesDf.write.format("delta")\
    .mode("overwrite")\
    .save("/mnt/delta/tables/silver/countries")

sellersDf.write.format("delta")\
    .mode("overwrite")\
    .save("/mnt/delta/tables/silver/seller")