In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.types import IntegerType, DecimalType
from pyspark.sql.functions import col, round, when, initcap

In [0]:
spark = SparkSession.builder.appName("ECommerceDataPipeline").getOrCreate()

In [0]:
service_credential = dbutils.secrets.get(scope="ecomm-kv-scope",key="ecomm-secret")

spark.conf.set("fs.azure.account.auth.type.ecommadls.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.ecommadls.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.ecommadls.dfs.core.windows.net", "3b059ba0-2363-42d4-345890-37UC3dd866ea7")
spark.conf.set("fs.azure.account.oauth2.client.secret.ecommadls.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.ecommadls.dfs.core.windows.net", "https://login.microsoftonline.com/2535435251-7BG9-4c72-905f-39bf434340a8a84/oauth2/token")

In [0]:
root = "abfss://zone-2@ecommadls.dfs.core.windows.net"
bronze_users = f"{root}/bronze/users"     # source (Delta)
silver_users = f"{root}/silver/users"     # target (Delta)

In [0]:
usersDF = spark.read.format("delta").load(bronze_users)

In [0]:
# 1) Normalize country codes to uppercase
usersDF = usersDF.withColumn("countrycode", upper(col("countrycode")))

# 2) Language bucket using CASE WHEN
usersDF = usersDF.withColumn(
    "language_full",
    expr(
        "CASE WHEN language = 'EN' THEN 'English' "
        "WHEN language = 'FR' THEN 'French' "
        "ELSE 'Other' END"
    )
)

# 3) Fix gender values
usersDF = usersDF.withColumn(
    "gender",
    when(col("gender").startswith("M"), "Male")
    .when(col("gender").startswith("F"), "Female")
    .otherwise("Other")
)

# 4) Clean civilitytitle -> civilitytitle_clean
usersDF = usersDF.withColumn(
    "civilitytitle_clean",
    regexp_replace(col("civilitytitle"), "(Mme|Ms|Mrs)", "Ms")
)

# 5) years_since_last_login from dayssincelastlogin
usersDF = usersDF.withColumn("years_since_last_login", col("dayssincelastlogin") / 365)

# 6) Account age in years + group
usersDF = usersDF.withColumn("account_age_years", round(col("seniority") / 365, 2))
usersDF = usersDF.withColumn(
    "account_age_group",
    when(col("account_age_years") < 1, "New")
    .when((col("account_age_years") >= 1) & (col("account_age_years") < 3), "Intermediate")
    .otherwise("Experienced")
)

# 7) Current year
usersDF = usersDF.withColumn("current_year", year(current_date()))

# 8) User descriptor: gender_countrycode_civ_language
usersDF = usersDF.withColumn(
    "user_descriptor",
    concat(
        col("gender"), lit("_"),
        col("countrycode"), lit("_"),
        expr("substring(civilitytitle_clean, 1, 3)"), lit("_"),
        col("language_full")
    )
)

# 9) Flag very long civility titles
usersDF = usersDF.withColumn("flag_long_title", length(col("civilitytitle")) > 10)


In [0]:
usersDF.show(5)

+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+-------------+-------------------+----------------------+-----------------+-----------------+------------+-------------------+---------------+
|      identifierHash|type|   country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasAnyApp|hasAndroidApp|hasIosApp|hasProfilePicture|daysSinceLastLogin|seniority|seniorityAsMonths|seniorityAsYears|countrycode|language_full|civilitytitle_clean|years_since_last_login|account_age_years|account_age_group|current_year|    user_descriptor|flag_long_title|
+--------------------+----+---

In [0]:
from pyspark.sql.functions import col

# normalize to lowercase; drop duplicates if any collide after lowercasing
seen = set()
exprs = []
for c in usersDF.columns:
    lc = c.strip().lower()
    if lc in seen:
        continue  # drop the duplicate
    exprs.append(col(c).alias(lc))
    seen.add(lc)

