In [4]:
# Installing the PySpark library
# Note: While PySpark is the Python API for Apache Spark, if you're using Azure Databricks, 
# you don't need to install Spark as it's already integrated. However, if you're setting up 
# Spark in a different environment, you'll need this step.
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[K     |████████████████████████████████| 316.9 MB 54 kB/s  eta 0:00:0161
[?25hCollecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[K     |████████████████████████████████| 200 kB 70.5 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425365 sha256=f08c14a4441853e8e4376c06e4a9172d6889962f0590fed34a9994eac669283b
  Stored in directory: /Users/thanhpham/Library/Caches/pip/wheels/57/bd/14/ce9e21f2649298678d011fb8f71ed38ee70b42b94fef0be142
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.0
Note: you may need to restart the kernel to use updated packages.


In [None]:
# Importing necessary functions from PySpark's functions module
from pyspark.sql.functions import col  # Importing column function for referencing DataFrame columns

# Importing specific data types from PySpark's types module
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType

# Importing the round function from PySpark's functions module for rounding numerical values
from pyspark.sql.functions import round

# Using an alias 'F' for the functions module to make it more concise in the code
from pyspark.sql import functions as F

# Importing the when function from PySpark's functions module for conditional column operations
from pyspark.sql.functions import when


In [None]:
configs = {
    # Defining the authentication type for Azure Data Lake Storage Gen2 (ADLS Gen2)
    "fs.azure.account.auth.type": "OAuth",

    # Specifying the provider type for OAuth2 authentication with ADLS Gen2
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",

    # Azure application (client) ID associated with the ADLS Gen2 storage account (find in the Azure portal app registration)
    "fs.azure.account.oauth2.client.id": "***",  # Replace with your client ID

    # Azure client secret associated with the app registration (should be stored securely, e.g., in Azure Key Vault)
    "fs.azure.account.oauth2.client.secret": "***",  # Replace with your client secret

    # OAuth 2.0 token endpoint to fetch the access token for Azure AD authentication
    "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/***/oauth2/token"  # Replace with your endpoint
}

# Mounting an Azure Data Lake Storage Gen2 filesystem to Databricks
dbutils.fs.mount(
    # The full path to the ADLS Gen2 filesystem. Replace with your specific path.
    source = "abfss://***@***.dfs.core.windows.net",  # Follow the format: "abfss://datacontainer@storageaccount.dfs.core.windows.net"

    # The mount point in Databricks File System (DBFS) where the ADLS Gen2 filesystem will be mounted
    mount_point = "/mnt/tokyoolympic",

    # Passing the above-defined configuration settings for the mount operation
    extra_configs = configs
)


In [None]:
#Extract the data into the workspace
athletes = spark.read.format("csv").option("header","true").option("Inferschema","True").load("/mnt/tokyoolympic/raw-data/athletes.csv")
coaches = spark.read.format("csv").option("header","true").option("Inferschema","True").load("/mnt/tokyoolympic/raw-data/coaches.csv")
medals = spark.read.format("csv").option("header","true").option("Inferschema","True").load("/mnt/tokyoolympic/raw-data/medals.csv")
teams = spark.read.format("csv").option("header","true").option("Inferschema","True").load("/mnt/tokyoolympic/raw-data/teams.csv")
entriesgender = spark.read.format("csv").option("header","true").option("Inferschema","True").load("/mnt/tokyoolympic/raw-data/entriesgender.csv")

In [None]:
# Renaming the column 'Team_Country' to 'Country' in the 'medals' DataFrame
medal_rename = medals.withColumnRenamed("Team_Country", "Country")

In [None]:
# Correcting country name from "C�te d'Ivoire" to "Ivory Coast" in the 'athletes' DataFrame
athletes_change=athletes.withColumn('Country',when(col('Country')== "C�te d'Ivoire", "Ivory Coast").otherwise (col('Country')))

# Making multiple corrections in the 'teams' DataFrame:
# 1. Changing country name from "C�te d'Ivoire" to "Ivory Coast"
# 2. Changing team name from "C�te d�Ivoire" to "Ivory Coast"
# 3. Correcting event names containing garbled characters

teams_change= teams.withColumn('Country',when(col('Country')== "C�te d'Ivoire", "Ivory Coast").otherwise (col('Country'))).\
                    withColumn('TeamName',when(col('TeamName')== "C�te d�Ivoire", "Ivory Coast").otherwise (col('TeamName'))).\
                    withColumn('Event',when(col('Event')== "Women's �p�e Team", "Women's Épée Team").otherwise (col('Event'))).\
                    withColumn('Event',when(col('Event')== "Men's �p�e Team", "Men's Épée Team").otherwise (col('Event')))    
                
# Correcting country name from "C�te d'Ivoire" to "Ivory Coast" in the 'coaches' DataFrame
coaches_change = coaches.withColumn('Country', when(col('Country') == "C�te d'Ivoire", "Ivory Coast").otherwise(col('Country')))

# Correcting country name from "C�te d'Ivoire" to "Ivory Coast" in the previously renamed 'medal_rename' DataFrame
medals_change = medal_rename.withColumn ('Country',when (col('Country') =="C�te d'Ivoire", "Ivory Coast").otherwise(col('Country')))

In [None]:
# Modifying the 'entriesgender' DataFrame by casting column data types
entriesgender = (
    entriesgender
    # Casting the 'Female' column to Integer type
    .withColumn("Female", col("Female").cast(IntegerType()))
    # Casting the 'Male' column to Integer type
    .withColumn("Male", col("Male").cast(IntegerType()))
    # Casting the 'Total' column to Integer type
    .withColumn("Total", col("Total").cast(IntegerType()))
)

In [None]:
# Grouping the 'athletes_change' DataFrame by 'Country' to count the number of athletes representation per country
athletes_total = (
    athletes_change
    .groupBy("Country")
    # Aggregating to count the number of unique athlete names per country and renaming the resulting column
    .agg(F.count("PersonName").alias("Athletes_Rep"))
    # Sorting the countries by the number of athlete representations in descending order
    .orderBy(F.desc("Athletes_Rep"))
)

# Joining the aggregated athlete representation data with the 'medals_change' DataFrame based on the 'Country' column
athletes_medals = athletes_total.join(medals_change, 'Country', 'inner')

# Casting the 'Athletes_Rep' column to an Integer type in the resulting 'athletes_medals' DataFrame
athletes_medals_dataset = athletes_medals.withColumn("Athletes_Rep", col("Athletes_Rep").cast(IntegerType()))

In [None]:
display(athletes_medals_dataset)

In [None]:
#Data Loading into Data Lake
coaches_change.repartition(1).write.mode('overwrite').option('header','true').csv('/mnt/tokyoolympic/transformed-data/coaches')
athletes_change.repartition(1).write.mode('overwrite').option('header','true').csv('/mnt/tokyoolympic/transformed-data/athletes')
entriesgender.repartition(1).write.mode('overwrite').option('header','true').csv('/mnt/tokyoolympic/transformed-data/entriesgender')
medals_change.repartition(1).write.mode('overwrite').option('header','true').csv('/mnt/tokyoolympic/transformed-data/medals')
teams_change.repartition(1).write.mode('overwrite').option('header','true').csv('/mnt/tokyoolympic/transformed-data/teams')
athletes_medals_dataset.repartition(1).write.mode('overwrite').option('header','true').csv('/mnt/tokyoolympic/transformed-data/athletesmedal')
