Importing Libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [0]:
#/mnt/delta/tables/bronze/users
userDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/users")

In [0]:
userDF.display()

identifierhash,type,country,language,socialnbfollowers,socialnbfollows,socialproductsliked,productslisted,productssold,productspassrate,productswished,productsbought,gender,civilitygenderid,civilitytitle,hasanyapp,hasandroidapp,hasiosapp,hasprofilepicture,dayssincelastlogin,seniority,seniorityasmonths,seniorityasyears,countrycode
-7279641312655250028,user,Etats-Unis,en,3,8,0,0,0,0.0,0,0,F,2,mrs,False,False,False,True,709,3205,106.83,8.9,us
-1456013578740053406,user,Allemagne,de,3,8,0,0,0,0.0,0,0,F,2,mrs,False,False,False,True,709,3205,106.83,8.9,de
9006282053848196165,user,Suède,en,3,8,0,0,0,0.0,0,0,M,1,mr,True,False,True,True,689,3205,106.83,8.9,se
-7154634866120535654,user,Turquie,en,3,8,0,0,0,0.0,0,0,F,2,mrs,False,False,False,True,709,3205,106.83,8.9,tr
2858299215060733023,user,France,en,3,8,0,0,0,0.0,0,0,M,1,mr,True,False,True,True,709,3205,106.83,8.9,fr
-8370972521561479983,user,Royaume-Uni,en,3,8,0,0,0,0.0,0,0,F,2,mrs,False,False,False,True,709,3205,106.83,8.9,gb
-7877915015908472168,user,Royaume-Uni,en,3,8,4,0,0,0.0,0,0,F,2,mrs,False,False,False,True,591,3205,106.83,8.9,gb
7455841332634807036,user,Italie,fr,3,8,0,0,0,0.0,0,0,F,2,mrs,True,True,False,True,709,3205,106.83,8.9,it
4607255007288453096,user,Italie,fr,3,8,0,0,0,0.0,0,0,F,2,mrs,True,True,False,True,701,3205,106.83,8.9,it
-7302797141205914253,user,France,en,3,8,0,0,0,0.0,0,0,M,1,mr,True,False,True,True,703,3205,106.83,8.9,fr


Tranformation on User Data

In [0]:
# Normalize Country Code to Upper Case
userDF = userDF.withColumn("countrycode", upper(col("countrycode")))

In [0]:
# Handling multiple languages elegantly using 'expr' and 'case when'
userDF = userDF.withColumn("language_full", expr("case when language = 'en' then 'English' " +
                                            "when language = 'fr' then 'French' "+
                                            "else 'Other' end"))

In [0]:
#correct potential data entry errors in gender column
userDF = userDF.withColumn("gender", 
                           when(col("gender").startswith("M"), "Male")
                           .when(col("gender").startswith("F"), "Female")
                           .otherwise("Other"))

In [0]:
# using `regex_replace` to clean `civilitytitle` values
userDF = userDF.withColumn("civilitytitle_clean",
                           regexp_replace("civilitytitle", "(Mme|Ms|Mrs)", "Ms"))

In [0]:
# Derive a new column `years_since_last_login` from `dayssincelastlogin`
userDF = userDF.withColumn("years_since_last_login", col("dayssincelastlogin")/365)

In [0]:
# Calculate age of account in years and categorize into `account_age_group`
userDF = userDF.withColumn("account_age_years", round(col("seniority")/365, 2))
userDF = userDF.withColumn("account_age_group",
                           when(col("account_age_years")<1, "Beginner")
                           .when((col("account_age_years")>=1) & (col("account_age_years")<3), "Intermediate")
                           .otherwise("Experienced"))


In [0]:
# Add a column with current year for comparison
userDF = userDF.withColumn("current_year", year(current_date()))

In [0]:
# Creatively combine strings to form a unique user descriptor
userDF = userDF.withColumn("user_descriptor",
                           concat(col("gender"), lit("_"), 
                                  col("countrycode"), lit("_"),
                                  expr("substring(civilitytitle_clean,1,3)"), lit("_"),
                                  col("language_full")
                                  ))

