In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
from pyspark.sql.types import *

In [0]:
spark=SparkSession.builder.appName("EcomDataPipeline").getOrCreate()

In [0]:
#/mnt/delta/tables/bronze/users
UserDF=spark.read.format("delta").load("/mnt/delta/tables/bronze/users")

In [0]:
#Normalizing the Country Code
UserDF=UserDF.withColumn("countryCode",upper(col("countryCode")))

In [0]:
# Handling multiple languages elegantly with `expr` and `case when`
# Handling multiple languages elegantly with `expr` and `case when`
UserDF = UserDF.withColumn("language_full", 
                             expr("CASE WHEN language = 'en' THEN 'English' " +
                                  "WHEN language = 'fr' THEN 'French' " +
                                  "ELSE 'Other' END"))

In [0]:
# Correcting potential data entry errors in `gender` column
UserDF=UserDF.withColumn("gender",when(col("gender").startswith("M"),"Male").when(col("gender").startswith("F"),"Female").otherwise("Other"))

In [0]:
# Using `regexp_replace` to clean `civilitytitle` values
UserDF = UserDF.withColumn("civilitytitle_clean", 
                             regexp_replace("civilitytitle", "(Mme|Ms|Mrs)", "Ms"))

In [0]:
# Derive new column `years_since_last_login` from `dayssincelastlogin`
UserDF = UserDF.withColumn("years_since_last_login", col("dayssincelastlogin") / 365)

In [0]:
# Calculate age of account in years and categorize into `account_age_group`
UserDF = UserDF.withColumn("account_age_years", round(col("seniority") / 365, 2))

In [0]:
UserDF=UserDF.withColumn("account_age_group",when(col("account_age_years")<1,"New").when((col("account_age_years")>=1)  & (col("account_age_years")<3),"Intermediate").otherwise("Experienced"))

In [0]:
#Add a column with the current year for Comparision
UserDF=UserDF.withColumn("current_year",year(current_date()))

In [0]:
# Creatively combining strings to form a unique user descriptor
UserDF=UserDF.withColumn("user_descriptor",concat(col("gender"),lit("_"),col("countryCode"),lit("_"),expr("substring(civilitytitle_clean, 1, 3)"),lit("_"),col("language_full")))

In [0]:
UserDF = UserDF.withColumn("flag_long_title", length(col("civilitytitle")) > 10)


In [0]:

UserDF = UserDF.withColumn("hasanyapp", col("hasanyapp").cast("boolean"))
UserDF = UserDF.withColumn("hasandroidapp", col("hasandroidapp").cast("boolean"))
UserDF = UserDF.withColumn("hasiosapp", col("hasiosapp").cast("boolean"))
UserDF = UserDF.withColumn("hasprofilepicture", col("hasprofilepicture").cast("boolean"))


UserDF = UserDF.withColumn("socialnbfollowers", col("socialnbfollowers").cast(IntegerType()))
UserDF = UserDF.withColumn("socialnbfollows", col("socialnbfollows").cast(IntegerType()))

UserDF = UserDF.withColumn("productspassrate", col("productspassrate").cast(DecimalType(10, 2)))
UserDF = UserDF.withColumn("seniorityasmonths", col("seniorityasmonths").cast(DecimalType(10, 2)))
UserDF = UserDF.withColumn("seniorityasyears", col("seniorityasyears").cast(DecimalType(10, 2)))

In [0]:
UserDF= UserDF.withColumn("dayssincelastlogin",when(col("dayssincelastlogin").isNotNull(),col("dayssincelastlogin").cast(IntegerType())).otherwise(0))

In [0]:
UserDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/users")

In [0]:
# /mnt/delta/tables/bronze/buyers
buyersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/buyers")

In [0]:
# Casting Integer columns
integer_columns = [
    'buyers', 'topbuyers', 'femalebuyers', 'malebuyers',
    'topfemalebuyers', 'topmalebuyers', 'totalproductsbought',
    'totalproductswished', 'totalproductsliked', 'toptotalproductsbought',
    'toptotalproductswished', 'toptotalproductsliked'
]

for column_name in integer_columns:
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(IntegerType()))

In [0]:

# Casting Decimal columns
decimal_columns = [
    'topbuyerratio', 'femalebuyersratio', 'topfemalebuyersratio',
    'boughtperwishlistratio', 'boughtperlikeratio', 'topboughtperwishlistratio',
    'topboughtperlikeratio', 'meanproductsbought', 'meanproductswished',
    'meanproductsliked', 'topmeanproductsbought', 'topmeanproductswished',
    'topmeanproductsliked', 'meanofflinedays', 'topmeanofflinedays',
    'meanfollowers', 'meanfollowing', 'topmeanfollowers', 'topmeanfollowing'
]

for column_name in decimal_columns:
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(DecimalType(10, 2)))

In [0]:
buyersDF=buyersDF.withColumn("country",initcap(col("country")))

