In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder.appName("ECommerceDataPipeline").getOrCreate()

In [0]:
# /mnt/delta/tables/bronze/users

userDF = spark.read.format("delta")\
    .load("/mnt/delta/tables/bronze/users")


In [0]:
#normalise country code
# userDF.select(col("countryCode")).show(2)
userDF = userDF.withColumn("countryCode", upper(col("countryCode")))
userDF.select(col("countryCode")).show(2)


+-----------+
|countryCode|
+-----------+
|         US|
|         DE|
+-----------+
only showing top 2 rows



In [0]:
userDF = userDF.withColumn("language", upper(col("language")))

In [0]:
userDF = userDF.withColumn("language", upper(col("language")))
userDF = userDF.withColumn("language_full",
                           expr("CASE WHEN language ='EN'" +"THEN 'English' WHEN language ='FR'"+ "THEN 'French' ELSE 'Others' END"))
userDF.show(2, truncate=True)

+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+-------------+
|      identifierHash|type|   country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasAnyApp|hasAndroidApp|hasIosApp|hasProfilePicture|daysSinceLastLogin|seniority|seniorityAsMonths|seniorityAsYears|countryCode|language_full|
+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+-

In [0]:
userDF = userDF.withColumn("gender",
                           when(col("gender").startswith("M"), 'Male')
                           .when(col("gender").contains("F"),'Female')
                           .otherwise("other"))
userDF.show(2)

+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+-------------+
|      identifierHash|type|   country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasAnyApp|hasAndroidApp|hasIosApp|hasProfilePicture|daysSinceLastLogin|seniority|seniorityAsMonths|seniorityAsYears|countryCode|language_full|
+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+-

In [0]:
userDF = userDF.withColumn("account_age_years", round(col("seniority")/365,2))

userDF = userDF.withColumn("account_age_group",
                           when(col("account_age_years")<1, "Baccha")
                           .when((col("account_age_years")>=1) & (col("account_age_years")<3), "1-3")
                           .when(col("account_age_years")>3,">3")
                           .otherwise("Experienced"))
userDF = userDF.withColumn("account_age_group",
                           when(col("account_age_years")<1, "Baccha")
                           .when(   (col("account_age_years")>=1) & (col("account_age_years")<3)      , "1-3")
                           .when(col("account_age_years")>3,">3")
                           .otherwise("Experienced"))                           
userDF.show(2)

+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+-------------+-----------------+-----------------+
|      identifierHash|type|   country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasAnyApp|hasAndroidApp|hasIosApp|hasProfilePicture|daysSinceLastLogin|seniority|seniorityAsMonths|seniorityAsYears|countryCode|language_full|account_age_years|account_age_group|
+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+

In [0]:
userDF.select(col("civilityTitle")).show(5)

+-------------+
|civilityTitle|
+-------------+
|          mrs|
|          mrs|
|           mr|
|          mrs|
|           mr|
+-------------+
only showing top 5 rows



In [0]:
userDF = userDF.withColumn("current year", year(current_date()))

In [0]:
userDF = userDF.withColumn("civilityTitle_clean",
                           regexp_replace("civilityTitle", "(Mme|mrs|ms)", "Ms"))
userDF.select(col("civilityTitle_clean")).show(4)

+-------------------+
|civilityTitle_clean|
+-------------------+
|                 Ms|
|                 Ms|
|                 mr|
|                 Ms|
+-------------------+
only showing top 4 rows



In [0]:
userDF = userDF.withColumn("user_descriptor",
                           concat(
                               col("gender"), lit("_"), col("countryCode"), lit("_"),
                               expr("substring(civilityTitle_clean, 1, 3)"), lit("_"),
                               col("language")
                           ))
userDF.show(3, truncate=True)

+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+-------------+-----------------+-----------------+------------+-------------------+---------------+
|      identifierHash|type|   country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasAnyApp|hasAndroidApp|hasIosApp|hasProfilePicture|daysSinceLastLogin|seniority|seniorityAsMonths|seniorityAsYears|countryCode|language_full|account_age_years|account_age_group|current year|civilityTitle_clean|user_descriptor|
+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+

