In [0]:
from pyspark.sql.functions import split
dbutils.widgets.text("silver_table","")
dbutils.widgets.text("silver_schema","")
dbutils.widgets.text("bronze_schema","")
dbutils.widgets.text("bronze_table","")
dbutils.widgets.text("loadID","")
dbutils.widgets.text("env","")
dbutils.widgets.text("transformation_type","")
silver_table = dbutils.widgets.get("silver_table").strip().split(".")[1]
silver_schema = dbutils.widgets.get("silver_schema").strip()
bronze_schema = dbutils.widgets.get("bronze_schema").strip()
bronze_table = dbutils.widgets.get("bronze_table").strip().split(".")[1]
loadID = dbutils.widgets.get("loadID").strip()
env = dbutils.widgets.get("env").strip()
transformation_type = dbutils.widgets.get("transformation_type").strip()

In [0]:
spark.sql(f"use hive_metastore.{env}_bronze")

In [0]:
df = spark.sql(f" select * from {bronze_schema}.{bronze_table} WHERE loadID = '{loadID}'")

In [0]:
from pyspark.sql.types import DecimalType, IntegerType, StringType, BooleanType, FloatType
from pyspark.sql.functions import expr, when, col, upper, regexp_replace, round, year, current_date, concat, lit, initcap, avg 

In [0]:
from pyspark.sql.types import DecimalType, IntegerType, StringType, BooleanType, FloatType
from pyspark.sql.functions import expr, when, col, upper, regexp_replace, round, year, current_date, concat, lit
def users_transformation(users_df):
    users_df = users_df.withColumn("countryCode", upper(col("countryCode")))
    # Normalizing country value for all the tables
    users_df = users_df.withColumn("country", initcap(col("country")))

    # Handling multiple languages elegantly with `expr` and `case when`
    users_df = users_df.withColumn("language_full", expr("case \
                                                     when language = 'en' then 'English' \
                                                     when language = 'fr' then 'French' \
                                                     else 'Other' end"))
    # Correcting potential data entry errors in `gender` column
    users_df = users_df.withColumn("gender", when(col("gender")=='M', 'MALE') \
                                       .when(col("gender")=='F', 'FEMALE') \
                                           .otherwise('Other'))
    
    # Using `regexp_replace` to clean `civilitytitle` values
    users_df = users_df.withColumn("civilitytitle_clean", regexp_replace(regexp_replace(col("civilitytitle"), "(mrs|miss)", "Ms"), "mr", "Mr"))

    # Derive new column `years_since_last_login` from `dayssincelastlogin`
    users_df = users_df.withColumn("years_since_last_login", round(col("dayssincelastlogin")/365,2)).withColumn("years_since_last_login", col("years_since_last_login").cast(DecimalType(10, 2)))

    # Calculate age of account in years and categorize into `account_age_group`
    users_df = users_df.withColumn("account_age_group", round(col("seniority")/365,2))

    users_df = users_df.withColumn(
    "account_age_group",
    when(col("account_age_group") < 4, "New") \
    .when((col("account_age_group") >= 4) & (col("account_age_group") <= 8), "Intermediate") \
    .otherwise("Experienced"))

    # Add a column with the current year for comparison
    users_df = users_df.withColumn("current_year", year(current_date()))

    # Creatively combining strings to form a unique user descriptor
    users_df = users_df.withColumn("user_descriptor", \
                             concat(col("gender"), lit("_"), \
                                    col("countrycode"), lit("_"), \
                                    expr("substring(civilitytitle_clean, 0, 3)"), lit("_"), \
                                    col("language_full")))
    # change the remaining data types
    users_df = users_df.withColumn("hasanyapp", col("hasanyapp").cast(BooleanType()))
    users_df = users_df.withColumn("hasandroidapp", col("hasandroidapp").cast(BooleanType()))
    users_df = users_df.withColumn("hasiosapp", col("hasiosapp").cast(BooleanType()))
    users_df = users_df.withColumn("hasprofilepicture", col("hasprofilepicture").cast(BooleanType()))


    users_df = users_df.withColumn("socialnbfollowers", col("socialnbfollowers").cast(IntegerType()))
    users_df = users_df.withColumn("socialnbfollows", col("socialnbfollows").cast(IntegerType()))

    users_df = users_df.withColumn("productspassrate", col("productspassrate").cast(DecimalType(10, 2)))
    users_df = users_df.withColumn("seniorityasmonths", col("seniorityasmonths").cast(DecimalType(10, 2)))
    users_df = users_df.withColumn("seniorityasyears", col("seniorityasyears").cast(DecimalType(10, 2)))

    users_df = users_df.withColumn("dayssincelastlogin", when(col("dayssincelastlogin").isNotNull(), col("dayssincelastlogin").cast(IntegerType())).otherwise(0))
    return users_df
    

