In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder.appName("EcommerceDataPipeline").getOrCreate()

In [0]:
##/mnt/delta/tables/bronze/users

userDF = spark.read.format('delta')\
    .load('/mnt/delta/tables/bronze/users')


In [0]:
userDF.show(2)

+--------------------+----+-----------+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+-----------------+---------+-----------------+----------------+----------------+---------+-------------+---------+------------------+
|      identifierHash|type|countryCode|   country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasProfilePicture|seniority|seniorityAsMonths|seniorityAsYears|websiteLongevity|hasAnyApp|hasAndroidApp|hasIosApp|daysSinceLastLogin|
+--------------------+----+-----------+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+-----------------+---------+-----------------+--------

In [0]:
#Normalize the country codes to uppercase
userDF = userDF.withColumn("countrycode", upper(col("countrycode")))

In [0]:
# Handling multiple languages elegantly with 'expr' and ' case when'
# languages column has EN and FR expressions that will be replaced by Engilish and French.
userDF = userDF.withColumn('language_full',
                           expr( "CASE WHEN language = 'EN' THEN 'English' " +
                                "WHEN language = 'FR' THEN 'French' " +
                                "ELSE 'Other' END"))

In [0]:
#Correcting the 'gender' column.

userDF = userDF.withColumn('gender',
                           when(col('gender').startswith('M'), 'Male')
                           .when(col('gender').startswith('F'),'Female')
                           .otherwise('Other'))

In [0]:
#changing the values of column 'civilitytitle' 
userDF = userDF.withColumn('civilitytitle_clean',
                           regexp_replace('civilitytitle', '(Mme|Ms|Mrs)', 'Ms'))

In [0]:
#Derive new column 'years_since_last_login' from ' dayssincelastlogin'
userDF = userDF.withColumn("years_since_last_login", col("daysSinceLastLogin") / 365)

In [0]:
##calculate age of account in years and categorize into ' account_age_group'

userDF = userDF.withColumn("account_age_years", round(col("seniority") / 365,2))
user = userDF.withColumn("account_age_group",
                          when(col("account_age_years") < 1, "New")
                          .when((col("account_age_years") >= 1) & (col("account_age_years") < 3),
                          "Intermediate")
                          .otherwise("Experienced"))

                         

In [0]:
#add a column with the current year for comparison
usersDF = userDF.withColumn("current_year", year(current_date()))

In [0]:
#Creating user description column by combining all the user information 

userDF = userDF.withColumn('user_descriptior',
                           concat(col('gender'), lit("_"),
                                  col('countrycode'),lit("_"),
                                  expr("substring(civilitytitle_clean,1,3)"), lit("_"),
                                  col("language_full")))

In [0]:
userDF = userDF.withColumn('flag_long_title', length(col('civilitytitle')) > 10)

In [0]:

#casting the datatype into 'Boolean'
userDF = userDF.withColumn('hasanyapp', col('hasAnyApp').cast('boolean'))
userDF = userDF.withColumn('hasandroidapp', col('hasandroidapp').cast('boolean'))
userDF = userDF.withColumn('hasiosapp', col('hasiosApp').cast('boolean'))
userDF = userDF.withColumn('hasprofilepicture', col('hasProfilePicture').cast('boolean'))

In [0]:
#casting the datatype into 'IntergerType'
userDF = userDF.withColumn('socialnbfollowers', col('socialnbfollowers').cast(IntegerType()))
userDF = userDF.withColumn('socialnbfollows', col('socialnbfollows').cast(IntegerType()))

In [0]:
#casting the datatype into 'DecimalType'
userDF = userDF.withColumn('productspassrate', col('productspassrate').cast(DecimalType(10,2)))
userDF = userDF.withColumn('seniorityasmonths', col('seniorityasmonths').cast(DecimalType(10,2)))
userDF = userDF.withColumn('seniorityasyears', col('seniorityasyears').cast(DecimalType(10,2)))

In [0]:
userDF.write.format('delta')\
    .mode('overwrite')\
    .save('/mnt/delta/tables/silver/users')

