In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
cases = spark.read.csv("Case.csv",header=True,sep=",",inferSchema=True)

In [4]:
cases.show()

+-------+--------+---------------+-----+--------------------+---------+---------+----------+
|case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+-------+--------+---------------+-----+--------------------+---------+---------+----------+
|1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|       72|37.538621|126.992652|
|1000002|   Seoul|        Guro-gu| true| Guro-gu Call Center|       98|37.508163|126.884387|
|1000003|   Seoul|  Dongdaemun-gu| true|       Dongan Church|       20|37.592888|127.056766|
|1000004|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|
|1000005|   Seoul|   Eunpyeong-gu| true|Eunpyeong St. Mar...|       14| 37.63369|  126.9165|
|1000006|   Seoul|   Seongdong-gu| true|    Seongdong-gu APT|       13| 37.55713|  127.0403|
|1000007|   Seoul|      Jongno-gu| true|Jongno Community ...|       10| 37.57681|   127.006|
|1000008|   Seoul|        Jung-gu| true|Jung-gu Fashion C...|        7

## Change Column Names

In [5]:
cases = cases.withColumnRenamed("infection_case","infection_source")

In [7]:
cases.rdd.top(5)

[Row(case_id=7000004, province='Jeju-do', city='from other city', group=True, infection_source='Itaewon Clubs', confirmed=1, latitude='-', longitude='-'),
 Row(case_id=7000003, province='Jeju-do', city='-', group=False, infection_source='etc', confirmed=4, latitude='-', longitude='-'),
 Row(case_id=7000002, province='Jeju-do', city='-', group=False, infection_source='contact with patient', confirmed=0, latitude='-', longitude='-'),
 Row(case_id=7000001, province='Jeju-do', city='-', group=False, infection_source='overseas inflow', confirmed=9, latitude='-', longitude='-'),
 Row(case_id=6100010, province='Gyeongsangnam-do', city='-', group=False, infection_source='etc', confirmed=18, latitude='-', longitude='-')]

In [12]:
cases = cases.toDF('case_id','province','city','group','infection_case','confirmed','latitude','longitude')

In [13]:
cases.show()

+-------+--------+---------------+-----+--------------------+---------+---------+----------+
|case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+-------+--------+---------------+-----+--------------------+---------+---------+----------+
|1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|       72|37.538621|126.992652|
|1000002|   Seoul|        Guro-gu| true| Guro-gu Call Center|       98|37.508163|126.884387|
|1000003|   Seoul|  Dongdaemun-gu| true|       Dongan Church|       20|37.592888|127.056766|
|1000004|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|
|1000005|   Seoul|   Eunpyeong-gu| true|Eunpyeong St. Mar...|       14| 37.63369|  126.9165|
|1000006|   Seoul|   Seongdong-gu| true|    Seongdong-gu APT|       13| 37.55713|  127.0403|
|1000007|   Seoul|      Jongno-gu| true|Jongno Community ...|       10| 37.57681|   127.006|
|1000008|   Seoul|        Jung-gu| true|Jung-gu Fashion C...|        7

## Select Columns

In [14]:
cases = cases.select('province','city','infection_case','confirmed')
cases.show()

+--------+---------------+--------------------+---------+
|province|           city|      infection_case|confirmed|
+--------+---------------+--------------------+---------+
|   Seoul|     Yongsan-gu|       Itaewon Clubs|       72|
|   Seoul|        Guro-gu| Guro-gu Call Center|       98|
|   Seoul|  Dongdaemun-gu|       Dongan Church|       20|
|   Seoul|        Guro-gu|Manmin Central Ch...|       41|
|   Seoul|   Eunpyeong-gu|Eunpyeong St. Mar...|       14|
|   Seoul|   Seongdong-gu|    Seongdong-gu APT|       13|
|   Seoul|      Jongno-gu|Jongno Community ...|       10|
|   Seoul|        Jung-gu|Jung-gu Fashion C...|        7|
|   Seoul|from other city|  Shincheonji Church|        8|
|   Seoul|              -|     overseas inflow|      321|
|   Seoul|              -|contact with patient|       18|
|   Seoul|              -|                 etc|       24|
|   Busan|     Dongnae-gu|       Onchun Church|       39|
|   Busan|from other city|  Shincheonji Church|       12|
|   Busan|    

