# Assignment 2 - Selin Karabulut

## Due date/time: February 13, 2023 at 11:59 PM
## Submit Jupyter notebook to class Gradescope

Variable `data` shows where data is located. Modify it as needed

In [1]:
data = "gs://pstat235-sk/notebooks/jupyter/data/"

## Data

This is a historical dataset on the modern Olympic Games, including all the Games from Athens 1896 to Rio 2016. The data was taken from Kaggle. The `athlete_events` Dataset contains $271,116$ rows and $15$ columns and the NOC region dataset contains $230$ rows and $3$ columns. They will be merged together by the National Olympic Committee (NOC) region. Both files are comma separated.

**Source:**

Griffin, R, H (2018) 120 years of Olympic history: athletes and results, athlete_events, Found at: https://www.kaggle.com/heesoo37/120-years-of-olympic-history-athletes-and-results#athlete_events.csv

Griffin, R, H (2018) 120 years of Olympic history: athletes and results, noc_regions, Found at: https://www.kaggle.com/heesoo37/120-years-of-olympic-history-athletes-and-results#noc_regions.csv

**ATTRIBUTES:**

**athlete_events.csv**

| Column Name | Data Type | Description/Notes |
|:----:|:----:|:----|
| ID |  integer | Unique number for each athlete |
| Name | string | Athlete’s name |
| Sex | string | M or F |
| Age | integer |  |
| Height | integer | In centimeters |
| Weight | integer | In kilograms |
| Team | string | Team name |
| NOC | string | National Olympic Committee, 3 letter code (Matches with `NOC` from noc_regions.csv) |
| Games | string | Year and season |
| Year | integer |  |
| Season | string | Summer or Winter |
| City | string | Host city |
| Sport | string |  |
| Event | string |  |
| Medal | string | Gold, Silver, Bronze, or NA |

**noc_regions.csv**

| Column Name | Data Type | Description/Notes |
|:--|--|:--|
| NOC | string | National Olympic Committee, 3 letter code (Matches with `NOC` from noc_regions.csv) |
| Region | string |  |
| notes | string |  |

## Upload the data into Google Cloud Storage

Use the paths above to download our two files and upload them to your Google bucket. For consistency use the following path:

`gs://<BUCKET-NAME>/notebooks/jupyter/data/olympics-analysis`

and upload the files into *olympics-analysis* directory.

Confirm that files were uploaded successfully and are accessible via the notebook by the following gsutil command:

In [2]:
!gsutil ls {data + "olympics-analysis"}

gs://pstat235-sk/notebooks/jupyter/data/olympics-analysis/
gs://pstat235-sk/notebooks/jupyter/data/olympics-analysis/Assignment-2.ipynb
gs://pstat235-sk/notebooks/jupyter/data/olympics-analysis/athlete_events.csv
gs://pstat235-sk/notebooks/jupyter/data/olympics-analysis/noc_regions.csv


## Load the data into Spark

We can either ask Spark to infer the schema or we explicitely specify it ourselves. For this example we need to specify the schema explicitely since not all the columns will be converted the way we would like to by the default option.

As a reminder, here is how we can define a schema contained of two columns, one string and one integer:

```python
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("ID", LongType(), True),
  StructField("name", StringType(), True)
])

df = spark.read.format("csv")\
  .schema(myManualSchema)\
  .option("header", "true")\
  .option("nullValue", "NA")\
  .load("gs/path/to/file")
```

Modify this code to load athlete_events.csv. Call this DataFrame `athlete_events`:

**Note:** We have "NA" values in our data. This could cause issues when loading the data. To overcome this we need to let Spark know that what string is representing `null` in the data. We can use the option/parameter `nullValue` (as used in the sample code above) and set it to "NA".

In [3]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("ID", LongType(), True),
  StructField("Name", StringType(), True),
  StructField("Sex", StringType(), True),
  StructField("Age", LongType(), True),
  StructField("Height", LongType(), True),
  StructField("Weight", LongType(), True),
  StructField("Team", StringType(), True),
  StructField("NOC", StringType(), True),
  StructField("Games", StringType(), True),
  StructField("Year", LongType(), True),
  StructField("Season", StringType(), True),
  StructField("City", StringType(), True),
  StructField("Sport", StringType(), True),
  StructField("Event", StringType(), True),
  StructField("Medal", StringType(), True)
])