In [0]:
buyersDF = spark.read.format('delta')\
    .load('/mnt/delta/tables/bronze/buyers')

In [0]:
#casting Integer columns

integer_columns = ['buyers','topbuyers','femalebuyers','malebuyers',
                   'topfemalebuyers','topmalebuyers','totalproductsbought','totalproductswished','totalproductsliked','toptotalproductsbought','toptotalproductswished','toptotalproductsliked']
                
for column_name in integer_columns:
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(IntegerType()))

In [0]:
decimal_columns = ['topbuyerratio','femalebuyersratio','topfemalebuyersratio', 
                   'boughtperwishlistratio','boughtperlikeratio','topboughtperwishlistratio',
                   'topboughtperlikeratio','meanproductsbought','meanproductswished',
                   'meanproductsliked','topmeanproductsbought','topmeanproductswished',
                   'topmeanproductsliked','meanofflinedays','topmeanofflinedays',
                   'meanfollowers','meanfollowing','topmeanfollowers','topmeanfollowing'
]

for column_name in decimal_columns:
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(DecimalType(10,2)))

In [0]:
#Normalize the country names
buyersDF = buyersDF.withColumn('country', initcap(col('country')))

for col_name in integer_columns:
    buyersDF = buyersDF.fillna({col_name:0})

#calculate the ratio of female to male buyers
buyersDF = buyersDF.withColumn('female_to_male_ratio',
                               round((col('femalebuyers')/col('malebuyers') + 1 ),2))
                            
#Determine the market potential by comparing wishlist and purchases 
buyersDF = buyersDF.withColumn("wishlist_to_purchase_ratio",
                               round(col("totalproductswished")/ (col("totalproductsbought") + 1),2))

#tag countries with high engagement ratio
high_engagement_treshhold = 0.5
buyersDF = buyersDF.withColumn("high_engagement",
                               when(col("boughtperwishlistratio") > high_engagement_treshhold, True)
                               .otherwise(False))

#flag markets with increasing female buyers participation
buyersDF = buyersDF.withColumn("growing_female_market",
                               when(col("femalebuyersratio") > col("topfemalebuyersratio"), True)
                               .otherwise(False))

In [0]:
buyersDF.write.format('delta')\
    .mode('overwrite')\
    .save('/mnt/delta/tables/silver/buyers')

In [0]:
sellersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/sellers")

In [0]:
#casting integer columns 

seller_Integer_columns = ['nbsellers','totalproductssold','totalproductslisted',
                   'totalbought','totalwished','totalproductsliked']

for column_name_sellers in seller_Integer_columns:
    sellersDF =  sellersDF.withColumn(column_name_sellers, sellersDF[column_name_sellers].cast('int'))


In [0]:
#casting decimal columns 
seller_decimal_columns = ['meanproductssold','meanproductslisted','meansellerpassrate','meanproductsbought',
                          'meanproductswished','meanproductsliked','meanfollowers','meanfollows',
                          'percentofappusers','percentofiosusers','meanseniority']

for column_name_sellers in seller_decimal_columns:
    sellersDF =  sellersDF.withColumn(column_name_sellers, sellersDF[column_name_sellers].cast('decimal(10,2)'))

In [0]:
sellersDF.printSchema()

root
 |-- country: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- nbsellers: integer (nullable = true)
 |-- meanproductssold: decimal(10,2) (nullable = true)
 |-- meanproductslisted: decimal(10,2) (nullable = true)
 |-- meansellerpassrate: decimal(10,2) (nullable = true)
 |-- totalproductssold: integer (nullable = true)
 |-- totalproductslisted: integer (nullable = true)
 |-- meanproductsbought: decimal(10,2) (nullable = true)
 |-- meanproductswished: decimal(10,2) (nullable = true)
 |-- meanproductsliked: decimal(10,2) (nullable = true)
 |-- totalbought: integer (nullable = true)
 |-- totalwished: integer (nullable = true)
 |-- totalproductsliked: integer (nullable = true)
 |-- meanfollowers: decimal(10,2) (nullable = true)
 |-- meanfollows: decimal(10,2) (nullable = true)
 |-- percentofappusers: decimal(10,2) (nullable = true)
 |-- percentofiosusers: decimal(10,2) (nullable = true)
 |-- meanseniority: decimal(10,2) (nullable = true)



