# Silver Layer - Olympics and Country GDP
The silver layer will provide with a few datasets saved in parquet

In [1]:
from utils import load_dataframes, save_parquet

df = load_dataframes(datasets=['olympics', 'gdp'])

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/30 13:08:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

## Olympics

### Olympics Cleaned
The `olympics_cleaned.parquet` aggregate filters the data by year so it matches the same range as the gdp data

### Medalists
The `medalists.parquet` aggregate filters by athletes who won at least one medal

### Medal Rank
The `medal_rank.parquet` aggregate is a rank of countries by the amount of medals they've won

In [2]:
df["olympics"].printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Team: string (nullable = true)
 |-- NOC: string (nullable = true)
 |-- Games: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Season: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Medal: string (nullable = true)



In [3]:
df["olympics"].show(5)

                                                                                

+---+--------------------+---+----+------+------+--------------+---+-----------+----+------+---------+-------------+--------------------+-----+
| ID|                Name|Sex| Age|Height|Weight|          Team|NOC|      Games|Year|Season|     City|        Sport|               Event|Medal|
+---+--------------------+---+----+------+------+--------------+---+-----------+----+------+---------+-------------+--------------------+-----+
|  1|           A Dijiang|  M|24.0| 180.0|  80.0|         China|CHN|1992 Summer|1992|Summer|Barcelona|   Basketball|Basketball Men's ...| NULL|
|  2|            A Lamusi|  M|23.0| 170.0|  60.0|         China|CHN|2012 Summer|2012|Summer|   London|         Judo|Judo Men's Extra-...| NULL|
|  3| Gunnar Nielsen Aaby|  M|24.0|  NULL|  NULL|       Denmark|DEN|1920 Summer|1920|Summer|Antwerpen|     Football|Football Men's Fo...| NULL|
|  4|Edgar Lindenau Aabye|  M|34.0|  NULL|  NULL|Denmark/Sweden|DEN|1900 Summer|1900|Summer|    Paris|   Tug-Of-War|Tug-Of-War Men's ...

In [4]:
def get_year_range(df):
    min_year = df.agg({"Year": "min"}).collect()[0][0]
    max_year = df.agg({"Year": "max"}).collect()[0][0]
    return min_year, max_year

get_year_range(df["olympics"])

(1896, 2016)

### Filter data by year range

In [18]:
year_range = 1960, 2016

# Filter df to include only records from min_year to max_year
def filter_by_year_in(years):
    min_year, max_year = years
    return lambda df: df.filter(df["Year"].between(min_year, max_year))

df_olympics = df["olympics"].transform(filter_by_year_in(year_range))

df_olympics.show(10)

+---+--------------------+---+----+------+------+-------------+---+-----------+----+------+-----------+--------------------+--------------------+-----+
| ID|                Name|Sex| Age|Height|Weight|         Team|NOC|      Games|Year|Season|       City|               Sport|               Event|Medal|
+---+--------------------+---+----+------+------+-------------+---+-----------+----+------+-----------+--------------------+--------------------+-----+
|  1|           A Dijiang|  M|24.0| 180.0|  80.0|        China|CHN|1992 Summer|1992|Summer|  Barcelona|          Basketball|Basketball Men's ...| NULL|
|  2|            A Lamusi|  M|23.0| 170.0|  60.0|        China|CHN|2012 Summer|2012|Summer|     London|                Judo|Judo Men's Extra-...| NULL|
|  5|Christine Jacoba ...|  F|21.0| 185.0|  82.0|  Netherlands|NED|1988 Winter|1988|Winter|    Calgary|       Speed Skating|Speed Skating Wom...| NULL|
|  5|Christine Jacoba ...|  F|21.0| 185.0|  82.0|  Netherlands|NED|1988 Winter|1988|Wint

In [24]:
# Assert df_olympics has the same year range as defined by min_year and max_year
def test_year_should_be_in_range(df, expected):
    actual = get_year_range(df)
    assert actual == expected, f"Year range should be {expected} but was {actual}"
    print("Test Passed!")

test_year_should_be_in_range(df_olympics, year_range)

Test Passed!


### Filter by medalists

In [None]:
# Function to filter DataFrame to include only rows where Medal is not null and order by Medal
def filter_medalists(df):
    return df.filter(df["Medal"].isNotNull()).orderBy("Medal")

def filter_by_age_not_null(df):
    return df.filter(df["Age"].isNotNull())

def filter_by_summer_editions(df):
    return df.filter(df["Season"] == "Summer")

# Apply the transformation functions to df_olympics DataFrame
df_medalists = (
    df_olympics.transform(filter_medalists)
    .transform(filter_by_age_not_null)
    .transform(filter_by_summer_editions)
)

# Display the resulting DataFrame
df_medalists.show(10)

### Write medalists to parquet

In [None]:
save_parquet(df_medalists, "medalists")

### Geeting relevant data to answer the question: Is there a correlation between age and getting a medal?

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Group by Age, Sex, and Medal to count the number of people
def count_medals_by_age_and_sex(df):
    return df.groupBy("Age", "Sex", "Medal").count()

# Pivot the Medal column to create separate columns for Gold, Silver, and Bronze
def pivot_medals_count(df):
    return df.groupBy("Age", "Sex").pivot("Medal").sum("count").fillna(0)

# Sum the amount of Gold, Silver, and Bronze medals by Age and Sex
def with_total_medals(df):
    return df.withColumn("Total_Medals", F.col("Gold") + F.col("Silver") + F.col("Bronze"))

# Define a window to rank the counts within each Age and Sex partition
aggregation_window_age_sex = Window.partitionBy("Sex").orderBy(F.desc("Total_Medals"))

