In [0]:
storage_account = "ecomadlsrakesh"

spark.conf.set(
  f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net",
  "OAuth"
)

spark.conf.set(
  f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
)

spark.conf.set(
  f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net",
  "***"
)

spark.conf.set(
  f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net",
  "***"
)

spark.conf.set(
  f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
  "https://login.microsoftonline.com/***/oauth2/token"
)



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder.appName("EcomDataPipeline").getOrCreate()

In [0]:
user_df_silver = spark.read.format("parquet") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://landing-zone-2@ecomadlsrakesh.dfs.core.windows.net/users-raw-2")

In [0]:
user_df_silver.show(5)

+--------------------+----+---------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+
|      identifierHash|type|  country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasAnyApp|hasAndroidApp|hasIosApp|hasProfilePicture|daysSinceLastLogin|seniority|seniorityAsMonths|seniorityAsYears|countryCode|
+--------------------+----+---------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----

In [0]:
user_df_silver.select(col("countrycode")).show(5)

+-----------+
|countrycode|
+-----------+
|         au|
|         it|
|         es|
|         ie|
|         ca|
+-----------+
only showing top 5 rows


In [0]:
user_df_silver = user_df_silver.withColumn("countrycode", upper(col("countrycode")))

In [0]:
user_df_silver.select(col("countrycode")).show()

+-----------+
|countrycode|
+-----------+
|         AU|
|         IT|
|         ES|
|         IE|
|         CA|
|         UA|
|         GB|
|         HR|
|         US|
|         FR|
|         GE|
|         AU|
|         ES|
|         GB|
|         ES|
|         FR|
|         US|
|         CA|
|         EG|
|         GB|
+-----------+
only showing top 20 rows


In [0]:
user_df_silver = user_df_silver.withColumn("language_full",
                                           expr("CASE WHEN language = 'EN' THEN 'English' " +
                                                "WHEN language = 'FR' THEN 'French' " +
                                                "ELSE 'Other' END"))

In [0]:
user_df_silver = user_df_silver.withColumn("gender",
                                           when(col("gender").startswith("M"), "Male")
                                           .when(col("gender").startswith("F"), "Female")
                                           .otherwise("Other"))

In [0]:
user_df_silver = user_df_silver.withColumn("civilitytitle_clean",
                                           regexp_replace("civilitytitle","(Mme|Ms|Mrs)","Ms"))

In [0]:
user_df_silver = user_df_silver.withColumn("years_since_last_login",col("dayssincelastlogin") / 365)

In [0]:
user_df_silver = user_df_silver.withColumn("account_age_years",round(col("seniority") /365, 2))
user_df_silver = user_df_silver.withColumn("account_age_group",
                                           when(col("account_age_years") < 1, "New")
                                           .when((col("account_age_years") >=1) & (col("account_age_years") <3),"Intermediate")
                                           .otherwise("Experienced"))

In [0]:
user_df_silver = user_df_silver.withColumn("user_descriptor",
                                           concat(col("gender"), lit("_"),
                                                  col("countrycode"), lit("_"),
                                                  expr("substring(civilitytitle_clean,1,3)"), lit("_"),
                                                  col("language_full")))

In [0]:
user_df_silver = user_df_silver.withColumn("flag_long_title",length(col("civilitytitle")) > 10)

In [0]:
user_df_silver = user_df_silver.withColumn("hasanyapp", col("hasanyapp").cast("boolean"))
user_df_silver = user_df_silver.withColumn("hasandroidapp", col("hasandroidapp").cast("boolean"))
user_df_silver = user_df_silver.withColumn("hasiosapp", col("hasiosapp").cast("boolean"))
user_df_silver = user_df_silver.withColumn("hasprofilepicture", col("hasprofilepicture").cast("boolean"))

user_df_silver = user_df_silver.withColumn(
    "socialnbfollowers", col("socialnbfollowers").cast(IntegerType())
)
user_df_silver = user_df_silver.withColumn(
    "socialnbfollows", col("socialnbfollows").cast(IntegerType())
)

user_df_silver = user_df_silver.withColumn(
    "productspassrate", col("productspassrate").cast(DecimalType(10, 2))
)
user_df_silver = user_df_silver.withColumn(
    "seniorityasmonths", col("seniorityasmonths").cast(DecimalType(10, 2))
)
user_df_silver = user_df_silver.withColumn(
    "seniorityasyears", col("seniorityasyears").cast(DecimalType(10, 2))
)


