In [3]:
import pyspark
from pyspark.sql import SparkSession,SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [4]:
spark = SparkSession.builder.appName('Covid sample analysis').getOrCreate()

25/02/05 14:47:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### Practicing Spark operations using a COVID dataset. The tasks include:

#### 1.Reading a CSV file
#### 2.Changing column names and data types
#### 3.Applying filters and transformations
#### 4.Performing joins
#### 5.Using Spark SQL
#### 6.Implementing Spark UDFs
#### 7.Applying window functions


#### 1.Reading a CSV file


In [5]:

covid_data = spark.read.load('./source_data/Case.csv', format="csv", sep=',', inferschema=True, header=True)
covid_data 

DataFrame[ case_id: int, province: string, city: string, group: boolean, infection_case: string, confirmed: int, latitude: string, longitude: string]

In [6]:
covid_data.show()

+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| true|     Day Care Center|       43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|
| 1000007|   Seoul|from other city| true|SMR Newly Planted...|       36|        -|         -|
| 1000008|   Seoul|  Dongdaemun-gu| true|       Dongan Churc

#### 2.Changing column names and data types


In [8]:
covid_datas =covid_data.withColumnRenamed('infection_case','continment_area')\
       .withColumnRenamed('confirmed','confirmed_cases')  
covid_datas.show()

+--------+--------+---------------+-----+--------------------+---------------+---------+----------+
| case_id|province|           city|group|     continment_area|confirmed_cases| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|            139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| true|             Richway|            119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| true| Guro-gu Call Center|             95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| true|Yangcheon Table T...|             43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| true|     Day Care Center|             43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| true|Manmin Central Ch...|             41|37.481059|126.894343|
| 1000007|   Seoul|from other city| true|SMR Newly Planted...|             36|        -|         -|


In [9]:
covid_datas.sort('confirmed_cases').show()

+--------+-----------------+---------------+-----+--------------------+---------------+---------+----------+
| case_id|         province|           city|group|     continment_area|confirmed_cases| latitude| longitude|
+--------+-----------------+---------------+-----+--------------------+---------------+---------+----------+
| 7000002|          Jeju-do|              -|false|contact with patient|              0|        -|         -|
| 3000007|       Gangwon-do|              -|false|contact with patient|              0|        -|         -|
| 1000030|            Seoul|     Gangseo-gu| true|SJ Investment Cal...|              0|37.559649|126.835102|
| 1100007|            Busan|from other city| true|Cheongdo Daenam H...|              1|        -|         -|
| 5000003|     Jeollabuk-do|from other city| true|  Shincheonji Church|              1|        -|         -|
| 1000028|            Seoul|from other city| true|Anyang Gunpo Past...|              1|        -|         -|
| 1000025|         

In [10]:
covid_datas.sort(desc('confirmed_cases')).show()

+--------+-----------------+---------------+-----+--------------------+---------------+---------+----------+
| case_id|         province|           city|group|     continment_area|confirmed_cases| latitude| longitude|
+--------+-----------------+---------------+-----+--------------------+---------------+---------+----------+
| 1200001|            Daegu|         Nam-gu| true|  Shincheonji Church|           4511| 35.84008|  128.5667|
| 1200009|            Daegu|              -|false|contact with patient|            917|        -|         -|
| 1200010|            Daegu|              -|false|                 etc|            747|        -|         -|
| 6000001| Gyeongsangbuk-do|from other city| true|  Shincheonji Church|            566|        -|         -|
| 2000020|      Gyeonggi-do|              -|false|     overseas inflow|            305|        -|         -|
| 1000036|            Seoul|              -|false|     overseas inflow|            298|        -|         -|
| 1200002|         

 #### Change data types 

In [12]:
covid_datas.printSchema()

root
 |--  case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- continment_area: string (nullable = true)
 |-- confirmed_cases: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [15]:
covid_datas.withColumn("confirmed_cases", col("confirmed_cases").cast(IntegerType()))
covid_datas.printSchema()

root
 |--  case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- continment_area: string (nullable = true)
 |-- confirmed_cases: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [16]:
data = covid_datas.withColumn("confirmed_cases", col("confirmed_cases").cast(IntegerType()))\
                  .withColumn("city", col("city").cast(StringType()))
data.printSchema()


root
 |--  case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- continment_area: string (nullable = true)
 |-- confirmed_cases: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [54]:
