In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder.appName("Silver Layer").getOrCreate()

In [0]:
%fs ls "/mnt"

path,name,size,modificationTime
dbfs:/mnt/delta/,delta/,0,1741845212000
dbfs:/mnt/ecommerce/,ecommerce/,0,0


## 1. Users

In [0]:
userDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/users")

In [0]:
# Bool
userDF = userDF.withColumn("hasanyapp", col("hasanyapp").cast("boolean"))
userDF = userDF.withColumn("hasandroidapp", col("hasandroidapp").cast("boolean"))
userDF = userDF.withColumn("hasiosapp", col("hasiosapp").cast("boolean"))
userDF = userDF.withColumn("hasprofilepicture", col("hasprofilepicture").cast("boolean"))

# Interger
intcol = ["socialnbfollowers", "socialnbfollows", "productslisted", "productsSold", "productsWished", "productsBought", "daysSinceLastLogin"]
for column in intcol:
  userDF = userDF.withColumn(column, col(column).cast("integer"))

# Decimal
userDF = userDF.withColumn("productspassrate", col("productspassrate").cast(DecimalType(10, 2)))
userDF = userDF.withColumn("seniorityasmonths", col("seniorityasmonths").cast(DecimalType(10, 2)))
userDF = userDF.withColumn("seniorityasyears", col("seniorityasyears").cast(DecimalType(10, 2)))

In [0]:
userDF = userDF.withColumn("dayssincelastlogin",
                            when(col("dayssincelastlogin").isNotNull(),
                                 col("dayssincelastlogin").cast(IntegerType()))
                            .otherwise(0))

In [0]:
userDF.printSchema()

