In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import random

In [2]:
spark = SparkSession.builder.appName("Pyspark Learning").getOrCreate()

In [3]:
df_cases = spark.read.csv('sparkdf_guide/coronavirusdataset/Case.csv', inferSchema=True, sep=",", header=True)

In [5]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [9]:
df_cases

case_id,province,city,group,infection_case,confirmed,latitude,longitude
1000001,Seoul,Yongsan-gu,True,Itaewon Clubs,72,37.538621,126.992652
1000002,Seoul,Guro-gu,True,Guro-gu Call Center,98,37.508163,126.884387
1000003,Seoul,Dongdaemun-gu,True,Dongan Church,20,37.592888,127.056766
1000004,Seoul,Guro-gu,True,Manmin Central Ch...,41,37.481059,126.894343
1000005,Seoul,Eunpyeong-gu,True,Eunpyeong St. Mar...,14,37.63369,126.9165
1000006,Seoul,Seongdong-gu,True,Seongdong-gu APT,13,37.55713,127.0403
1000007,Seoul,Jongno-gu,True,Jongno Community ...,10,37.57681,127.006
1000008,Seoul,Jung-gu,True,Jung-gu Fashion C...,7,37.562405,126.984377
1000009,Seoul,from other city,True,Shincheonji Church,8,-,-
1000010,Seoul,-,False,overseas inflow,321,-,-


In [119]:
df_cases.count()

115

In [120]:
df_cases.limit(2).toPandas()

Unnamed: 0,case_id,province,city,group,infection_case,confirmed,latitude,longitude
0,1000001,Seoul,Yongsan-gu,True,Itaewon Clubs,72,37.538621,126.992652
1,1000002,Seoul,Guro-gu,True,Guro-gu Call Center,98,37.508163,126.884387


## Change column names

In [121]:
df_cases = df_cases.withColumnRenamed("infection_case", "infection_source")

In [122]:
df_cases.show(1)

+-------+--------+----------+-----+----------------+---------+---------+----------+
|case_id|province|      city|group|infection_source|confirmed| latitude| longitude|
+-------+--------+----------+-----+----------------+---------+---------+----------+
|1000001|   Seoul|Yongsan-gu| true|   Itaewon Clubs|       72|37.538621|126.992652|
+-------+--------+----------+-----+----------------+---------+---------+----------+
only showing top 1 row



## Select

In [47]:
df_cases.select('case_id', 'city', 'confirmed').show(5)

+-------+-------------+---------+
|case_id|         city|confirmed|
+-------+-------------+---------+
|1000001|   Yongsan-gu|       72|
|1000002|      Guro-gu|       98|
|1000003|Dongdaemun-gu|       20|
|1000004|      Guro-gu|       41|
|1000005| Eunpyeong-gu|       14|
+-------+-------------+---------+
only showing top 5 rows



## Sort

In [49]:
df_cases.sort('confirmed').show() # lowed to highest

+-------+-----------------+---------------+-----+--------------------+---------+---------+----------+
|case_id|         province|           city|group|    infection_source|confirmed| latitude| longitude|
+-------+-----------------+---------------+-----+--------------------+---------+---------+----------+
|7000002|          Jeju-do|              -|false|contact with patient|        0|        -|         -|
|3000005|       Gangwon-do|              -|false|contact with patient|        0|        -|         -|
|1300004|          Gwangju|              -|false|                 etc|        0|        -|         -|
|1100006|            Busan|from other city| true|Cheongdo Daenam H...|        1|        -|         -|
|1300003|          Gwangju|              -|false|contact with patient|        1|        -|         -|
|7000004|          Jeju-do|from other city| true|       Itaewon Clubs|        1|        -|         -|
|1700003|           Sejong|from other city| true|  Shincheonji Church|        1|  

In [51]:
# descending sort
from pyspark.sql import functions as f

In [53]:
df_cases.sort(f.desc('confirmed')).show(20)

+-------+-----------------+---------------+-----+--------------------+---------+---------+----------+
|case_id|         province|           city|group|    infection_source|confirmed| latitude| longitude|
+-------+-----------------+---------------+-----+--------------------+---------+---------+----------+
|1200001|            Daegu|         Nam-gu| true|  Shincheonji Church|     4510| 35.84008|  128.5667|
|1200008|            Daegu|              -|false|contact with patient|      929|        -|         -|
|1200009|            Daegu|              -|false|                 etc|      724|        -|         -|
|6000001| Gyeongsangbuk-do|from other city| true|  Shincheonji Church|      566|        -|         -|
|1000010|            Seoul|              -|false|     overseas inflow|      321|        -|         -|
|2000007|      Gyeonggi-do|              -|false|     overseas inflow|      225|        -|         -|
|1200002|            Daegu|   Dalseong-gun| true|Second Mi-Ju Hosp...|      196|35