data_new = data.withColumn('NewconfirmaedCases', 100 + col('confirmed_cases'))
data_new['confirmed_cases','NewconfirmaedCases'].show()

+---------------+------------------+
|confirmed_cases|NewconfirmaedCases|
+---------------+------------------+
|            139|               239|
|            119|               219|
|             95|               195|
|             43|               143|
|             43|               143|
|             41|               141|
|             36|               136|
|             17|               117|
|             25|               125|
|             30|               130|
|             14|               114|
|             13|               113|
|             10|               110|
|              7|               107|
|              7|               107|
|              5|               105|
|              7|               107|
|              6|               106|
|              1|               101|
|              6|               106|
+---------------+------------------+
only showing top 20 rows



#### 3.Applying filters and transformations


In [23]:
data.filter((data.confirmed_cases > 30)).show()

+--------+--------+---------------+-----+--------------------+---------------+---------+----------+
| case_id|province|           city|group|     continment_area|confirmed_cases| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|            139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| true|             Richway|            119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| true| Guro-gu Call Center|             95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| true|Yangcheon Table T...|             43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| true|     Day Care Center|             43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| true|Manmin Central Ch...|             41|37.481059|126.894343|
| 1000007|   Seoul|from other city| true|SMR Newly Planted...|             36|        -|         -|


In [26]:
data.dropDuplicates(['city']).show()

+--------+-----------------+---------------+-----+--------------------+---------------+---------+----------+
| case_id|         province|           city|group|     continment_area|confirmed_cases| latitude| longitude|
+--------+-----------------+---------------+-----+--------------------+---------------+---------+----------+
| 1000034|            Seoul|              -| true|         Orange Life|              1|        -|         -|
| 2000011|      Gyeonggi-do|      Anyang-si| true|Anyang Gunpo Past...|             22|37.381784| 126.93615|
| 6000003| Gyeongsangbuk-do|    Bonghwa-gun| true|Bonghwa Pureun Nu...|             68| 36.92757|  128.9099|
| 2000002|      Gyeonggi-do|     Bucheon-si| true|Coupang Logistics...|             67|37.530579|126.775254|
| 6100006| Gyeongsangnam-do|Changnyeong-gun| true|Changnyeong Coin ...|              7| 35.54127|  128.5008|
| 6100005| Gyeongsangnam-do|    Changwon-si| true|Hanmaeum Changwon...|              7| 35.22115|  128.6866|
| 4100001|Chungcheo

In [27]:
data.filter((data.confirmed_cases > 30) & (data.city=='Bucheon-si')).show()

+--------+-----------+----------+-----+--------------------+---------------+---------+----------+
| case_id|   province|      city|group|     continment_area|confirmed_cases| latitude| longitude|
+--------+-----------+----------+-----+--------------------+---------------+---------+----------+
| 2000002|Gyeonggi-do|Bucheon-si| true|Coupang Logistics...|             67|37.530579|126.775254|
+--------+-----------+----------+-----+--------------------+---------------+---------+----------+



In [31]:
data.select('city').distinct().show()

+---------------+
|           city|
+---------------+
|     Gangnam-gu|
|     Cheonan-si|
|from other city|
|      Anyang-si|
|      Gwanak-gu|
|     Yongsan-gu|
|        Dong-gu|
|         Sejong|
|     Gangseo-gu|
|       Wonju-si|
|     Suyeong-gu|
|   Geochang-gun|
|  Dongdaemun-gu|
|     Dongnae-gu|
|         Jin-gu|
|     Yangsan-si|
|    Changwon-si|
|         Nam-gu|
|   Gyeongsan-si|
|      Jongno-gu|
+---------------+
only showing top 20 rows



In [32]:
data.groupBy('city').count().show()

+---------------+-----+
|           city|count|
+---------------+-----+
|     Gangnam-gu|    4|
|     Cheonan-si|    1|
|from other city|   51|
|      Anyang-si|    2|
|      Gwanak-gu|    2|
|     Yongsan-gu|    1|
|        Dong-gu|    2|
|         Sejong|    2|
|     Gangseo-gu|    1|
|       Wonju-si|    1|
|     Suyeong-gu|    1|
|   Geochang-gun|    2|
|  Dongdaemun-gu|    1|
|     Dongnae-gu|    1|
|         Jin-gu|    1|
|     Yangsan-si|    1|
|    Changwon-si|    1|
|         Nam-gu|    1|
|   Gyeongsan-si|    3|
|      Jongno-gu|    2|
+---------------+-----+
only showing top 20 rows