root
 |-- identifierHash: string (nullable = true)
 |-- type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- language: string (nullable = true)
 |-- socialnbfollowers: integer (nullable = true)
 |-- socialnbfollows: integer (nullable = true)
 |-- socialProductsLiked: string (nullable = true)
 |-- productslisted: integer (nullable = true)
 |-- productsSold: integer (nullable = true)
 |-- productspassrate: decimal(10,2) (nullable = true)
 |-- productsWished: integer (nullable = true)
 |-- productsBought: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- civilityGenderId: string (nullable = true)
 |-- civilityTitle: string (nullable = true)
 |-- hasanyapp: boolean (nullable = true)
 |-- hasandroidapp: boolean (nullable = true)
 |-- hasiosapp: boolean (nullable = true)
 |-- hasprofilepicture: boolean (nullable = true)
 |-- dayssincelastlogin: integer (nullable = true)
 |-- seniority: string (nullable = true)
 |-- seniorityasmonths: decimal(10,2) (nu

### Country Code

In [0]:
userDF = userDF.withColumn("countryCode", upper(col("countryCode")))
userDF.select("countryCode").show(5)

+-----------+
|countryCode|
+-----------+
|         US|
|         DE|
|         SE|
|         TR|
|         FR|
+-----------+
only showing top 5 rows



### Language

In [0]:
userDF.groupBy("language").count().orderBy(count("language").desc()).show()

+--------+-----+
|language|count|
+--------+-----+
|      en|10555|
|      fr| 5025|
|      it| 1601|
|      de| 1554|
|      es| 1048|
+--------+-----+



In [0]:
userDF = userDF.withColumn("language", 
                             expr("CASE WHEN language = 'en' THEN 'English' " +
                                  "WHEN language = 'fr' THEN 'French' " +
                                  "ELSE 'Other' END"))

### Gender

In [0]:
userDF.select("gender").distinct().show()

+------+
|gender|
+------+
|     F|
|     M|
+------+



In [0]:
userDF = userDF.withColumn("gender", 
                             when(col("gender").startswith("M"), "Male")
                             .when(col("gender").startswith("F"), "Female")
                             .otherwise("Other"))

### Civility title

In [0]:
userDF.select("civilitytitle").distinct().show()

+-------------+
|civilitytitle|
+-------------+
|         miss|
|           mr|
|          mrs|
+-------------+



In [0]:
userDF = userDF.withColumn("civilitytitle", 
                            regexp_replace("civilitytitle", "(miss|mme|ms|mrs)", "Ms"))

Calculate age of account in years and categorize into `account_age_group`

In [0]:

userDF = userDF.withColumn("account_age_group",when(col("seniorityasyears") < 1, "New")
                                                .when(col("seniorityasyears") < 5, "Intermediate")
                                                .otherwise("Senior"))

In [0]:
userDF.select("seniority").distinct().show()

+---------+
|seniority|
+---------+
|     3200|
|     3202|
|     3204|
|     3201|
|     3205|
|     3203|
+---------+



### Creatively combining strings to form a unique user descriptor

In [0]:
userDF = userDF.withColumn("user_descriptor", 
                             concat(col("gender"), lit("_"), 
                                    col("countrycode"), lit("_"), 
                                    expr("substring(civilitytitle, 1, 3)"), lit("_"), 
                                    col("language")))

### WRITE

In [0]:
userDF.printSchema()

root
 |-- identifierHash: string (nullable = true)
 |-- type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- language: string (nullable = false)
 |-- socialnbfollowers: integer (nullable = true)
 |-- socialnbfollows: integer (nullable = true)
 |-- socialProductsLiked: string (nullable = true)
 |-- productslisted: integer (nullable = true)
 |-- productsSold: integer (nullable = true)
 |-- productspassrate: decimal(10,2) (nullable = true)
 |-- productsWished: integer (nullable = true)
 |-- productsBought: integer (nullable = true)
 |-- gender: string (nullable = false)
 |-- civilityGenderId: string (nullable = true)
 |-- civilitytitle: string (nullable = true)
 |-- hasanyapp: boolean (nullable = true)
 |-- hasandroidapp: boolean (nullable = true)
 |-- hasiosapp: boolean (nullable = true)
 |-- hasprofilepicture: boolean (nullable = true)
 |-- dayssincelastlogin: integer (nullable = true)
 |-- seniority: string (nullable = true)
 |-- seniorityasmonths: decimal(10,2) (

In [0]:
userDF.write.format("delta").option("mergeSchema", "true").mode("append").save("/mnt/delta/tables/silver/users")

## 2. Countries

In [0]:
countriesDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/countries")

### Format

In [0]:
countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

### Calculating the ratio of top sellers to total sellers

In [0]:
countriesDF = countriesDF.withColumn("country", initcap(col("country")))

### Countries with a high ratio of female sellers

In [0]:
countriesDF = countriesDF.withColumn("high_female_seller_ratio", 
                                        when(col("femalesellersratio") > 0.5, True).otherwise(False))

### Adding a performance indicator based on the sold/listed ratio

In [0]:
countriesDF = countriesDF.withColumn("performance_indicator", 
                                        round(col("toptotalproductssold") / (col("toptotalproductslisted") + 1), 2))


### Flag countries with exceptionally high performance

In [0]:
performance_threshold = 0.8
countriesDF = countriesDF.withColumn("high_performance", 
                                        when(col("performance_indicator") > performance_threshold, True).otherwise(False))

countriesDF = countriesDF.withColumn("activity_level",
                                       when(col("meanofflinedays") < 30, "Highly Active")
                                       .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
                                       .otherwise("Low Activity"))

### WRITE

In [0]:
countriesDF.show(2)
countriesDF.printSchema()

+---------+-------+----------+--------------+------------------+---------------------+-------------+-----------+----------------+--------------+----------------+-------------+--------------------+-----------------+----------------------+-------------------+-------------------+---------------------+----------------+------------------+---------------+------------------+-------------+-------------+----------------+----------------+------------------------+---------------------+----------------+--------------+
|  country|sellers|topsellers|topsellerratio|femalesellersratio|topfemalesellersratio|femalesellers|malesellers|topfemalesellers|topmalesellers|countrysoldratio|bestsoldratio|toptotalproductssold|totalproductssold|toptotalproductslisted|totalproductslisted|topmeanproductssold|topmeanproductslisted|meanproductssold|meanproductslisted|meanofflinedays|topmeanofflinedays|meanfollowers|meanfollowing|topmeanfollowers|topmeanfollowing|high_female_seller_ratio|performance_indicator|high_perf

In [0]:
countriesDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/countries")

## 3. Sellers

In [0]:
sellersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/sellers")

In [0]:
sellersDF = sellersDF \
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10, 2))) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10, 2))) \
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10, 2))) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(10, 2)))

### Normalize country names and gender values

In [0]:
sellersDF = sellersDF.withColumn("country", initcap(col("country"))) \
                                                .withColumn("sex", upper(col("sex")))

### Add a column to categorize the number of sellers

In [0]:
sellersDF = sellersDF.withColumn("seller_size_category", 
                               when(col("nbsellers") < 500, "Small") \
                               .when((col("nbsellers") >= 500) & (col("nbsellers") < 2000), "Medium") \
                               .otherwise("Large"))

### Calculate the mean products listed per seller as an indicator of seller activity