In [0]:
from pyspark.sql.types import DecimalType, IntegerType, StringType, BooleanType, FloatType
from pyspark.sql.functions import expr, when, col, upper, regexp_replace, round, year, current_date, concat, lit
def buyers_transformation(buyers_df):
    # Casting Integer columns
    integer_columns = [
    'buyers', 'topbuyers', 'femalebuyers', 'malebuyers',
    'topfemalebuyers', 'topmalebuyers', 'totalproductsbought',
    'totalproductswished', 'totalproductsliked', 'toptotalproductsbought',
    'toptotalproductswished', 'toptotalproductsliked'
    ]

    for column_name in integer_columns:
        buyers_df = buyers_df.withColumn(column_name, col(column_name).cast(IntegerType()))
        buyers_df = buyers_df.fillna({column_name:0})

    # Casting Decimal columns
    decimal_columns = [
    'topbuyerratio', 'femalebuyersratio', 'topfemalebuyersratio',
    'boughtperwishlistratio', 'boughtperlikeratio', 'topboughtperwishlistratio',
    'topboughtperlikeratio', 'meanproductsbought', 'meanproductswished',
    'meanproductsliked', 'topmeanproductsbought', 'topmeanproductswished',
    'topmeanproductsliked', 'meanofflinedays', 'topmeanofflinedays',
    'meanfollowers', 'meanfollowing', 'topmeanfollowers', 'topmeanfollowing'
    ]

    for column_name in decimal_columns:
        buyers_df = buyers_df.withColumn(column_name, col(column_name).cast(DecimalType(10, 2)))
        buyers_df = buyers_df.fillna({column_name:0})
    
    # Normalize country column values for all tables
    buyers_df = buyers_df.withColumn("country", initcap(col("country")))

    # Calculate the ratio of female to male buyers
    buyers_df = buyers_df.withColumn("female_to_male_ratio", round(col("femalebuyers")/(col("malebuyers")+1),2)).withColumn("female_to_male_ratio", col("female_to_male_ratio").cast(DecimalType(10, 2)))

    # Determine the market potential by comparing wishlist and purchases
    buyers_df = buyers_df.withColumn("wishlist_to_purchase_ratio", 
                               round(col("totalproductswished") / (col("totalproductsbought") + 1), 2)).withColumn("wishlist_to_purchase_ratio", col("wishlist_to_purchase_ratio").cast(DecimalType(10, 2)))

    # Tag countries with a high engagement ratio
    high_engagement_threshold = 0.5
    buyers_df = buyers_df.withColumn("high_engagement",
                               when(col("boughtperwishlistratio") > high_engagement_threshold, True)
                               .otherwise(False))
                               
    # Flag markets with increasing female buyer participation
    buyers_df = buyers_df.withColumn("growing_female_market",
                               when(col("femalebuyersratio") > col("topfemalebuyersratio"), True)
                               .otherwise(False))
    
    return buyers_df

In [0]:
from pyspark.sql.types import DecimalType, IntegerType, StringType, BooleanType, FloatType
from pyspark.sql.functions import expr, when, col, upper, regexp_replace, round, year, current_date, concat, lit, initcap, avg 