In [35]:
data.groupBy('city').count().orderBy(desc("count")).show()

+---------------+-----+
|           city|count|
+---------------+-----+
|              -|   53|
|from other city|   51|
|         Seo-gu|    5|
|     Gangnam-gu|    4|
|   Gyeongsan-si|    3|
|        Jung-gu|    3|
|        Guro-gu|    3|
|    Seongnam-si|    3|
|         Sejong|    2|
|      Anyang-si|    2|
|   Geochang-gun|    2|
|      Jongno-gu|    2|
|       Suwon-si|    2|
|      Gwanak-gu|    2|
|   Yangcheon-gu|    2|
|   Dalseong-gun|    2|
|        Dong-gu|    2|
|     Cheonan-si|    1|
|     Yongsan-gu|    1|
|     Gangseo-gu|    1|
+---------------+-----+
only showing top 20 rows



In [37]:
data.groupBy(['province','city']).agg(sum('confirmed_cases'), max('confirmed_cases')).show()

+----------------+---------------+--------------------+--------------------+
|        province|           city|sum(confirmed_cases)|max(confirmed_cases)|
+----------------+---------------+--------------------+--------------------+
|Gyeongsangnam-do|       Jinju-si|                   9|                   9|
|           Seoul|        Guro-gu|                 139|                  95|
|           Seoul|     Gangnam-gu|                  18|                   7|
|         Daejeon|              -|                 100|                  55|
|    Jeollabuk-do|from other city|                   6|                   3|
|Gyeongsangnam-do|Changnyeong-gun|                   7|                   7|
|           Seoul|              -|                 561|                 298|
|         Jeju-do|from other city|                   1|                   1|
|Gyeongsangbuk-do|              -|                 345|                 190|
|Gyeongsangnam-do|   Geochang-gun|                  18|                  10|

In [38]:
data.groupBy(['province','city']).agg(sum('confirmed_cases').alias('Total_Confirmed_Cases'),\
                                      max('confirmed_cases').alias('Max_Confirmed_Cases')).show()

+----------------+---------------+---------------------+-------------------+
|        province|           city|Total_Confirmed_Cases|Max_Confirmed_Cases|
+----------------+---------------+---------------------+-------------------+
|Gyeongsangnam-do|       Jinju-si|                    9|                  9|
|           Seoul|        Guro-gu|                  139|                 95|
|           Seoul|     Gangnam-gu|                   18|                  7|
|         Daejeon|              -|                  100|                 55|
|    Jeollabuk-do|from other city|                    6|                  3|
|Gyeongsangnam-do|Changnyeong-gun|                    7|                  7|
|           Seoul|              -|                  561|                298|
|         Jeju-do|from other city|                    1|                  1|
|Gyeongsangbuk-do|              -|                  345|                190|
|Gyeongsangnam-do|   Geochang-gun|                   18|                 10|

#### 4.Performing joins

In [42]:
region = spark.read.load("./source_data/Region.csv", format='csv', sep=',', inferschema=True, header=True)
region.show()

+-----+--------+-------------+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
| code|province|         city| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|
+-----+--------+-------------+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|10000|   Seoul|        Seoul|37.566953|126.977977|                    607|               830|              48|         1.44|                   15.38|                5.8|             22739|
|10010|   Seoul|   Gangnam-gu|37.518421|127.047222|                     33|                38|               0|         4.18|                   13.17|                4.3|              3088|
|10020|   Seoul|  Gangdong-gu|37.530492|127.123837

In [44]:
data_region = data.join(region,['province','city'], how='left')
data_region.show()

+--------+---------------+--------+-----+--------------------+---------------+---------+----------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|province|           city| case_id|group|     continment_area|confirmed_cases| latitude| longitude| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|
+--------+---------------+--------+-----+--------------------+---------------+---------+----------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|   Seoul|     Yongsan-gu| 1000001| true|       Itaewon Clubs|            139|37.538621|126.992652|10210|37.532768|126.990021|                     15|                13|               1|         0.68|     

In [45]:
data_region_anti_left = data.join(region,['province','city'], how='left_anti')
data_region_anti_left.show()

