In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName("EcomDataPipeline").getOrCreate()

In [0]:
%fs ls "/mnt/delta/tables/bronze"

path,name,size,modificationTime
dbfs:/mnt/delta/tables/bronze/buyers/,buyers/,0,1747744991000
dbfs:/mnt/delta/tables/bronze/countries/,countries/,0,1747745088000
dbfs:/mnt/delta/tables/bronze/sellers/,sellers/,0,1747745091000
dbfs:/mnt/delta/tables/bronze/users/,users/,0,1747744899000


In [0]:
userDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/users")

In [0]:
userDF = userDF.withColumn("countrycode",upper(col("countrycode")))

In [0]:
userDF = userDF.withColumn("language_full",
                           expr("CASE WHEN language = 'en' then 'English'" +
                                "WHEN language = 'fr' then 'French' " +
                                "ELSE 'Other' END"))

In [0]:
userDF = userDF.withColumn("gender",
                             when(col("gender").startswith("M"), "Male")
                             .when(col("gender").startswith("F"), "Female")
                             .otherwise("Other"))

In [0]:
userDF = userDF.withColumn("civilitytitle_clean", regexp_replace("civilitytitle","(Mme|Ms|Mrs)","Ms"))

In [0]:
userDF = userDF.withColumn("years_since_last_login", col("dayssincelastlogin") / 365)

In [0]:
userDF = userDF.withColumn("account_year_age", round(col("seniority") / 365,2))
userDF = userDF.withColumn("account_age_group", when(col("account_year_age") < 1, "New")
                                          .when((col("account_year_age") > 1) & (col("account_year_age") < 3), "Intermidiate")
                                          .otherwise("Experienced"))

In [0]:
userDF = userDF.withColumn("current_year", year(current_date()))


In [0]:
userDF = userDF.withColumn("user_descriptor", concat(col("gender"),lit("_"),
                                                     col("countrycode"),lit("_"),
                                                     expr("substring(civilitytitle_clean,1,3)"),lit("_"),
                                                     col("language_full")))

In [0]:
userDF = userDF.withColumn("flag_long_title",length(col("civilitytitle")) > 10)


In [0]:
userDF.printSchema()

root
 |-- identifierhash: string (nullable = true)
 |-- type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- language: string (nullable = true)
 |-- socialnbfollowers: integer (nullable = true)
 |-- socialnbfollows: integer (nullable = true)
 |-- socialproductsliked: string (nullable = true)
 |-- productslisted: string (nullable = true)
 |-- productssold: string (nullable = true)
 |-- productspassrate: string (nullable = true)
 |-- productswished: string (nullable = true)
 |-- productsbought: string (nullable = true)
 |-- gender: string (nullable = false)
 |-- civilitygenderid: string (nullable = true)
 |-- civilitytitle: string (nullable = true)
 |-- hasanyapp: boolean (nullable = true)
 |-- hasandroidapp: boolean (nullable = true)
 |-- hasiosapp: boolean (nullable = true)
 |-- hasprofilepicture: boolean (nullable = true)
 |-- dayssincelastlogin: string (nullable = true)
 |-- seniority: string (nullable = true)
 |-- seniorityasmonths: integer (nullable = true)
 |

In [0]:
userDF = userDF.withColumn("hasanyapp", col("hasanyapp").cast("boolean"))
userDF = userDF.withColumn("hasandroidapp", col("hasandroidapp").cast("boolean"))
userDF = userDF.withColumn("hasiosapp", col("hasiosapp").cast("boolean"))
userDF = userDF.withColumn("hasprofilepicture", col("hasprofilepicture").cast("boolean"))

userDF = userDF.withColumn("socialnbfollowers", col("socialnbfollowers").cast(IntegerType()))
userDF = userDF.withColumn("socialnbfollows", col("socialnbfollows").cast(IntegerType()))

userDF = userDF.withColumn("productpassrate", col("productspassrate").cast(DecimalType()))
userDF = userDF.withColumn("seniorityasmonths", col("seniorityasmonths").cast(IntegerType()))
userDF = userDF.withColumn("seniorityasyears", col("seniorityasyears").cast(IntegerType()))


In [0]:
userDF = userDF.withColumn("dayssincelastlogin", when(col("dayssincelastlogin").isNotNull(),
                           col("dayssincelastlogin").cast(IntegerType())).otherwise(0))