# Cast

In [58]:
from pyspark.sql.types import IntegerType, StringType, DoubleType

In [61]:
df_cases = df_cases.withColumn('confirmed', f.col('confirmed').cast(IntegerType()))
df_cases = df_cases.withColumn('city', f.col('city').cast(StringType()))

In [62]:
df_cases.show(5, False)

+-------+--------+-------------+-----+-----------------------------+---------+---------+----------+
|case_id|province|city         |group|infection_source             |confirmed|latitude |longitude |
+-------+--------+-------------+-----+-----------------------------+---------+---------+----------+
|1000001|Seoul   |Yongsan-gu   |true |Itaewon Clubs                |72       |37.538621|126.992652|
|1000002|Seoul   |Guro-gu      |true |Guro-gu Call Center          |98       |37.508163|126.884387|
|1000003|Seoul   |Dongdaemun-gu|true |Dongan Church                |20       |37.592888|127.056766|
|1000004|Seoul   |Guro-gu      |true |Manmin Central Church        |41       |37.481059|126.894343|
|1000005|Seoul   |Eunpyeong-gu |true |Eunpyeong St. Mary's Hospital|14       |37.63369 |126.9165  |
+-------+--------+-------------+-----+-----------------------------+---------+---------+----------+
only showing top 5 rows



## Filter

In [65]:
df_cases.filter((df_cases.confirmed==72) & (df_cases.province=='Seoul')).show(5)

+-------+--------+----------+-----+----------------+---------+---------+----------+
|case_id|province|      city|group|infection_source|confirmed| latitude| longitude|
+-------+--------+----------+-----+----------------+---------+---------+----------+
|1000001|   Seoul|Yongsan-gu| true|   Itaewon Clubs|       72|37.538621|126.992652|
+-------+--------+----------+-----+----------------+---------+---------+----------+



## GroupBy

In [66]:
df_cases.show()

+-------+--------+---------------+-----+--------------------+---------+---------+----------+
|case_id|province|           city|group|    infection_source|confirmed| latitude| longitude|
+-------+--------+---------------+-----+--------------------+---------+---------+----------+
|1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|       72|37.538621|126.992652|
|1000002|   Seoul|        Guro-gu| true| Guro-gu Call Center|       98|37.508163|126.884387|
|1000003|   Seoul|  Dongdaemun-gu| true|       Dongan Church|       20|37.592888|127.056766|
|1000004|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|
|1000005|   Seoul|   Eunpyeong-gu| true|Eunpyeong St. Mar...|       14| 37.63369|  126.9165|
|1000006|   Seoul|   Seongdong-gu| true|    Seongdong-gu APT|       13| 37.55713|  127.0403|
|1000007|   Seoul|      Jongno-gu| true|Jongno Community ...|       10| 37.57681|   127.006|
|1000008|   Seoul|        Jung-gu| true|Jung-gu Fashion C...|        7

In [123]:
df_cases.groupBy('province', 'city').agg(\
    f.sum('confirmed').alias('Total Cases'), \
    f.max('confirmed').alias('Highest_case') \
    ).show(5)

+----------------+---------------+-----------+------------+
|        province|           city|Total Cases|Highest_case|
+----------------+---------------+-----------+------------+
|Gyeongsangnam-do|       Jinju-si|         10|          10|
|           Seoul|        Guro-gu|        139|          98|
|         Daejeon|              -|         27|          10|
|    Jeollabuk-do|from other city|          1|           1|
|Gyeongsangnam-do|Changnyeong-gun|          7|           7|
+----------------+---------------+-----------+------------+
only showing top 5 rows



## Joins

In [76]:
df_region = spark.read.csv('sparkdf_guide/coronavirusdataset/Region.csv', inferSchema=True, header=True, sep=",")

In [83]:
df_region.toPandas().head()

Unnamed: 0,code,province,city,latitude,longitude,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count
0,10000,Seoul,Seoul,37.566953,126.977977,607,830,48,1.44,15.38,5.8,22739
1,10010,Seoul,Gangnam-gu,37.518421,127.047222,33,38,0,4.18,13.17,4.3,3088
2,10020,Seoul,Gangdong-gu,37.530492,127.123837,27,32,0,1.54,14.55,5.4,1023
3,10030,Seoul,Gangbuk-gu,37.639938,127.025508,14,21,0,0.67,19.49,8.5,628
4,10040,Seoul,Gangseo-gu,37.551166,126.849506,36,56,1,1.17,14.39,5.7,1080


