# PySpark module with COVID-19 Dataset

In [2]:
import pandas as pd
import numpy as np
from datetime import date, timedelta, datetime
import time

import pyspark # only run this after findspark.init()
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

In [3]:
# Initiate the Spark Session
spark = SparkSession.builder.appName('covid-example').getOrCreate()

In [4]:
spark

## Data
 We will be working with the Data Science for COVID-19 in South Korea, which is one of the most detailed datasets on the internet for COVID.

Data can be found in this kaggle URL [Link](https://www.kaggle.com/kimjihoo/coronavirusdataset)

### 1. Basic Functions

#### [1] Load (Read) the data

In [5]:
cases = spark.read.load("./data/Case.csv",
                        format="csv",
                        sep=",",
                        header="true")

In [6]:
# First few rows in the file
cases.show()

+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| TRUE|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| TRUE|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| TRUE| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| TRUE|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| TRUE|     Day Care Center|       43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| TRUE|Manmin Central Ch...|       41|37.481059|126.894343|
| 1000007|   Seoul|from other city| TRUE|SMR Newly Planted...|       36|        -|         -|
| 1000008|   Seoul|  Dongdaemun-gu| TRUE|       Dongan Churc

It looks ok right now, but sometimes as we the number of columns increases, the formatting becomes not too great. I have noticed that the following trick helps in displaying in pandas format in my Jupyter Notebook. 

The **.toPandas()** function converts a **Spark Dataframe** into a **Pandas Dataframe**, which is much easier to play with.

In [7]:
cases.limit(10).toPandas()

Unnamed: 0,case_id,province,city,group,infection_case,confirmed,latitude,longitude
0,1000001,Seoul,Yongsan-gu,True,Itaewon Clubs,139,37.538621,126.992652
1,1000002,Seoul,Gwanak-gu,True,Richway,119,37.48208,126.901384
2,1000003,Seoul,Guro-gu,True,Guro-gu Call Center,95,37.508163,126.884387
3,1000004,Seoul,Yangcheon-gu,True,Yangcheon Table Tennis Club,43,37.546061,126.874209
4,1000005,Seoul,Dobong-gu,True,Day Care Center,43,37.679422,127.044374
5,1000006,Seoul,Guro-gu,True,Manmin Central Church,41,37.481059,126.894343
6,1000007,Seoul,from other city,True,SMR Newly Planted Churches Group,36,-,-
7,1000008,Seoul,Dongdaemun-gu,True,Dongan Church,17,37.592888,127.056766
8,1000009,Seoul,from other city,True,Coupang Logistics Center,25,-,-
9,1000010,Seoul,Gwanak-gu,True,Wangsung Church,30,37.481735,126.930121


#### [2] Change Column Names

To change a single column,

In [8]:
cases = cases.withColumnRenamed("infection_case","infection_source")

To change all columns,

In [9]:
cases = cases.toDF(*['case_id', 'province', 'city', 'group', 'infection_case', 'confirmed',
       'latitude', 'longitude'])

In [10]:
cases.show()

+-------+--------+---------------+-----+--------------------+---------+---------+----------+
|case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+-------+--------+---------------+-----+--------------------+---------+---------+----------+
|1000001|   Seoul|     Yongsan-gu| TRUE|       Itaewon Clubs|      139|37.538621|126.992652|
|1000002|   Seoul|      Gwanak-gu| TRUE|             Richway|      119| 37.48208|126.901384|
|1000003|   Seoul|        Guro-gu| TRUE| Guro-gu Call Center|       95|37.508163|126.884387|
|1000004|   Seoul|   Yangcheon-gu| TRUE|Yangcheon Table T...|       43|37.546061|126.874209|
|1000005|   Seoul|      Dobong-gu| TRUE|     Day Care Center|       43|37.679422|127.044374|
|1000006|   Seoul|        Guro-gu| TRUE|Manmin Central Ch...|       41|37.481059|126.894343|
|1000007|   Seoul|from other city| TRUE|SMR Newly Planted...|       36|        -|         -|
|1000008|   Seoul|  Dongdaemun-gu| TRUE|       Dongan Church|       17

#### [3] Change Column Names

We can select a subset of columns using the **select** 

In [11]:
cases = cases.select('province','city','infection_case','confirmed')
cases.show()

+--------+---------------+--------------------+---------+
|province|           city|      infection_case|confirmed|
+--------+---------------+--------------------+---------+
|   Seoul|     Yongsan-gu|       Itaewon Clubs|      139|
|   Seoul|      Gwanak-gu|             Richway|      119|
|   Seoul|        Guro-gu| Guro-gu Call Center|       95|
|   Seoul|   Yangcheon-gu|Yangcheon Table T...|       43|
|   Seoul|      Dobong-gu|     Day Care Center|       43|
|   Seoul|        Guro-gu|Manmin Central Ch...|       41|
|   Seoul|from other city|SMR Newly Planted...|       36|
|   Seoul|  Dongdaemun-gu|       Dongan Church|       17|
|   Seoul|from other city|Coupang Logistics...|       25|
|   Seoul|      Gwanak-gu|     Wangsung Church|       30|
|   Seoul|   Eunpyeong-gu|Eunpyeong St. Mar...|       14|
|   Seoul|   Seongdong-gu|    Seongdong-gu APT|       13|
|   Seoul|      Jongno-gu|Jongno Community ...|       10|
|   Seoul|     Gangnam-gu|Samsung Medical C...|        7|
|   Seoul|    

#### [4] Sort by Column

In [15]:
# To convert str to int
# cases = cases.withColumn("confirmed", cases.confirmed.cast('int'))

In [16]:
# Simple sort
cases.sort("confirmed", ascending =True).show()

+-----------------+---------------+--------------------+---------+
|         province|           city|      infection_case|confirmed|
+-----------------+---------------+--------------------+---------+
|          Jeju-do|              -|contact with patient|        0|
|       Gangwon-do|              -|contact with patient|        0|
|            Seoul|     Gangseo-gu|SJ Investment Cal...|        0|
|           Sejong|              -|                 etc|        1|
|           Sejong|from other city|  Shincheonji Church|        1|
|            Seoul|              -|         Orange Life|        1|
|            Seoul|from other city|Anyang Gunpo Past...|        1|
|            Seoul|from other city|Daejeon door-to-d...|        1|
|     Jeollabuk-do|from other city|  Shincheonji Church|        1|
|     Jeollanam-do|from other city|  Shincheonji Church|        1|
|            Seoul|     Gangnam-gu|Gangnam Dongin Ch...|        1|
|            Busan|from other city|Cheongdo Daenam H...|      

In [14]:
# Descending Sort
from pyspark.sql import functions as F

cases.sort(F.desc("confirmed")).show()

+-----------------+---------------+--------------------+---------+
|         province|           city|      infection_case|confirmed|
+-----------------+---------------+--------------------+---------+
|            Seoul|        Guro-gu| Guro-gu Call Center|       95|
|            Daegu|              -|contact with patient|      917|
|          Gwangju|from other city|  Shincheonji Church|        9|
| Gyeongsangnam-do|       Jinju-si|         Wings Tower|        9|
|Chungcheongbuk-do|from other city|       Itaewon Clubs|        9|
|Chungcheongnam-do|      Seosan-si|Seosan-si Laboratory|        9|
|      Gyeonggi-do|              -|                 etc|       84|
|            Seoul|from other city|  Shincheonji Church|        8|
|           Sejong|         Sejong|gym facility in S...|        8|
| Gyeongsangnam-do|   Geochang-gun|Geochang-gun Woon...|        8|
|Chungcheongbuk-do|              -|contact with patient|        8|
|            Daegu|              -|                 etc|      

#### [5] Change Column Type

In [14]:
from pyspark.sql.types import DoubleType, IntegerType, StringType

cases = cases.withColumn('confirmed', F.col('confirmed').cast(IntegerType()))
cases = cases.withColumn('city', F.col('city').cast(StringType()))

cases.show()

+--------+---------------+--------------------+---------+
|province|           city|      infection_case|confirmed|
+--------+---------------+--------------------+---------+
|   Seoul|     Yongsan-gu|       Itaewon Clubs|      139|
|   Seoul|      Gwanak-gu|             Richway|      119|
|   Seoul|        Guro-gu| Guro-gu Call Center|       95|
|   Seoul|   Yangcheon-gu|Yangcheon Table T...|       43|
|   Seoul|      Dobong-gu|     Day Care Center|       43|
|   Seoul|        Guro-gu|Manmin Central Ch...|       41|
|   Seoul|from other city|SMR Newly Planted...|       36|
|   Seoul|  Dongdaemun-gu|       Dongan Church|       17|
|   Seoul|from other city|Coupang Logistics...|       25|
|   Seoul|      Gwanak-gu|     Wangsung Church|       30|
|   Seoul|   Eunpyeong-gu|Eunpyeong St. Mar...|       14|
|   Seoul|   Seongdong-gu|    Seongdong-gu APT|       13|
|   Seoul|      Jongno-gu|Jongno Community ...|       10|
|   Seoul|     Gangnam-gu|Samsung Medical C...|        7|
|   Seoul|    

#### [6] Filter 

We can filter a data frame using multiple conditions using AND(&), OR(|) and NOT(~) conditions. For example, we may want to find out all the different infection_case in Daegu with more than 10 confirmed cases.

In [15]:
cases.filter((cases.confirmed>10) & (cases.province=='Daegu')).show()

+--------+------------+--------------------+---------+
|province|        city|      infection_case|confirmed|
+--------+------------+--------------------+---------+
|   Daegu|      Nam-gu|  Shincheonji Church|     4511|
|   Daegu|Dalseong-gun|Second Mi-Ju Hosp...|      196|
|   Daegu|      Seo-gu|Hansarang Convale...|      124|
|   Daegu|Dalseong-gun|Daesil Convalesce...|      101|
|   Daegu|     Dong-gu|     Fatima Hospital|       39|
|   Daegu|           -|     overseas inflow|       41|
|   Daegu|           -|contact with patient|      917|
|   Daegu|           -|                 etc|      747|
+--------+------------+--------------------+---------+



#### [7] GroupBy

In [16]:
from pyspark.sql import functions as F

cases.groupBy(["province","city"]).agg(F.sum("confirmed") ,F.max("confirmed")).show()

+----------------+---------------+--------------+--------------+
|        province|           city|sum(confirmed)|max(confirmed)|
+----------------+---------------+--------------+--------------+
|Gyeongsangnam-do|       Jinju-si|             9|             9|
|           Seoul|        Guro-gu|           139|            95|
|           Seoul|     Gangnam-gu|            18|             7|
|         Daejeon|              -|           100|            55|
|    Jeollabuk-do|from other city|             6|             3|
|Gyeongsangnam-do|Changnyeong-gun|             7|             7|
|           Seoul|              -|           561|           298|
|         Jeju-do|from other city|             1|             1|
|Gyeongsangbuk-do|              -|           345|           190|
|Gyeongsangnam-do|   Geochang-gun|            18|            10|
|Gyeongsangbuk-do|        Gumi-si|            10|            10|
|         Incheon|from other city|           117|            53|
|           Busan|       

Or if we don’t like the new column names, we can use the **alias** keyword to rename columns in the agg command itself.

In [17]:
cases.groupBy(["province","city"]).agg(
    F.sum("confirmed").alias("TotalConfirmed"),\
    F.max("confirmed").alias("MaxFromOneConfirmedCase")\
    ).show()

+----------------+---------------+--------------+-----------------------+
|        province|           city|TotalConfirmed|MaxFromOneConfirmedCase|
+----------------+---------------+--------------+-----------------------+
|Gyeongsangnam-do|       Jinju-si|             9|                      9|
|           Seoul|        Guro-gu|           139|                     95|
|           Seoul|     Gangnam-gu|            18|                      7|
|         Daejeon|              -|           100|                     55|
|    Jeollabuk-do|from other city|             6|                      3|
|Gyeongsangnam-do|Changnyeong-gun|             7|                      7|
|           Seoul|              -|           561|                    298|
|         Jeju-do|from other city|             1|                      1|
|Gyeongsangbuk-do|              -|           345|                    190|
|Gyeongsangnam-do|   Geochang-gun|            18|                     10|
|Gyeongsangbuk-do|        Gumi-si|    

#### [8] Joins

Here, We will go with the region file which contains region information such as elementary_school_count, elderly_population_ratio, etc.

In [18]:
regions = spark.read.load("./data/Region.csv",
                          format="csv", 
                          sep=",", 
                          inferSchema="true", 
                          header="true")

regions.limit(10).toPandas()

Unnamed: 0,code,province,city,latitude,longitude,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count
0,10000,Seoul,Seoul,37.566953,126.977977,607,830,48,1.44,15.38,5.8,22739
1,10010,Seoul,Gangnam-gu,37.518421,127.047222,33,38,0,4.18,13.17,4.3,3088
2,10020,Seoul,Gangdong-gu,37.530492,127.123837,27,32,0,1.54,14.55,5.4,1023
3,10030,Seoul,Gangbuk-gu,37.639938,127.025508,14,21,0,0.67,19.49,8.5,628
4,10040,Seoul,Gangseo-gu,37.551166,126.849506,36,56,1,1.17,14.39,5.7,1080
5,10050,Seoul,Gwanak-gu,37.47829,126.951502,22,33,1,0.89,15.12,4.9,909
6,10060,Seoul,Gwangjin-gu,37.538712,127.082366,22,33,3,1.16,13.75,4.8,723
7,10070,Seoul,Guro-gu,37.495632,126.88765,26,34,3,1.0,16.21,5.7,741
8,10080,Seoul,Geumcheon-gu,37.456852,126.895229,18,19,0,0.96,16.15,6.7,475
9,10090,Seoul,Nowon-gu,37.654259,127.056294,42,66,6,1.39,15.4,7.4,952


In [19]:
# Left Join 'Case' with 'Region' on Province and City column
cases = cases.join(regions, ['province','city'],how='left')
cases.limit(10).toPandas()

Unnamed: 0,province,city,infection_case,confirmed,code,latitude,longitude,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count
0,Seoul,Yongsan-gu,Itaewon Clubs,139,10210.0,37.532768,126.990021,15.0,13.0,1.0,0.68,16.87,6.5,435.0
1,Seoul,Gwanak-gu,Richway,119,10050.0,37.47829,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0
2,Seoul,Guro-gu,Guro-gu Call Center,95,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0
3,Seoul,Yangcheon-gu,Yangcheon Table Tennis Club,43,10190.0,37.517189,126.866618,30.0,43.0,0.0,2.26,13.55,5.5,816.0
4,Seoul,Dobong-gu,Day Care Center,43,10100.0,37.668952,127.047082,23.0,26.0,1.0,0.95,17.89,7.2,485.0
5,Seoul,Guro-gu,Manmin Central Church,41,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0
6,Seoul,from other city,SMR Newly Planted Churches Group,36,,,,,,,,,,
7,Seoul,Dongdaemun-gu,Dongan Church,17,10110.0,37.574552,127.039721,21.0,31.0,4.0,1.06,17.26,6.7,832.0
8,Seoul,from other city,Coupang Logistics Center,25,,,,,,,,,,
9,Seoul,Gwanak-gu,Wangsung Church,30,10050.0,37.47829,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0


### 2. Use SQL with DataFrames

We first register the cases dataframe to a temporary table cases_table on which we can run SQL operations. As you can see, the result of the SQL select statement is again a Spark Dataframe.

All complex SQL queries like GROUP BY, HAVING, AND ORDER BY clauses can be applied in 'Sql' function

In [20]:
cases.registerTempTable('cases_table')
newDF = spark.sql('select * from cases_table where confirmed > 100')
newDF.show()



+-----------------+---------------+--------------------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|         province|           city|      infection_case|confirmed| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|
+-----------------+---------------+--------------------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|            Seoul|     Yongsan-gu|       Itaewon Clubs|      139|10210|37.532768|126.990021|                     15|                13|               1|         0.68|                   16.87|                6.5|               435|
|            Seoul|      Gwanak-gu|             Richway|      119|10050|

### 3. Create New Columns

There are many ways that you can use to create a column in a PySpark Dataframe.

#### [1] Using Spark Native Functions

We can use .withcolumn along with PySpark SQL functions to create a new column. In essence, you can find String functions, Date functions, and Math functions already implemented using Spark functions. Our first function, the F.col function gives us access to the column. So if we wanted to add 100 to a column, we could use F.col as:

In [21]:
import pyspark.sql.functions as F

casesWithNewConfirmed = cases.withColumn("NewConfirmed", 100 + F.col("confirmed"))
casesWithNewConfirmed.show()

+--------+---------------+--------------------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+------------+
|province|           city|      infection_case|confirmed| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|NewConfirmed|
+--------+---------------+--------------------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+------------+
|   Seoul|     Yongsan-gu|       Itaewon Clubs|      139|10210|37.532768|126.990021|                     15|                13|               1|         0.68|                   16.87|                6.5|               435|         239|
|   Seoul|      Gwanak-gu|             Richway|      119

Sometimes you might also want to repartition by a known scheme as this scheme might be used by a certain join or aggregation operation later on. You can use multiple columns to repartition using:

### Close Spark Instance

In [18]:
spark.stop()