In [0]:
user_df_silver = user_df_silver.withColumn("dayssincelastlogin",
                                           when(col("dayssincelastlogin").isNotNull(),
                                                col("dayssincelastlogin").cast(IntegerType()))
                                                .otherwise(0))

In [0]:
user_df_silver.show(5)

+--------------------+----+---------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+-------------+-------------------+----------------------+-----------------+-----------------+-------------------+---------------+
|      identifierHash|type|  country|language|socialnbfollowers|socialnbfollows|socialProductsLiked|productsListed|productsSold|productspassrate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasanyapp|hasandroidapp|hasiosapp|hasprofilepicture|dayssincelastlogin|seniority|seniorityasmonths|seniorityasyears|countrycode|language_full|civilitytitle_clean|years_since_last_login|account_age_years|account_age_group|    user_descriptor|flag_long_title|
+--------------------+----+---------+--------+------------

In [0]:
user_df_silver.select("account_age_group").show(5)

+-----------------+
|account_age_group|
+-----------------+
|      Experienced|
|      Experienced|
|      Experienced|
|      Experienced|
|      Experienced|
+-----------------+
only showing top 5 rows


In [0]:
buyer_df_silver = spark.read.format("parquet") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://landing-zone-2@ecomadlsrakesh.dfs.core.windows.net/buyers-raw-2")

In [0]:
buyer_df_silver.show(5)


+-----------+------+---------+-------------+------------+----------+---------------+-------------+-----------------+--------------------+----------------------+------------------+-------------------------+---------------------+-------------------+-------------------+------------------+----------------------+----------------------+---------------------+------------------+------------------+-----------------+---------------------+---------------------+--------------------+---------------+------------------+-------------+-------------+----------------+----------------+
|    country|buyers|topbuyers|topbuyerratio|femalebuyers|malebuyers|topfemalebuyers|topmalebuyers|femalebuyersratio|topfemalebuyersratio|boughtperwishlistratio|boughtperlikeratio|topboughtperwishlistratio|topboughtperlikeratio|totalproductsbought|totalproductswished|totalproductsliked|toptotalproductsbought|toptotalproductswished|toptotalproductsliked|meanproductsbought|meanproductswished|meanproductsliked|topmeanproductsbo

In [0]:

integer_columns = [
    "buyers",
    "topbuyers",          # ✅ fixed
    "femalebuyers",
    "malebuyers",
    "topfemalebuyers",
    "topmalebuyers",
    "totalproductsbought",
    "totalproductswished",
    "totalproductsliked",
    "toptotalproductsbought",
    "toptotalproductswished",
    "toptotalproductsliked"
]

for column_name in integer_columns:
    buyer_df_silver = buyer_df_silver.withColumn(
        column_name,
        col(column_name).cast(IntegerType())
    )



In [0]:
buyer_df_silver.show(5)


+-----------+------+---------+-------------+------------+----------+---------------+-------------+-----------------+--------------------+----------------------+------------------+-------------------------+---------------------+-------------------+-------------------+------------------+----------------------+----------------------+---------------------+------------------+------------------+-----------------+---------------------+---------------------+--------------------+---------------+------------------+-------------+-------------+----------------+----------------+
|    country|buyers|topbuyers|topbuyerratio|femalebuyers|malebuyers|topfemalebuyers|topmalebuyers|femalebuyersratio|topfemalebuyersratio|boughtperwishlistratio|boughtperlikeratio|topboughtperwishlistratio|topboughtperlikeratio|totalproductsbought|totalproductswished|totalproductsliked|toptotalproductsbought|toptotalproductswished|toptotalproductsliked|meanproductsbought|meanproductswished|meanproductsliked|topmeanproductsbo

In [0]:

# Casting Decimal columns
decimal_columns = [
    "topbuyerratio",
    "femalebuyersratio",
    "topfemalebuyersratio",
    "boughtperwishlistratio",
    "boughtperlikeratio",
    "topboughtperwishlistratio",
    "topboughtperlikeratio",
    "meanproductsbought",
    "meanproductswished",
    "meanproductsliked",
    "topmeanproductsbought",
    "topmeanproductswished",
    "topmeanproductsliked",
    "meanofflinedays",
    "topmeanofflinedays",
    "meanfollowers",
    "meanfollowing",
    "topmeanfollowers",
    "topmeanfollowing"
]