def sellers_transformation(sellers_df):
    # changing the data types of columns
    sellers_df = sellers_df \
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10, 2))) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10, 2))) \
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10, 2))) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(10, 2)))

    # Normalize country names and gender values
    sellers_df = sellers_df.withColumn("country", initcap(col("country"))) \
                                                .withColumn("sex", upper(col("sex")))

    #Add a column to categorize the number of sellers
    sellers_df = sellers_df.withColumn("seller_size_category", 
                               when(col("nbsellers") < 500, "Small") \
                               .when((col("nbsellers") >= 500) & (col("nbsellers") < 2000), "Medium") \
                               .otherwise("Large"))

    # Calculate the mean products listed per seller as an indicator of seller activity
    sellers_df = sellers_df.withColumn("mean_products_listed_per_seller", 
                               round(col("totalproductslisted") / col("nbsellers"), 2)).withColumn("mean_products_listed_per_seller", col("mean_products_listed_per_seller").cast(DecimalType(10, 2)))
    
    # Identify markets with high seller pass rate
    sellers_df = sellers_df.withColumn("high_seller_pass_rate", 
                               when(col("meansellerpassrate") > 0.75, "High") \
                               .otherwise("Normal"))
    
    # mean of meansellerpassrate to replace the null value of column meansellerpassrate
    mean_pass_rate = sellers_df.select(round(avg(col("meansellerpassrate")),2)).collect()[0][0]

    # sellers_df = sellers_df.fillna({"meansellerpassrate": mean_pass_rate})
    sellers_df = sellers_df.withColumn("meansellerpassrate", when(col("meansellerpassrate").isNull(), mean_pass_rate).otherwise(col("meansellerpassrate")))

    return sellers_df

In [0]:
from pyspark.sql.types import DecimalType, IntegerType, StringType, BooleanType, FloatType
from pyspark.sql.functions import expr, when, col, upper, regexp_replace, round, year, current_date, concat, lit, initcap, avg 

def countries_transformation(countries_df):

    # changing the datatypes of columns
    countries_df = countries_df \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

    # Normalize country names in countries table
    countries_df = countries_df.withColumn("country", initcap(col("country")))

    # Calculating the ratio of top sellers to total sellers
    countries_df = countries_df.withColumn("top_seller_ratio", 
                                        round(col("topsellers") / col("sellers"), 2)).withColumn("top_seller_ratio", col("top_seller_ratio").cast(DecimalType(10, 2)))

    # countriesDF countries with a high ratio of female sellers
    countries_df = countries_df.withColumn("high_female_seller_ratio", 
                                        when(col("femalesellersratio") > 0.5, True).otherwise(False))

    # Adding a performance indicator based on the sold/listed ratio
    countries_df = countries_df.withColumn("performance_indicator", 
                                        round(col("toptotalproductssold") / (col("toptotalproductslisted") + 1), 2)).withColumn("performance_indicator", col("performance_indicator").cast(DecimalType(10, 2)))

    # Flag countries with exceptionally high performance
    performance_threshold = 0.8
    countries_df = countries_df.withColumn("high_performance", 
                                        when(col("performance_indicator") > performance_threshold, True).otherwise(False))

    countries_df = countries_df.withColumn("activity_level",
                                       when(col("meanofflinedays") < 30, "Highly Active")
                                       .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
                                       .otherwise("Low Activity"))
    
    return countries_df

In [0]:
def select_transformation(transformation_type, df):
    if transformation_type == "Ecommerce.users":
        transformed_df = users_transformation(df)
    elif transformation_type == "Ecommerce.buyers":
        transformed_df = buyers_transformation(df)
    elif transformation_type == "Ecommerce.sellers":
        transformed_df = sellers_transformation(df)
    elif transformation_type == "Ecommerce.countries":
        transformed_df = countries_transformation(df)
    else:
        dbutils.notebook.exit("Error: Invalid Transformation Type not mentioned")

    # Save the transformed DataFrame to the table
    transformed_df.write.mode("overwrite").saveAsTable(f"{silver_schema}.{silver_table}")

In [0]:
select_transformation(transformation_type, df)

In [0]:
from pyspark.sql.functions import count
dbutils.notebook.exit([df.count()])