In [0]:
userDF = userDF.withColumn("flag_long_title", length(col("civilitytitle")) >10)

In [0]:
# Correct the data type of the columns
userDF = userDF.withColumn("hasanyapp", col("hasanyapp").cast("boolean"))
userDF = userDF.withColumn("hasandroidapp", col("hasandroidapp").cast("boolean"))
userDF = userDF.withColumn("hasiosapp", col("hasiosapp").cast("boolean"))
userDF = userDF.withColumn("hasprofilepicture", col("hasprofilepicture").cast("boolean"))

userDF = userDF.withColumn("socialnbfollowers", col("socialnbfollowers").cast(IntegerType()))
userDF = userDF.withColumn("socialnbfollows", col("socialnbfollows").cast(IntegerType()))

userDF = userDF.withColumn("productspassrate", col("productspassrate").cast(DecimalType(10, 2)))
userDF = userDF.withColumn("seniorityasmonths", col("seniorityasmonths").cast(DecimalType(10, 2)))
userDF = userDF.withColumn("seniorityasyears", col("seniorityasyears").cast(DecimalType(10, 2)))

In [0]:
userDf = userDF.withColumn("dayssincelastlogin", 
                           when(col("dayssincelastlogin").isNotNull(), col("dayssincelastlogin").cast(IntegerType()))\
                            .otherwise(0))

In [0]:
userDF.select("gender").show(5)

+------+
|gender|
+------+
|Female|
|Female|
|  Male|
|Female|
|  Male|
+------+
only showing top 5 rows



Writing the Transformed User Data to Silver layer

In [0]:
userDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/users")

Transforming Buyer Data

In [0]:
#/mnt/delta/tables/bronze/users
buyersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/buyers")

In [0]:
# Casting Integer columns
integer_columns = [
    'buyers', 'topbuyers', 'femalebuyers', 'malebuyers',
    'topfemalebuyers', 'topmalebuyers', 'totalproductsbought',
    'totalproductswished', 'totalproductsliked', 'toptotalproductsbought',
    'toptotalproductswished', 'toptotalproductsliked'
]

for cols in integer_columns:
    buyersDF = buyersDF.withColumn(cols, buyersDF[cols].cast('int'))

In [0]:

# Casting Decimal columns
decimal_columns = [
    'topbuyerratio', 'femalebuyersratio', 'topfemalebuyersratio',
    'boughtperwishlistratio', 'boughtperlikeratio', 'topboughtperwishlistratio',
    'topboughtperlikeratio', 'meanproductsbought', 'meanproductswished',
    'meanproductsliked', 'topmeanproductsbought', 'topmeanproductswished',
    'topmeanproductsliked', 'meanofflinedays', 'topmeanofflinedays',
    'meanfollowers', 'meanfollowing', 'topmeanfollowers', 'topmeanfollowing'
]

for cols in decimal_columns:
    buyersDF = buyersDF.withColumn(cols, buyersDF[cols].cast('decimal(10,2)'))

Transformation on Buyers Data

In [0]:
# Normalize country names
buyersDF = buyersDF.withColumn('country', initcap(col('country')))

# Filling missing values in Integers Columns
for cols in integer_columns:
    buyersDF = buyersDF.fillna({cols: 0})

# Calculate the ratio of female to male buyers
buyersDF = buyersDF.withColumn('female_to_male_ratio', round(col("femalebuyers") / col("malebuyers"), 2))

# Determine the market potential by comparing wishlist and purchases
buyersDF = buyersDF.withColumn('wishlist_to_purchase_ratio', round(col("totalproductswished") / (col("totalproductsbought")+1), 2))

# Tag countries with a high engagement ratio
high_engagement_threshold = 0.5
buyersDF = buyersDF.withColumn("High_Engagement", when(col("boughtperwishlistratio") > high_engagement_threshold, True).otherwise(False))

