In [0]:
from pyspark.sql.functions import count,col,when,isnan,row_number,date_format,from_utc_timestamp
from pyspark.sql.types import TimestampType
from pyspark.sql.window import Window

In [0]:
input_path = '/mnt/tokyo_olympic/Bronze-Layer/'
file_names = dbutils.fs.ls(input_path)
dfs = {}

In [0]:

for file_name in file_names:
    var = file_name.name.split('.')[0]
    dfs[var] = spark.read.format('csv').option("header","true").option("InferSchema","true").load(file_name.path)

### Checking Null Values and Removing them

In [0]:
for df_name,df_value in dfs.items():
    Null_Dict = {col:df_value.filter(df_value[col].isNull()).count() for col in df_value.columns}
    for key,values in Null_Dict.items():
        if values > 0:
            if values/df_value.count() > 0.2:
                 print(f"Dropping column {key} from DataFrame {df_name}")
                 dfs[df_name] = df_value.drop(key)
            else:
                print(f"Dropping row for null value in column {key} of DataFrame {df_name}")
                dfs[df_name].na.drop(key)
        else:
            print(f"There are no null records in column {key} of DataFrame {df_name}")

There are no null records in column medal_type of DataFrame Medals_Individual
There are no null records in column medal_code of DataFrame Medals_Individual
There are no null records in column medal_date of DataFrame Medals_Individual
There are no null records in column athlete_short_name of DataFrame Medals_Individual
There are no null records in column athlete_name of DataFrame Medals_Individual
There are no null records in column athlete_sex of DataFrame Medals_Individual
There are no null records in column athlete_link of DataFrame Medals_Individual
There are no null records in column country_code of DataFrame Medals_Individual
There are no null records in column discipline_code of DataFrame Medals_Individual
There are no null records in column event of DataFrame Medals_Individual
There are no null records in column country of DataFrame Medals_Individual
There are no null records in column discipline of DataFrame Medals_Individual
There are no null records in column PersonName of Da

### Checking Duplicates and Removing them

In [0]:
for df_name,df_value in dfs.items():
    if df_value.count() == df_value.distinct().count():
        print(f"The Dataframe {df_name} donot have duplicates")
    else:
        print(f"The Dataframe {df_name} do contain duplicate values")
        print("Removing Duplicates")
        print(f"The Row Count of DataFrame {df_name} before duplicates removal is",df_value.count())
        dfs[df_name] = df_value.dropDuplicates()
        print(f"The Row Count of DataFrame {df_name} after duplicates removal is",df_value.count())

The Dataframe Medals_Individual donot have duplicates
The Dataframe athletes do contain duplicate values
Removing Duplicates
The Row Count of DataFrame athletes before duplicates removal is 11085
The Row Count of DataFrame athletes after duplicates removal is 11085
The Dataframe coaches do contain duplicate values
Removing Duplicates
The Row Count of DataFrame coaches before duplicates removal is 394
The Row Count of DataFrame coaches after duplicates removal is 394
The Dataframe entriesgender donot have duplicates
The Dataframe medals donot have duplicates
The Dataframe teams donot have duplicates
The Dataframe tokyo_olympic_dataset donot have duplicates


### Creating a spark sql view of all DataFrames

In [0]:
for df_name,df_value in dfs.items():
    dfs[df_name].createOrReplaceTempView(df_name)

### Transforming Athletes and Countries DataFrame

#### Integrating athletes data from multiple DataFrames 

In [0]:
athletes_delta = spark.sql(" Select a.athlete_name PersonName,a.Country,a.Discipline from Medals_Individual a left join athletes b\
                                  on a.Athlete_Name = b.PersonName\
                                  where b.PersonName is null")

In [0]:
dfs['athletes'] = dfs['athletes'].union(athletes_delta.distinct())

In [0]:
tokyo_olympic_dataset = spark.sql("Select Distinct Name,\
                                                   Age,\
                                                   Gender,\
                                                   Country from tokyo_olympic_dataset")