In [None]:
## Sort by confirmed Cases

In [16]:
cases.sort('confirmed').show()

+-----------------+---------------+--------------------+---------+
|         province|           city|      infection_case|confirmed|
+-----------------+---------------+--------------------+---------+
|          Jeju-do|              -|contact with patient|        0|
|       Gangwon-do|              -|contact with patient|        0|
|          Gwangju|              -|                 etc|        0|
|            Busan|from other city|Cheongdo Daenam H...|        1|
|          Gwangju|              -|contact with patient|        1|
|          Jeju-do|from other city|       Itaewon Clubs|        1|
|           Sejong|from other city|  Shincheonji Church|        1|
|           Sejong|              -|                 etc|        1|
|Chungcheongnam-do|              -|contact with patient|        1|
|     Jeollabuk-do|from other city|  Shincheonji Church|        1|
|     Jeollanam-do|from other city|  Shincheonji Church|        1|
|          Incheon|from other city|  Shincheonji Church|      

NoneType

In [17]:
cases.sort(cases.confirmed.desc()).show()

+-----------------+---------------+--------------------+---------+
|         province|           city|      infection_case|confirmed|
+-----------------+---------------+--------------------+---------+
|            Daegu|         Nam-gu|  Shincheonji Church|     4510|
|            Daegu|              -|contact with patient|      929|
|            Daegu|              -|                 etc|      724|
| Gyeongsangbuk-do|from other city|  Shincheonji Church|      566|
|            Seoul|              -|     overseas inflow|      321|
|      Gyeonggi-do|              -|     overseas inflow|      225|
|            Daegu|   Dalseong-gun|Second Mi-Ju Hosp...|      196|
| Gyeongsangbuk-do|              -|contact with patient|      192|
| Gyeongsangbuk-do|              -|                 etc|      134|
|            Daegu|         Seo-gu|Hansarang Convale...|      128|
| Gyeongsangbuk-do|   Cheongdo-gun|Cheongdo Daenam H...|      120|
|Chungcheongnam-do|     Cheonan-si|gym facility in C...|      

In [18]:
cases.sort(F.desc("confirmed")).show()

+-----------------+---------------+--------------------+---------+
|         province|           city|      infection_case|confirmed|
+-----------------+---------------+--------------------+---------+
|            Daegu|         Nam-gu|  Shincheonji Church|     4510|
|            Daegu|              -|contact with patient|      929|
|            Daegu|              -|                 etc|      724|
| Gyeongsangbuk-do|from other city|  Shincheonji Church|      566|
|            Seoul|              -|     overseas inflow|      321|
|      Gyeonggi-do|              -|     overseas inflow|      225|
|            Daegu|   Dalseong-gun|Second Mi-Ju Hosp...|      196|
| Gyeongsangbuk-do|              -|contact with patient|      192|
| Gyeongsangbuk-do|              -|                 etc|      134|
|            Daegu|         Seo-gu|Hansarang Convale...|      128|
| Gyeongsangbuk-do|   Cheongdo-gun|Cheongdo Daenam H...|      120|
|Chungcheongnam-do|     Cheonan-si|gym facility in C...|      

## Casting

In [19]:
from pyspark.sql.types import DoubleType, IntegerType, StringType

In [20]:
cases = cases.withColumn('confirmed',F.col('confirmed').cast(IntegerType()))

In [21]:
cases = cases.withColumn('city', F.col('city').cast(StringType()))

## Filtering

In [22]:
cases.filter((cases.confirmed > 10) & (cases.province == 'Daegu')).show()

+--------+------------+--------------------+---------+
|province|        city|      infection_case|confirmed|
+--------+------------+--------------------+---------+
|   Daegu|      Nam-gu|  Shincheonji Church|     4510|
|   Daegu|Dalseong-gun|Second Mi-Ju Hosp...|      196|
|   Daegu|      Seo-gu|Hansarang Convale...|      128|
|   Daegu|Dalseong-gun|Daesil Convalesce...|      100|
|   Daegu|     Dong-gu|     Fatima Hospital|       37|
|   Daegu|           -|     overseas inflow|       24|
|   Daegu|           -|contact with patient|      929|
|   Daegu|           -|                 etc|      724|
+--------+------------+--------------------+---------+



## GroupBy