In [78]:
df_joined = df_cases.join(df_region, on=['province', 'city'], how='left')

In [82]:
df_joined.toPandas().head()

Unnamed: 0,province,city,case_id,group,infection_source,confirmed,latitude,longitude,code,latitude.1,longitude.1,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count
0,Seoul,Yongsan-gu,1000001,True,Itaewon Clubs,72,37.538621,126.992652,10210.0,37.532768,126.990021,15.0,13.0,1.0,0.68,16.87,6.5,435.0
1,Seoul,Guro-gu,1000002,True,Guro-gu Call Center,98,37.508163,126.884387,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0
2,Seoul,Dongdaemun-gu,1000003,True,Dongan Church,20,37.592888,127.056766,10110.0,37.574552,127.039721,21.0,31.0,4.0,1.06,17.26,6.7,832.0
3,Seoul,Guro-gu,1000004,True,Manmin Central Church,41,37.481059,126.894343,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0
4,Seoul,Eunpyeong-gu,1000005,True,Eunpyeong St. Mary's Hospital,14,37.63369,126.9165,10220.0,37.603481,126.929173,31.0,44.0,1.0,1.09,17.0,6.5,874.0


## using SQL with Spark

In [124]:
df_cases.show(10)

+-------+--------+---------------+-----+--------------------+---------+---------+----------+
|case_id|province|           city|group|    infection_source|confirmed| latitude| longitude|
+-------+--------+---------------+-----+--------------------+---------+---------+----------+
|1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|       72|37.538621|126.992652|
|1000002|   Seoul|        Guro-gu| true| Guro-gu Call Center|       98|37.508163|126.884387|
|1000003|   Seoul|  Dongdaemun-gu| true|       Dongan Church|       20|37.592888|127.056766|
|1000004|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|
|1000005|   Seoul|   Eunpyeong-gu| true|Eunpyeong St. Mar...|       14| 37.63369|  126.9165|
|1000006|   Seoul|   Seongdong-gu| true|    Seongdong-gu APT|       13| 37.55713|  127.0403|
|1000007|   Seoul|      Jongno-gu| true|Jongno Community ...|       10| 37.57681|   127.006|
|1000008|   Seoul|        Jung-gu| true|Jung-gu Fashion C...|        7

In [125]:
df_cases.registerTempTable('cases_table')

In [126]:
spark.sql("select * from cases_table where confirmed<10 and case_id like '11000%'").show()

+-------+--------+---------------+-----+--------------------+---------+--------+---------+
|case_id|province|           city|group|    infection_source|confirmed|latitude|longitude|
+-------+--------+---------------+-----+--------------------+---------+--------+---------+
|1100003|   Busan|     Suyeong-gu| true|Suyeong-gu Kinder...|        5|35.16708| 129.1124|
|1100004|   Busan|    Haeundae-gu| true|Haeundae-gu Catho...|        6|35.20599| 129.1256|
|1100005|   Busan|         Jin-gu| true|      Jin-gu Academy|        4|35.17371| 129.0633|
|1100006|   Busan|from other city| true|Cheongdo Daenam H...|        1|       -|        -|
+-------+--------+---------------+-----+--------------------+---------+--------+---------+



## New Column

In [127]:
df_cases.withColumn('new_confirmed', 100+f.col('confirmed')).show(5)

+-------+--------+-------------+-----+--------------------+---------+---------+----------+-------------+
|case_id|province|         city|group|    infection_source|confirmed| latitude| longitude|new_confirmed|
+-------+--------+-------------+-----+--------------------+---------+---------+----------+-------------+
|1000001|   Seoul|   Yongsan-gu| true|       Itaewon Clubs|       72|37.538621|126.992652|          172|
|1000002|   Seoul|      Guro-gu| true| Guro-gu Call Center|       98|37.508163|126.884387|          198|
|1000003|   Seoul|Dongdaemun-gu| true|       Dongan Church|       20|37.592888|127.056766|          120|
|1000004|   Seoul|      Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|          141|
|1000005|   Seoul| Eunpyeong-gu| true|Eunpyeong St. Mar...|       14| 37.63369|  126.9165|          114|
+-------+--------+-------------+-----+--------------------+---------+---------+----------+-------------+
only showing top 5 rows



