In [3]:
## CELL 1
import findspark
findspark.init()
import pyspark
findspark.find()

## CELL 2
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = pyspark.SparkConf().setAppName('CovidCases').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [181]:
from pyspark.sql.functions import desc, asc
from pyspark.sql import SQLContext

In [19]:
import pandas as pd
import numpy as np
from datetime import date, timedelta, datetime
import time

<h1>1. Basic Functions</h1>

* Check out more about working with csv files
* [Options](https://spark.apache.org/docs/latest/sql-data-sources-csv.html)

In [16]:


'''
# 
covedcases = spark.read.load(path,
                        format="csv", 
                        sep=",", 
                        inferSchema="true", 
                        header="true")

#                       
covedcases = spark.read.options(sep=",",inferSchema="true",header="true").csv(path)

# Read all files in a folder, please make sure only CSV files should present in the folder.

folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
'''




'\n# \ncovedcases = spark.read.load(path,\n                        format="csv", \n                        sep=",", \n                        inferSchema="true", \n                        header="true")\n\n# Read all files in a folder, please make sure only CSV files should present in the folder.\n\nfolderPath = "examples/src/main/resources"\ndf5 = spark.read.csv(folderPath)\ndf5.show()\n'

In [170]:
path="./dataSets/CovidCase.csv"
covedcases = spark.read.options(sep=",",inferSchema="true",header="true").csv(path)

In [15]:
covedcases.show(4)

+--------+--------+------------+-----+--------------------+---------+---------+----------+
| case_id|province|        city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|  Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|   Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|     Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|
+--------+--------+------------+-----+--------------------+---------+---------+----------+
only showing top 4 rows



In [17]:
covedcases.printSchema()

root
 |--  case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



* sometimes as we the number of columns increases, the formatting becomes not too great. I have noticed that the following trick helps in displaying in pandas format in my Jupyter Notebook.

* The .toPandas() function converts a Spark Dataframe into a Pandas Dataframe, which is much easier to play with.

In [26]:
covedcases.limit(10).toPandas()

Unnamed: 0,case_id,province,city,group,infection_case,confirmed,latitude,longitude
0,1000001,Seoul,Yongsan-gu,True,Itaewon Clubs,139,37.538621,126.992652
1,1000002,Seoul,Gwanak-gu,True,Richway,119,37.48208,126.901384
2,1000003,Seoul,Guro-gu,True,Guro-gu Call Center,95,37.508163,126.884387
3,1000004,Seoul,Yangcheon-gu,True,Yangcheon Table Tennis Club,43,37.546061,126.874209
4,1000005,Seoul,Dobong-gu,True,Day Care Center,43,37.679422,127.044374
5,1000006,Seoul,Guro-gu,True,Manmin Central Church,41,37.481059,126.894343
6,1000007,Seoul,from other city,True,SMR Newly Planted Churches Group,36,-,-
7,1000008,Seoul,Dongdaemun-gu,True,Dongan Church,17,37.592888,127.056766
8,1000009,Seoul,from other city,True,Coupang Logistics Center,25,-,-
9,1000010,Seoul,Gwanak-gu,True,Wangsung Church,30,37.481735,126.930121


<h2>Understand various dataFrame submethods and there types </h2>

<h2>Renaming column names</h2>

In [54]:
covedcases.printSchema()

# renames an existing column, we can use show() or save it to same dataframe covidcases
covedcases.withColumnRenamed(' case_id','case_id').show(5) 

#adds a column or replaces an existing column
covedcases.withColumn('confirmed_2',covedcases.confirmed+2).show(5)

# Renames multiple columns 
# Parameters , colsMap : dict :   a dict of existing column names and corresponding desired column names.
new_col_names = {' case_id':'Case_id'
                 ,'province':'Province'
                 }
covedcases.withColumnsRenamed(new_col_names).show(4)


root
 |--  case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)

+-------+--------+------------+-----+--------------------+---------+---------+----------+
|case_id|province|        city|group|      infection_case|confirmed| latitude| longitude|
+-------+--------+------------+-----+--------------------+---------+---------+----------+
|1000001|   Seoul|  Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
|1000002|   Seoul|   Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
|1000003|   Seoul|     Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|
|1000004|   Seoul|Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|
|1000005|   Seoul|   Dobong-gu| true|   

## Sorting column

In [76]:
print("* sort  by a single column either using a asc or desc method -- desc(' case_id')")
covedcases.sort(desc(' case_id')).show(5)

print("* or use another parameter in sort  ascending which takes a boolean -- ' case_id',ascending=True")
covedcases.sort(' case_id',ascending=True).show(5)

print("* sorting multiple columns -- desc('province'),desc('infection_case')")
covedcases.sort(desc('province'),desc('infection_case')).show(10)