In [0]:
userDF = userDF.withColumn("socialNbFollowers", col("socialNbFollowers").cast(IntegerType()) )
userDF

Out[13]: DataFrame[identifierHash: string, type: string, country: string, language: string, socialNbFollowers: int, socialNbFollows: string, socialProductsLiked: string, productsListed: string, productsSold: string, productsPassRate: string, productsWished: string, productsBought: string, gender: string, civilityGenderId: string, civilityTitle: string, hasAnyApp: string, hasAndroidApp: string, hasIosApp: string, hasProfilePicture: string, daysSinceLastLogin: string, seniority: string, seniorityAsMonths: string, seniorityAsYears: string, countryCode: string, language_full: string, account_age_years: double, account_age_group: string, current year: int, civilityTitle_clean: string, user_descriptor: string]

In [0]:
userDF.show(1)

+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+-------------+-----------------+-----------------+------------+-------------------+---------------+
|      identifierHash|type|   country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasAnyApp|hasAndroidApp|hasIosApp|hasProfilePicture|daysSinceLastLogin|seniority|seniorityAsMonths|seniorityAsYears|countryCode|language_full|account_age_years|account_age_group|current year|civilityTitle_clean|user_descriptor|
+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+

In [0]:
userDF = userDF.withColumn("hasanyapp", col("hasanyapp").cast("boolean"))
userDF = userDF.withColumn("hasandroidapp", col("hasandroidapp").cast("boolean"))
userDF = userDF.withColumn("hasiosapp", col("hasiosapp").cast("boolean"))
userDF = userDF.withColumn("hasprofilepicture", col("hasprofilepicture").cast("boolean"))



userDF = userDF.withColumn("socialnbfollowers", col("socialnbfollowers").cast(IntegerType()))
userDF = userDF.withColumn("socialnbfollows", col("socialnbfollows").cast(IntegerType()))
userDF = userDF.withColumn("socialProductsLiked", col("socialProductsLiked").cast(IntegerType()))
userDF = userDF.withColumn("productsListed", col("productsListed").cast(IntegerType()))
userDF = userDF.withColumn("productsSold", col("productsSold").cast(IntegerType()))
userDF = userDF.withColumn("productsWished", col("productsWished").cast(IntegerType()))
userDF = userDF.withColumn("productsBought", col("productsBought").cast(IntegerType()))



userDF = userDF.withColumn("productspassrate", col("productspassrate").cast(DecimalType(10, 2)))
userDF = userDF.withColumn("seniorityasmonths", col("seniorityasmonths").cast(DecimalType(10, 2)))
userDF = userDF.withColumn("seniorityasyears", col("seniorityasyears").cast(DecimalType(10, 2)))
userDF

Out[15]: DataFrame[identifierHash: string, type: string, country: string, language: string, socialnbfollowers: int, socialnbfollows: int, socialProductsLiked: int, productsListed: int, productsSold: int, productspassrate: decimal(10,2), productsWished: int, productsBought: int, gender: string, civilityGenderId: string, civilityTitle: string, hasanyapp: boolean, hasandroidapp: boolean, hasiosapp: boolean, hasprofilepicture: boolean, daysSinceLastLogin: string, seniority: string, seniorityasmonths: decimal(10,2), seniorityasyears: decimal(10,2), countryCode: string, language_full: string, account_age_years: double, account_age_group: string, current year: int, civilityTitle_clean: string, user_descriptor: string]

In [0]:
sanitize_columns = [
    col.replace(" ","_")
    for col in userDF.columns
]
userDF = userDF.toDF(*sanitize_columns)

In [0]:
## write delta table

userDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/users")

In [0]:
%fs ls "/mnt/delta/tables/silver/"

path,name,size,modificationTime
dbfs:/mnt/delta/tables/silver/users/,users/,0,0


In [0]:
%fs ls "/mnt/delta/tables/bronze/"