In [25]:
cases.groupBy(['province','city']).agg(F.sum('confirmed').alias('TotalConfirmed'),F.max('confirmed').alias('MaxConfimred')).show()

+----------------+---------------+--------------+------------+
|        province|           city|TotalConfirmed|MaxConfimred|
+----------------+---------------+--------------+------------+
|Gyeongsangnam-do|       Jinju-si|            10|          10|
|           Seoul|        Guro-gu|           139|          98|
|         Daejeon|              -|            27|          10|
|    Jeollabuk-do|from other city|             1|           1|
|Gyeongsangnam-do|Changnyeong-gun|             7|           7|
|           Seoul|              -|           363|         321|
|         Jeju-do|from other city|             1|           1|
|Gyeongsangbuk-do|              -|           336|         192|
|Gyeongsangnam-do|   Geochang-gun|            18|          10|
|         Incheon|from other city|            22|          20|
|           Busan|              -|            72|          29|
|           Daegu|         Seo-gu|           128|         128|
|           Busan|     Suyeong-gu|             5|      

In [27]:
regions = spark.read.csv("Region.csv", sep=",", inferSchema="true", header="true")
regions.show()

+-----+--------+-------------+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
| code|province|         city| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|
+-----+--------+-------------+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|10000|   Seoul|        Seoul|37.566953|126.977977|                    607|               830|              48|         1.44|                   15.38|                5.8|             22739|
|10010|   Seoul|   Gangnam-gu|37.518421|127.047222|                     33|                38|               0|         4.18|                   13.17|                4.3|              3088|
|10020|   Seoul|  Gangdong-gu|37.530492|127.123837

In [28]:
cases = cases.join(regions, on=['province','city'],how='left')

In [29]:
cases.show()

+--------+---------------+--------------------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|province|           city|      infection_case|confirmed| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|
+--------+---------------+--------------------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|   Seoul|     Yongsan-gu|       Itaewon Clubs|       72|10210|37.532768|126.990021|                     15|                13|               1|         0.68|                   16.87|                6.5|               435|
|   Seoul|        Guro-gu| Guro-gu Call Center|       98|10070|37.495632| 126.88765|                     26|

In [30]:
from pyspark.sql.functions import broadcast
cases = cases.join(broadcast(regions), ['province','city'],how='left')

In [31]:
cases.show()

+--------+---------------+--------------------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|province|           city|      infection_case|confirmed| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|
+--------+---------------+--------------------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+--

In [36]:
cases = spark.read.csv("Case.csv",header=True,sep=",",inferSchema=True)

In [37]:
cases.registerTempTable('cases_table')

In [38]:
newDF = spark.sql('select * from cases_table where confirmed>100')

In [39]:
newDF.show()

+-------+-----------------+---------------+-----+--------------------+---------+---------+----------+
|case_id|         province|           city|group|      infection_case|confirmed| latitude| longitude|
+-------+-----------------+---------------+-----+--------------------+---------+---------+----------+
|1000010|            Seoul|              -|false|     overseas inflow|      321|        -|         -|
|1200001|            Daegu|         Nam-gu| true|  Shincheonji Church|     4510| 35.84008|  128.5667|
|1200002|            Daegu|   Dalseong-gun| true|Second Mi-Ju Hosp...|      196|35.857375|128.466651|
|1200003|            Daegu|         Seo-gu| true|Hansarang Convale...|      128|35.885592|128.556649|
|1200008|            Daegu|              -|false|contact with patient|      929|        -|         -|
|1200009|            Daegu|              -|false|                 etc|      724|        -|         -|
|2000007|      Gyeonggi-do|              -|false|     overseas inflow|      225|  

## Usinf Spark Native Functions

In [41]:
casesWithNewConfirmed = cases.withColumn("NewConfirmed", 100 + F.col("confirmed"))
casesWithNewConfirmed.show()