for column_name in decimal_columns:
    buyer_df_silver = buyer_df_silver.withColumn(
        column_name,
        col(column_name).cast(DecimalType(10, 2))
    )


In [0]:
buyer_df_silver.show(5)


+-----------+------+---------+-------------+------------+----------+---------------+-------------+-----------------+--------------------+----------------------+------------------+-------------------------+---------------------+-------------------+-------------------+------------------+----------------------+----------------------+---------------------+------------------+------------------+-----------------+---------------------+---------------------+--------------------+---------------+------------------+-------------+-------------+----------------+----------------+
|    country|buyers|topbuyers|topbuyerratio|femalebuyers|malebuyers|topfemalebuyers|topmalebuyers|femalebuyersratio|topfemalebuyersratio|boughtperwishlistratio|boughtperlikeratio|topboughtperwishlistratio|topboughtperlikeratio|totalproductsbought|totalproductswished|totalproductsliked|toptotalproductsbought|toptotalproductswished|toptotalproductsliked|meanproductsbought|meanproductswished|meanproductsliked|topmeanproductsbo

In [0]:

# Normalize country names
buyer_df_silver = buyer_df_silver.withColumn(
    "country",
    initcap(col("country"))
)

# Fill nulls in integer columns with 0
for col_name in integer_columns:
    buyer_df_silver = buyer_df_silver.fillna({col_name: 0})

# Calculate the ratio of female to male buyers
buyer_df_silver = buyer_df_silver.withColumn(
    "female_to_male_ratio",
    round(col("femalebuyers") / (col("malebuyers") + 1), 2)
)

# Determine the market potential by comparing wishlist and purchases
buyer_df_silver = buyer_df_silver.withColumn(
    "wishlist_to_purchase_ratio",
    round(col("totalproductswished") / (col("totalproductsbought") + 1), 2)
)

# Tag countries with a high engagement ratio
high_engagement_threshold = 0.5

buyer_df_silver = buyer_df_silver.withColumn(
    "high_engagement",
    when(
        col("boughtperwishlistratio") > high_engagement_threshold,
        True
    ).otherwise(False)
)

# Flag markets with increasing female buyer participation
buyer_df_silver = buyer_df_silver.withColumn(
    "growing_female_market",
    when(
        col("femalebuyersratio") > col("topfemalebuyersratio"),
        True
    ).otherwise(False)
)


In [0]:
buyer_df_silver.show(5)

+-----------+------+---------+-------------+------------+----------+---------------+-------------+-----------------+--------------------+----------------------+------------------+-------------------------+---------------------+-------------------+-------------------+------------------+----------------------+----------------------+---------------------+------------------+------------------+-----------------+---------------------+---------------------+--------------------+---------------+------------------+-------------+-------------+----------------+----------------+--------------------+--------------------------+---------------+---------------------+
|    country|buyers|topbuyers|topbuyerratio|femalebuyers|malebuyers|topfemalebuyers|topmalebuyers|femalebuyersratio|topfemalebuyersratio|boughtperwishlistratio|boughtperlikeratio|topboughtperwishlistratio|topboughtperlikeratio|totalproductsbought|totalproductswished|totalproductsliked|toptotalproductsbought|toptotalproductswished|toptotalp

In [0]:
seller_df_silver = spark.read.format("parquet") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://landing-zone-2@ecomadlsrakesh.dfs.core.windows.net/sellers-raw-2")

In [0]:
seller_df_silver = seller_df_silver \
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10, 2))) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10, 2))) \
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10, 2))) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(10, 2)))


In [0]:

# Normalize country names and gender values
seller_df_silver = seller_df_silver.withColumn(
    "country", initcap(col("country"))
).withColumn(
    "sex", upper(col("sex"))
)

# Add a column to categorize the number of sellers
seller_df_silver = seller_df_silver.withColumn(
    "seller_size_category",
    when(col("nbsellers") < 500, "Small")
    .when((col("nbsellers") >= 500) & (col("nbsellers") < 2000), "Medium")
    .otherwise("Large")
)

# Calculate the mean products listed per seller
seller_df_silver = seller_df_silver.withColumn(
    "mean_products_listed_per_seller",
    round(col("totalproductslisted") / col("nbsellers"), 2)
)