# Add a rank column based on the total count of medals
def row_number_over(window):
    return lambda d: d.withColumn("Rank", F.row_number().over(window))

# Display the resulting DataFrame
df_medalists_age_ranked = (
    df_medalists.transform(count_medals_by_age_and_sex)
    .transform(pivot_medals_count)
    .transform(with_total_medals)
    .transform(row_number_over(aggregation_window_age_sex))
    .select("Rank", "Sex", "Age", "Total_Medals")
)

df_medalists_age_ranked.show(10)

In [None]:
save_parquet(df_medalists_age_ranked, "medalists_age")

### Medals by Country

In [None]:
def count_medals_by(selection):
    return lambda df: df.groupBy(*selection).count().orderBy(*selection)


df_medal_count_by_country = df_medalists.transform(count_medals_by(["NOC", "Medal"]))

df_medal_count_by_country.show(10)

### Rank by Medal count

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Add a Rank column based on the defined window
def rank_over(window_spec):
    return lambda df: df.withColumn("Rank", F.rank().over(window_spec))

# Aggregate counts of Gold, Silver, and Bronze medals by selection
def group_medals_by(selection):
    return lambda df: df.groupBy(*selection).agg(
        F.sum(F.when(F.col("Medal") == "Gold", F.col("count")).otherwise(0)).alias(
            "Gold"
        ),
        F.sum(F.when(F.col("Medal") == "Silver", F.col("count")).otherwise(0)).alias(
            "Silver"
        ),
        F.sum(F.when(F.col("Medal") == "Bronze", F.col("count")).otherwise(0)).alias(
            "Bronze"
        ),
    )

In [None]:
# Create a window specification to rank by Gold, Silver, and Bronze medals
window_ordered_by_medal_score = Window.partitionBy().orderBy(
    F.desc("Gold"), F.desc("Silver"), F.desc("Bronze")
)

df_medal_rank = (
    df_medal_count_by_country.transform(group_medals_by(["NOC"]))
    .transform(rank_over(window_ordered_by_medal_score))
    .select("Rank", "NOC", "Gold", "Silver", "Bronze")
)

df_medal_rank.show()

### Write medal_rank to parquet

In [None]:
save_parquet(df_medal_rank, "medal_rank")

### Rank Medals by year

In [None]:
# Create a window specification to partition by Year and
# then rank by Gold, Silver, and Bronze medals
window_by_year_ordered_by_medal_score = Window.partitionBy("Year").orderBy(
    F.desc("Gold"), F.desc("Silver"), F.desc("Bronze")
)

df_medals_ranked_by_year = (
    df_medalists.transform(count_medals_by(["NOC", "Year", "Medal"]))
    .transform(group_medals_by(["NOC", "Year"]))
    .transform(rank_over(window_by_year_ordered_by_medal_score))
    .orderBy("Year", "Rank")
    .select("Year", "Rank", "NOC", "Gold", "Silver", "Bronze")
)

# Display the resulting DataFrame
df_medals_ranked_by_year.show(10)

### Write medals_ranked_by_year to parquet

In [None]:
save_parquet(df_medals_ranked_by_year, "medals_ranked_by_year")

## GDP

### GDP Ranked
Produces the `gdp_ranked.parquet` aggregate.  
There are some country codes in the gdp dataset that doesn't correspond to country codes in the olympics dataset. 
We've filtered the gdp dataset to only show the country codes that are in the olympic dataset.
Then, we've partitioned it by country and ranked them by gdp value.

### GDP average by Country
Produces the `gdp_avg_by_country.parquet` aggregate.  
It ranks the country according to their GDP average

### Read from bronze layer

In [None]:
df_gdp = df["gdp"]
df_gdp.show(10)

In [None]:
# Assert df_gdp has the same year range as defined by min_year and max_year
assert get_year_range(df_gdp) == (min_year, max_year)

### Rank value and filter country codes 

In [None]:
# Extract distinct NOC codes from the Olympics DataFrame and collect them into a list
olympics_country_codes_list = (
    df_olympics.select("NOC").distinct().rdd.map(lambda row: row[0]).collect()
)

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# Filter GDP data to include only countries present in the Olympics dataset
def filter_by_olympics_countries(df):
    return df.filter(F.col("Country Code").isin(olympics_country_codes_list))

# Define a window partitioned by Year and ordered by Value in descending order
window_by_year_ordered_by_value = Window.partitionBy("Year").orderBy(F.desc("Value"))

df_gdp_ranked = df_gdp.transform(filter_by_olympics_countries).transform(
    rank_over(window_by_year_ordered_by_value)
)

display(df_gdp_ranked)

### Write gdp_ranked to parquet

In [None]:
save_parquet(df_gdp_ranked, "gdp_ranked")

### Average value by country and Rank

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Group by Country Name and Country Code, and calculate the average value
def avg_value_by_country(df):
    return df.groupBy("Country Name", "Country Code").agg(
        F.avg("Value").alias("Average Value")
    )


# Format the Average Value to include a dollar sign and commas
def format_money(from_col, to_col):
    return lambda df: df.withColumn(to_col, F.format_string("$%,.2f", F.col(from_col)))


# Define a window ordered by Average Value in descending order
windows_ordered_by_avg = Window.orderBy(F.desc("Average Value"))


df_gdp_avg_by_country = (
    df_gdp.transform(filter_by_olympics_countries)
    .transform(avg_value_by_country)
    .transform(format_money(from_col="Average Value", to_col="Formatted Value"))
    .transform(rank_over(windows_ordered_by_avg))
).select("Rank", "Country Name", "Formatted Value", "Country Code", "Average Value")

df_gdp_avg_by_country.show(10)

### Write gdp_avg_by_country to parquet

In [None]:
save_parquet(df_gdp_avg_by_country, "gdp_avg_by_country")