path,name,size,modificationTime
dbfs:/mnt/delta/tables/bronze/buyers/,buyers/,0,0
dbfs:/mnt/delta/tables/bronze/countries/,countries/,0,0
dbfs:/mnt/delta/tables/bronze/sellers/,sellers/,0,0
dbfs:/mnt/delta/tables/bronze/users/,users/,0,0


In [0]:
buyerDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/buyers")
buyerDF

Out[22]: DataFrame[country: string, buyers: string, topbuyers: string, topbuyerratio: string, femalebuyers: string, malebuyers: string, topfemalebuyers: string, topmalebuyers: string, femalebuyersratio: string, topfemalebuyersratio: string, boughtperwishlistratio: string, boughtperlikeratio: string, topboughtperwishlistratio: string, topboughtperlikeratio: string, totalproductsbought: string, totalproductswished: string, totalproductsliked: string, toptotalproductsbought: string, toptotalproductswished: string, toptotalproductsliked: string, meanproductsbought: string, meanproductswished: string, meanproductsliked: string, topmeanproductsbought: string, topmeanproductswished: string, topmeanproductsliked: string, meanofflinedays: string, topmeanofflinedays: string, meanfollowers: string, meanfollowing: string, topmeanfollowers: string, topmeanfollowing: string]

In [0]:
# Casting Integer columns
integer_columns = [
    'buyers', 'topbuyers', 'femalebuyers', 'malebuyers',
    'topfemalebuyers', 'topmalebuyers', 'totalproductsbought',
    'totalproductswished', 'totalproductsliked', 'toptotalproductsbought',
    'toptotalproductswished', 'toptotalproductsliked'
]
for column in integer_columns:
    buyerDF = buyerDF.withColumn(column, col(column).cast(IntegerType()))

# Casting Decimal columns
decimal_columns = [
    'topbuyerratio', 'femalebuyersratio', 'topfemalebuyersratio',
    'boughtperwishlistratio', 'boughtperlikeratio', 'topboughtperwishlistratio',
    'topboughtperlikeratio', 'meanproductsbought', 'meanproductswished',
    'meanproductsliked', 'topmeanproductsbought', 'topmeanproductswished',
    'topmeanproductsliked', 'meanofflinedays', 'topmeanofflinedays',
    'meanfollowers', 'meanfollowing', 'topmeanfollowers', 'topmeanfollowing'
]
for column in decimal_columns:
    buyerDF = buyerDF.withColumn(column, col(column).cast(DecimalType()))

buyerDF

Out[24]: DataFrame[country: string, buyers: int, topbuyers: int, topbuyerratio: decimal(10,0), femalebuyers: int, malebuyers: int, topfemalebuyers: int, topmalebuyers: int, femalebuyersratio: decimal(10,0), topfemalebuyersratio: decimal(10,0), boughtperwishlistratio: decimal(10,0), boughtperlikeratio: decimal(10,0), topboughtperwishlistratio: decimal(10,0), topboughtperlikeratio: decimal(10,0), totalproductsbought: int, totalproductswished: int, totalproductsliked: int, toptotalproductsbought: int, toptotalproductswished: int, toptotalproductsliked: int, meanproductsbought: decimal(10,0), meanproductswished: decimal(10,0), meanproductsliked: decimal(10,0), topmeanproductsbought: decimal(10,0), topmeanproductswished: decimal(10,0), topmeanproductsliked: decimal(10,0), meanofflinedays: decimal(10,0), topmeanofflinedays: decimal(10,0), meanfollowers: decimal(10,0), meanfollowing: decimal(10,0), topmeanfollowers: decimal(10,0), topmeanfollowing: decimal(10,0)]

In [0]:
 buyerDF.show(2, truncate=True)

