In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType, ArrayType

In [0]:
spark = SparkSession.builder.appName("Paris Olympic Tranformation").getOrCreate()

# READ

In [0]:
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": "'client_id'",
    "fs.azure.account.oauth2.client.secret": "'secret_key'",
    "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/'tenant_id'/oauth2/token"
}

mount_point = "/mnt/paris-olympic-data"
if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    print(f"{mount_point} is already mounted")
else:
    dbutils.fs.mount(
        source="abfss://paris-olympic-data@parisolympic1.dfs.core.windows.net/",
        mount_point=mount_point,
        extra_configs=configs
    )

/mnt/paris-olympic-data is already mounted


In [0]:
%fs 
ls "/mnt/paris-olympic/raw-data"

path,name,size,modificationTime
dbfs:/mnt/paris-olympic/raw-data/athletes.csv,athletes.csv,7223512,1740826755000
dbfs:/mnt/paris-olympic/raw-data/coaches.csv,coaches.csv,93472,1740826770000
dbfs:/mnt/paris-olympic/raw-data/medals.csv,medals.csv,2974,1740826799000
dbfs:/mnt/paris-olympic/raw-data/teams.csv,teams.csv,444984,1740826786000


In [0]:
athleteDF = spark.read.format("csv").option("header", "true").load("/mnt/paris-olympic/raw-data/athletes.csv")
coachDF = spark.read.format("csv").option("header", "true").load("/mnt/paris-olympic/raw-data/coaches.csv")
medalsDF = spark.read.format("csv").option("header", "true").load("/mnt/paris-olympic/raw-data/medals.csv")
teamsDF = spark.read.format("csv").option("header", "true").load("/mnt/paris-olympic/raw-data/teams.csv")

# TRANSFORM

## Athletes

In [0]:
athleteDF.show(5)

+-------+-------+-----------------+-------------+-----------------+------+--------+------------+-------+------------+-----------+----------------+----------------+------+------+-------------+--------------------+----------+-----------+-------------+---------------+-----------------+----------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+
|   code|current|             name|   name_short|          name_tv|gender|function|country_code|country|country_long|nationality|nationality_long|nationality_code|height|weight|  disciplines|              events|birth_date|birth_place|birth_country|residence_place|residence_country|  nickname|             hobbies|occupation|           education|              family|                lang|               coach|              reason|                he

In [0]:
athleteDF = athleteDF.select(
    col("code").cast(IntegerType()), 
    col("current").cast(BooleanType()), 
    col("name"), col("gender"), 
    col("function"), 
    col("country_code"), 
    explode(array(regexp_replace(col("disciplines"), r'[\[\]\',]', ''))).alias("disciplines"),
    col("birth_date").cast(DateType())
)

In [0]:
athleteDF.show(5)

+-------+-------+-----------------+------+--------+------------+-----------+----------+
|   code|current|             name|gender|function|country_code|disciplines|birth_date|
+-------+-------+-----------------+------+--------+------------+-----------+----------+
|1532872|   true| ALEKSANYAN Artur|  Male| Athlete|         ARM|  Wrestling|1991-10-21|
|1532873|   true|   AMOYAN Malkhas|  Male| Athlete|         ARM|  Wrestling|1999-01-22|
|1532874|   true|  GALSTYAN Slavik|  Male| Athlete|         ARM|  Wrestling|1996-12-21|
|1532944|   true|HARUTYUNYAN Arsen|  Male| Athlete|         ARM|  Wrestling|1999-11-22|
|1532945|   true|  TEVANYAN Vazgen|  Male| Athlete|         ARM|  Wrestling|1999-10-27|
+-------+-------+-----------------+------+--------+------------+-----------+----------+
only showing top 5 rows



In [0]:
athleteDF.printSchema()