In [0]:
userDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/users/")

In [0]:
buyersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/buyers")

In [0]:
integer_columns = ['buyers','topbuyers','femalebuyers','malebuyers','topfemalebuyers','topmalebuyers','totalproductsbought','totalproductswished','totalproductsliked','toptotalproductsbought','toptotalproductswished','toptotalproductsliked']

for column in integer_columns:
  buyersDF = buyersDF.withColumn(column, col(column).cast(IntegerType()))

In [0]:
decimal_columns = ['topbuyerratio','femalebuyersratio','topfemalebuyersratio','boughtperwishlistratio','boughtperlikeratio','topboughtperwishlistratio','topboughtperlikeratio','meanproductsbought','meanproductswished','meanproducts''liked','topmeanproductsbought','topmeanproductswished','topmeanproductsliked','meanofflinedays','topmeanofflinedays','meanfollowers','meanfollowing','topmeanfollowers','topmeanfollowing']

for column in decimal_columns:
      buyersDF = buyersDF.withColumn(column, col(column).cast(DecimalType(10,2)))

In [0]:
buyersDF = buyersDF.withColumn("country", initcap(col("country")))

for column in integer_columns:
    buyersDF = buyersDF.fillna({column:0})


buyersDF = buyersDF.withColumn("female_to_male_ratio", round(col("femalebuyers")/col("malebuyers") +1 ,2))


buyersDF = buyersDF.withColumn("wishlist_to_purchase_ratio", round(col("totalproductsbought")/col("totalproductswished") +1 ,2))

high_engagement_threshold = 0.5
buyersDF = buyersDF.withColumn("high_engagement", when(col("boughtperwishlistratio") > high_engagement_threshold, True).otherwise(False))

buyersDF = buyersDF.withColumn("growing_female_market", when(col("femalebuyersratio") > col("topfemalebuyersratio"), True).otherwise(False))


In [0]:
buyersDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/buyers")

In [0]:
sellersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/sellers")

In [0]:
sellersDF = sellersDF \
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10, 2))) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10, 2))) \
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10, 2))) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(10, 2)))

In [0]:
sellersDF = sellersDF.withColumn("country",initcap(col("country"))).withColumn("sex",upper(col("sex")))

sellersDF = sellersDF.withColumn("seller_size_category", when(col("nbsellers") <= 500, "Small").when((col("nbsellers") >=500) & (col("nbsellers") < 2000), "Medium")\
.otherwise("Large"))

In [0]:
sellersDF = sellersDF.withColumn("mean_products_listed_per_seller",round(col("totalproductslisted")/col("nbsellers"),2))


In [0]:
sellersDF = sellersDF.withColumn("high_seller_pass_rate", when(col("meansellerpassrate") > 0.75,"High").otherwise("Normal"))

In [0]:
mean_pass_rate = sellersDF.select(round(avg("meansellerpassrate"),2).alias("avg_pass_rate")).collect()[0]["avg_pass_rate"]
sellersDF = sellersDF.withColumn("mean_pass_rate",
                                 when(col("meansellerpassrate").isNull(),mean_pass_rate).otherwise(col("meansellerpassrate")))


In [0]:
sellersDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/sellers")

In [0]:
countriesDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/countries")

In [0]:
countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

In [0]:
countriesDF = countriesDF.withColumn("country",initcap(col("country")))

In [0]:
countriesDF = countriesDF.withColumn("top_seller_ratio", round(col("topsellers")/col("sellers"),2))

In [0]:
countriesDF = countriesDF.withColumn("high_female_seller_ratio", 
                                        when(col("femalesellersratio") > 0.5, True).otherwise(False))

# Adding a performance indicator based on the sold/listed ratio
countriesDF = countriesDF.withColumn("performance_indicator", 
                                        round(col("toptotalproductssold") / (col("toptotalproductslisted") + 1), 2))

# Flag countries with exceptionally high performance
performance_threshold = 0.8
countriesDF = countriesDF.withColumn("high_performance", 
                                        when(col("performance_indicator") > performance_threshold, True).otherwise(False))

countriesDF = countriesDF.withColumn("activity_level",
                                       when(col("meanofflinedays") < 30, "Highly Active")
                                       .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
                                       .otherwise("Low Activity"))

In [0]:
countriesDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/countries")