# Identify markets with high seller pass rate
seller_df_silver = seller_df_silver.withColumn(
    "high_seller_pass_rate",
    when(col("meansellerpassrate") > 0.75, "High")
    .otherwise("Normal")
)

# Calculate overall mean seller pass rate
mean_pass_rate = (
    seller_df_silver
    .select(round(avg("meansellerpassrate"), 2).alias("avg_pass_rate"))
    .collect()[0]["avg_pass_rate"]
)

# Fill null mean seller pass rate with overall average
seller_df_silver = seller_df_silver.withColumn(
    "meansellerpassrate",
    when(col("meansellerpassrate").isNull(), mean_pass_rate)
    .otherwise(col("meansellerpassrate"))
)


In [0]:
seller_df_silver.show(5)

+---------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+-----------------+-----------------+-------------+--------------------+-------------------------------+---------------------+
|  country|   sex|nbsellers|meanproductssold|meanproductslisted|meansellerpassrate|totalproductssold|totalproductslisted|meanproductsbought|meanproductswished|meanproductsliked|totalbought|totalwished|totalproductsliked|meanfollowers|meanfollows|percentofappusers|percentofiosusers|meanseniority|seller_size_category|mean_products_listed_per_seller|high_seller_pass_rate|
+---------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+----------

In [0]:
seller_df_silver.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("seller_df_silver")


In [0]:
buyer_df_silver.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("buyer_df_silver")


In [0]:
user_df_silver.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("user_df_silver")

In [0]:
countries_df_silver.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("countries_df_silver")

In [0]:
buyer_df_silver.show()

+-----------+------+---------+-------------+------------+----------+---------------+-------------+-----------------+--------------------+----------------------+------------------+-------------------------+---------------------+-------------------+-------------------+------------------+----------------------+----------------------+---------------------+------------------+------------------+-----------------+---------------------+---------------------+--------------------+---------------+------------------+-------------+-------------+----------------+----------------+--------------------+--------------------------+---------------+---------------------+
|    country|buyers|topbuyers|topbuyerratio|femalebuyers|malebuyers|topfemalebuyers|topmalebuyers|femalebuyersratio|topfemalebuyersratio|boughtperwishlistratio|boughtperlikeratio|topboughtperwishlistratio|topboughtperlikeratio|totalproductsbought|totalproductswished|totalproductsliked|toptotalproductsbought|toptotalproductswished|toptotalp

In [0]:
user_df_silver.show(5)

+--------------------+----+---------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+-------------+-------------------+----------------------+-----------------+-----------------+-------------------+---------------+
|      identifierHash|type|  country|language|socialnbfollowers|socialnbfollows|socialProductsLiked|productsListed|productsSold|productspassrate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasanyapp|hasandroidapp|hasiosapp|hasprofilepicture|dayssincelastlogin|seniority|seniorityasmonths|seniorityasyears|countrycode|language_full|civilitytitle_clean|years_since_last_login|account_age_years|account_age_group|    user_descriptor|flag_long_title|
+--------------------+----+---------+--------+------------

In [0]:
countries_df_silver = spark.read.format("parquet") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://landing-zone-2@ecomadlsrakesh.dfs.core.windows.net/countries-raw-2")


In [0]:

countries_df_silver = countries_df_silver \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))


In [0]:
countries_df_silver.show()

+-----------+-------+----------+--------------+------------------+---------------------+-------------+-----------+----------------+--------------+----------------+-------------+--------------------+-----------------+----------------------+-------------------+-------------------+---------------------+----------------+------------------+---------------+------------------+-------------+-------------+----------------+----------------+
|    country|sellers|topsellers|topsellerratio|femalesellersratio|topfemalesellersratio|femalesellers|malesellers|topfemalesellers|topmalesellers|countrysoldratio|bestsoldratio|toptotalproductssold|totalproductssold|toptotalproductslisted|totalproductslisted|topmeanproductssold|topmeanproductslisted|meanproductssold|meanproductslisted|meanofflinedays|topmeanofflinedays|meanfollowers|meanfollowing|topmeanfollowers|topmeanfollowing|
+-----------+-------+----------+--------------+------------------+---------------------+-------------+-----------+----------------

In [0]:
seller_df_silver.repartition(1).write.mode("overwrite").option("header",'true').csv(
    "abfss://landing-zone-2@ecomadlsrakesh.dfs.core.windows.net/all-data-processed"
)