+-------+--------+---------------+-----+--------------------+---------+---------+----------+------------+
|case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|NewConfirmed|
+-------+--------+---------------+-----+--------------------+---------+---------+----------+------------+
|1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|       72|37.538621|126.992652|         172|
|1000002|   Seoul|        Guro-gu| true| Guro-gu Call Center|       98|37.508163|126.884387|         198|
|1000003|   Seoul|  Dongdaemun-gu| true|       Dongan Church|       20|37.592888|127.056766|         120|
|1000004|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|         141|
|1000005|   Seoul|   Eunpyeong-gu| true|Eunpyeong St. Mar...|       14| 37.63369|  126.9165|         114|
|1000006|   Seoul|   Seongdong-gu| true|    Seongdong-gu APT|       13| 37.55713|  127.0403|         113|
|1000007|   Seoul|      Jongno-gu| true|Jongno

In [42]:
casesWithExpConfirmed = cases.withColumn("ExpConfirmed",F.exp("confirmed"))
casesWithExpConfirmed.show()

+-------+--------+---------------+-----+--------------------+---------+---------+----------+--------------------+
|case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|        ExpConfirmed|
+-------+--------+---------------+-----+--------------------+---------+---------+----------+--------------------+
|1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|       72|37.538621|126.992652|1.858671745284127...|
|1000002|   Seoul|        Guro-gu| true| Guro-gu Call Center|       98|37.508163|126.884387|3.637970947608805E42|
|1000003|   Seoul|  Dongdaemun-gu| true|       Dongan Church|       20|37.592888|127.056766| 4.851651954097903E8|
|1000004|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|6.398434935300549...|
|1000005|   Seoul|   Eunpyeong-gu| true|Eunpyeong St. Mar...|       14| 37.63369|  126.9165|  1202604.2841647768|
|1000006|   Seoul|   Seongdong-gu| true|    Seongdong-gu APT|       13| 37.55713|  127.0

## Spark UDF

In [10]:
from pyspark.sql.types import *

def casesHighLow(confirmed):
    if confirmed < 50:
        return 'low'
    else:
        return 'high'
    
casesHighLowUDF=F.udf(casesHighLow,StringType())
casesWithHighLow = cases.withColumn("HighLow", casesHighLowUDF("confirmed"))
casesWithHighLow.show()

+-------+--------+---------------+-----+--------------------+---------+---------+----------+-------+
|case_id|province|           city|group|    infection_source|confirmed| latitude| longitude|HighLow|
+-------+--------+---------------+-----+--------------------+---------+---------+----------+-------+
|1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|       72|37.538621|126.992652|   high|
|1000002|   Seoul|        Guro-gu| true| Guro-gu Call Center|       98|37.508163|126.884387|   high|
|1000003|   Seoul|  Dongdaemun-gu| true|       Dongan Church|       20|37.592888|127.056766|    low|
|1000004|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|    low|
|1000005|   Seoul|   Eunpyeong-gu| true|Eunpyeong St. Mar...|       14| 37.63369|  126.9165|    low|
|1000006|   Seoul|   Seongdong-gu| true|    Seongdong-gu APT|       13| 37.55713|  127.0403|    low|
|1000007|   Seoul|      Jongno-gu| true|Jongno Community ...|       10| 37.57681|   127.006

In [44]:
cases.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



## Windowing, TimeLag(), Rank(), Rolling function

In [18]:
timeprovince = spark.read.csv("TimeProvince.csv",header=True,sep=",",inferSchema=True)
timeprovince.show()

+-------------------+----+-----------------+---------+--------+--------+
|               date|time|         province|confirmed|released|deceased|
+-------------------+----+-----------------+---------+--------+--------+
|2020-01-20 00:00:00|  16|            Seoul|        0|       0|       0|
|2020-01-20 00:00:00|  16|            Busan|        0|       0|       0|
|2020-01-20 00:00:00|  16|            Daegu|        0|       0|       0|
|2020-01-20 00:00:00|  16|          Incheon|        1|       0|       0|
|2020-01-20 00:00:00|  16|          Gwangju|        0|       0|       0|
|2020-01-20 00:00:00|  16|          Daejeon|        0|       0|       0|
|2020-01-20 00:00:00|  16|            Ulsan|        0|       0|       0|
|2020-01-20 00:00:00|  16|           Sejong|        0|       0|       0|
|2020-01-20 00:00:00|  16|      Gyeonggi-do|        0|       0|       0|
|2020-01-20 00:00:00|  16|       Gangwon-do|        0|       0|       0|
|2020-01-20 00:00:00|  16|Chungcheongbuk-do|       

In [12]:
from pyspark.sql.window import Window