athlete_events = spark.read.format("csv")\
  .schema(myManualSchema)\
  .option("header", "true")\
  .option("nullValue", "NA")\
  .load("gs://pstat235-sk/notebooks/jupyter/data/olympics-analysis/athlete_events.csv")

Print the schema of this DataFrame:

In [4]:
athlete_events.printSchema()

root
 |-- ID: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Height: long (nullable = true)
 |-- Weight: long (nullable = true)
 |-- Team: string (nullable = true)
 |-- NOC: string (nullable = true)
 |-- Games: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- Season: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Medal: string (nullable = true)



Print the first 5 rows:

In [5]:
athlete_events.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+---+--------------------+---+---+------+------+--------------+---+-----------+----+------+---------+-------------+--------------------+-----+
| ID|                Name|Sex|Age|Height|Weight|          Team|NOC|      Games|Year|Season|     City|        Sport|               Event|Medal|
+---+--------------------+---+---+------+------+--------------+---+-----------+----+------+---------+-------------+--------------------+-----+
|  1|           A Dijiang|  M| 24|   180|    80|         China|CHN|1992 Summer|1992|Summer|Barcelona|   Basketball|Basketball Men's ...| null|
|  2|            A Lamusi|  M| 23|   170|    60|         China|CHN|2012 Summer|2012|Summer|   London|         Judo|Judo Men's Extra-...| null|
|  3| Gunnar Nielsen Aaby|  M| 24|  null|  null|       Denmark|DEN|1920 Summer|1920|Summer|Antwerpen|     Football|Football Men's Fo...| null|
|  4|Edgar Lindenau Aabye|  M| 34|  null|  null|Denmark/Sweden|DEN|1900 Summer|1900|Summer|    Paris|   Tug-Of-War|Tug-Of-War Men's ...| Gold|

                                                                                

We won't use the following columns, let's drop them from the DataFrame in a persistent way:

* ID
* Games
* Event

In [6]:
cols_to_drop = ['ID', 'Games', 'Event']
athlete_events = athlete_events.drop(*cols_to_drop)

In [7]:
 athlete_events.show(5) # uncomment before submission

+--------------------+---+---+------+------+--------------+---+----+------+---------+-------------+-----+
|                Name|Sex|Age|Height|Weight|          Team|NOC|Year|Season|     City|        Sport|Medal|
+--------------------+---+---+------+------+--------------+---+----+------+---------+-------------+-----+
|           A Dijiang|  M| 24|   180|    80|         China|CHN|1992|Summer|Barcelona|   Basketball| null|
|            A Lamusi|  M| 23|   170|    60|         China|CHN|2012|Summer|   London|         Judo| null|
| Gunnar Nielsen Aaby|  M| 24|  null|  null|       Denmark|DEN|1920|Summer|Antwerpen|     Football| null|
|Edgar Lindenau Aabye|  M| 34|  null|  null|Denmark/Sweden|DEN|1900|Summer|    Paris|   Tug-Of-War| Gold|
|Christine Jacoba ...|  F| 21|   185|    82|   Netherlands|NED|1988|Winter|  Calgary|Speed Skating| null|
+--------------------+---+---+------+------+--------------+---+----+------+---------+-------------+-----+
only showing top 5 rows



 Now load noc_regions.csv. Call this DataFrame `noc_regions`:

In [8]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("NOC", StringType(), True),
  StructField("region", StringType(), True),
  StructField("notes", StringType(), True)
])

noc_regions = spark.read.format("csv")\
  .schema(myManualSchema)\
  .option("header", "true")\
  .option("nullValue", "NA")\
  .load("gs://pstat235-sk/notebooks/jupyter/data/olympics-analysis/noc_regions.csv")

In [9]:
noc_regions.show(5) # uncomment before submission

+---+-----------+--------------------+
|NOC|     region|               notes|
+---+-----------+--------------------+
|AFG|Afghanistan|                null|
|AHO|    Curacao|Netherlands Antilles|
|ALB|    Albania|                null|
|ALG|    Algeria|                null|
|AND|    Andorra|                null|
+---+-----------+--------------------+
only showing top 5 rows



### Caching

Since we will be using these two DataFrames a lot in this notebook let's `cache()` them to speed up our execution. Caching allows the DataFrame to be loaded and persist in the memory. If we don't use this option, every time we execute an action our DataFrame gets loaded from our Cloud Storage, which is not ideal and will add to our execution time:

