## Setting up PySpark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar -xvf /content/data-analysis-pyspark/spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark

spark-3.0.1-bin-hadoop3.2/
spark-3.0.1-bin-hadoop3.2/RELEASE
spark-3.0.1-bin-hadoop3.2/examples/
spark-3.0.1-bin-hadoop3.2/examples/src/
spark-3.0.1-bin-hadoop3.2/examples/src/main/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/KMeansExample.scala
spark-3.0.1-bin-hadoop3.2/example

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/data-analysis-pyspark/spark-3.0.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from __future__ import division
import matplotlib.pyplot as plt
import pandas as pd


conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local[*]').getOrCreate()

Use this cell to stop the current session

In [None]:
# sc.stop()

Defining the Schemea for the datast

In [None]:
schema = StructType() \
      .add("ID",IntegerType(),True) \
      .add("Name",StringType(),True) \
      .add("Sex",StringType(),True) \
      .add("Age",IntegerType(),False) \
      .add("Height",DoubleType(),True) \
      .add("Weight",DoubleType(),True) \
      .add("Team",StringType(),True) \
      .add("NOC",StringType(),True) \
      .add("Games",StringType(),True) \
      .add("Year",IntegerType(),True) \
      .add("Season",StringType(),True) \
      .add("City",StringType(),True) \
      .add("Sport",StringType(),True) \
      .add("Event",StringType(),True) \
      .add("Medal",StringType(),True)

Reading from CSV

In [None]:
df1 = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("/content/data-analysis-pyspark/athlete_events.csv")

In [None]:
df1 = df1.na.drop(subset=["Year"])

In [None]:
df1.show()

+---+--------------------+---+---+------+------+--------------+---+-----------+----+------+-----------+--------------------+--------------------+-----+
| ID|                Name|Sex|Age|Height|Weight|          Team|NOC|      Games|Year|Season|       City|               Sport|               Event|Medal|
+---+--------------------+---+---+------+------+--------------+---+-----------+----+------+-----------+--------------------+--------------------+-----+
|  1|           A Dijiang|  M| 24| 180.0|  80.0|         China|CHN|1992 Summer|1992|Summer|  Barcelona|          Basketball|Basketball Men's ...|   NA|
|  2|            A Lamusi|  M| 23| 170.0|  60.0|         China|CHN|2012 Summer|2012|Summer|     London|                Judo|Judo Men's Extra-...|   NA|
|  3| Gunnar Nielsen Aaby|  M| 24|  null|  null|       Denmark|DEN|1920 Summer|1920|Summer|  Antwerpen|            Football|Football Men's Fo...|   NA|
|  4|Edgar Lindenau Aabye|  M| 34|  null|  null|Denmark/Sweden|DEN|1900 Summer|1900|Summ

# Visualization

## General count



1.   Number of Atheletes per year
2.   Number of Nations per year
3.   Number of Events per year





In [None]:
import plotly.express as px

In [None]:
atheletes = df1.groupBy(['Year', 'Season']).agg(F.countDistinct('ID').alias('Count')).sort('Year').toPandas()

In [None]:
fig = px.line(atheletes, x="Year", y="Count", color='Season', height=600, width=1000, title='Number of atheletes every year')
fig.update_traces(mode='markers+lines')
fig.show()

In [None]:
countries = df1.groupBy(['Year', 'Season']).agg(F.countDistinct('NOC').alias('Nations')).sort('Year').toPandas()
fig = px.line(countries, x='Year', y='Nations', color='Season', height=600, width=1000, title='Number of Nations participating every year')
fig.update_traces(mode='markers+lines')
fig.show()

In [None]:
Event = df1.groupBy(['Year', 'Season']).agg(F.countDistinct('Event').alias('Event')).sort('Year').toPandas()
fig = px.line(Event, x='Year', y='Event', color='Season', height=600, width=1000, title='Number of events every year')
fig.update_traces(mode='markers+lines')
fig.show()

## Age Vs Height Vs Weight (3D plot)

Displays the three properties seperated by gender

In [None]:
d = df1.filter('Year < 2004').toPandas()

In [None]:
fig = px.scatter_3d(d, x='Age', y='Height', z='Weight', color='Sex', opacity=0.2, title='Male Vs Female', height=800)
fig.update_traces(marker=dict(size=5), selector=dict(mode='markers'))
fig.update_layout(margin=dict(l=0, r=0, b=0, t=0), xaxis = dict(showgrid=False), yaxis = dict(showgrid=False))
fig.show()

## Event based - Boxing (3D plot)

Displays Heavy /Middle /Fly /Light weight seperated by gender

In [None]:
e = df1.filter((F.col("Event")=="Boxing Women's Lightweight")|(F.col("Event")=="Boxing Women's Middleweight")|(F.col("Event")=="Boxing Women's Flyweight")|(F.col("Event")=="Boxing Men's Heavyweight")|(F.col("Event")=="Boxing Men's Flyweight")|(F.col("Event")=="Boxing Men's Middleweight")|(F.col("Event")=="Boxing Men's Featherweight")).toPandas()