tokyo_olympic_dataset.createOrReplaceTempView('tokyo_olympic_dataset')
dfs['athletes'].createOrReplaceTempView('athletes')

In [0]:
dfs['athletes'] = spark.sql("Select a.*,b.Age,b.Gender from athletes as a\
                                          left join tokyo_olympic_dataset as b\
                                               on (a.PersonName = b.Name\
                                               and a.Country = b.Country) ")

#### Renaming column

In [0]:
dfs['athletes'] = dfs['athletes'].withColumnRenamed("PersonName","Athlete_Name")

#### Adding a Surrogate Key

In [0]:
dfs['athletes'] = dfs['athletes'].withColumn("Athlete_SID",row_number().over(Window.orderBy("Athlete_Name")))
dfs['athletes'] = dfs['athletes'].select("Athlete_SID","Athlete_Name","Country","Age","Gender","Discipline")

In [0]:
dfs['athletes'].show()

+-----------+--------------------+--------------------+----+------+-------------------+
|Athlete_SID|        Athlete_Name|             Country| Age|Gender|         Discipline|
+-----------+--------------------+--------------------+----+------+-------------------+
|          1|     AALERUD Katrine|              Norway|  26|Female|       Cycling Road|
|          2|         ABAD Nestor|               Spain|  28|  Male|Artistic Gymnastics|
|          3|   ABAGNALE Giovanni|               Italy|  26|  Male|             Rowing|
|          4|      ABALDE Alberto|               Spain|  25|  Male|         Basketball|
|          5|       ABALDE Tamara|               Spain|  32|Female|         Basketball|
|          6|           ABALO Luc|              France|  36|  Male|           Handball|
|          7|        ABAROA Cesar|               Chile|  24|  Male|             Rowing|
|          8|       ABASS Abobakr|               Sudan|  22|  Male|           Swimming|
|          9|    ABBASALI Hamide

In [0]:
dfs['countries'] = dfs['athletes'].select("Country").distinct()
dfs['countries'] = dfs['countries'].withColumn("Country_SID",row_number().over(Window.orderBy("Country")))

In [0]:
dfs.keys()

dict_keys(['Medals_Individual', 'athletes', 'coaches', 'entriesgender', 'medals', 'teams', 'tokyo_olympic_dataset', 'countries'])

#### Transforming coaches DataFrame

#### Renaming Columns

In [0]:
dfs['coaches'] = dfs['coaches'].withColumnRenamed("Name","Coach_Name")

#### Adding a Surrogate Key

In [0]:
dfs['coaches'] = dfs['coaches'].withColumn("Coach_SID",row_number().over(Window.orderBy("Coach_Name")))
dfs['coaches'] = dfs['coaches'].select("Coach_SID","Coach_Name","Country","Discipline")

#### Transforming EntriesGender DataFrame

#### Adding a Surrogate Key

In [0]:
dfs['entriesgender'] = dfs['entriesgender'].withColumn("Discipline_SID",row_number().over(Window.orderBy("Discipline")))

In [0]:
dfs['discipline'] = dfs['entriesgender'].select("Discipline_SID","Discipline")
dfs['discipline_gender_fact'] = dfs['entriesgender'].select("Discipline_SID","Female","Male","Total")

In [0]:
del dfs['entriesgender']

#### Transforming Medals DataFrame

#### Renaming Columns

In [0]:
dfs['countries_medal_fact'] = dfs['medals'].withColumnRenamed("Team_Country","Country").drop("Rank").withColumnRenamed("Rank by Total","Rank")

#### Adding a Surrogate Key

In [0]:
dfs['countries_medal_fact'] = dfs['countries_medal_fact'].withColumn("Country_Medal_Fact_SID",row_number().over(Window.orderBy("Country")))
dfs['countries_medal_fact'] = dfs['countries_medal_fact'].select("Country_Medal_Fact_SID","Country","Gold","Silver","Bronze","Total","Rank")

#### Transforming Medals DataFrame

#### Renaming Columns

