## 1) Import module

In [3]:
import pyspark

## 2) Starting a SPARK session

In [2]:
from pyspark.sql import SparkSession

In [3]:
# Create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

In [4]:
spark

## 3) Read dataset


In [24]:
df_pyspark = spark.read.csv("./data/data_sport.csv", header=True, inferSchema=True, sep=";")
df_pyspark.show()

+----+--------------------+-------------------+-------------+------+----------+------+-----------------------+--------------+
|Year|               Sport|              Event|      Country|Gender|Medal Rank| Medal|Name of Athlete or Team|Age of Athlete|
+----+--------------------+-------------------+-------------+------+----------+------+-----------------------+--------------+
|1924|             Bobsled|    Men's Four/Five|  Switzerland|   Men|         1|  gold|          Switzerland-1|          NULL|
|1924|             Bobsled|    Men's Four/Five|      Britain|   Men|         2|silver|              Britain-1|          NULL|
|1924|             Bobsled|    Men's Four/Five|      Belgium|   Men|         3|bronze|              Belgium-1|          NULL|
|1924|Cross-Country Skiing|Men's 18 Kilometers|       Norway|   Men|         1|  gold|          Thorleif Haug|            29|
|1924|Cross-Country Skiing|Men's 18 Kilometers|       Norway|   Men|         2|silver|   Johan GrÃ¸ttumsbr...|        

## 4) Basic investigation

In [10]:
df_pyspark.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Medal Rank: integer (nullable = true)
 |-- Medal: string (nullable = true)
 |-- Name of Athlete or Team: string (nullable = true)
 |-- Age of Athlete: integer (nullable = true)



In [13]:
df_pyspark.head(3)

[Row(Year=1924, Sport='Bobsled', Event="Men's Four/Five", Country='Switzerland', Gender='Men', Medal Rank=1, Medal='gold', Name of Athlete or Team='Switzerland-1', Age of Athlete=None),
 Row(Year=1924, Sport='Bobsled', Event="Men's Four/Five", Country='Britain', Gender='Men', Medal Rank=2, Medal='silver', Name of Athlete or Team='Britain-1', Age of Athlete=None),
 Row(Year=1924, Sport='Bobsled', Event="Men's Four/Five", Country='Belgium', Gender='Men', Medal Rank=3, Medal='bronze', Name of Athlete or Team='Belgium-1', Age of Athlete=None)]

In [12]:
df_pyspark.select("Year").show()

+----+
|Year|
+----+
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
|1924|
+----+
only showing top 20 rows



## 5) Selecting columns and indexing

In [14]:
df_pyspark.columns

['Year',
 'Sport',
 'Event',
 'Country',
 'Gender',
 'Medal Rank',
 'Medal',
 'Name of Athlete or Team',
 'Age of Athlete']

In [None]:
df_pyspark.select(["Year", "Sport"]).show()

## 6) Checking datatypes

In [16]:
df_pyspark.dtypes   

[('Year', 'int'),
 ('Sport', 'string'),
 ('Event', 'string'),
 ('Country', 'string'),
 ('Gender', 'string'),
 ('Medal Rank', 'int'),
 ('Medal', 'string'),
 ('Name of Athlete or Team', 'string'),
 ('Age of Athlete', 'int')]

In [18]:
df_pyspark.describe()

DataFrame[summary: string, Year: string, Sport: string, Event: string, Country: string, Gender: string, Medal Rank: string, Medal: string, Name of Athlete or Team: string, Age of Athlete: string]

## 7) Adding columns 

In [25]:
df_pyspark = df_pyspark.withColumn("Year+1", df_pyspark["Year"]+1)

In [26]:
df_pyspark.show()