+-----------+------+---------+-------------+------------+----------+---------------+-------------+-----------------+--------------------+----------------------+------------------+-------------------------+---------------------+-------------------+-------------------+------------------+----------------------+----------------------+---------------------+------------------+------------------+-----------------+---------------------+---------------------+--------------------+---------------+------------------+-------------+-------------+----------------+----------------+
|    country|buyers|topbuyers|topbuyerratio|femalebuyers|malebuyers|topfemalebuyers|topmalebuyers|femalebuyersratio|topfemalebuyersratio|boughtperwishlistratio|boughtperlikeratio|topboughtperwishlistratio|topboughtperlikeratio|totalproductsbought|totalproductswished|totalproductsliked|toptotalproductsbought|toptotalproductswished|toptotalproductsliked|meanproductsbought|meanproductswished|meanproductsliked|topmeanproductsbo

In [0]:
buyerDF.select("topboughtperwishlistratio").show(10)

+-------------------------+
|topboughtperwishlistratio|
+-------------------------+
|                       36|
|                       13|
|                       23|
|                       10|
|                       11|
|                       11|
|                       27|
|                       12|
|                       42|
|                        6|
+-------------------------+
only showing top 10 rows



In [0]:
# normalize country names
buyerDF = buyerDF.withColumn("country", initcap(col("country")))

# male to female ratio
buyerDF = buyerDF.withColumn("male_female_ratio", round(col("malebuyers")/(col("femalebuyers")+1),2))

#wish_purchase
buyerDF = buyerDF.withColumn("wish_purchase", round(col("toptotalproductswished")/(col("totalproductsbought")+1),2))

# Tag countries with a high engagement ratio
high_engagement_thresh = 20
buyerDF = buyerDF.withColumn("high_engagement",
                             when(col("topboughtperwishlistratio")> high_engagement_thresh , True)
                             .otherwise(False))

# Flag markets with increasing female buyer participation
buyerDF = buyerDF.withColumn("growing_female_market",
                             when(col("femalebuyersratio")>col("topfemalebuyersratio"), True)
                             .otherwise(False))
                        
buyerDF.show(2)


+-----------+------+---------+-------------+------------+----------+---------------+-------------+-----------------+--------------------+----------------------+------------------+-------------------------+---------------------+-------------------+-------------------+------------------+----------------------+----------------------+---------------------+------------------+------------------+-----------------+---------------------+---------------------+--------------------+---------------+------------------+-------------+-------------+----------------+----------------+-----------------+-------------+---------------+---------------------+
|    country|buyers|topbuyers|topbuyerratio|femalebuyers|malebuyers|topfemalebuyers|topmalebuyers|femalebuyersratio|topfemalebuyersratio|boughtperwishlistratio|boughtperlikeratio|topboughtperwishlistratio|topboughtperlikeratio|totalproductsbought|totalproductswished|totalproductsliked|toptotalproductsbought|toptotalproductswished|toptotalproductsliked|mea

In [0]:
buyerDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/buyers")

In [0]:
%fs ls "/mnt/delta/tables/silver/buyers"

path,name,size,modificationTime
dbfs:/mnt/delta/tables/silver/buyers/_delta_log/,_delta_log/,0,0
dbfs:/mnt/delta/tables/silver/buyers/part-00000-2178a593-2a7f-4823-8842-6d30449818e5-c000.snappy.parquet,part-00000-2178a593-2a7f-4823-8842-6d30449818e5-c000.snappy.parquet,16924,1734290202000
dbfs:/mnt/delta/tables/silver/buyers/part-00000-f446ef07-d8a3-419e-90a7-19cbf243c426-c000.snappy.parquet,part-00000-f446ef07-d8a3-419e-90a7-19cbf243c426-c000.snappy.parquet,16924,1734290215000


In [0]:
%fs ls "/mnt/delta/tables/bronze"

path,name,size,modificationTime
dbfs:/mnt/delta/tables/bronze/buyers/,buyers/,0,0
dbfs:/mnt/delta/tables/bronze/countries/,countries/,0,0
dbfs:/mnt/delta/tables/bronze/sellers/,sellers/,0,0
dbfs:/mnt/delta/tables/bronze/users/,users/,0,0


In [0]:
sellerDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/sellers")
sellerDF.show(2)