## Spark UDF

In [128]:
def caseHighLow(confirmed):
    if confirmed < 50:
        return 'low'
    else:
        return 'high'

# convert the above function to UDF, by passing function and the return type
caseHighLowUDF = f.udf(caseHighLow, StringType())

In [129]:
df_cases.show(2)

+-------+--------+----------+-----+-------------------+---------+---------+----------+
|case_id|province|      city|group|   infection_source|confirmed| latitude| longitude|
+-------+--------+----------+-----+-------------------+---------+---------+----------+
|1000001|   Seoul|Yongsan-gu| true|      Itaewon Clubs|       72|37.538621|126.992652|
|1000002|   Seoul|   Guro-gu| true|Guro-gu Call Center|       98|37.508163|126.884387|
+-------+--------+----------+-----+-------------------+---------+---------+----------+
only showing top 2 rows



In [131]:
spark.conf.set("spark.executor.heartbeatInterval","360s")

AnalysisException: Cannot modify the value of a Spark config: spark.executor.heartbeatInterval

In [132]:
# df_cases.withColumn("HighLow", caseHighLowUDF("confirmed")).show(5)

In [136]:
df_cases.show(1)

+-------+--------+----------+-----+----------------+---------+---------+----------+
|case_id|province|      city|group|infection_source|confirmed| latitude| longitude|
+-------+--------+----------+-----+----------------+---------+---------+----------+
|1000001|   Seoul|Yongsan-gu| true|   Itaewon Clubs|       72|37.538621|126.992652|
+-------+--------+----------+-----+----------------+---------+---------+----------+
only showing top 1 row



## Spark Window function

In [139]:
df_timeprovince = spark.read.load('sparkdf_guide/coronavirusdataset/TimeProvince.csv', infer_schema=True, sep=",", header=True, format='csv')

In [141]:
df_timeprovince.show(2)

+----------+----+--------+---------+--------+--------+
|      date|time|province|confirmed|released|deceased|
+----------+----+--------+---------+--------+--------+
|2020-01-20|  16|   Seoul|        0|       0|       0|
|2020-01-20|  16|   Busan|        0|       0|       0|
+----------+----+--------+---------+--------+--------+
only showing top 2 rows



## Ranking

In [142]:
from pyspark.sql.window import Window

In [158]:
window_spec = Window().partitionBy(['province']).orderBy(f.desc('confirmed'))
df_cases.withColumn('rank', f.rank().over(window_spec)).orderBy(['province', 'confirmed']).show()

+-------+-----------------+---------------+-----+--------------------+---------+---------+----------+----+
|case_id|         province|           city|group|    infection_source|confirmed| latitude| longitude|rank|
+-------+-----------------+---------------+-----+--------------------+---------+---------+----------+----+
|1100006|            Busan|from other city| true|Cheongdo Daenam H...|        1|        -|         -|   9|
|1100005|            Busan|         Jin-gu| true|      Jin-gu Academy|        4| 35.17371|  129.0633|   8|
|1100003|            Busan|     Suyeong-gu| true|Suyeong-gu Kinder...|        5| 35.16708|  129.1124|   7|
|1100004|            Busan|    Haeundae-gu| true|Haeundae-gu Catho...|        6| 35.20599|  129.1256|   6|
|1100002|            Busan|from other city| true|  Shincheonji Church|       12|        -|         -|   5|
|1100008|            Busan|              -|false|contact with patient|       18|        -|         -|   4|
|1100007|            Busan|          

In [None]:
# df_trans = spark.read.csv('bank_transactions/bank_transactions.csv', header=True, inferSchema=True)
# df_trans.registerTempTable('bank_transactions')
# df = spark.sql("select cast(account_no as string) as account_no, date, cast(transaction_details as string) as transaction_details, cast(cast(chq_no as decimal(32,3)) as int) as chq_no, value_date, cast(cast(withdrawal_amt as decimal(32,3)) as int) as withdrawal_amt, cast(cast(deposit_amt as decimal(32,3)) as int) as deposit_amt, cast(cast(balance_amt as decimal(32,3)) as int) as balance_amt from bank_transactions").toPandas()
# df.to_csv('bank_transactions/bank_transactions.csv', index=None)
# df_trans.filter(df_trans.chq_no.isNull()).show(5)
# df_trans.na.drop(how='any', subset=['deposit_amt']).show(5)
# df_trans.select('account_no').distinct().show()
# df_trans.registerTempTable('bank_transactions')
# spark.sql("select * from bank_transactions where deposit_amt is null").count()