+----+--------------------+-------------------+-------------+------+----------+------+-----------------------+--------------+------+
|Year|               Sport|              Event|      Country|Gender|Medal Rank| Medal|Name of Athlete or Team|Age of Athlete|Year+1|
+----+--------------------+-------------------+-------------+------+----------+------+-----------------------+--------------+------+
|1924|             Bobsled|    Men's Four/Five|  Switzerland|   Men|         1|  gold|          Switzerland-1|          NULL|  1925|
|1924|             Bobsled|    Men's Four/Five|      Britain|   Men|         2|silver|              Britain-1|          NULL|  1925|
|1924|             Bobsled|    Men's Four/Five|      Belgium|   Men|         3|bronze|              Belgium-1|          NULL|  1925|
|1924|Cross-Country Skiing|Men's 18 Kilometers|       Norway|   Men|         1|  gold|          Thorleif Haug|            29|  1925|
|1924|Cross-Country Skiing|Men's 18 Kilometers|       Norway|   Men| 

In [29]:
df_pyspark = df_pyspark.drop("Year+1")

In [30]:
df_pyspark.show()

+----+--------------------+-------------------+-------------+------+----------+------+-----------------------+--------------+
|Year|               Sport|              Event|      Country|Gender|Medal Rank| Medal|Name of Athlete or Team|Age of Athlete|
+----+--------------------+-------------------+-------------+------+----------+------+-----------------------+--------------+
|1924|             Bobsled|    Men's Four/Five|  Switzerland|   Men|         1|  gold|          Switzerland-1|          NULL|
|1924|             Bobsled|    Men's Four/Five|      Britain|   Men|         2|silver|              Britain-1|          NULL|
|1924|             Bobsled|    Men's Four/Five|      Belgium|   Men|         3|bronze|              Belgium-1|          NULL|
|1924|Cross-Country Skiing|Men's 18 Kilometers|       Norway|   Men|         1|  gold|          Thorleif Haug|            29|
|1924|Cross-Country Skiing|Men's 18 Kilometers|       Norway|   Men|         2|silver|   Johan GrÃ¸ttumsbr...|        

## 8) Rename column

In [31]:
df_pyspark = df_pyspark.withColumnRenamed("Year", "NewYear")
df_pyspark.show()

+-------+--------------------+-------------------+-------------+------+----------+------+-----------------------+--------------+
|NewYear|               Sport|              Event|      Country|Gender|Medal Rank| Medal|Name of Athlete or Team|Age of Athlete|
+-------+--------------------+-------------------+-------------+------+----------+------+-----------------------+--------------+
|   1924|             Bobsled|    Men's Four/Five|  Switzerland|   Men|         1|  gold|          Switzerland-1|          NULL|
|   1924|             Bobsled|    Men's Four/Five|      Britain|   Men|         2|silver|              Britain-1|          NULL|
|   1924|             Bobsled|    Men's Four/Five|      Belgium|   Men|         3|bronze|              Belgium-1|          NULL|
|   1924|Cross-Country Skiing|Men's 18 Kilometers|       Norway|   Men|         1|  gold|          Thorleif Haug|            29|
|   1924|Cross-Country Skiing|Men's 18 Kilometers|       Norway|   Men|         2|silver|   Johan

## 9) Handling missing values

In [6]:
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder.appName("testing").getOrCreate()
spark

In [9]:
dataframe = spark.read.csv("./data/test_simple.csv", header=True, inferSchema=True, sep=";")
dataframe.show()

+--------+----+----------+------+
|    name| age|experience|salary|
+--------+----+----------+------+
|   Lukas|  30|        10| 45000|
|Katerina|  26|         8| 34000|
|   Petra|  40|         4| 24000|
|   Pavel|  45|         3| 20000|
|    Kuba|  21|         1| 15000|
|   Julie|NULL|      NULL| 40000|
|   Tonda|  34|         2| 40500|
|    NULL|  26|        10| 38000|
|    NULL|  19|      NULL|  NULL|
+--------+----+----------+------+



In [15]:
dataframe.na.drop(how = "any").show()

+--------+---+----------+------+
|    name|age|experience|salary|
+--------+---+----------+------+
|   Lukas| 30|        10| 45000|
|Katerina| 26|         8| 34000|
|   Petra| 40|         4| 24000|
|   Pavel| 45|         3| 20000|
|    Kuba| 21|         1| 15000|
|   Tonda| 34|         2| 40500|
+--------+---+----------+------+