usersDF = usersDF.select(*exprs)

# now write, replacing the existing schema
usersDF.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(silver_users)


In [0]:
df_check = spark.read.format("delta").load(silver_users)
df_check.printSchema()
df_check.select("socialnbfollowers").show(5)  # now there should be only one


root
 |-- identifierhash: string (nullable = true)
 |-- type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- language: string (nullable = true)
 |-- socialnbfollowers: string (nullable = true)
 |-- socialnbfollows: string (nullable = true)
 |-- socialproductsliked: string (nullable = true)
 |-- productslisted: string (nullable = true)
 |-- productssold: string (nullable = true)
 |-- productspassrate: string (nullable = true)
 |-- productswished: string (nullable = true)
 |-- productsbought: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- civilitygenderid: string (nullable = true)
 |-- civilitytitle: string (nullable = true)
 |-- hasanyapp: string (nullable = true)
 |-- hasandroidapp: string (nullable = true)
 |-- hasiosapp: string (nullable = true)
 |-- hasprofilepicture: string (nullable = true)
 |-- dayssincelastlogin: string (nullable = true)
 |-- seniority: string (nullable = true)
 |-- seniorityasmonths: string (nullable = true)
 |-- senio

In [0]:
usersDF.write.format("delta").mode("overwrite").save(silver_users)

In [0]:
bronze_buyers = f"{root}/bronze/buyers"   # <- absolute path
buyersDF = spark.read.format("delta").load(bronze_buyers)
buyersDF.show(5)

+-----------+------+---------+-------------+------------+----------+---------------+-------------+-----------------+--------------------+----------------------+------------------+-------------------------+---------------------+-------------------+-------------------+------------------+----------------------+----------------------+---------------------+------------------+------------------+-----------------+---------------------+---------------------+--------------------+---------------+------------------+-------------+-------------+----------------+----------------+
|    country|buyers|topbuyers|topbuyerratio|femalebuyers|malebuyers|topfemalebuyers|topmalebuyers|femalebuyersratio|topfemalebuyersratio|boughtperwishlistratio|boughtperlikeratio|topboughtperwishlistratio|topboughtperlikeratio|totalproductsbought|totalproductswished|totalproductsliked|toptotalproductsbought|toptotalproductswished|toptotalproductsliked|meanproductsbought|meanproductswished|meanproductsliked|topmeanproductsbo

In [0]:
display(dbutils.fs.ls(bronze_buyers))

path,name,size,modificationTime
abfss://zone-2@ecommadls.dfs.core.windows.net/bronze/buyers/_delta_log/,_delta_log/,0,1756261113000
abfss://zone-2@ecommadls.dfs.core.windows.net/bronze/buyers/part-00000-13677a7e-d71d-444e-b015-b60083afb741.c000.snappy.parquet,part-00000-13677a7e-d71d-444e-b015-b60083afb741.c000.snappy.parquet,15182,1756261114000


In [0]:
buyersDF.columns

['country',
 'buyers',
 'topbuyers',
 'topbuyerratio',
 'femalebuyers',
 'malebuyers',
 'topfemalebuyers',
 'topmalebuyers',
 'femalebuyersratio',
 'topfemalebuyersratio',
 'boughtperwishlistratio',
 'boughtperlikeratio',
 'topboughtperwishlistratio',
 'topboughtperlikeratio',
 'totalproductsbought',
 'totalproductswished',
 'totalproductsliked',
 'toptotalproductsbought',
 'toptotalproductswished',
 'toptotalproductsliked',
 'meanproductsbought',
 'meanproductswished',
 'meanproductsliked',
 'topmeanproductsbought',
 'topmeanproductswished',
 'topmeanproductsliked',
 'meanofflinedays',
 'topmeanofflinedays',
 'meanfollowers',
 'meanfollowing',
 'topmeanfollowers',
 'topmeanfollowing']