+---------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+-----------------+-----------------+------------------+
|  country|   sex|nbsellers|meanproductssold|meanproductslisted|meansellerpassrate|totalproductssold|totalproductslisted|meanproductsbought|meanproductswished|meanproductsliked|totalbought|totalwished|totalproductsliked|meanfollowers|meanfollows|percentofappusers|percentofiosusers|     meanseniority|
+---------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+-----------------+-----------------+------------------+
|Allemagne|Female|      116|            4.03|              2.72|             27.33|           

In [0]:
sellerDF.orderBy(col("nbsellers"), ascending=False ).show(2)

+--------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+-----------------+-----------------+-------------+
| country|   sex|nbsellers|meanproductssold|meanproductslisted|meansellerpassrate|totalproductssold|totalproductslisted|meanproductsbought|meanproductswished|meanproductsliked|totalbought|totalwished|totalproductsliked|meanfollowers|meanfollows|percentofappusers|percentofiosusers|meanseniority|
+--------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+-----------------+-----------------+-------------+
|Finlande|Female|        8|             2.5|              1.25|             32.13|               20|            

In [0]:
sellerDF = sellerDF\
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10,2)))\
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10,2)))\
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10,2)))\
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10,2)))\
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10,2)))\
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10,2)))\
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10,2)))\
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10,2)))\
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10,2)))\
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10,2)))\
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(10,2)))
sellerDF

Out[54]: DataFrame[country: string, sex: string, nbsellers: int, meanproductssold: decimal(10,2), meanproductslisted: decimal(10,2), meansellerpassrate: decimal(10,2), totalproductssold: int, totalproductslisted: int, meanproductsbought: decimal(10,2), meanproductswished: decimal(10,2), meanproductsliked: decimal(10,2), totalbought: int, totalwished: int, totalproductsliked: int, meanfollowers: decimal(10,2), meanfollows: decimal(10,2), percentofappusers: decimal(10,2), percentofiosusers: decimal(10,2), meanseniority: decimal(10,2)]

In [0]:
# Normalize country names and gender values
sellerDF = sellerDF.withColumn("country",initcap(col("country")))\
            .withColumn("sex", upper(col("sex")))
#Add a column to categorize the number of sellers
sellerDF = sellerDF.withColumn("seller_size_category",
                               when(col("nbsellers")<500, "Small")\
                                   .when(col("nbsellers")<1500, "Medium")\
                                       .otherwise("BIG")
                               )



In [0]:
mean_pass_rate = sellerDF.select(round(avg("meansellerpassrate"), 2).alias("avg_pass_rate")).collect()[0]["avg_pass_rate"]

sellerDF = sellerDF.withColumn("meansellerpassrate",
                                 when(col("meansellerpassrate").isNull(), mean_pass_rate)
                                 .otherwise(col("meansellerpassrate")))


In [0]:
sellerDF.show(2)

+---------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+-----------------+-----------------+-------------+--------------------+
|  country|   sex|nbsellers|meanproductssold|meanproductslisted|meansellerpassrate|totalproductssold|totalproductslisted|meanproductsbought|meanproductswished|meanproductsliked|totalbought|totalwished|totalproductsliked|meanfollowers|meanfollows|percentofappusers|percentofiosusers|meanseniority|seller_size_category|
+---------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+-----------------+-----------------+-------------+--------------------+
|Allemagne|FEMALE|      116|            4.03| 

In [0]:
most_gender = sellerDF.groupBy(col("sex")).count().orderBy("count", ascending=False).collect()[0]["sex"]

sellerDF = sellerDF.withColumn("sex",
                               when(col("sex").isNull(), most_gender)\
                                   .otherwise(col("sex")))
sellerDF.groupBy(col("sex")).count().show()

+------+-----+
|   sex|count|
+------+-----+
|  MALE|   25|
|FEMALE|   48|
+------+-----+



In [0]:
sellerDF.filter(col("country").isNull()).show()

+-------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+-----------------+-----------------+-------------+--------------------+
|country|   sex|nbsellers|meanproductssold|meanproductslisted|meansellerpassrate|totalproductssold|totalproductslisted|meanproductsbought|meanproductswished|meanproductsliked|totalbought|totalwished|totalproductsliked|meanfollowers|meanfollows|percentofappusers|percentofiosusers|meanseniority|seller_size_category|
+-------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+-----------------+-----------------+-------------+--------------------+
|   null|FEMALE|     2127|            4.40|         