print("* or we can use  orderby method also 'city',ascending=False")
print("we could use multiple columns in orderBy in the same way as sort()")
covedcases.orderBy('city',ascending=False).show(5)

* sort  by a single column either using a asc or desc method -- desc(' case_id')
+--------+----------------+---------------+-----+--------------------+---------+--------+---------+
| case_id|        province|           city|group|      infection_case|confirmed|latitude|longitude|
+--------+----------------+---------------+-----+--------------------+---------+--------+---------+
| 7000004|         Jeju-do|from other city| true|       Itaewon Clubs|        1|       -|        -|
| 7000003|         Jeju-do|              -|false|                 etc|        4|       -|        -|
| 7000002|         Jeju-do|              -|false|contact with patient|        0|       -|        -|
| 7000001|         Jeju-do|              -|false|     overseas inflow|       14|       -|        -|
| 6100012|Gyeongsangnam-do|              -|false|                 etc|       20|       -|        -|
+--------+----------------+---------------+-----+--------------------+---------+--------+---------+
only showing top 5 

## Changing column type

In [80]:
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql import functions as F

In [88]:
covedcases.printSchema()
covedcases.show(3)


covedcases.withColumn('confirmed',F.col('confirmed').cast(StringType())).printSchema()
covedcases.withColumn('confirmed',F.col('confirmed').cast(StringType())).show(3)

root
 |--  case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)

+--------+--------+----------+-----+-------------------+---------+---------+----------+
| case_id|province|      city|group|     infection_case|confirmed| latitude| longitude|
+--------+--------+----------+-----+-------------------+---------+---------+----------+
| 1000001|   Seoul|Yongsan-gu| true|      Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul| Gwanak-gu| true|            Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|   Guro-gu| true|Guro-gu Call Center|       95|37.508163|126.884387|
+--------+--------+----------+-----+-------------------+---------+---------+----------+
only showing top 3 rows