In [0]:
decimal_columns = [
    'topbuyerratio', 'femalebuyersratio', 'topfemalebuyersratio',
    'boughtperwishlistratio', 'boughtperlikeratio', 'topboughtperwishlistratio',
    'topboughtperlikeratio', 'meanproductsbought', 'meanproductswished',
    'meanproductsliked', 'topmeanproductsbought', 'topmeanproductswished',
    'topmeanproductsliked', 'meanofflinedays', 'topmeanofflinedays',
    'meanfollowers', 'meanfollowing', 'topmeanfollowers', 'topmeanfollowing'
]

for column_name in decimal_columns:
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(DecimalType(10, 2)))

In [0]:
integer_columns = [
    'buyers', 'topbuyers', 'femalebuyers', 'malebuyers',
    'topfemalebuyers', 'topmalebuyers', 'totalproductsbought',
    'totalproductswished', 'totalproductsliked', 'toptotalproductsbought',
    'toptotalproductswished', 'toptotalproductsliked'
]

for column_name in integer_columns:
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(IntegerType()))

In [0]:
# Normalize country names
buyersDF = buyersDF.withColumn("country", initcap(col("country")))

# Fill NULL integers with 0
for column_name in integer_columns:
    buyersDF = buyersDF.fillna({column_name: 0})

# Female-to-male ratio
buyersDF = buyersDF.withColumn(
    "female_to_male_ratio",
    round(col("femalebuyers") / (col("malebuyers") + 1), 2)
)

# Wishlist-to-purchase ratio
buyersDF = buyersDF.withColumn(
    "wishlist_to_purchase_ratio",
    round(col("totalproductswished") / (col("totalproductsbought") + 1), 2)
)

# High engagement flag
high_engagement_threshold = 0.5
buyersDF = buyersDF.withColumn(
    "high_engagement",
    when(col("boughtperwishlistratio") > high_engagement_threshold, True).otherwise(False)
)

# Growing female market flag
buyersDF = buyersDF.withColumn(
    "growing_female_market",
    when(col("femalebuyersratio") > col("topfemalebuyersratio"), True).otherwise(False)
)

In [0]:
silver_buyers = f"{root}/silver/buyers"
buyersDF.write.format("delta").mode("overwrite").save(silver_buyers)

In [0]:
bronze_sellers = f"{root}/bronze/sellers"     # source
silver_sellers = f"{root}/silver/sellers"     # target
sellersDF = spark.read.format("delta").load(bronze_sellers)

In [0]:
sellersDF.columns

['country',
 'sex',
 'nbsellers',
 'meanproductssold',
 'meanproductslisted',
 'meansellerpassrate',
 'totalproductssold',
 'totalproductslisted',
 'meanproductsbought',
 'meanproductswished',
 'meanproductsliked',
 'totalbought',
 'totalwished',
 'totalproductsliked',
 'meanfollowers',
 'meanfollows',
 'percentofappusers',
 'percentofiosusers',
 'meanseniority']

In [0]:
sellersDF = sellersDF \
    .withColumn("nbsellers",            col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold",     col("meanproductssold").cast(DecimalType(10,2))) \
    .withColumn("meanproductslisted",   col("meanproductslisted").cast(DecimalType(10,2))) \
    .withColumn("meansellerpassrate",   col("meansellerpassrate").cast(DecimalType(10,2))) \
    .withColumn("totalproductssold",    col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted",  col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought",   col("meanproductsbought").cast(DecimalType(10,2))) \
    .withColumn("meanproductswished",   col("meanproductswished").cast(DecimalType(10,2))) \
    .withColumn("meanproductsliked",    col("meanproductsliked").cast(DecimalType(10,2))) \
    .withColumn("totalbought",          col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished",          col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked",   col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers",        col("meanfollowers").cast(DecimalType(10,2))) \
    .withColumn("percentofappusers",    col("percentofappusers").cast(DecimalType(10,2))) \
    .withColumn("percentofiosusers",    col("percentofiosusers").cast(DecimalType(10,2))) \
    .withColumn("meanseniority",        col("meanseniority").cast(DecimalType(10,2)))