In [None]:
fig = px.scatter_3d(e, x='Age', y='Height', z='Weight', color='Event', opacity=0.5, title="Men's Boxing", height=800)
fig.update_traces(marker=dict(size=5), selector=dict(mode='markers'))
fig.update_layout(margin=dict(l=0, r=0, b=0, t=0), xaxis = dict(showgrid=False), yaxis = dict(showgrid=False))
fig.show()

## Women in Olympics (WIP)

1.   Men vs Women per year (Line graph)
2.   Number of women relative to men across countries (Scatter plot)
3.   Medal counts for women of different nations: 2016 (Bar graph)



In [None]:
Gender = df1.filter('Year<1980')
Gender = Gender.groupBy(['Year', 'Sex']).agg(F.countDistinct('ID').alias('Count')).sort('Year').toPandas()
fig = px.line(Gender, x='Year', y='Count', color='Sex', height=600, width=800, title='Number of Male Vs Female')
fig.update_traces(mode='markers+lines')

fig.show()

In [None]:
relative = df1.groupBy(['Year', 'Sex', 'NOC']).agg(F.countDistinct('ID').alias('Count')).filter("Year = 2016 OR Year = 1996 OR Year = 1976 OR Year = 1956 OR Year = 1936").sort('Year')

In [None]:
male = df1.groupBy(['Year', 'Sex', 'NOC']).agg(F.countDistinct('ID').alias('Male')).filter("Sex == 'M'").sort('Year')
female = df1.groupBy(['Year', 'Sex', 'NOC']).agg(F.countDistinct('ID').alias('Female')).filter("Sex == 'F'").sort('Year')

In [None]:
r = relative.toPandas()

In [None]:
px.scatter(r, x='Count', y='Count')

In [None]:
medals = df1.groupBy(['NOC', 'Medal']).agg(F.countDistinct('ID').alias('Count')).filter("Year = 2016 OR Year = 1996 OR Year = 1976 OR Year = 1956 OR Year = 1936").sort('Year')

In [None]:
import plotly.graph_objects as go

fig = go.Figure(go.Scattergeo())
fig.update_geos(projection_type="orthographic")
fig.update_layout(height=300, margin={"r":0,"t":0,"l":0,"b":0})
fig.show()

## Height over time

In [None]:
df2 = df1.filter('Year > 1960')
height_over_time = df2.groupBy(['Year', 'Height', 'Sex']).agg(F.countDistinct('ID').alias('H')).sort('Year').toPandas()

In [None]:
fig = px.box(height_over_time, x='Year', y="Height", color='Sex', title='Height(cm) per year')
fig.update_yaxes(range=[125,225], secondary_y=False)
fig.show()

## Weight over time

In [None]:
df3 = df1.filter('Year > 1960')
weight_over_time = df3.groupBy(['Year', 'Weight', 'Sex']).agg(F.countDistinct('ID').alias('W')).sort('Year').toPandas()
fig = px.box(weight_over_time, x='Year', y="Weight", color='Sex', title='Weight(kg) per year')
fig.update_yaxes(range=[0,200], secondary_y=False)
fig.show()

In [None]:
filter_by_year = df1.filter('Year==2016')

In [None]:
op = filter_by_year.groupBy('Team').agg(F.countDistinct('ID').alias('Count')).toPandas()

In [None]:
op.to_csv('/content/sample_data/op.csv')

In [None]:
df6 = df1.filter('Year==2016')
df6 = df6.filter((df6.Medal != 'NA'))

In [None]:
total_medals = df6.groupBy(['NOC']).agg(F.count('Medal')).sort(F.desc('count(Medal)')).toPandas()

In [None]:
fig = px.bar(total_medals, x="count(Medal)", y="NOC", orientation='h', title='Number of medal by countries in 2016')
fig.show()

In [None]:
df = spark.read.format("csv") \
      .option("header", True) \
      .load("/content/opnew.csv")

In [None]:
df6 = pd.read_csv('/content/opnew.csv')

In [None]:
new_dataframe = pd.read_csv('/content/6699.csv')

In [None]:
fig = px.scatter_geo(new_dataframe, locations="iso_alpha", hover_name="Team", size="Count", projection="orthographic", height=600, width=800, title='Athlete count by country in 2016 Olympics')
fig.update_geos(
    resolution=50,
    # showcoastlines=True, coastlinecolor="RebeccaPurple",
    showland=True, landcolor="#a7ff83",
    showcountries=True, countrycolor="#403f3f",
    showocean=True, oceancolor="#c5e3f6",
)
fig.show()

In [None]:
fig = px.scatter_geo(new_dataframe, locations="iso_alpha", hover_name="Team", size="Count", projection="natural earth", height=600, width=800, title='Athlete count by country in 2016 Olympics')
fig.update_geos(
    resolution=50,
    # showcoastlines=True, coastlinecolor="RebeccaPurple",
    showland=True, landcolor="#a7ff83",
    showcountries=True, countrycolor="#403f3f",
    showocean=True, oceancolor="#c5e3f6",
)
fig.show()