In [0]:
from pyspark.sql.types import IntegerType, DateType
from pyspark.sql.functions import regexp_replace, col, split, explode, col, trim, when, regexp_extract, lit

In [0]:
spark.conf.set(
  "fs.azure.account.key.parisolympicsdata3.dfs.core.windows.net",
  dbutils.secrets.get(scope="paris-olympics-secret-scope", key="paris-olympics-storage-key")
)

In [0]:
# paths
raw_data_path = "abfss://paris-olympics@parisolympicsdata3.dfs.core.windows.net/raw data"
athletes_csv_path = f"{raw_data_path}/Athletes.csv"
coaches_csv_path = f"{raw_data_path}/Coaches.csv"
events_csv_path = f"{raw_data_path}/Events.csv"
medallists_csv_path = f"{raw_data_path}/Medallists.csv"
teams_csv_path = f"{raw_data_path}/Teams.csv"

processed_data_path = "abfss://paris-olympics@parisolympicsdata3.dfs.core.windows.net/processed_data"

### Reading the files

In [0]:
athletes_df = spark.read \
    .option("header", True) \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .csv(athletes_csv_path)

coaches_df = spark.read.csv(coaches_csv_path, header=True)

events_df = spark.read.csv(events_csv_path, header=True)

medallists_df = spark.read.csv(medallists_csv_path, header=True)

teams_df = spark.read.csv(teams_csv_path, header=True)

In [0]:
display(athletes_df.limit(5))
display(coaches_df.limit(5))
display(events_df.limit(5))
display(medallists_df.limit(5))
display(teams_df.limit(5))

In [0]:
athletes_df = athletes_df \
    .withColumn("code", col("code").cast(IntegerType())) \
    .withColumn("height", col("height").cast(IntegerType())) \
    .withColumn("weight", col("weight").cast(IntegerType())) \
    .withColumn("birth_date", col("birth_date").cast(DateType()))

In [0]:
coaches_df = coaches_df \
    .withColumn("code", col("code").cast(IntegerType())) \
    .withColumn("birth_date", col("birth_date").cast(DateType()))

In [0]:
medallists_df = medallists_df \
    .withColumn("medal_code", col("medal_code").cast(IntegerType())) \
    .withColumn("code_athlete", col("code_athlete").cast(IntegerType())) \
    .withColumn("birth_date", col("birth_date").cast(DateType())) \
    .withColumn("medal_date", col("medal_date").cast(DateType()))

### Transforming the data

In [0]:
# Clean the 'disciplines' column by removing brackets and single quotes
athletes_df_exploded = athletes_df.withColumn(
    "disciplines", regexp_replace("disciplines", r"[\[\]']", "")
).withColumn(
    # Clean the 'events' column by removing brackets, single quotes, and double quotes
    "events", regexp_replace("events", r"""[\[\]'"]""", "")
).withColumn(
    # Split the cleaned 'disciplines' string by commas and explode into multiple rows
    "disciplines", explode(split(col("disciplines"), ",\\s*"))
).withColumn(
    # Trim leading/trailing whitespace from each discipline
    "disciplines", trim(col("disciplines"))
).withColumn(
    # Split the cleaned 'events' string by commas and explode into multiple rows
    "events", explode(split(col("events"), ",\\s*"))
).withColumn(
    # Trim leading/trailing whitespace from each event
    "events", trim(col("events"))
)

# Display the resulting DataFrame with exploded and cleaned disciplines and events
athletes_df_exploded.display()


In [0]:
# Clean and transform the 'athletes' column in the teams_df DataFrame
teams_df_final = teams_df.withColumn(
    # Remove square brackets and single quotes from the 'athletes' string
    "athletes", regexp_replace(col("athletes"), r"[\[\]']", "")
).withColumn(
    # Split the cleaned string into individual athlete names and explode into separate rows
    "athletes", explode(split(col("athletes"), ",\\s*"))
).withColumn(
    # Trim any leading/trailing whitespace from each athlete name
    "athletes", trim(col("athletes"))
)

# Display the final transformed DataFrame
teams_df_final.display()