In [13]:
dataframe.na.drop(how="all").show()

+--------+----+----------+------+
|    name| age|experience|salary|
+--------+----+----------+------+
|   Lukas|  30|        10| 45000|
|Katerina|  26|         8| 34000|
|   Petra|  40|         4| 24000|
|   Pavel|  45|         3| 20000|
|    Kuba|  21|         1| 15000|
|   Julie|NULL|      NULL| 40000|
|   Tonda|  34|         2| 40500|
|    NULL|  26|        10| 38000|
|    NULL|  19|      NULL|  NULL|
+--------+----+----------+------+



In [19]:
dataframe.na.drop(thresh=2).show()

+--------+----+----------+------+
|    name| age|experience|salary|
+--------+----+----------+------+
|   Lukas|  30|        10| 45000|
|Katerina|  26|         8| 34000|
|   Petra|  40|         4| 24000|
|   Pavel|  45|         3| 20000|
|    Kuba|  21|         1| 15000|
|   Julie|NULL|      NULL| 40000|
|   Tonda|  34|         2| 40500|
|    NULL|  26|        10| 38000|
+--------+----+----------+------+



In [23]:
dataframe.na.drop(subset=["age"]).show()

+--------+---+----------+------+
|    name|age|experience|salary|
+--------+---+----------+------+
|   Lukas| 30|        10| 45000|
|Katerina| 26|         8| 34000|
|   Petra| 40|         4| 24000|
|   Pavel| 45|         3| 20000|
|    Kuba| 21|         1| 15000|
|   Tonda| 34|         2| 40500|
|    NULL| 26|        10| 38000|
|    NULL| 19|      NULL|  NULL|
+--------+---+----------+------+



In [34]:
dataframe.show()

+--------+----+----------+------+
|    name| age|experience|salary|
+--------+----+----------+------+
|   Lukas|  30|        10| 45000|
|Katerina|  26|         8| 34000|
|   Petra|  40|         4| 24000|
|   Pavel|  45|         3| 20000|
|    Kuba|  21|         1| 15000|
|   Julie|NULL|      NULL| 40000|
|   Tonda|  34|         2| 40500|
|    NULL|  26|        10| 38000|
|    NULL|  19|      NULL|  NULL|
+--------+----+----------+------+



In [38]:
dataframe.na.fill("Missing Values").show()

+--------------+----+----------+------+
|          name| age|experience|salary|
+--------------+----+----------+------+
|         Lukas|  30|        10| 45000|
|      Katerina|  26|         8| 34000|
|         Petra|  40|         4| 24000|
|         Pavel|  45|         3| 20000|
|          Kuba|  21|         1| 15000|
|         Julie|NULL|      NULL| 40000|
|         Tonda|  34|         2| 40500|
|Missing Values|  26|        10| 38000|
|Missing Values|  19|      NULL|  NULL|
+--------------+----+----------+------+



In [45]:
from pyspark.ml.feature import Imputer

imputer = Imputer(      inputCols=["age", "experience", "salary"],
                        outputCols=["{}_imputed".format(c) for c in ["age", "experience", "salary"]]
                    ).setStrategy("mean")

In [46]:
imputer.fit(dataframe).transform(dataframe).show()

+--------+----+----------+------+-----------+------------------+--------------+
|    name| age|experience|salary|age_imputed|experience_imputed|salary_imputed|
+--------+----+----------+------+-----------+------------------+--------------+
|   Lukas|  30|        10| 45000|         30|                10|         45000|
|Katerina|  26|         8| 34000|         26|                 8|         34000|
|   Petra|  40|         4| 24000|         40|                 4|         24000|
|   Pavel|  45|         3| 20000|         45|                 3|         20000|
|    Kuba|  21|         1| 15000|         21|                 1|         15000|
|   Julie|NULL|      NULL| 40000|         30|                 5|         40000|
|   Tonda|  34|         2| 40500|         34|                 2|         40500|
|    NULL|  26|        10| 38000|         26|                10|         38000|
|    NULL|  19|      NULL|  NULL|         19|                 5|         32062|
+--------+----+----------+------+-------