In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("OlympicsExtraction") \
    .master("local[*]") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .config("spark.hadoop.dfs.client.use.datanode.hostname", "true") \
    .config("spark.executor.memory", "1g") \
    .config("spark.driver.memory", "1g") \
    .config("spark.network.timeout", "800s") \
    .config("spark.executor.heartbeatInterval", "100s") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .config("spark.sql.repl.eagerEval.maxNumRows", 5) \
    .getOrCreate()

athletes = spark.read.csv("/data/raw/athletes.csv", header=True, inferSchema=True)
results = spark.read.csv("/data/raw/results.csv", header=True, inferSchema=True)

athletes


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/20 20:14:19 WARN Utils: Your hostname, Jonathans-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 10.0.0.238 instead (on interface en0)
26/02/20 20:14:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/20 20:14:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Roles,Sex,Full name,Used name,Born,Died,NOC,athlete_id,Measurements,Affiliations,Nick/petnames,Title(s),Other names,Nationality,Original name,Name order
Competed in Olymp...,Male,"""François Joseph ...",Jean-François•Bla...,12 December 1886 ...,2 October 1960 in...,France,1,,,,,,,,
Competed in Olymp...,Male,Arnaud Benjamin•B...,Arnaud•Boetsch,1 April 1969 in M...,,France,2,183 cm / 76 kg,Racing Club de Fr...,,,,,,
Competed in Olymp...,Male,Jean Laurent Robe...,Jean•Borotra,13 August 1898 in...,17 July 1994 in A...,France,3,183 cm / 76 kg,"TCP, Paris (FRA)",Le Basque Bondiss...,,,,,
Competed in Olymp...,Male,Jacques Marie Sta...,Jacques•Brugnon,11 May 1895 in Pa...,20 March 1978 in ...,France,4,168 cm / 64 kg,Sporting club de ...,Toto,,,,,
Competed in Olymp...,Male,Henry Albert•Canet,Albert•Canet,17 April 1878 in ...,25 July 1930 in P...,France,5,,"TCP, Paris (FRA)",,,,,,


## Now use pyspark

- Name, height, weight

In [19]:
df_athletes = athletes

df_athletes = (
    df_athletes
    .withColumn('Name', F.regexp_replace('Used name', '•', ' '))
    .withColumn('Height_cm', F.regexp_extract('Measurements', r'(\d+)\scm', 1).try_cast('int'))
    .withColumn('Weight_kg', F.regexp_extract('Measurements', r'(\d+)\skg', 1).try_cast('int'))
    
    # Extracting years as integers
    .withColumn('Born_year', F.regexp_extract('Born', r'(\d{4})', 1).try_cast('int'))
    .withColumn('Death_year', F.regexp_extract('Died', r'(\d{4})', 1).try_cast('int'))
    
    # Fixed date pattern for "12 July 1995"
    .withColumn('Born_date', F.regexp_extract('Born', r'(\d+\s\w+\s\d{4})', 1))
    .withColumn('Death_date', F.regexp_extract('Died', r'(\d+\s\w+\s\d{4})', 1))
    # Fixed typo: Birth_location
    .withColumn('Birth_location', F.regexp_extract('Born', r'in\s(.*)', 1))
)

location_map = {
    'City': r'^([\w\s]+),',
    'Region': r',\s([\w\s]+)\s\(',
    'Country': r'\((\w+)\)',
}

for col, pattern in location_map.items():
    df_athletes = df_athletes.withColumn(
        col, F.nullif(F.regexp_extract('Birth_location', pattern, 1), F.lit(""))
    )
    
df_athletes

Roles,Sex,Full name,Used name,Born,Died,NOC,athlete_id,Measurements,Affiliations,Nick/petnames,Title(s),Other names,Nationality,Original name,Name order,Name,Height_cm,Weight_kg,Born_year,Death_year,Born_date,Death_date,Birth_location,City,Region,Country
Competed in Olymp...,Male,"""François Joseph ...",Jean-François•Bla...,12 December 1886 ...,2 October 1960 in...,France,1,,,,,,,,,Jean-François Bla...,,,1886,1960.0,12 December 1886,2 October 1960,"Bordeaux, Gironde...",Bordeaux,Gironde,FRA
Competed in Olymp...,Male,Arnaud Benjamin•B...,Arnaud•Boetsch,1 April 1969 in M...,,France,2,183 cm / 76 kg,Racing Club de Fr...,,,,,,,Arnaud Boetsch,183.0,76.0,1969,,1 April 1969,,"Meulan, Yvelines ...",Meulan,Yvelines,FRA
Competed in Olymp...,Male,Jean Laurent Robe...,Jean•Borotra,13 August 1898 in...,17 July 1994 in A...,France,3,183 cm / 76 kg,"TCP, Paris (FRA)",Le Basque Bondiss...,,,,,,Jean Borotra,183.0,76.0,1898,1994.0,13 August 1898,17 July 1994,"Biarritz, Pyrénée...",Biarritz,,FRA
Competed in Olymp...,Male,Jacques Marie Sta...,Jacques•Brugnon,11 May 1895 in Pa...,20 March 1978 in ...,France,4,168 cm / 64 kg,Sporting club de ...,Toto,,,,,,Jacques Brugnon,168.0,64.0,1895,1978.0,11 May 1895,20 March 1978,"Paris VIIIe, Pari...",Paris VIIIe,Paris,FRA
Competed in Olymp...,Male,Henry Albert•Canet,Albert•Canet,17 April 1878 in ...,25 July 1930 in P...,France,5,,"TCP, Paris (FRA)",,,,,,,Albert Canet,,,1878,1930.0,17 April 1878,25 July 1930,"Wandsworth, Engla...",Wandsworth,England,GBR