root
 |-- code: integer (nullable = true)
 |-- current: boolean (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- function: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- disciplines: string (nullable = true)
 |-- birth_date: date (nullable = true)



## Coaches

In [0]:
coachDF.show(5)

+-------+-------+-------------------+------+----------+--------+------------+-------+--------------------+-----------------+------+----------+
|   code|current|               name|gender|  function|category|country_code|country|        country_long|      disciplines|events|birth_date|
+-------+-------+-------------------+------+----------+--------+------------+-------+--------------------+-----------------+------+----------+
|1533246|   True|     PEDRERO Ofelia|Female|     Coach|       C|         MEX| Mexico|              Mexico|Artistic Swimming|  Team|1988-03-28|
|1535775|   True|   RADHI SHENAISHIL|  Male|Head Coach|       C|         IRQ|   Iraq|                Iraq|         Football|   Men|1965-07-01|
|1536055|   True|AFLAKIKHAMSEH Majid|  Male|     Coach|       C|         IRI|IR Iran|Islamic Republic ...|        Taekwondo|  null|1973-08-26|
|1536059|   True|    YOUSEFY Mehrdad|  Male|     Coach|       C|         IRI|IR Iran|Islamic Republic ...|        Taekwondo|  null|1972-06-12|

In [0]:
coachDF = coachDF.select(
    col("code").cast(IntegerType()), 
    col("current").cast(BooleanType()), 
    col("name"), 
    col("gender"), 
    col("function"), 
    col("category"), 
    col("country_code"),  
    col("disciplines"), 
    col("birth_date").cast(DateType())
)

In [0]:
coachDF.printSchema()

root
 |-- code: integer (nullable = true)
 |-- current: boolean (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- function: string (nullable = true)
 |-- category: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- disciplines: string (nullable = true)
 |-- birth_date: date (nullable = true)



## Teams

In [0]:
teamsDF.show(5)

+-----------------+-------+--------------------+-----------+------------+-------------+--------------------+----------+----------------+----------+--------------------+-------+--------------------+------------+-------------+-----------+
|             code|current|                team|team_gender|country_code|      country|        country_long|discipline|disciplines_code|    events|            athletes|coaches|      athletes_codes|num_athletes|coaches_codes|num_coaches|
+-----------------+-------+--------------------+-----------+------------+-------------+--------------------+----------+----------------+----------+--------------------+-------+--------------------+------------+-------------+-----------+
|ARCMTEAM3---CHN01|   True|People's Republic...|          M|         CHN|        China|People's Republic...|   Archery|             ARC|Men's Team|['KAO Wenchao', '...|   null|['1913366', '1913...|         3.0|         null|       null|
|ARCMTEAM3---COL01|   True|            Colombia|    

In [0]:
teamsDF = teamsDF.withColumn(
    "athletes_codes",
    split(regexp_replace(col("athletes_codes"), r'[\[\]\']', ''), ",").cast(ArrayType(IntegerType()))
)

In [0]:
teamsDF = teamsDF.select(
    col("code"),
    col("current"),
    col("team_gender"),
    col("country_code"),
    col("discipline"),
    explode(col("athletes_codes")).alias("athletes_codes")
)

In [0]:
teamsDF.show(5)

+-----------------+-------+-----------+------------+----------+--------------+
|             code|current|team_gender|country_code|discipline|athletes_codes|
+-----------------+-------+-----------+------------+----------+--------------+
|ARCMTEAM3---CHN01|   True|          M|         CHN|   Archery|       1913366|
|ARCMTEAM3---CHN01|   True|          M|         CHN|   Archery|       1913367|
|ARCMTEAM3---CHN01|   True|          M|         CHN|   Archery|       1913369|
|ARCMTEAM3---COL01|   True|          M|         COL|   Archery|       1935642|
|ARCMTEAM3---COL01|   True|          M|         COL|   Archery|       1543412|
+-----------------+-------+-----------+------------+----------+--------------+
only showing top 5 rows



In [0]:
teamsDF.printSchema()

root
 |-- code: string (nullable = true)
 |-- current: string (nullable = true)
 |-- team_gender: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- discipline: string (nullable = true)
 |-- athletes_codes: integer (nullable = true)



## Medals

In [0]:
medalsDF.show(5)

+------------+-------------+--------------------+----------+------------+------------+-----+
|country_code|      country|        country_long|Gold Medal|Silver Medal|Bronze Medal|Total|
+------------+-------------+--------------------+----------+------------+------------+-----+
|         USA|United States|United States of ...|        40|          44|          42|  126|
|         CHN|        China|People's Republic...|        40|          27|          24|   91|
|         JPN|        Japan|               Japan|        20|          12|          13|   45|
|         AUS|    Australia|           Australia|        18|          19|          16|   53|
|         FRA|       France|              France|        16|          26|          22|   64|
+------------+-------------+--------------------+----------+------------+------------+-----+
only showing top 5 rows



In [0]:
medalsDF = medalsDF.select(
    col("country_code"), 
    col("country"), 
    col("country_long"), 
    col("Gold Medal").cast(IntegerType()), 
    col("Silver Medal").cast(IntegerType()), 
    col("Bronze Medal").cast(IntegerType()), 
    col("Total").cast(IntegerType())
)


In [0]:
medalsDF.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- country_long: string (nullable = true)
 |-- Gold Medal: integer (nullable = true)
 |-- Silver Medal: integer (nullable = true)
 |-- Bronze Medal: integer (nullable = true)
 |-- Total: integer (nullable = true)



# WRITE

In [0]:
%fs
ls /mnt/paris-olympic/transformed-data

In [0]:
athleteDF.write.mode("overwrite").option("header", "true").parquet(mount_point + '/transformed-data/athletes')
coachDF.write.mode("overwrite").option("header", "true").parquet(mount_point + '/transformed-data/coaches')
teamsDF.write.mode("overwrite").option("header", "true").parquet(mount_point + '/transformed-data/teams')
medalsDF.write.mode("overwrite").option("header", "true").parquet(mount_point + '/transformed-data/medals')