In [0]:
from pyspark.sql.functions import col, rank, countDistinct, desc, count, coalesce
from pyspark.sql.window import Window

In [0]:
configs = {  # replace with Key Vault secrets
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": "3da655a9-586c-42b5-a4a4-4547bd0fdb02",
  "fs.azure.account.oauth2.client.secret": "wr38Q~yUgYXk_yKDxS4Sm7W7hAxoRiV5iFtgwa8l",
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/9c0d4336-117e-4476-93d8-44a062f2d928/oauth2/token",
  "fs.azure.createRemoteFileSystemDuringInitialization": "true"
}

# Check if directory is already mounted
mounted_dirs = [mount.mountPoint for mount in dbutils.fs.mounts()]
mount_point = "/mnt/tokyoolympics"

if mount_point not in mounted_dirs:
  # Mount Storage Account
  dbutils.fs.mount(
    source="abfss://tokyo-olympics-data@tokyoolympicskeenu.dfs.core.windows.net",  # Azure Blob File System Secure, container@storageaccount
    mount_point=mount_point,
    extra_configs=configs
  )
else:
  print(f"Directory {mount_point} is already mounted.")

Directory /mnt/tokyoolympics is already mounted.


In [0]:
%fs
ls "/mnt/tokyoolympics"

path,name,size,modificationTime
dbfs:/mnt/tokyoolympics/raw-data/,raw-data/,0,1709222251000
dbfs:/mnt/tokyoolympics/transformed-data/,transformed-data/,0,1709222265000


In [0]:
spark

In [0]:
# Read raw datasets from mounted raw-data directory
athletes = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/tokyoolympics/raw-data/Athletes.csv")
coaches = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/tokyoolympics/raw-data/Coaches.csv")
entries_gender = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/tokyoolympics/raw-data/EntriesGender.csv")
medals = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/tokyoolympics/raw-data/Medals.csv")
teams = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/tokyoolympics/raw-data/Teams.csv")

In [0]:
# Rename column to use underscores instead of spaces (for Synapse Analytics)
medals = medals.withColumnRenamed("Rank by Total", "Rank_by_Total")

medals.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Team_Country: string (nullable = true)
 |-- Gold: integer (nullable = true)
 |-- Silver: integer (nullable = true)
 |-- Bronze: integer (nullable = true)
 |-- Total: integer (nullable = true)
 |-- Rank_by_Total: integer (nullable = true)



In [0]:
# Find the number of events each country participated in by number of entries in teams
country_events = teams.groupBy("Country").agg(count("Country").alias("EventCount")).orderBy(desc("EventCount"))

# Find the number of athletes each country produced
country_athletes = athletes.groupBy("Country").agg(countDistinct("PersonName").alias("AthleteCount")).orderBy(desc("AthleteCount"))

# Join country_events and country_athletes on "Country"
countries = country_events.join(country_athletes, "Country")

# Join countries and medals on "Country" for countries and "Team_Country" for medals
countries = medals.join(countries, countries["Country"] == medals["Team_Country"], "outer")
countries = countries.withColumn("Country", coalesce("Country", "Team_Country")).drop("Team_Country")

countries = countries.fillna(0, subset=["Gold", "Silver", "Bronze", "Total"])

loser_rank = countries.select("Rank").dropna(subset=["Rank"]).count() + 1
countries = countries.fillna(loser_rank, subset=["Rank", "Rank_by_Total"])


countries = countries.withColumn("Rank_by_Gold", rank().over(Window.orderBy(desc("Gold"))))

countries = countries.select("Rank", "Country", "EventCount", "AthleteCount", "Gold", "Silver", "Bronze", "Total", "Rank_by_Total", "Rank_by_Gold").orderBy("Rank")


countries.show(countries.count(), truncate=False)
countries.count()

+----+--------------------------+----------+------------+----+------+------+-----+-------------+------------+
|Rank|Country                   |EventCount|AthleteCount|Gold|Silver|Bronze|Total|Rank_by_Total|Rank_by_Gold|
+----+--------------------------+----------+------------+----+------+------+-----+-------------+------------+
|1   |United States of America  |47        |614         |39  |41    |33    |113  |1            |1           |
|2   |People's Republic of China|33        |398         |38  |32    |18    |88   |2            |2           |
|3   |Japan                     |48        |585         |27  |14    |17    |58   |5            |3           |
|4   |Great Britain             |28        |366         |22  |21    |22    |65   |4            |4           |
|5   |ROC                       |34        |318         |20  |28    |23    |71   |3            |5           |
|6   |Australia                 |35        |470         |17  |7     |22    |46   |6            |6           |
|7   |Neth

103

In [0]:
disciplines = teams.groupBy("Discipline").agg(countDistinct("Country").alias("CountryCount")).orderBy(desc("CountryCount"))

# Join disciplines and entries_gender on "Discipline"
disciplines = disciplines.join(entries_gender, "Discipline", "outer").withColumnRenamed("Total", "AthleteCount")

# Find proportion of male:female for each discipline
disciplines = disciplines.withColumn('Female_Proportion', disciplines['Female'] / disciplines['AthleteCount'])

disciplines = disciplines.orderBy(desc("CountryCount"))

disciplines.show()
disciplines.printSchema()

+-------------------+------------+------+----+------------+-------------------+
|         Discipline|CountryCount|Female|Male|AthleteCount|  Female_Proportion|
+-------------------+------------+------+----+------------+-------------------+
|          Athletics|          33|   969|1072|        2041| 0.4747672709456149|
|            Archery|          31|    64|  64|         128|                0.5|
|           Swimming|          30|   361| 418|         779| 0.4634146341463415|
|       Table Tennis|          25|    86|  86|         172|                0.5|
|           Football|          24|   264| 344|         608| 0.4342105263157895|
|      Cycling Track|          23|    90|  99|         189|0.47619047619047616|
|   Beach Volleyball|          23|    48|  48|          96|                0.5|
|  Artistic Swimming|          22|   105|   0|         105|                1.0|
|           Handball|          18|   168| 168|         336|                0.5|
|          Triathlon|          18|    55

In [0]:
# Write transformed data to mounted transformed-data directory
countries.write.format('csv').mode('overwrite').option('header', 'true').save('/mnt/tokyoolympics/transformed-data/countries')
disciplines.write.format('csv').mode('overwrite').option('header', 'true').save('/mnt/tokyoolympics/transformed-data/disciplines')