**Note:** Caching is a lazy transformation. It will happen the first time you execute an action against the DataFrame, not when you cache that DataFrame.

In [10]:
athlete_events.cache()  # uncomment

DataFrame[Name: string, Sex: string, Age: bigint, Height: bigint, Weight: bigint, Team: string, NOC: string, Year: bigint, Season: string, City: string, Sport: string, Medal: string]

In [11]:
noc_regions.cache()  # uncomment

DataFrame[NOC: string, region: string, notes: string]

## Question 1

What is the minimum and maximum `year`?

**PSTAT 235**: use `agg` to show both minimum and maximum values in a single output.

In [12]:
max_year = athlete_events.agg({'Year': 'max'}).show()
min_year = athlete_events.agg({'Year': 'min'}).show()


                                                                                

+---------+
|max(Year)|
+---------+
|     2016|
+---------+

+---------+
|min(Year)|
+---------+
|     1896|
+---------+



In [13]:
# PSTAT235
from pyspark.sql import functions as F
min_max_year = athlete_events.agg(F.min(athlete_events['Year']).alias('min_year'),
    F.max(athlete_events['Year']).alias('max_year')).show()

+--------+--------+
|min_year|max_year|
+--------+--------+
|    1896|    2016|
+--------+--------+



## Question 2

Is the following statement True or False?

> Averag age of female athletes who attended the olympic games after 1990 has raised when compared to the era before then.

In [14]:
from pyspark.sql import functions as F

# Filter the DataFrame to only include female athletes
female_athletes = athlete_events.filter(athlete_events["Sex"] == "F")

# Calculate the average age of female athletes before 1990
before_1990 = female_athletes.filter(female_athletes["Year"] < 1990).agg(F.avg(female_athletes["Age"])).collect()[0][0]
before_1990


22.034368070953438

In [15]:
# Calculate the average age of female athletes after 1990
after_1990 = female_athletes.filter(female_athletes["Year"] >= 1990).agg(F.avg(female_athletes["Age"])).collect()[0][0]
after_1990

24.619499568593614

In [16]:
# Compare the two values
if after_1990 > before_1990:
    print("True")
else:
    print("False")

True


## Question 3

How many Gold medals were given to men from 1970 to 2000 (including both years)?

In [17]:
# Filter the DataFrame to only include male athletes and gold medals
male_gold_medals = athlete_events.filter((athlete_events["Sex"] == "M") & (athlete_events["Medal"] == "Gold"))

# Filter the DataFrame to only include years between 1970 and 2000 (inclusive)
male_gold_medals_1970_to_2000 = male_gold_medals.filter((male_gold_medals["Year"] >= 1970) & (male_gold_medals["Year"] <= 2000))



In [18]:
# Count the number of gold medals
count = male_gold_medals_1970_to_2000.agg(F.count("*")).collect()[0][0]

print("Number of gold medals awarded to men from 1970 to 2000:", count)

Number of gold medals awarded to men from 1970 to 2000: 3186


## Question 4

How many NOCs attended Summer Olympics 2016 in Rio de Janeiro?

NOC stands for National Olympic Committee. Almost equivalent to a country.

In [19]:
# Filter the DataFrame to only include Summer Olympics in 2016
summer_olympics_2016 = athlete_events.filter((athlete_events["Year"] == 2016) & (athlete_events["Season"] == "Summer"))

# Count the number of unique NOCs that attended the Summer Olympics in 2016
count = summer_olympics_2016.agg(F.countDistinct(summer_olympics_2016["NOC"])).collect()[0][0]

print("Number of NOCs that attended Summer Olympics 2016 in Rio de Janeiro:", count)

Number of NOCs that attended Summer Olympics 2016 in Rio de Janeiro: 207


## Question 5

Create two DataFrames, one for the Winter games and one for the Summer games; these DataFrames should include a list of all NOCs that have wone gold medals in the colympics, and their count. Sort these DataFrame by the count in a descending order. Call these DataFrames `winter_gold_count` and `summer_gold_count` respectively. Using these two, answer the following questions:

Which country has the highest gold medal count in the Winter Olympics? How about the Summer Olympics?

In [20]:
from pyspark.sql.functions import desc, col
# Filter the DataFrame to only include Winter Olympics gold medals
winter_gold = athlete_events.filter((athlete_events["Season"] == "Winter") & (athlete_events["Medal"] == "Gold"))