# Flag markets with increasing female buyer participation
buyersDF = buyersDF.withColumn("growing_female_market", when(col("femalebuyersratio") > col("topfemalebuyersratio"), True).otherwise(False))



Writing the transformed buyers data into silver layer

In [0]:
buyersDF.write.mode("overwrite").format("delta").save("/mnt/delta/tables/silver/buyers")

Transforming Seller Data

In [0]:
sellersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/sellers")

In [0]:
sellersDF = sellersDF \
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10, 2))) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10, 2))) \
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10, 2))) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(10, 2)))

In [0]:
sellersDF.display()

country,sex,nbsellers,meanproductssold,meanproductslisted,meansellerpassrate,totalproductssold,totalproductslisted,meanproductsbought,meanproductswished,meanproductsliked,totalbought,totalwished,totalproductsliked,meanfollowers,meanfollows,percentofappusers,percentofiosusers,meanseniority
Allemagne,Female,116,4.03,2.72,27.33,468,315,3.05,34.66,35.28,354,4021,4092,9.5,8.9,54.0,49.0,3060.34
Allemagne,Male,34,2.0,1.0,19.15,68,34,1.62,3.38,31.79,55,115,1081,7.8,8.4,79.0,64.0,3089.06
Arménie,Female,1,0.0,25.0,0.0,0,25,0.0,0.0,1.0,0,0,1,4.0,8.0,,,3201.0
Australie,Female,18,0.94,1.33,10.44,17,24,6.11,17.72,209.28,110,319,3767,7.5,9.3,55.0,55.0,3103.67
Australie,Male,3,6.0,4.0,33.33,18,12,8.0,24.0,38.33,24,72,115,12.7,8.3,66.0,66.0,3085.67
Autriche,Female,18,3.28,2.5,38.67,59,45,3.39,31.94,33.0,61,575,594,6.9,8.4,61.0,50.0,3048.22
Autriche,Male,5,1.6,0.2,35.0,8,1,0.8,5.0,0.8,4,25,4,6.2,8.0,60.0,60.0,3133.0
Bahamas,Female,1,1.0,0.0,0.0,1,0,0.0,0.0,0.0,0,0,0,4.0,8.0,,,2857.0
Belgique,Female,37,2.41,1.89,29.59,89,70,12.03,57.24,20.11,445,2118,744,8.3,8.5,62.0,48.0,3041.08
Belgique,Male,8,3.13,1.75,26.13,25,14,0.63,1.13,1.25,5,9,10,8.4,9.3,62.0,37.0,3069.63


In [0]:
# Normalize country names and gender values
sellersDF = sellersDF.withColumn("country",initcap(col("country")))\
    .withColumn("sex", upper(col("sex")))

#Add a column to categorize the number of sellers
sellersDF = sellersDF.withColumn("nbsellers_category", when(col("nbsellers") <= 500, "Small"))\
                    .withColumn("nbsellers_category", when((col("nbsellers") > 500) & (col("nbsellers") < 1000), "Medium")\
                        .otherwise("Large"))

# Calculate the mean products listed per seller as an indicator of seller activity
sellersDF = sellersDF.withColumn("mean_products_listed_per_seller", round(col("totalproductslisted")/col("nbsellers"),2))

# Identify markets with high seller pass rate
sellersDF = sellersDF.withColumn("seller_pass_rate_cat", when(col("meansellerpassrate") > 0.75, "High")\
    .otherwise("Low"))

mean_pass_rate = sellersDF.select(round(avg("meansellerpassrate"), 2).alias("avg_pass_rate")).collect()[0]["avg_pass_rate"]

sellersDF = sellersDF.withColumn("meansellerpassrate",
                                 when(col("meansellerpassrate").isNull(), mean_pass_rate)
                                 .otherwise(col("meansellerpassrate")))

Wrting the tranformed Seller data into Silver Layer

In [0]:
sellersDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/sellers")