In [0]:
#calculate the female to male ratio
for col_name in integer_columns:
    buyersDF = buyersDF.fillna({col_name: 0})
# Adding 1 to the malebuyers in the denominator ensures that the calculation avoids division by zero errors and allows the ratio of female to male buyers to be calculated meaningfully,
buyersDF=buyersDF.withColumn("femal_to_male_ratio",round(col("femalebuyers")/(col("malebuyers")+1),2))

#wishlist_to_purchase_ratio
buyersDF=buyersDF.withColumn("wishlist_to_purchase_ratio",round(col("totalproductswished")/(col("totalproductsbought")+1),2))

#Tag Countries with a High engagement Ratio
high_engagement_threshold = 0.5
buyersDF=buyersDF.withColumn("high_engagement",when(col("boughtperwishlistratio")>high_engagement_threshold,True).otherwise(False))

#Flag markets with increasing female buyer participation
buyersDF = buyersDF.withColumn("growing_female_market",
                               when(col("femalebuyersratio") > col("topfemalebuyersratio"), True)
                               .otherwise(False))




In [0]:
buyersDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/buyers")


In [0]:
# /mnt/delta/tables/bronze/sellers
sellersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/sellers")

In [0]:
sellersDF = sellersDF \
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10, 2))) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10, 2))) \
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10, 2))) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(10, 2)))

In [0]:
# Normalize country names and gender values
sellersDF=sellersDF.withColumn("country",initcap(col("country"))).withColumn("sex",upper(col("sex")))


# Add a column to categorize the number of sellers
sellersDF = sellersDF.withColumn("seller_size_category", 
                               when(col("nbsellers") < 500, "Small") \
                               .when((col("nbsellers") >= 500) & (col("nbsellers") < 2000), "Medium") \
                               .otherwise("Large"))

# Calculate the mean products listed per seller as an indicator of seller activity
sellersDF=sellersDF.withColumn("mean_products_listed_per_seller",round(col("totalproductslisted")/col("nbsellers"),2))

# Identify markets with high seller pass rate

sellersDF=sellersDF.withColumn("high_seller_pass_rate",when(col("meansellerpassrate")>=0.75,"High").otherwise("Normal"))

mean_pass_rate = sellersDF.select(round(avg("meansellerpassrate"), 2).alias("avg_pass_rate")).collect()[0]["avg_pass_rate"]

#replace the null values in the meansellerpassrate column with the mean pass rate

sellersDF = sellersDF.withColumn("meansellerpassrate",
                                 when(col("meansellerpassrate").isNull(), mean_pass_rate)
                                 .otherwise(col("meansellerpassrate")))

sellersDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/sellers")



In [0]:
countriesDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/countries")

countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

In [0]:
countriesDF = countriesDF.withColumn("country", initcap(col("country")))


In [0]:
# Calculating the ratio of top sellers to total sellers
countriesDF = countriesDF.withColumn("top_seller_ratio",round(col("topsellers")/col("sellers"),2))

# countriesDF countries with a high ratio of female sellers
countriesDF=countriesDF.withColumn("high_female_seller_ratio",when(col("femalesellersratio")>0.5,True).otherwise(False))

# Adding a performance indicator based on the sold/listed ratio
countriesDF = countriesDF.withColumn("performance_indicator",round(col("toptotalproductssold")/col("toptotalproductslisted"),2))

# Flag countries with exceptionally high performance
performance_threshold = 0.8
countriesDF = countriesDF.withColumn("high_performance", 
                                        when(col("performance_indicator") > performance_threshold, True).otherwise(False))
                                    
countriesDF = countriesDF.withColumn("activity_level",
                                       when(col("meanofflinedays") < 30, "Highly Active")
                                       .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
                                       .otherwise("Low Activity"))

countriesDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/countries")



In [0]:
User_table_path = "/mnt/delta/tables/silver/users"
Buyer_table_path = "/mnt/delta/tables/silver/buyers"
Seller_table_path = "/mnt/delta/tables/silver/sellers"
Country_table_path = "/mnt/delta/tables/silver/countries"

# Function to get the row count of a Delta table
def get_row_count(delta_table_path):
    df = spark.read.format("delta").load(delta_table_path)
    return df.count()

# Get the number of rows in each table
num_rows_User = get_row_count(User_table_path)
num_rows_Buyer = get_row_count(Buyer_table_path)
num_rows_Seller = get_row_count(Seller_table_path)
num_rows_Country = get_row_count(Country_table_path)

# Print the counts
print(f"The Delta table at {User_table_path} has {num_rows_User} rows.")
print(f"The Delta table at {Buyer_table_path} has {num_rows_Buyer} rows.")
print(f"The Delta table at {Seller_table_path} has {num_rows_Seller} rows.")
print(f"The Delta table at {Country_table_path} has {num_rows_Country} rows.")