In [0]:
# ---------- normalization ----------
sellersDF = sellersDF.withColumn("country", initcap(col("country"))) \
                     .withColumn("sex", upper(col("sex")))

# ---------- add seller size category ----------
sellersDF = sellersDF.withColumn(
    "seller_size_category",
    when(col("nbsellers") < 500, "Small")
    .when((col("nbsellers") >= 500) & (col("nbsellers") < 2000), "Medium")
    .otherwise("Large")
)

# ---------- mean products listed per seller ----------
sellersDF = sellersDF.withColumn(
    "mean_products_listed_per_seller",
    round(col("totalproductslisted") / col("nbsellers"), 2)
)

# ---------- high seller pass rate ----------
sellersDF = sellersDF.withColumn(
    "high_seller_pass_rate",
    when(col("meansellerpassrate") > 0.75, "High").otherwise("Normal")
)

# ---------- handle null pass rate with avg ----------
mean_pass_rate = sellersDF.select(
    round(avg("meansellerpassrate"), 2).alias("avg_pass_rate")
).collect()[0]["avg_pass_rate"]

sellersDF = sellersDF.withColumn(
    "meansellerpassrate",
    when(col("meansellerpassrate").isNull(), mean_pass_rate)
    .otherwise(col("meansellerpassrate"))
)


In [0]:
sellersDF.write.format("delta").mode("overwrite").save(silver_sellers)

In [0]:
bronze_countries = f"{root}/bronze/countries"
silver_countries = f"{root}/silver/countries"

In [0]:
# ---------- read Bronze COUNTRIES ----------
countriesDF = spark.read.format("delta").load(bronze_countries)

In [0]:
print(countriesDF.count())

19


In [0]:
countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

In [0]:
# ---------- normalize country ----------
countriesDF = countriesDF.withColumn("country", initcap(col("country")))

# ---------- derived features ----------
countriesDF = countriesDF.withColumn(
    "top_seller_ratio",
    round(col("topsellers") / (col("sellers") + 1), 2)
)

countriesDF = countriesDF.withColumn(
    "high_female_seller_ratio",
    when(col("femalesellersratio") > 0.5, True).otherwise(False)
)

countriesDF = countriesDF.withColumn(
    "performance_indicator",
    round(col("toptotalproductssold") / (col("toptotalproductslisted") + 1), 2)
)

countriesDF = countriesDF.withColumn(
    "high_performance",
    when(col("performance_indicator") > 0.8, True).otherwise(False)
)

countriesDF = countriesDF.withColumn(
    "activity_level",
    when(col("meanofflinedays") < 30, "Highly Active")
    .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
    .otherwise("Low Activity")
)

In [0]:
countriesDF.show(5)

+---------+-------+----------+--------------+------------------+---------------------+-------------+-----------+----------------+--------------+----------------+-------------+--------------------+-----------------+----------------------+-------------------+-------------------+---------------------+----------------+------------------+---------------+------------------+-------------+-------------+----------------+----------------+----------------+------------------------+---------------------+----------------+--------------+
|  country|sellers|topsellers|topsellerratio|femalesellersratio|topfemalesellersratio|femalesellers|malesellers|topfemalesellers|topmalesellers|countrysoldratio|bestsoldratio|toptotalproductssold|totalproductssold|toptotalproductslisted|totalproductslisted|topmeanproductssold|topmeanproductslisted|meanproductssold|meanproductslisted|meanofflinedays|topmeanofflinedays|meanfollowers|meanfollowing|topmeanfollowers|topmeanfollowing|top_seller_ratio|high_female_seller_rat

In [0]:
countriesDF.write.format("delta").mode("overwrite").save(silver_countries)