In [0]:
sellersDF = sellersDF.withColumn("mean_products_listed_per_seller", 
                               round(col("totalproductslisted") / col("nbsellers"), 2))

### Identify markets with high seller pass rate

In [0]:
sellersDF = sellersDF.withColumn("high_seller_pass_rate", 
                               when(col("meansellerpassrate") > 0.75, "High") \
                               .otherwise("Normal"))

mean_pass_rate = sellersDF.select(round(avg("meansellerpassrate"), 2).alias("avg_pass_rate")).collect()[0]["avg_pass_rate"]

sellersDF = sellersDF.withColumn("meansellerpassrate",
                                 when(col("meansellerpassrate").isNull(), mean_pass_rate)
                                 .otherwise(col("meansellerpassrate")))

### WRITE

In [0]:
sellersDF.show(2)
sellersDF.printSchema()

+---------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+-----------------+-----------------+-------------+--------------------+-------------------------------+---------------------+
|  country|   sex|nbsellers|meanproductssold|meanproductslisted|meansellerpassrate|totalproductssold|totalproductslisted|meanproductsbought|meanproductswished|meanproductsliked|totalbought|totalwished|totalproductsliked|meanfollowers|meanfollows|percentofappusers|percentofiosusers|meanseniority|seller_size_category|mean_products_listed_per_seller|high_seller_pass_rate|
+---------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+----------

In [0]:
sellersDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/sellers")

## Buyers

In [0]:
buyersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/buyers")

### Format

In [0]:
integer_columns = [
    'buyers', 'topbuyers', 'femalebuyers', 'malebuyers',
    'topfemalebuyers', 'topmalebuyers', 'totalproductsbought',
    'totalproductswished', 'totalproductsliked', 'toptotalproductsbought',
    'toptotalproductswished', 'toptotalproductsliked'
]

for column_name in integer_columns:
    buyersDF = buyersDF.fillna({column_name: 0})
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(IntegerType()))

In [0]:
decimal_columns = [
    'topbuyerratio', 'femalebuyersratio', 'topfemalebuyersratio',
    'boughtperwishlistratio', 'boughtperlikeratio', 'topboughtperwishlistratio',
    'topboughtperlikeratio', 'meanproductsbought', 'meanproductswished',
    'meanproductsliked', 'topmeanproductsbought', 'topmeanproductswished',
    'topmeanproductsliked', 'meanofflinedays', 'topmeanofflinedays',
    'meanfollowers', 'meanfollowing', 'topmeanfollowers', 'topmeanfollowing'
]

for column_name in decimal_columns:
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(DecimalType(10, 2)))

### Normalize country names

In [0]:
buyersDF = buyersDF.withColumn("country", initcap(col("country")))

### Calculate the ratio of female to male buyers

In [0]:
buyersDF = buyersDF.withColumn("female_to_male_ratio", 
                               round(col("femalebuyers") / (col("malebuyers") + 1), 2))

### Determine the market potential by comparing wishlist and purchases

In [0]:
buyersDF = buyersDF.withColumn("wishlist_to_purchase_ratio", 
                               round(col("totalproductswished") / (col("totalproductsbought") + 1), 2))

### Tag countries with a high engagement ratio

In [0]:
high_engagement_threshold = 0.5
buyersDF = buyersDF.withColumn("high_engagement",
                               when(col("boughtperwishlistratio") > high_engagement_threshold, True)
                               .otherwise(False))

### Flag markets with increasing female buyer participation

In [0]:
buyersDF = buyersDF.withColumn("growing_female_market",
                               when(col("femalebuyersratio") > col("topfemalebuyersratio"), True)
                               .otherwise(False))

### WRITE

In [0]:
buyersDF.show(1)
buyersDF.printSchema()

+-------+------+---------+-------------+------------+----------+---------------+-------------+-----------------+--------------------+----------------------+------------------+-------------------------+---------------------+-------------------+-------------------+------------------+----------------------+----------------------+---------------------+------------------+------------------+-----------------+---------------------+---------------------+--------------------+---------------+------------------+-------------+-------------+----------------+----------------+--------------------+--------------------------+---------------+---------------------+
|country|buyers|topbuyers|topbuyerratio|femalebuyers|malebuyers|topfemalebuyers|topmalebuyers|femalebuyersratio|topfemalebuyersratio|boughtperwishlistratio|boughtperlikeratio|topboughtperwishlistratio|topboughtperlikeratio|totalproductsbought|totalproductswished|totalproductsliked|toptotalproductsbought|toptotalproductswished|toptotalproductsl

In [0]:
buyersDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/buyers")