In [0]:
#Normalizing the country names with gender values 
sellersDF = sellersDF.withColumn('country', initcap(col('country')))\
                        .withColumn('sex', upper(col('sex')))

#add a column to categorize the number of sellers
sellersDF = sellersDF.withColumn('seller_size_category',
                               when(col('nbsellers') < 500, 'Small') \
                                   .when((col('nbsellers') >= 500) & (col('nbsellers') < 2000), 'Medium') \
                                       .otherwise('Large'))

#calculate the mean products listed per seller as an indicator of seller activity
sellersDF = sellersDF.withColumn('mean_products_listed_per_seller',
                                round(col('totalproductslisted') / col('nbsellers'), 2))

#identify markets with high seller pass rate 
sellersDF = sellersDF.withColumn('high_pass_rate',
                               when(col('meansellerpassrate') > 0.75, 'High')\
                                   .otherwise('Normal'))

mean_pass_rate = sellersDF.select(round(avg('meansellerpassrate'),2).alias('avg_pass_rate')).collect()[0]['avg_pass_rate']

sellersDF = sellersDF.withColumn('meansellerpassrate',
                               when(col('meansellerpassrate').isNull(),mean_pass_rate)
                               .otherwise(col('meansellerpassrate')))

                

In [0]:
sellersDF.printSchema()

root
 |-- country: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- nbsellers: integer (nullable = true)
 |-- meanproductssold: decimal(10,2) (nullable = true)
 |-- meanproductslisted: decimal(10,2) (nullable = true)
 |-- meansellerpassrate: decimal(10,2) (nullable = true)
 |-- totalproductssold: integer (nullable = true)
 |-- totalproductslisted: integer (nullable = true)
 |-- meanproductsbought: decimal(10,2) (nullable = true)
 |-- meanproductswished: decimal(10,2) (nullable = true)
 |-- meanproductsliked: decimal(10,2) (nullable = true)
 |-- totalbought: integer (nullable = true)
 |-- totalwished: integer (nullable = true)
 |-- totalproductsliked: integer (nullable = true)
 |-- meanfollowers: decimal(10,2) (nullable = true)
 |-- meanfollows: decimal(10,2) (nullable = true)
 |-- percentofappusers: decimal(10,2) (nullable = true)
 |-- percentofiosusers: decimal(10,2) (nullable = true)
 |-- meanseniority: decimal(10,2) (nullable = true)
 |-- seller_size_category: string

In [0]:
sellersDF.write.format('delta')\
    .mode('overwrite')\
    .save('/mnt/delta/tables/silver/sellers')

In [0]:
countriesDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/countries")

In [0]:
countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

In [0]:
#normalizing the country column
countriesDF = countriesDF.withColumn("country", initcap(col("country")))

# Calculating the ratio of top sellers to total sellers
countriesDF = countriesDF.withColumn("top_seller_ratio", 
                                        round(col("topsellers") / col("sellers"), 2))

# countriesDF countries with a high ratio of female sellers
countriesDF = countriesDF.withColumn("high_female_seller_ratio", 
                                        when(col("femalesellersratio") > 0.5, True).otherwise(False))


# Adding a performance indicator based on the sold/listed ratio
countriesDF = countriesDF.withColumn("performance_indicator", 
                                        round(col("toptotalproductssold") / (col("toptotalproductslisted") + 1), 2))

# Flag countries with exceptionally high performance
performance_threshold = 0.8
countriesDF = countriesDF.withColumn("high_performance", 
                                        when(col("performance_indicator") > performance_threshold, True).otherwise(False))

countriesDF = countriesDF.withColumn("activity_level",
                                       when(col("meanofflinedays") < 30, "Highly Active")
                                       .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
                                       .otherwise("Low Activity"))



In [0]:
countriesDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/countries")