Convert to dates to to_date dt

In [21]:
df_athletes = (
    df_athletes
    .withColumn('Born_date', F.try_to_date(F.col('Born_date'), 'd-MMMM-yyyy'))
    .withColumn('Death_date', F.try_to_date(F.col('Death_date'), 'd-MMMM-yyyy'))
)

Age

In [22]:
df_athletes = df_athletes.withColumn(
    'Age',
    F.col('Death_year') - F.col('Born_year')
)
df_athletes

Roles,Sex,Full name,Used name,Born,Died,NOC,athlete_id,Measurements,Affiliations,Nick/petnames,Title(s),Other names,Nationality,Original name,Name order,Name,Height_cm,Weight_kg,Born_year,Death_year,Born_date,Death_date,Birth_location,City,Region,Country,Age
Competed in Olymp...,Male,"""François Joseph ...",Jean-François•Bla...,12 December 1886 ...,2 October 1960 in...,France,1,,,,,,,,,Jean-François Bla...,,,1886,1960.0,,,"Bordeaux, Gironde...",Bordeaux,Gironde,FRA,74.0
Competed in Olymp...,Male,Arnaud Benjamin•B...,Arnaud•Boetsch,1 April 1969 in M...,,France,2,183 cm / 76 kg,Racing Club de Fr...,,,,,,,Arnaud Boetsch,183.0,76.0,1969,,,,"Meulan, Yvelines ...",Meulan,Yvelines,FRA,
Competed in Olymp...,Male,Jean Laurent Robe...,Jean•Borotra,13 August 1898 in...,17 July 1994 in A...,France,3,183 cm / 76 kg,"TCP, Paris (FRA)",Le Basque Bondiss...,,,,,,Jean Borotra,183.0,76.0,1898,1994.0,,,"Biarritz, Pyrénée...",Biarritz,,FRA,96.0
Competed in Olymp...,Male,Jacques Marie Sta...,Jacques•Brugnon,11 May 1895 in Pa...,20 March 1978 in ...,France,4,168 cm / 64 kg,Sporting club de ...,Toto,,,,,,Jacques Brugnon,168.0,64.0,1895,1978.0,,,"Paris VIIIe, Pari...",Paris VIIIe,Paris,FRA,83.0
Competed in Olymp...,Male,Henry Albert•Canet,Albert•Canet,17 April 1878 in ...,25 July 1930 in P...,France,5,,"TCP, Paris (FRA)",,,,,,,Albert Canet,,,1878,1930.0,,,"Wandsworth, Engla...",Wandsworth,England,GBR,52.0


Drop columns

In [23]:
df_athletes = df_athletes.drop(
    'Roles', 
    'Full name', 
    'Used name', 
    'Born', 
    'Died', 
    'Measurements', 
    'Affiliations', 
    'Nick/petnames', 
    'Title(s)', 
    'Other names', 
    'Nationality', 
    'Original name', 
    'Name order',
    'Birth_location'
)

df_athletes

Sex,NOC,athlete_id,Name,Height_cm,Weight_kg,Born_year,Death_year,Born_date,Death_date,City,Region,Country,Age
Male,France,1,Jean-François Bla...,,,1886,1960.0,,,Bordeaux,Gironde,FRA,74.0
Male,France,2,Arnaud Boetsch,183.0,76.0,1969,,,,Meulan,Yvelines,FRA,
Male,France,3,Jean Borotra,183.0,76.0,1898,1994.0,,,Biarritz,,FRA,96.0
Male,France,4,Jacques Brugnon,168.0,64.0,1895,1978.0,,,Paris VIIIe,Paris,FRA,83.0
Male,France,5,Albert Canet,,,1878,1930.0,,,Wandsworth,England,GBR,52.0


Uplaod to hadoop

In [None]:
df_athletes.write.mode('overwrite').parquet("hdfs:///data/clean/athletes")


                                                                                

In [25]:
df_athletes_clean = spark.read.parquet("hdfs:///data/clean/athletes")
df_athletes_clean

Sex,NOC,athlete_id,Name,Height_cm,Weight_kg,Born_year,Death_year,Born_date,Death_date,City,Region,Country,Age
Male,Switzerland,87293,Henry Höhnes,,,1889,,,,,,,
Male,Czechoslovakia,87294,Rudolf Höhnl,158.0,71.0,1946,,,,Lomazice,,CZE,
Male,Austria,87295,Gregor Höll,165.0,,1911,1999.0,,,,Salzburg,AUT,88.0
Male,Austria,87296,Rudolf Höll,,,1911,1984.0,,,Kraubath an der Mur,Steiermark,AUT,73.0
Male,West Germany,87297,Stefan Hölzlwimmer,174.0,86.0,1951,,,,Salzberg,Bayern,GER,


## Results

In [8]:
df_results = results

df_results = (
    df_results
    .withColumn('Position', F.trim(F.regexp_replace('Pos', '=', ' ')).try_cast('int'))
    .withColumn('Games_year', F.regexp_extract('Games', r'(\d{4})', 1).try_cast('int'))
    .withColumn('Season', F.nullif(F.regexp_extract('Games', r'\b(Summer|Winter|Fall|Spring)\b', 1), F.lit('')))
    .withColumn('Gender', F.nullif(F.regexp_extract('Event', r'\b(Men|Women)\b', 1), F.lit("")))
    .withColumn('Discipline_clean', F.regexp_replace('Discipline', r'\s\(.*\)', 1))
    .withColumn('Name', F.regexp_replace('As', '-', ' '))
    .withColumn('Event_clean', F.regexp_extract('Event', r'(.*), ', 1))
)

cols = ['Season', 'Discipline_clean', 'Event_clean']

for c in cols:
    df_results = df_results.withColumn(
        c,
        F.when(F.col(c) == "", None).otherwise(F.col(c))
    ) 

In [9]:
df_results = df_results.withColumn('Medal', F.lower(F.trim(F.col('Medal'))))

df_results = df_results.withColumn(
    'Points',
    F.when(F.col('Medal') == 'gold', 3)
     .when(F.col('Medal') == 'silver', 2)
     .when(F.col('Medal') == 'bronze', 1)
     .otherwise(0).cast('bigint') 
).withColumn(
    'Preformance_result',
    F.when(F.col('Points') > 0, 'Medalist').otherwise('non-medalist')
)

df_results

Games,Event,Team,Pos,Medal,As,athlete_id,NOC,Discipline,Nationality,Unnamed: 7,Position,Games_year,Season,Gender,Discipline_clean,Name,Event_clean,Points,Preformance_result
1912 Summer Olympics,"Singles, Men (Oly...",,=17,,Jean-François Bla...,1,FRA,Tennis,,,17.0,1912,Summer,Men,Tennis,Jean François Bla...,Singles,0,non-medalist
1912 Summer Olympics,"Doubles, Men (Oly...",Jean Montariol,DNS,,Jean-François Bla...,1,FRA,Tennis,,,,1912,Summer,Men,Tennis,Jean François Bla...,Doubles,0,non-medalist
1920 Summer Olympics,"Singles, Men (Oly...",,=32,,Jean-François Bla...,1,FRA,Tennis,,,32.0,1920,Summer,Men,Tennis,Jean François Bla...,Singles,0,non-medalist
1920 Summer Olympics,"Doubles, Mixed (O...",Jeanne Vaussard,=8,,Jean-François Bla...,1,FRA,Tennis,,,8.0,1920,Summer,,Tennis,Jean François Bla...,Doubles,0,non-medalist
1920 Summer Olympics,"Doubles, Men (Oly...",Jacques Brugnon,4,,Jean-François Bla...,1,FRA,Tennis,,,4.0,1920,Summer,Men,Tennis,Jean François Bla...,Doubles,0,non-medalist


In [10]:
df_results = df_results.select(
    'athlete_id', 'Name', 'Gender', 'Discipline_clean', 'Event_clean', 'Medal', 'Points', 'Preformance_Result', 'Position', 'Games_Year', 'Season'
)

In [None]:
df_results.write.mode('overwrite').parquet("hdfs:///data/clean/results")

26/02/20 20:15:07 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [None]:
df_results_clean = spark.read.parquet("hdfs:///data/clean/results")

## Merge

- When trying to merge, df_athletes id had strings as a data type.
- Remove from dataframe and save them to analyze later.

In [45]:
df_athletes.filter(F.col('athlete_id').try_cast('int').isNull())

Sex,NOC,athlete_id,Name,Height_cm,Weight_kg,Born_year,Death_year,Born_date,Death_date,City,Region,Country,Age
Male,13 February 2015 ...,Canada,"III""",,,,1934.0,,,,,,
Male,5 March 1969 in T...,Netherlands,"Jr.""",,,,1887.0,,,,,,
Male,,United States,"Jr.""",,,,1962.0,,,,,,
Female,21 December 1954 ...,,-Mill,,,,,,,,,,
Female,,United States,"-Lazenby)""",,,,1962.0,,,,,,


