##Data Transformations

Import all required pyspark packages

In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType
from pyspark.sql.window import Window 

Connection to Azure Data Lake Container

In [0]:
configs = {spark.conf.set("fs.azure.account.auth.type.azdatalakegen2storage.dfs.core.windows.net", "OAuth"),
spark.conf.set("fs.azure.account.oauth.provider.type.azdatalakegen2storage.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"),
spark.conf.set("fs.azure.account.oauth2.client.id.azdatalakegen2storage.dfs.core.windows.net", "60222cc4-24d2-466d-b94d-f7babcd89224"),
spark.conf.set("fs.azure.account.oauth2.client.secret.azdatalakegen2storage.dfs.core.windows.net", "w.y8Q~iVDynNdBzfAKD7sHnsZAXs6YeKzP6mhbhI"),
spark.conf.set("fs.azure.account.oauth2.client.endpoint.azdatalakegen2storage.dfs.core.windows.net", "https://login.microsoftonline.com/c3ba5346-12c8-41bd-9847-5a6b18229d4b/oauth2/token")}

# Mount the storage account
# dbutils.fs.mount(
#     source = "abfss://bronze@azdatalakegen2storage.dfs.core.windows.net", # container@storageacc
#     mount_point = "/mnt/bronze",
#     extra_configs = configs)

In [0]:
dbutils.fs.mounts()
# dbutils.fs.unmount('/mnt/bronze')

In [0]:
# dbutils.fs.refreshMounts()

In [0]:
%fs
ls "/mnt/bronze/"

Read Data from Bronze Layer

In [0]:
df_atheletes = spark.read\
    .format("parquet")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("/mnt/bronze/dbo/Athletes")

df_coaches = spark.read\
    .format("parquet")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load("/mnt/bronze/dbo/Coaches")

df_entriesgender = spark.read\
    .format("parquet")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load("/mnt/bronze/dbo/EntriesGender")

df_medals = spark.read\
    .format("parquet")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load("/mnt/bronze/dbo/Medals")

df_teams = spark.read\
    .format("parquet")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load("/mnt/bronze/dbo/Teams")
     

Casting data type for relevant dataframes

In [0]:
# Casting columns from the dataset
# df_medals = df_medals.withColumn("Gold", col("Gold").cast("int"))
# df_medals = df_medals.withColumn("Rank_by_Total", col("Rank").cast("int"))
# df_medals = df_medals.withColumn("Rank", col("Rank").cast("int"))

Athletes Data

In [0]:
df_atheletes = df_atheletes.withColumn("PersonName", initcap(col("PersonName"))) \
                           .withColumn("Country", upper(col("Country"))) \
                           .withColumn("Discipline", initcap(col("Discipline")))

# Count athletes per country & discipline
atheletes_counts_df = df_atheletes.groupBy("Country", "Discipline") \
                                 .agg(count("PersonName").alias("AthleteCount"))\
                                     .orderBy(col("AthleteCount").desc())
atheletes_counts_df.display()

Broadcast Join to calculate Weighted score, Rank by Country, Athlete's contribution and Aggregate total medals. 

In [0]:
# Step 1: Broadcast Join (Optimize smaller DataFrame joins)
from pyspark.sql.functions import broadcast

broadcast_athelete_teams = df_atheletes.join(broadcast(df_teams), ["Country", "Discipline"], "left")

# Step 2: Calculate Weighted Score (Gold = 5 points, Silver = 3, Bronze = 1)
medals_score_df = df_medals.withColumn("Weighted_Score", (col("Gold") * 5) + (col("Silver") * 3) + (col("Bronze") * 1))

# Step 3: Define Ranking Window and Rank Countries by Weighted Score
rank_window = Window.orderBy(col("Weighted_Score").desc())
medals_df = medals_score_df.withColumn("Rank_by_Weighted", rank().over(rank_window))

# Step 4: Aggregate Total Medal Counts per Country
Total_Medal_Counts_df = medals_df.groupBy("TeamCountry").agg(
    sum("Gold").alias("Total_Gold"),
    sum("Silver").alias("Total_Silver"),
    sum("Bronze").alias("Total_Bronze"),
    sum("Weighted_Score").alias("Total_Weighted_Score")
).orderBy(col("Total_Weighted_Score").desc())

# Show the transformed data
Total_Medal_Counts_df.display()

In [0]:
Total_Medal_Counts_df.write.format('parquet')\
            .mode('append')\
            .option("path", "abfss://silver@azdatalakegen2storage.dfs.core.windows.net/Athletes")\
            .save()

Medals Data