+--------+---------------+--------+-----+--------------------+---------------+--------+---------+
|province|           city| case_id|group|     continment_area|confirmed_cases|latitude|longitude|
+--------+---------------+--------+-----+--------------------+---------------+--------+---------+
|   Seoul|from other city| 1000007| true|SMR Newly Planted...|             36|       -|        -|
|   Seoul|from other city| 1000009| true|Coupang Logistics...|             25|       -|        -|
|   Seoul|from other city| 1000019| true|Daejeon door-to-d...|              1|       -|        -|
|   Seoul|from other city| 1000021| true|  Shincheonji Church|              8|       -|        -|
|   Seoul|from other city| 1000022| true|Guri Collective I...|              5|       -|        -|
|   Seoul|from other city| 1000028| true|Anyang Gunpo Past...|              1|       -|        -|
|   Seoul|from other city| 1000031| true|     Yongin Brothers|              4|       -|        -|
|   Seoul|from other

In [47]:
data.count()

174

In [48]:
region.count()

244

##### Performance Optimization in Joins
 ###### Use Broadcast Join for small DataFrames:


In [51]:
df_join = region.join(broadcast(data),['province','city'], 'inner')
df_join.show()

+--------+---------------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+--------+-----+--------------------+---------------+---------+----------+
|province|           city| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count| case_id|group|     continment_area|confirmed_cases| latitude| longitude|
+--------+---------------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+--------+-----+--------------------+---------------+---------+----------+
|   Seoul|     Gangnam-gu|10010|37.518421|127.047222|                     33|                38|               0|         4.18|                   13.17|                4.3|              3088| 1000029| true

#### 5.Using Spark SQL


In [55]:
data.registerTempTable('covid_data')





In [57]:
sql_df = spark.sql("select * from covid_data limit 10 ")
sql_df.show()

+--------+--------+---------------+-----+--------------------+---------------+---------+----------+
| case_id|province|           city|group|     continment_area|confirmed_cases| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|            139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| true|             Richway|            119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| true| Guro-gu Call Center|             95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| true|Yangcheon Table T...|             43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| true|     Day Care Center|             43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| true|Manmin Central Ch...|             41|37.481059|126.894343|
| 1000007|   Seoul|from other city| true|SMR Newly Planted...|             36|        -|         -|


In [58]:
spark.sql("select * from covid_data where  confirmed_cases > 100").show()


+--------+-----------------+---------------+-----+--------------------+---------------+---------+----------+
| case_id|         province|           city|group|     continment_area|confirmed_cases| latitude| longitude|
+--------+-----------------+---------------+-----+--------------------+---------------+---------+----------+
| 1000001|            Seoul|     Yongsan-gu| true|       Itaewon Clubs|            139|37.538621|126.992652|
| 1000002|            Seoul|      Gwanak-gu| true|             Richway|            119| 37.48208|126.901384|
| 1000036|            Seoul|              -|false|     overseas inflow|            298|        -|         -|
| 1000037|            Seoul|              -|false|contact with patient|            162|        -|         -|
| 1200001|            Daegu|         Nam-gu| true|  Shincheonji Church|           4511| 35.84008|  128.5667|
| 1200002|            Daegu|   Dalseong-gun| true|Second Mi-Ju Hosp...|            196|35.857375|128.466651|
| 1200003|         

In [59]:
spark.sql("select * from covid_data where  confirmed_cases > 100 order by confirmed_cases desc").show()


+--------+-----------------+---------------+-----+--------------------+---------------+---------+----------+
| case_id|         province|           city|group|     continment_area|confirmed_cases| latitude| longitude|
+--------+-----------------+---------------+-----+--------------------+---------------+---------+----------+
| 1200001|            Daegu|         Nam-gu| true|  Shincheonji Church|           4511| 35.84008|  128.5667|
| 1200009|            Daegu|              -|false|contact with patient|            917|        -|         -|
| 1200010|            Daegu|              -|false|                 etc|            747|        -|         -|
| 6000001| Gyeongsangbuk-do|from other city| true|  Shincheonji Church|            566|        -|         -|
| 2000020|      Gyeonggi-do|              -|false|     overseas inflow|            305|        -|         -|
| 1000036|            Seoul|              -|false|     overseas inflow|            298|        -|         -|
| 1200002|         

#### 6.Implementing Spark UDFs