root
 |--  case_id: integer (

In [96]:
# Derived a new column by converting one column using select method
covedcases.select(F.col('confirmed').cast(StringType()).alias('confirmed_str'),'*').show(5)

+-------------+--------+--------+------------+-----+--------------------+---------+---------+----------+
|confirmed_str| case_id|province|        city|group|      infection_case|confirmed| latitude| longitude|
+-------------+--------+--------+------------+-----+--------------------+---------+---------+----------+
|          139| 1000001|   Seoul|  Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
|          119| 1000002|   Seoul|   Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
|           95| 1000003|   Seoul|     Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|
|           43| 1000004|   Seoul|Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|
|           43| 1000005|   Seoul|   Dobong-gu| true|     Day Care Center|       43|37.679422|127.044374|
+-------------+--------+--------+------------+-----+--------------------+---------+---------+----------+
only showing top 5 rows



## Filter

* We can filter a data frame using multiple conditions using AND(&), OR(|) and NOT(~) conditions. For example, we may want to find out all the different infection_case in Daegu with more than 10 confirmed cases.

In [122]:
# Filter  rows using given condition
# func `where` is an alias for :func:`filter`

#     class:`Column` or str(SQL expression in a string)
#     column :   df.age > 2 , df.age == 3  etc
#     sql as string : "age" > 2, "age" ==3 
covedcases.filter((covedcases.confirmed>10) & (covedcases.province=='Daegu')).show()

covedcases.filter('confirmed > 10 and confirmed <115 and province != "Seoul"').show()

+--------+--------+------------+-----+--------------------+---------+---------+----------+
| case_id|province|        city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+------------+-----+--------------------+---------+---------+----------+
| 1200001|   Daegu|      Nam-gu| true|  Shincheonji Church|     4511| 35.84008|  128.5667|
| 1200002|   Daegu|Dalseong-gun| true|Second Mi-Ju Hosp...|      196|35.857375|128.466651|
| 1200003|   Daegu|      Seo-gu| true|Hansarang Convale...|      124|35.885592|128.556649|
| 1200004|   Daegu|Dalseong-gun| true|Daesil Convalesce...|      101|35.857393|128.466653|
| 1200005|   Daegu|     Dong-gu| true|     Fatima Hospital|       39| 35.88395|128.624059|
| 1200008|   Daegu|           -|false|     overseas inflow|       41|        -|         -|
| 1200009|   Daegu|           -|false|contact with patient|      917|        -|         -|
| 1200010|   Daegu|           -|false|                 etc|      747|        -|         -|

## GroupBy

* Groups the DataFrame using the specified columns, so we can run any of the available aggregate function on them

In [161]:
# parameter : list of columns
grp_list = ["province","city"]

# on the GroupedData object we can perform various aggregative functions
# once we perform any aggregate function it returns a dataframe again

# agg  function
# You can perform multiple aggregations in a single agg operation by specifying 
# multiple pairs of column expressions and aggregate functions.

# min,max,avg,count, etc 


In [160]:
covedcases.groupBy(grp_list)\
          .agg(F.count('confirmed').alias('group_count')
              ,F.sum("confirmed").alias('total_confirmed') 
              ,F.max("confirmed").alias('max_confirmed'))\
          .show(10)

+----------------+---------------+-----------+---------------+-------------+
|        province|           city|group_count|total_confirmed|max_confirmed|
+----------------+---------------+-----------+---------------+-------------+
|Gyeongsangnam-do|       Jinju-si|          1|              9|            9|
|           Seoul|        Guro-gu|          3|            139|           95|
|           Seoul|     Gangnam-gu|          4|             18|            7|
|         Daejeon|              -|          4|            100|           55|
|    Jeollabuk-do|from other city|          3|              6|            3|
|Gyeongsangnam-do|Changnyeong-gun|          1|              7|            7|
|           Seoul|              -|          4|            561|          298|
|         Jeju-do|from other city|          1|              1|            1|
|Gyeongsangbuk-do|              -|          3|            345|          190|
|Gyeongsangnam-do|   Geochang-gun|          2|             18|           10|

In [164]:
covedcases.groupBy(grp_list).count().show(5)

+----------------+---------------+-----+
|        province|           city|count|
+----------------+---------------+-----+
|Gyeongsangnam-do|       Jinju-si|    1|
|           Seoul|        Guro-gu|    3|
|           Seoul|     Gangnam-gu|    4|
|         Daejeon|              -|    4|
|    Jeollabuk-do|from other city|    3|
+----------------+---------------+-----+
only showing top 5 rows



## Joins

In [None]:
'''

'''

In [171]:
regionsFile_path = './dataSets/Region.csv'
regions = spark.read.options(sep=",",inferSchema="true",header="true").csv(regionsFile_path)

In [176]:
regions.printSchema()
regions.limit(6).toPandas()

root
 |-- code: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- elementary_school_count: integer (nullable = true)
 |-- kindergarten_count: integer (nullable = true)
 |-- university_count: integer (nullable = true)
 |-- academy_ratio: double (nullable = true)
 |-- elderly_population_ratio: double (nullable = true)
 |-- elderly_alone_ratio: double (nullable = true)
 |-- nursing_home_count: integer (nullable = true)



Unnamed: 0,code,province,city,latitude,longitude,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count
0,10000,Seoul,Seoul,37.566953,126.977977,607,830,48,1.44,15.38,5.8,22739
1,10010,Seoul,Gangnam-gu,37.518421,127.047222,33,38,0,4.18,13.17,4.3,3088
2,10020,Seoul,Gangdong-gu,37.530492,127.123837,27,32,0,1.54,14.55,5.4,1023
3,10030,Seoul,Gangbuk-gu,37.639938,127.025508,14,21,0,0.67,19.49,8.5,628
4,10040,Seoul,Gangseo-gu,37.551166,126.849506,36,56,1,1.17,14.39,5.7,1080
5,10050,Seoul,Gwanak-gu,37.47829,126.951502,22,33,1,0.89,15.12,4.9,909


In [178]:
# Left Join 'Case' with 'Region' on Province and City column

'''
Parameters
----------
other : :class:`DataFrame`
    Right side of the join
on : str, list or :class:`Column`, optional
    a string for the join column name, a list of column names,
    a join expression (Column), or a list of Columns.
    If `on` is a string or a list of strings indicating the name of the join column(s),
    the column(s) must exist on both sides, and this performs an equi-join.
how : str, optional
    default ``inner``. Must be one of: ``inner``, ``cross``, ``outer``,
    ``full``, ``fullouter``, ``full_outer``, ``left``, ``leftouter``, ``left_outer``,
    ``right``, ``rightouter``, ``right_outer``, ``semi``, ``leftsemi``, ``left_semi``,
    ``anti``, ``leftanti`` and ``left_anti``.
'''

cases_reg = covedcases.join(regions,['province','city'],how='left')
cases_reg.limit(6).toPandas()

Unnamed: 0,province,city,case_id,group,infection_case,confirmed,latitude,longitude,code,latitude.1,longitude.1,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count
0,Seoul,Yongsan-gu,1000001,True,Itaewon Clubs,139,37.538621,126.992652,10210,37.532768,126.990021,15,13,1,0.68,16.87,6.5,435
1,Seoul,Gwanak-gu,1000002,True,Richway,119,37.48208,126.901384,10050,37.47829,126.951502,22,33,1,0.89,15.12,4.9,909
2,Seoul,Guro-gu,1000003,True,Guro-gu Call Center,95,37.508163,126.884387,10070,37.495632,126.88765,26,34,3,1.0,16.21,5.7,741
3,Seoul,Yangcheon-gu,1000004,True,Yangcheon Table Tennis Club,43,37.546061,126.874209,10190,37.517189,126.866618,30,43,0,2.26,13.55,5.5,816
4,Seoul,Dobong-gu,1000005,True,Day Care Center,43,37.679422,127.044374,10100,37.668952,127.047082,23,26,1,0.95,17.89,7.2,485
5,Seoul,Guro-gu,1000006,True,Manmin Central Church,41,37.481059,126.894343,10070,37.495632,126.88765,26,34,3,1.0,16.21,5.7,741


## Use SQL with DataFrames

* In Apache Spark, the <b>registerTempTable</b> method is used to register a DataFrame as a temporary table, making it accessible via Spark SQL

* All complex SQL queries like   GROUP BY, HAVING, AND ORDER BY   clauses can be applied in 'Sql' function
* The result of the SQL select statement is again a Spark Dataframe.


### Register a view or tempTable to use Spark SQL
<p>The <code>registerTempTable</code> method is a way to expose a Spark DataFrame as a temporary table in the Spark SQL catalog. It allows you to leverage Spark SQL to query and analyze your DataFrame using SQL-like syntax. This method was commonly used in earlier versions of Spark.</p>

<p>However, starting from Spark 2.0, the preferred method for registering a DataFrame as a temporary table is to use the <code>createOrReplaceTempView</code> method. This method is considered more idiomatic and aligns with the DataFrame API design. Here's an example:</p>

<pre><code># Assuming 'covedcases' is a DataFrame
covedcases.createOrReplaceTempView('cases_temp_table')
</code></pre>

<p>Now, you can use Spark SQL queries to reference the temporary table <code>cases_temp_table</code>:</p>

<pre><code>result = spark.sql("SELECT * FROM cases_temp_table WHERE some_column = 'some_value'")
result.show()
</code></pre>

<p>In this example, <code>some_column</code> is a placeholder for an actual column in your DataFrame.</p>

### Why to create a temp view table, can't we use SQL directly on Spark DataFrame
<p>Make sure that you register the DataFrame as a temporary view or table before referencing it in Spark SQL. This way, Spark SQL knows how to interpret the DataFrame as a SQL table</p>

<p>This error <code>[TABLE_OR_VIEW_NOT_FOUND]</code> commonly occurs when trying to reference a DataFrame directly in Spark SQL without registering it as a temporary table or view.</p>


In [196]:
covedcases.createOrReplaceTempView('cases_temp_table')
newDF = spark.sql('select * from cases_temp_table where confirmed > 100')
newDF.show()

+--------+-----------------+---------------+-----+--------------------+---------+---------+----------+
| case_id|         province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+-----------------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|            Seoul|     Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|            Seoul|      Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
| 1000036|            Seoul|              -|false|     overseas inflow|      298|        -|         -|
| 1000037|            Seoul|              -|false|contact with patient|      162|        -|         -|
| 1200001|            Daegu|         Nam-gu| true|  Shincheonji Church|     4511| 35.84008|  128.5667|
| 1200002|            Daegu|   Dalseong-gun| true|Second Mi-Ju Hosp...|      196|35.857375|128.466651|
| 1200003|            Daegu|         Seo-gu| true|Hansarang Convale...|  

## Create New Columns

## Spark Window functions

In [202]:
timeprovince = spark.read.load("./dataSets/TimeProvince.csv",
                          format="csv", 
                          sep=",", 
                          inferSchema="true", 
                          header="true")

timeprovince.show()

+----------+----+-----------------+---------+--------+--------+
|      date|time|         province|confirmed|released|deceased|
+----------+----+-----------------+---------+--------+--------+
|2020-01-20|  16|            Seoul|        0|       0|       0|
|2020-01-20|  16|            Busan|        0|       0|       0|
|2020-01-20|  16|            Daegu|        0|       0|       0|
|2020-01-20|  16|          Incheon|        1|       0|       0|
|2020-01-20|  16|          Gwangju|        0|       0|       0|
|2020-01-20|  16|          Daejeon|        0|       0|       0|
|2020-01-20|  16|            Ulsan|        0|       0|       0|
|2020-01-20|  16|           Sejong|        0|       0|       0|
|2020-01-20|  16|      Gyeonggi-do|        0|       0|       0|
|2020-01-20|  16|       Gangwon-do|        0|       0|       0|
|2020-01-20|  16|Chungcheongbuk-do|        0|       0|       0|
|2020-01-20|  16|Chungcheongnam-do|        0|       0|       0|
|2020-01-20|  16|     Jeollabuk-do|     

In [203]:
spark