# Group the Winter Olympics gold medals by NOC and count the number of medals per NOC
winter_gold_grouped = winter_gold.groupBy("NOC").agg(F.count("Medal").alias("count"))

# Sort the grouped Winter Olympics gold medals by count in descending order
winter_gold_count = winter_gold_grouped.sort(F.desc("count"))




In [21]:
winter_gold_count.show(5)

+---+-----+
|NOC|count|
+---+-----+
|CAN|  305|
|URS|  249|
|USA|  159|
|GER|  153|
|NOR|  151|
+---+-----+
only showing top 5 rows



In [22]:
# Filter the DataFrame to only include Summer Olympics gold medals
summer_gold = athlete_events.filter((athlete_events["Season"] == "Summer") & (athlete_events["Medal"] == "Gold"))

# Group the Summer Olympics gold medals by NOC and count the number of medals per NOC
summer_gold_grouped = summer_gold.groupBy("NOC").agg(F.count("Medal").alias("count"))

# Sort the grouped Summer Olympics gold medals by count in descending order
summer_gold_count = summer_gold_grouped.sort(F.desc("count"))



In [23]:
summer_gold_count.show(5)

+---+-----+
|NOC|count|
+---+-----+
|USA| 2376|
|URS|  832|
|GBR|  635|
|GER|  591|
|ITA|  518|
+---+-----+
only showing top 5 rows



In [24]:
# Get the country with the highest gold medal count in the Winter Olympics
highest_winter_gold_NOC = winter_gold_count.first()["NOC"]
print("Country with the highest gold medal count in the Winter Olympics:", highest_winter_gold_NOC)



Country with the highest gold medal count in the Winter Olympics: CAN


In [25]:
# Get the country with the highest gold medal count in the Summer Olympics
highest_summer_gold_NOC = summer_gold_count.first()["NOC"]
print("Country with the highest gold medal count in the Summer Olympics:", highest_summer_gold_NOC)

Country with the highest gold medal count in the Summer Olympics: USA


## Question 6

Using the common field `NOC`, merge `summer_gold_count` and `noc_regions` DataFrames.

Which region takes the 10th place? This is based on the number of gold medals in all of the Summer Olympics in our dataset.

**PSTAT 235**: repeat the same procedure using SQL.

In [25]:
# Merge the summer_gold_count and noc_regions DataFrames using the NOC field
merged_df = summer_gold_count.join(noc_regions, on="NOC", how="left")

# Get the 10th place region based on the number of gold medals in all of the Summer Olympics in the dataset
tenth_place_region = merged_df.sort(F.desc("count")).take(10)[9]["region"]
print("Region in the 10th place based on the number of gold medals in all of the Summer Olympics:", tenth_place_region)

Region in the 10th place based on the number of gold medals in all of the Summer Olympics: Germany


In [56]:
# PSTAT235
# Register the summer_gold_count DataFrame as a temporary table
summer_gold_count.createOrReplaceTempView("summer_gold_count")

# Register the noc_regions DataFrame as a temporary table
noc_regions.createOrReplaceTempView("noc_regions")

# Merge the summer_gold_count and noc_regions DataFrames using SQL and the NOC field
tenth_place_region1 = spark.sql("SELECT region, total_gold_count\
                    FROM (SELECT noc_regions.region, SUM(summer_gold_count.count) AS total_gold_count\
                    FROM summer_gold_count\
                    JOIN noc_regions\
                    ON summer_gold_count.NOC = noc_regions.NOC\
                    GROUP BY noc_regions.region\
                    ORDER BY total_gold_count DESC\
                    LIMIT 10)\
                    AS top_10_regions\
                    ORDER BY total_gold_count\
                    LIMIT 1")



In [57]:
tenth_place_region1.show()

+------+----------------+
|region|total_gold_count|
+------+----------------+
| China|             335|
+------+----------------+



When we repeat the same procedure using sql, we found that Region in the 10th place based on the number of gold medals in all of the Summer Olympics is China, compared to the earlier result showing that Germany is the region in the 10th place based on the number of gold medals in all of the Summer Olympics. We got these two different results because we sometimes have more than 1 NOC for the same region in noc.regions.csv. For example, we have Yemen as a region with three different NOCs.