Tranforming Countries Data

In [0]:
countriesDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/countries")

In [0]:
countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

In [0]:
countriesDF.display()

country,sellers,topsellers,topsellerratio,femalesellersratio,topfemalesellersratio,femalesellers,malesellers,topfemalesellers,topmalesellers,countrysoldratio,bestsoldratio,toptotalproductssold,totalproductssold,toptotalproductslisted,totalproductslisted,topmeanproductssold,topmeanproductslisted,meanproductssold,meanproductslisted,meanofflinedays,topmeanofflinedays,meanfollowers,meanfollowing,topmeanfollowers,topmeanfollowing
Taiwan,1,1,100.0,100.0,100.0,1,0,1,0,1.02,1.02,57,57,56,56,57.0,56.0,57.0,56.0,11.0,11.0,83.0,8.0,83.0,8.0
Slovaquie,2,1,50.0,0.0,0.0,0,2,0,1,2.0,1.93,27,28,14,14,27.0,14.0,14.0,7.0,17.0,15.0,10.5,8.5,15.0,8.0
Lettonie,4,2,50.0,100.0,100.0,4,0,2,0,2.31,2.25,81,83,36,36,40.5,18.0,20.75,9.0,120.3,11.5,21.0,52.3,38.0,98.5
Bulgarie,9,4,44.4,66.7,100.0,6,3,4,0,2.07,2.1,145,170,69,82,36.25,17.25,18.89,9.11,98.3,19.0,28.6,31.6,46.3,19.0
Chypre,4,1,25.0,100.0,100.0,4,0,1,0,0.69,0.62,41,56,66,81,41.0,66.0,14.0,20.25,17.3,11.0,21.3,10.3,39.0,17.0
Monaco,5,1,20.0,100.0,100.0,5,0,1,0,7.31,8.95,170,190,19,26,170.0,19.0,38.0,5.2,51.6,12.0,39.6,8.0,167.0,8.0
Roumanie,13,2,15.4,76.9,50.0,10,3,1,1,0.88,1.26,49,68,39,77,24.5,19.5,5.23,5.92,121.6,11.0,10.9,11.5,30.0,32.0
Luxembourg,7,1,14.3,85.7,100.0,6,1,1,0,5.38,,30,43,0,8,30.0,0.0,6.14,1.14,73.6,11.0,15.9,8.4,52.0,3.0
Espagne,119,13,10.9,81.5,76.9,97,22,10,3,1.67,2.02,607,990,301,594,46.69,23.15,8.32,4.99,202.4,30.5,16.1,14.6,53.2,14.5
Italie,347,35,10.1,71.5,65.7,248,99,23,12,1.27,1.29,1389,2820,1077,2218,39.69,30.77,8.13,6.39,141.8,26.5,16.1,54.6,63.4,429.3


In [0]:
countriesDF = countriesDF.withColumn("country", initcap(col("country")))

# Calculating the ratio of top sellers to total sellers
countriesDF = countriesDF.withColumn("top_seller_ratio", 
                                        round(col("topsellers") / col("sellers"), 2))

# countriesDF countries with a high ratio of female sellers
countriesDF = countriesDF.withColumn("high_female_seller_ratio", 
                                        when(col("femalesellersratio") > 0.5, True).otherwise(False))

# Adding a performance indicator based on the sold/listed ratio
countriesDF = countriesDF.withColumn("performance_indicator", 
                                        round(col("toptotalproductssold") / col("toptotalproductslisted"), 2))

# Flag countries with exceptionally high performance
performance_threshold = 0.8
countriesDF = countriesDF.withColumn("high_performance", 
                                        when(col("performance_indicator") > performance_threshold, True).otherwise(False))

countriesDF = countriesDF.withColumn("activity_level",
                                       when(col("meanofflinedays") < 30, "Highly Active")
                                       .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
                                       .otherwise("Low Activity"))


Writing the transformed Countries Data into Silver layer

In [0]:
countriesDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/countries")