In [0]:
dfs['Medals_Individual'] = dfs['Medals_Individual'].select("medal_type","medal_date","athlete_name","event","country","discipline","medal_code")

old_col = dfs['Medals_Individual'].columns
for cols in old_col:
    if "date" in cols:
        dfs['Medals_Individual'] = dfs['Medals_Individual'].withColumn(cols,date_format(from_utc_timestamp(dfs['Medals_Individual'][cols].\
            cast(TimestampType()),"UTC"),"yyyy-MM-dd"))
    new_col_name = ''.join("_" + word.capitalize() for word in cols.split('_')).lstrip("_")
    dfs['Medals_Individual']  = dfs['Medals_Individual'] .withColumnRenamed(cols,new_col_name)

#### Adding a Surrogate Key

In [0]:
dfs['Medals_Individual'] = dfs['Medals_Individual'].withColumn("Medals_Fact_SID",row_number().over(Window.orderBy("Country")))
dfs['Medals_Individual'].createOrReplaceTempView("medal")

#### Adding columns

In [0]:
dfs['Medals_Individual'] = spark.sql("Select *, case when Medal_Type = 'Gold Medal' then 1 else 0 end Gold_Medals,\
                                    case when Medal_Type = 'Silver Medal' then 1 else 0 end Silver_Medals,\
                                    case when Medal_Type = 'Bronze Medal' then 1 else 0 end Bronze_Medals from medal")

In [0]:
dfs['medals_fact'] = dfs['Medals_Individual'].select("Medals_Fact_SID","Medal_Date","Athlete_Name","Event","Country","Discipline",\
                            "Gold_Medals","Silver_Medals","Bronze_Medals")

#### Transforming Teams DataFrame

#### Renaming Columns

In [0]:
dfs['teams'] = dfs['teams'] .withColumnRenamed("TeamName","Team_Name")

#### Adding a Surrogate Key

In [0]:
dfs['teams'] = dfs['teams'].withColumn("Team_SID",row_number().over(Window.orderBy("Team_Name")))
dfs['teams'] = dfs['teams'].select("Team_SID","Team_Name","Discipline","Country","Event")

#### Transforming TokyoOlympic DataFrame

In [0]:
dfs['event'] = dfs['tokyo_olympic_dataset'].select("Event").distinct()

In [0]:
dfs['event'].createOrReplaceTempView("events")

In [0]:
event_delta = spark.sql("Select a.Event from medal a left join events b on a.Event = b.Event where b.Event is Null")
event_delta = event_delta.distinct()

In [0]:
dfs['athlete_event'] =  dfs['tokyo_olympic_dataset'].select("Event","Name").distinct()

In [0]:
dfs['event'] = dfs['event'].union(event_delta)

#### Adding a Surrogate Key

In [0]:
dfs['event'] = dfs['event'].withColumn("Event_SID",row_number().over(Window.orderBy("Event")))
dfs['event'] = dfs['event'].select("Event_SID","Event")

In [0]:
dfs['athlete_event'] = dfs['athlete_event'].withColumn("Athlete_Event_SID",row_number().over(Window.orderBy("Name")))

#### Renaming Columns

In [0]:
dfs['athlete_event'] = dfs['athlete_event'].withColumnRenamed("Name","Athlete_Name").select("Athlete_Event_SID","Athlete_Name","Event")

In [0]:
dfs.keys()

dict_keys(['Medals_Individual', 'athletes', 'coaches', 'medals', 'teams', 'tokyo_olympic_dataset', 'countries', 'discipline', 'discipline_gender_fact', 'countries_medal_fact', 'medals_fact', 'event', 'athlete_event'])

In [0]:
del dfs['Medals_Individual']
del dfs['tokyo_olympic_dataset']
del dfs['medals']

#### Loading Transformed Data into Silver Layer

In [0]:
for df_name,df_value in dfs.items():
    output_path = '/mnt/tokyo_olympic/Silver-Layer/' + df_name + '_silver/' + df_name + '.csv'
    df_value.repartition(1).write.mode("overwrite").option("header","true").csv(output_path)