In [0]:
sellerDF.groupBy(col("country")).count().orderBy("count", ascending=False).show()

+-----------+-----+
|    country|count|
+-----------+-----+
|       null|    3|
|   Autriche|    2|
|    Islande|    2|
|   Bulgarie|    2|
|   Pays-bas|    2|
|Royaume-uni|    2|
| Etats-unis|    2|
|   Roumanie|    2|
|     France|    2|
|      Grèce|    2|
|    Espagne|    2|
|     Chypre|    2|
|     Suisse|    2|
|  Allemagne|    2|
|    Pologne|    2|
|     Italie|    2|
|  Hong Kong|    2|
|  Australie|    2|
|      Suède|    2|
|   Belgique|    2|
+-----------+-----+
only showing top 20 rows



In [0]:
sellerDF = sellerDF.filter(col("country").isNotNull())
sellerDF.groupBy("country").count().orderBy("count", ascending=False).show()

+-----------+-----+
|    country|count|
+-----------+-----+
|   Autriche|    2|
|    Islande|    2|
|   Bulgarie|    2|
|   Pays-bas|    2|
|Royaume-uni|    2|
| Etats-unis|    2|
|   Roumanie|    2|
|     France|    2|
|      Grèce|    2|
|    Espagne|    2|
|     Chypre|    2|
|     Suisse|    2|
|  Allemagne|    2|
|    Pologne|    2|
|     Italie|    2|
|  Hong Kong|    2|
|  Australie|    2|
|      Suède|    2|
|   Belgique|    2|
|    Estonie|    2|
+-----------+-----+
only showing top 20 rows



In [0]:
sellerDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/sellers")

In [0]:
%fs ls "/mnt/delta/tables/silver/sellers"

path,name,size,modificationTime
dbfs:/mnt/delta/tables/silver/sellers/_delta_log/,_delta_log/,0,0
dbfs:/mnt/delta/tables/silver/sellers/part-00000-5bfdce47-f3dd-43b3-9177-1818a7a9bd94-c000.snappy.parquet,part-00000-5bfdce47-f3dd-43b3-9177-1818a7a9bd94-c000.snappy.parquet,11249,1734295563000


In [0]:
countriesDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/countries")

countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

countriesDF = countriesDF.withColumn("country", initcap(col("country")))


# Calculating the ratio of top sellers to total sellers
countriesDF = countriesDF.withColumn("top_seller_ratio", 
                                        round(col("topsellers") / col("sellers"), 2))

# countriesDF countries with a high ratio of female sellers
countriesDF = countriesDF.withColumn("high_female_seller_ratio", 
                                        when(col("femalesellersratio") > 0.5, True).otherwise(False))

# Adding a performance indicator based on the sold/listed ratio
countriesDF = countriesDF.withColumn("performance_indicator", 
                                        round(col("toptotalproductssold") / (col("toptotalproductslisted") + 1), 2))

# Flag countries with exceptionally high performance
performance_threshold = 0.8
countriesDF = countriesDF.withColumn("high_performance", 
                                        when(col("performance_indicator") > performance_threshold, True).otherwise(False))

countriesDF = countriesDF.withColumn("activity_level",
                                       when(col("meanofflinedays") < 30, "Highly Active")
                                       .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
                                       .otherwise("Low Activity"))


countriesDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/countries")
     

In [0]:
%fs ls "/mnt/delta/tables/silver/countries"

path,name,size,modificationTime
dbfs:/mnt/delta/tables/silver/countries/_delta_log/,_delta_log/,0,0
dbfs:/mnt/delta/tables/silver/countries/part-00000-5389c720-5db7-44e8-8e3f-6035edd4ca55-c000.snappy.parquet,part-00000-5389c720-5db7-44e8-8e3f-6035edd4ca55-c000.snappy.parquet,11687,1734295625000