In [0]:
# Find top gold medal by countries 
top_gold_countries_df = df_medals.select("TeamCountry", "Gold") \
                 .orderBy(col("Gold").desc())

top_gold_countries_df.display()

Coaches Data

In [0]:
# Transformation Logic
coaches_transformed_df = df_coaches.withColumn("Name",initcap(col("Name")))\
    .withColumn("Name",split(col("Name"), " ")[0]) \
    .withColumn("Country", upper(col("Country"))) \
    .withColumn("Discipline", initcap(col("Discipline"))) \
    .withColumn("Event", regexp_replace(col("Event")," ", "Unknown")) \

# Show Transformed Data
coaches_transformed_df.select( "Name", "Country", "Discipline", "Event").display()

In [0]:
coaches_transformed_df.write.format('parquet')\
            .mode('append')\
            .option("path", "abfss://silver@azdatalakegen2storage.dfs.core.windows.net/Coaches")\
            .save()

Gender Data

In [0]:
# Transformation Logic
df_entriesgender_percent_df = df_entriesgender.withColumn("Discipline", upper(col("Discipline"))) \
    .withColumn("Female_Percentage", round((col("Female") / col("Total")) * 100, 2)) \
    .withColumn("Male_Percentage", round((col("Male") / col("Total")) * 100, 2)) \
    .withColumn("Gender_Category", 
                when(col("Female_Percentage") > 60, "Female-Dominated")
                .when(col("Male_Percentage") > 60, "Male-Dominated")
                .otherwise("Balanced"))

# Show Transformed Data
df_entriesgender_percent_df.display()

In [0]:
df_entriesgender_percent_df.write.format('parquet')\
            .mode('append')\
            .option("path", "abfss://silver@azdatalakegen2storage.dfs.core.windows.net/EntriesGender")\
            .save()

Medal Rankings By Country

In [0]:
# Transformation Logic
medals_df = medals_df.withColumn(
    "Weighted_Score", (col("Gold") * 5) + (col("Silver") * 3) + (col("Bronze") * 1)
)

# Define ranking window based on Weighted Score
rank_window = Window.orderBy(col("Weighted_Score").desc())

# Rank by Weighted Score
medals_score_df = medals_df.withColumn("Rank_by_Weighted", rank().over(rank_window))

# Categorize teams based on their performance
medals_performance_df = medals_score_df.withColumn(
    "Performance_Category", 
    when(col("Rank_by_Weighted") <= 5, "Top Performer")
    .when((col("Rank_by_Weighted") > 5) & (col("Rank_by_Weighted") <= 15), "Moderate Performer")
    .otherwise("Low Performer")
)

# Show Transformed Data
medals_performance_df.select("Rank", "TeamCountry", "Gold", "Silver", "Bronze", "Total", "Performance_Category").display()

Teams Data

In [0]:
# Transformation Logic
Teams_case_df = df_teams.withColumn("Discipline", upper(col("Discipline"))) \
    .withColumn("Country", upper(col("Country"))) \
    .withColumn("Event", regexp_replace(col("Event"), " ", "_")) \
    .withColumn("Team_ID", monotonically_increasing_id())  # Create unique Team ID

# Show Transformed Data
Teams_case_df.display(truncate=False)

In [0]:
Teams_case_df.write.format('parquet')\
            .mode('append')\
            .option("path", "abfss://silver@azdatalakegen2storage.dfs.core.windows.net/Teams")\
            .save()

Gold/Silver/Bronze Medalists

In [0]:
# Perform Join on Country (TeamCountry) 
joined_df = df_teams.join(df_medals, df_teams.Country == df_medals.TeamCountry, "left")

# Add additional meaningful transformations
joined_df = joined_df.withColumn(
    "Medal_Performance", 
    when(col("Gold") > 0, "Gold Medalist")
    .when(col("Silver") > 0, "Silver Medalist")
    .when(col("Bronze") > 0, "Bronze Medalist")
    .otherwise("No Medals")
)

joined_df = joined_df.withColumn(
    "Team_Category", 
    when(col("Rank_by_Total") <= 5, "Top 5 Teams")
    .when(col("Rank_by_Total") <= 15, "Top 15 Teams")
    .otherwise("Other Teams")
)

# Show Final Result
joined_df.select("Discipline", "Country", "Event", "Gold", "Silver", "Bronze", "Total", "Rank_by_Total", "Medal_Performance", "Team_Category").where(col("Gold") != 0).orderBy(col("Rank_by_Total").asc()).display(truncate=False)

In [0]:
joined_df.write.format('parquet')\
            .mode('append')\
            .option("path", "abfss://silver@azdatalakegen2storage.dfs.core.windows.net/Medals")\
            .save()