In [26]:
df_athletes_clean = df_athletes_clean.filter(F.col('athlete_id').try_cast('int').isNotNull())
df_quarantine = df_athletes_clean.filter(F.col('athlete_id').try_cast('int').isNull())

In [None]:
# Write to a differnet path avoiding circular reference
df_athletes_clean.write.mode('overwrite').parquet("hdfs:///data/clean/athletes_v2")
df_quarantine.write.mode('overwrite').parquet('hdfs:///data/quarantine')

                                                                                

In [29]:
df_athletes_clean = spark.read.parquet('hdfs:///data/quarantine')

In [None]:
columns = ['athlete_id','height_cm', 'weight_kg', 'Born_year', 'Death_year', 'Country']

df_merge = df_results_clean.join(df_athletes_clean.select(columns), on='athlete_id', how='left')

athlete_id,Name,Gender,Discipline_clean,Event_clean,Medal,Points,Preformance_Result,Position,Games_Year,Season,height_cm,weight_kg,Born_year,Death_year,Country
120662,BJ Lawrence,Men,Athletics,100 metres,,0,non-medalist,,2016,Summer,,,,,
120663,Robert Lindstedt,,Tennis,Doubles,,0,non-medalist,9.0,2012,Summer,,,,,
120663,Robert Lindstedt,Men,Tennis,Doubles,,0,non-medalist,9.0,2012,Summer,,,,,
120664,Mariya Baklakova,Women,Swimming1,4 × 200 metres Fr...,,0,non-medalist,,2012,Summer,,,,,
120665,Mariya Gromova,Women,Swimming1,4 × 100 metres Me...,,0,non-medalist,,2012,Summer,,,,,