In [14]:
windowSpec = Window().partitionBy(['confirmed']).orderBy(F.desc('confirmed'))
cases.withColumn('rank',F.rank().over(windowSpec)).show()

+-------+-----------------+---------------+-----+--------------------+---------+---------+----------+----+
|case_id|         province|           city|group|    infection_source|confirmed| latitude| longitude|rank|
+-------+-----------------+---------------+-----+--------------------+---------+---------+----------+----+
|1000010|            Seoul|              -|false|     overseas inflow|      321|        -|         -|   1|
|6000011| Gyeongsangbuk-do|              -|false|contact with patient|      192|        -|         -|   1|
|4100001|Chungcheongnam-do|     Cheonan-si| true|gym facility in C...|      103| 36.81503|  127.1139|   1|
|1100002|            Busan|from other city| true|  Shincheonji Church|       12|        -|         -|   1|
|2000005|      Gyeonggi-do|    Seongnam-si| true|Bundang Jesaeng H...|       22| 37.38833|  127.1218|   1|
|1200003|            Daegu|         Seo-gu| true|Hansarang Convale...|      128|35.885592|128.556649|   1|
|2000007|      Gyeonggi-do|          

In [19]:
windowSpec = Window().partitionBy(['province']).orderBy('date')
timeprovinceWithLag = timeprovince.withColumn("lag_7",F.lag("confirmed",7).over(windowSpec))
timeprovinceWithLag.filter(timeprovinceWithLag.date>'2020-03-10').show()

+-------------------+----+--------+---------+--------+--------+-----+
|               date|time|province|confirmed|released|deceased|lag_7|
+-------------------+----+--------+---------+--------+--------+-----+
|2020-03-10 00:00:00|   0|  Sejong|        8|       0|       0|    1|
|2020-03-11 00:00:00|   0|  Sejong|       10|       0|       0|    1|
|2020-03-12 00:00:00|   0|  Sejong|       15|       0|       0|    1|
|2020-03-13 00:00:00|   0|  Sejong|       32|       0|       0|    1|
|2020-03-14 00:00:00|   0|  Sejong|       38|       0|       0|    2|
|2020-03-15 00:00:00|   0|  Sejong|       39|       0|       0|    3|
|2020-03-16 00:00:00|   0|  Sejong|       40|       0|       0|    6|
|2020-03-17 00:00:00|   0|  Sejong|       40|       0|       0|    8|
|2020-03-18 00:00:00|   0|  Sejong|       41|       0|       0|   10|
|2020-03-19 00:00:00|   0|  Sejong|       41|       0|       0|   15|
|2020-03-20 00:00:00|   0|  Sejong|       41|       0|       0|   32|
|2020-03-21 00:00:00

In [20]:
windowSpec = Window().partitionBy(['province']).orderBy('date').rowsBetween(-6,0)
timeprovinceWithRoll = timeprovince.withColumn("roll_7_confirmed",F.mean("confirmed").over(windowSpec))
timeprovinceWithRoll.filter(timeprovinceWithLag.date>'2020-03-10').show()

+-------------------+----+--------+---------+--------+--------+------------------+
|               date|time|province|confirmed|released|deceased|  roll_7_confirmed|
+-------------------+----+--------+---------+--------+--------+------------------+
|2020-03-10 00:00:00|   0|  Sejong|        8|       0|       0| 3.142857142857143|
|2020-03-11 00:00:00|   0|  Sejong|       10|       0|       0| 4.428571428571429|
|2020-03-12 00:00:00|   0|  Sejong|       15|       0|       0| 6.428571428571429|
|2020-03-13 00:00:00|   0|  Sejong|       32|       0|       0|10.857142857142858|
|2020-03-14 00:00:00|   0|  Sejong|       38|       0|       0|              16.0|
|2020-03-15 00:00:00|   0|  Sejong|       39|       0|       0|21.142857142857142|
|2020-03-16 00:00:00|   0|  Sejong|       40|       0|       0|              26.0|
|2020-03-17 00:00:00|   0|  Sejong|       40|       0|       0|30.571428571428573|
|2020-03-18 00:00:00|   0|  Sejong|       41|       0|       0|              35.0|
|202

In [22]:
##cases = cases.withColumn("salt_key", F.concat(F.col("infection_case"), F.lit("_"), F.monotonically_increasing_id() % 10))