In [0]:
athletes_df = athletes_df.withColumnRenamed("name", "athlete_name") \
                         .withColumnRenamed("disciplines", "athlete_discipline") \
                         .withColumnRenamed("country_code", "athlete_country_code") \
                         .withColumnRenamed("birth_date", "athlete_birth_date")

medallists_df = medallists_df.withColumnRenamed("discipline", "medallist_discipline") \
                             .withColumnRenamed("country_long", "medallist_country_long") 

In [0]:
athlete_medallist_df = athletes_df.join(
    medallists_df,
    (athletes_df["code"] == medallists_df["code_athlete"]) &
    (athletes_df["athlete_name"] == medallists_df["name"]) &
    (athletes_df["country_long"] == medallists_df["medallist_country_long"]) ,
    "left"
)


In [0]:
athlete_medallist_df.filter(athlete_medallist_df.is_medallist=="True").count()

In [0]:
athlete_medallist_df = (
    athlete_medallist_df
        .filter('is_medallist = "True"')
        .select(
            "athlete_name",
            "country_long",
            "athlete_birth_date",
            "athlete_discipline",
            "medallist_discipline",
            col("events").alias("athlete_event"),
            col("event").alias("medallist_event"),
            "medal_type",
            "medal_code",
            "coach",
            col("country_code").alias("athlete_country_code")
        )
)

display(athlete_medallist_df)


In [0]:
# Split the 'coach' column into multiple entries using <br> or <p> as delimiters and explode into separate rows
athlete_medallist_df = athlete_medallist_df.withColumn("coach", explode(split(col("coach"), "<br>|<p>")))

# Trim any leading or trailing whitespace from each coach name
athlete_medallist_df = athlete_medallist_df.withColumn("coach", trim(col("coach")))

# Display the resulting DataFrame
display(athlete_medallist_df)

In [0]:
# Create a new column 'coach_country_code' based on patterns found in the 'coach' column
athlete_medallist_coach = athlete_medallist_df.withColumn(
    "coach_country_code",
    
    # First condition: if a 3-letter country code is found in parentheses (e.g., "(USA)"), extract and use it
    when(
        regexp_extract(col("coach"), r"\((\w{3})\)", 1) != "",
        regexp_extract(col("coach"), r"\((\w{3})\)", 1)
    
    # Second condition: if the word 'national' (case-insensitive) is present in the coach name,
    # assume the coach shares the same country as the athlete
    ).when(
        col("coach").rlike("(?i)national"),
        col("athlete_country_code")
    
    # If neither condition is met, assign None
    ).otherwise(lit(None))
)

# Display the resulting DataFrame with the new 'coach_country_code' column
athlete_medallist_coach.display()

In [0]:
# Add a new column 'coach_national_international' to classify the coach as National, International, or None
athlete_medallist_coach_df = athlete_medallist_coach.withColumn(
    "coach_national_international",
    
    # If the coach's country code matches the athlete's, label as 'National'
    when(col("athlete_country_code") == col("coach_country_code"), "National")
    
    # If the coach's country code is missing, assign None
    .when(col("coach_country_code").isNull(), None)
    
    # Otherwise, label as 'International'
    .otherwise("International")
)

# Display the resulting DataFrame with the new classification
athlete_medallist_coach_df.display()

### Writing Processed DataFrames to Parquet Format with Overwrite Mode for Multiple DataSets

In [0]:
athletes_df.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(f"{processed_data_path}/athletes")

coaches_df.write \
    .format("parquet") \
    .mode("overwrite") \
    .save(f"{processed_data_path}/coaches")

events_df.write \
    .format("parquet") \
    .mode("overwrite") \
    .save(f"{processed_data_path}/events")

medallists_df.write \
    .format("parquet") \
    .mode("overwrite") \
    .save(f"{processed_data_path}/medallists")

teams_df.write \
    .format("parquet") \
    .mode("overwrite") \
    .save(f"{processed_data_path}/teams")

athlete_medallist_coach_df.write \
    .format("parquet") \
    .mode("overwrite") \
    .save(f"{processed_data_path}/athlete_medallist_coach")
