In [0]:
#We initiated a SparkSession using the .builder method.
#We used .appName to tell Spark to name our Application HelloWorldApp.
#We used .getOrCreate() to tell Spark to create the Application if it does not exist yet, or reconnect to the existing app with the given name should it exist already.
#Finally, the reference to this Spark application is stored in an object we named spark

In [0]:
#in order to start working with pyspark you need to first set up a SparkSession.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("hello").getOrCreate()

In [0]:
df = spark.sql('SELECT "hello" as C1')
df.show()

In [0]:
#If you did everything right, you should see a table with your Hello World message inside. Congratulations! 
#You've just built your first Spark application that says hello to the world!!

In [0]:
# To kill the Spark application, use the `stop()` method
spark.stop()

In [0]:

#I will be working with the Data Science for COVID-19 in South Korea, which is one of the most detailed datasets on the internet for COVID.

In [0]:
cases = spark.read.load("/FileStore/Case.csv",format="csv", sep=",", inferSchema="true", header="true")
cases.toPandas()

Unnamed: 0,case_id,province,city,group,infection_case,confirmed,latitude,longitude
0,1000001,Seoul,Yongsan-gu,True,Itaewon Clubs,139,37.538621,126.992652
1,1000002,Seoul,Gwanak-gu,True,Richway,119,37.48208,126.901384
2,1000003,Seoul,Guro-gu,True,Guro-gu Call Center,95,37.508163,126.884387
3,1000004,Seoul,Yangcheon-gu,True,Yangcheon Table Tennis Club,43,37.546061,126.874209
4,1000005,Seoul,Dobong-gu,True,Day Care Center,43,37.679422,127.044374
...,...,...,...,...,...,...,...,...
169,6100012,Gyeongsangnam-do,-,False,etc,20,-,-
170,7000001,Jeju-do,-,False,overseas inflow,14,-,-
171,7000002,Jeju-do,-,False,contact with patient,0,-,-
172,7000003,Jeju-do,-,False,etc,4,-,-


In [0]:
cases.limit(10).toPandas()
cases.toPandas()

Unnamed: 0,province,city,confirmed,code,latitude,longitude,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count,code.1,latitude.1,longitude.1,elementary_school_count.1,kindergarten_count.1,university_count.1,academy_ratio.1,elderly_population_ratio.1,elderly_alone_ratio.1,nursing_home_count.1
0,Seoul,Yongsan-gu,139,10210.0,37.532768,126.990021,15.0,13.0,1.0,0.68,16.87,6.5,435.0,10210.0,37.532768,126.990021,15.0,13.0,1.0,0.68,16.87,6.5,435.0
1,Seoul,Gwanak-gu,119,10050.0,37.478290,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0,10050.0,37.478290,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0
2,Seoul,Guro-gu,95,10070.0,37.495632,126.887650,26.0,34.0,3.0,1.00,16.21,5.7,741.0,10070.0,37.495632,126.887650,26.0,34.0,3.0,1.00,16.21,5.7,741.0
3,Seoul,Yangcheon-gu,43,10190.0,37.517189,126.866618,30.0,43.0,0.0,2.26,13.55,5.5,816.0,10190.0,37.517189,126.866618,30.0,43.0,0.0,2.26,13.55,5.5,816.0
4,Seoul,Dobong-gu,43,10100.0,37.668952,127.047082,23.0,26.0,1.0,0.95,17.89,7.2,485.0,10100.0,37.668952,127.047082,23.0,26.0,1.0,0.95,17.89,7.2,485.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
169,Gyeongsangnam-do,-,20,,,,,,,,,,,,,,,,,,,,
170,Jeju-do,-,14,,,,,,,,,,,,,,,,,,,,
171,Jeju-do,-,0,,,,,,,,,,,,,,,,,,,,
172,Jeju-do,-,4,,,,,,,,,,,,,,,,,,,,


In [0]:
cases = cases.withColumnRenamed("infection_case","infection_source")

In [0]:
cases = cases.select('province','city','confirmed')
cases.toPandas()

Unnamed: 0,province,city,confirmed
0,Seoul,Yongsan-gu,139
1,Seoul,Gwanak-gu,119
2,Seoul,Guro-gu,95
3,Seoul,Yangcheon-gu,43
4,Seoul,Dobong-gu,43
...,...,...,...
169,Gyeongsangnam-do,-,20
170,Jeju-do,-,14
171,Jeju-do,-,0
172,Jeju-do,-,4


In [0]:

from pyspark.sql import functions as F
cases.sort("confirmed").show()
# descending Sort
cases.sort(F.desc("confirmed")).toPandas()

Unnamed: 0,province,city,confirmed
0,Daegu,Nam-gu,4511
1,Daegu,-,917
2,Daegu,-,747
3,Gyeongsangbuk-do,from other city,566
4,Gyeonggi-do,-,305
...,...,...,...
169,Jeollanam-do,from other city,1
170,Jeju-do,from other city,1
171,Seoul,Gangseo-gu,0
172,Gangwon-do,-,0


In [0]:
#We can filter a data frame using multiple conditions using AND(&), OR(|) and NOT(~) conditions
cases.filter((cases.confirmed>10) & (cases.province=='Daegu')).toPandas()

Unnamed: 0,province,city,confirmed
0,Daegu,Nam-gu,4511
1,Daegu,Dalseong-gun,196
2,Daegu,Seo-gu,124
3,Daegu,Dalseong-gun,101
4,Daegu,Dong-gu,39
5,Daegu,-,41
6,Daegu,-,917
7,Daegu,-,747


In [0]:
from pyspark.sql import functions as F
cases.groupBy(["province","city"]).agg(F.sum("confirmed") ,F.max("confirmed")).toPandas()

Unnamed: 0,province,city,sum(confirmed),max(confirmed)
0,Gyeongsangnam-do,Jinju-si,9,9
1,Seoul,Guro-gu,139,95
2,Seoul,Gangnam-gu,18,7
3,Daejeon,-,100,55
4,Jeollabuk-do,from other city,6,3
...,...,...,...,...
77,Jeollanam-do,Muan-gun,2,2
78,Seoul,Eunpyeong-gu,14,14
79,Seoul,Yeongdeungpo-gu,3,3
80,Seoul,Geumcheon-gu,6,6


In [0]:
cases.groupBy(["province","city"]).agg(
    F.sum("confirmed").alias("TotalConfirmed"),\
    F.max("confirmed").alias("MaxFromOneConfirmedCase")\
    ).toPandas()

Unnamed: 0,province,city,TotalConfirmed,MaxFromOneConfirmedCase
0,Gyeongsangnam-do,Jinju-si,9,9
1,Seoul,Guro-gu,139,95
2,Seoul,Gangnam-gu,18,7
3,Daejeon,-,100,55
4,Jeollabuk-do,from other city,6,3
...,...,...,...,...
77,Jeollanam-do,Muan-gun,2,2
78,Seoul,Eunpyeong-gu,14,14
79,Seoul,Yeongdeungpo-gu,3,3
80,Seoul,Geumcheon-gu,6,6


In [0]:
# for join we need another csv file! lets add region from datasets.
regions = spark.read.load("/FileStore/Region.csv",format="csv", sep=",", inferSchema="true", header="true")
regions.toPandas()

Unnamed: 0,code,province,city,latitude,longitude,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count
0,10000,Seoul,Seoul,37.566953,126.977977,607,830,48,1.44,15.38,5.8,22739
1,10010,Seoul,Gangnam-gu,37.518421,127.047222,33,38,0,4.18,13.17,4.3,3088
2,10020,Seoul,Gangdong-gu,37.530492,127.123837,27,32,0,1.54,14.55,5.4,1023
3,10030,Seoul,Gangbuk-gu,37.639938,127.025508,14,21,0,0.67,19.49,8.5,628
4,10040,Seoul,Gangseo-gu,37.551166,126.849506,36,56,1,1.17,14.39,5.7,1080
...,...,...,...,...,...,...,...,...,...,...,...,...
239,61160,Gyeongsangnam-do,Haman-gun,35.272481,128.406540,16,20,0,1.19,23.74,14.7,94
240,61170,Gyeongsangnam-do,Hamyang-gun,35.520541,127.725177,13,12,0,1.01,32.65,20.9,83
241,61180,Gyeongsangnam-do,Hapcheon-gun,35.566702,128.165870,17,15,0,0.71,38.44,24.7,96
242,70000,Jeju-do,Jeju-do,33.488936,126.500423,113,123,4,1.53,15.10,6.4,1245


In [0]:
cases1 = cases.join(regions, ['province','city'],how='left')
cases1.limit(10).toPandas()

Unnamed: 0,province,city,confirmed,code,latitude,longitude,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count,code.1,latitude.1,longitude.1,elementary_school_count.1,kindergarten_count.1,university_count.1,academy_ratio.1,elderly_population_ratio.1,elderly_alone_ratio.1,nursing_home_count.1,code.2,latitude.2,longitude.2,elementary_school_count.2,kindergarten_count.2,university_count.2,academy_ratio.2,elderly_population_ratio.2,elderly_alone_ratio.2,nursing_home_count.2
0,Seoul,Yongsan-gu,139,10210.0,37.532768,126.990021,15.0,13.0,1.0,0.68,16.87,6.5,435.0,10210.0,37.532768,126.990021,15.0,13.0,1.0,0.68,16.87,6.5,435.0,10210.0,37.532768,126.990021,15.0,13.0,1.0,0.68,16.87,6.5,435.0
1,Seoul,Gwanak-gu,119,10050.0,37.47829,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0,10050.0,37.47829,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0,10050.0,37.47829,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0
2,Seoul,Guro-gu,95,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0
3,Seoul,Yangcheon-gu,43,10190.0,37.517189,126.866618,30.0,43.0,0.0,2.26,13.55,5.5,816.0,10190.0,37.517189,126.866618,30.0,43.0,0.0,2.26,13.55,5.5,816.0,10190.0,37.517189,126.866618,30.0,43.0,0.0,2.26,13.55,5.5,816.0
4,Seoul,Dobong-gu,43,10100.0,37.668952,127.047082,23.0,26.0,1.0,0.95,17.89,7.2,485.0,10100.0,37.668952,127.047082,23.0,26.0,1.0,0.95,17.89,7.2,485.0,10100.0,37.668952,127.047082,23.0,26.0,1.0,0.95,17.89,7.2,485.0
5,Seoul,Guro-gu,41,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0
6,Seoul,from other city,36,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
7,Seoul,Dongdaemun-gu,17,10110.0,37.574552,127.039721,21.0,31.0,4.0,1.06,17.26,6.7,832.0,10110.0,37.574552,127.039721,21.0,31.0,4.0,1.06,17.26,6.7,832.0,10110.0,37.574552,127.039721,21.0,31.0,4.0,1.06,17.26,6.7,832.0
8,Seoul,from other city,25,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
9,Seoul,Gwanak-gu,30,10050.0,37.47829,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0,10050.0,37.47829,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0,10050.0,37.47829,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0


In [0]:
#Spark can “broadcast” a small DataFrame by sending all the data in that small DataFrame to all nodes in the cluster. After the small DataFrame is broadcasted, Spark can perform a join without shuffling any of the data in the large DataFrame.

from pyspark.sql.functions import broadcast
cases2 = cases.join(broadcast(regions), ['province','city'],how='left')
cases2.toPandas()


Unnamed: 0,province,city,confirmed,code,latitude,longitude,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count,code.1,latitude.1,longitude.1,elementary_school_count.1,kindergarten_count.1,university_count.1,academy_ratio.1,elderly_population_ratio.1,elderly_alone_ratio.1,nursing_home_count.1,code.2,latitude.2,longitude.2,elementary_school_count.2,kindergarten_count.2,university_count.2,academy_ratio.2,elderly_population_ratio.2,elderly_alone_ratio.2,nursing_home_count.2
0,Seoul,Yongsan-gu,139,10210.0,37.532768,126.990021,15.0,13.0,1.0,0.68,16.87,6.5,435.0,10210.0,37.532768,126.990021,15.0,13.0,1.0,0.68,16.87,6.5,435.0,10210.0,37.532768,126.990021,15.0,13.0,1.0,0.68,16.87,6.5,435.0
1,Seoul,Gwanak-gu,119,10050.0,37.478290,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0,10050.0,37.478290,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0,10050.0,37.478290,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0
2,Seoul,Guro-gu,95,10070.0,37.495632,126.887650,26.0,34.0,3.0,1.00,16.21,5.7,741.0,10070.0,37.495632,126.887650,26.0,34.0,3.0,1.00,16.21,5.7,741.0,10070.0,37.495632,126.887650,26.0,34.0,3.0,1.00,16.21,5.7,741.0
3,Seoul,Yangcheon-gu,43,10190.0,37.517189,126.866618,30.0,43.0,0.0,2.26,13.55,5.5,816.0,10190.0,37.517189,126.866618,30.0,43.0,0.0,2.26,13.55,5.5,816.0,10190.0,37.517189,126.866618,30.0,43.0,0.0,2.26,13.55,5.5,816.0
4,Seoul,Dobong-gu,43,10100.0,37.668952,127.047082,23.0,26.0,1.0,0.95,17.89,7.2,485.0,10100.0,37.668952,127.047082,23.0,26.0,1.0,0.95,17.89,7.2,485.0,10100.0,37.668952,127.047082,23.0,26.0,1.0,0.95,17.89,7.2,485.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
169,Gyeongsangnam-do,-,20,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
170,Jeju-do,-,14,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
171,Jeju-do,-,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
172,Jeju-do,-,4,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [0]:
from pyspark.sql.functions import *
df_as1 = regions.alias("df_as1")
df_as2 = cases.alias("df_as2")
joined_df = df_as1.join(df_as2, col("df_as1.city") == col("df_as2.city"), 'inner')
joined_df.toPandas()



Unnamed: 0,code,province,city,latitude,longitude,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count,province.1,city.1,confirmed,code.1,latitude.1,longitude.1,elementary_school_count.1,kindergarten_count.1,university_count.1,academy_ratio.1,elderly_population_ratio.1,elderly_alone_ratio.1,nursing_home_count.1,code.2,latitude.2,longitude.2,elementary_school_count.2,kindergarten_count.2,university_count.2,academy_ratio.2,elderly_population_ratio.2,elderly_alone_ratio.2,nursing_home_count.2
0,10210,Seoul,Yongsan-gu,37.532768,126.990021,15,13,1,0.68,16.87,6.5,435,Seoul,Yongsan-gu,139,10210,37.532768,126.990021,15,13,1,0.68,16.87,6.5,435,10210,37.532768,126.990021,15,13,1,0.68,16.87,6.5,435
1,10050,Seoul,Gwanak-gu,37.478290,126.951502,22,33,1,0.89,15.12,4.9,909,Seoul,Gwanak-gu,119,10050,37.478290,126.951502,22,33,1,0.89,15.12,4.9,909,10050,37.478290,126.951502,22,33,1,0.89,15.12,4.9,909
2,10070,Seoul,Guro-gu,37.495632,126.887650,26,34,3,1.00,16.21,5.7,741,Seoul,Guro-gu,95,10070,37.495632,126.887650,26,34,3,1.00,16.21,5.7,741,10070,37.495632,126.887650,26,34,3,1.00,16.21,5.7,741
3,10190,Seoul,Yangcheon-gu,37.517189,126.866618,30,43,0,2.26,13.55,5.5,816,Seoul,Yangcheon-gu,43,10190,37.517189,126.866618,30,43,0,2.26,13.55,5.5,816,10190,37.517189,126.866618,30,43,0,2.26,13.55,5.5,816
4,10100,Seoul,Dobong-gu,37.668952,127.047082,23,26,1,0.95,17.89,7.2,485,Seoul,Dobong-gu,43,10100,37.668952,127.047082,23,26,1,0.95,17.89,7.2,485,10100,37.668952,127.047082,23,26,1,0.95,17.89,7.2,485
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
112,61110,Gyeongsangnam-do,Jinju-si,35.180313,128.108750,45,53,6,2.49,16.27,8.6,597,Gyeongsangnam-do,Jinju-si,9,61110,35.180313,128.108750,45,53,6,2.49,16.27,8.6,597,61110,35.180313,128.108750,45,53,6,2.49,16.27,8.6,597
113,61020,Gyeongsangnam-do,Geochang-gun,35.686526,127.910021,17,16,2,1.25,27.01,17.4,127,Gyeongsangnam-do,Geochang-gun,8,61020,35.686526,127.910021,17,16,2,1.25,27.01,17.4,127,61020,35.686526,127.910021,17,16,2,1.25,27.01,17.4,127
114,61130,Gyeongsangnam-do,Changwon-si,35.227992,128.681815,110,195,5,1.84,13.64,6.5,1701,Gyeongsangnam-do,Changwon-si,7,61130,35.227992,128.681815,110,195,5,1.84,13.64,6.5,1701,61130,35.227992,128.681815,110,195,5,1.84,13.64,6.5,1701
115,61120,Gyeongsangnam-do,Changnyeong-gun,35.544603,128.492330,17,20,0,0.80,29.80,18.4,129,Gyeongsangnam-do,Changnyeong-gun,7,61120,35.544603,128.492330,17,20,0,0.80,29.80,18.4,129,61120,35.544603,128.492330,17,20,0,0.80,29.80,18.4,129


In [0]:
regions.columns

In [0]:
from pyspark.ml.stat import Correlation
#r1 = Correlation.corr(regions, "features").head()

In [0]:
regions.describe(['academy_ratio']).toPandas()

Unnamed: 0,summary,academy_ratio
0,count,244.0
1,mean,1.2947540983606531
2,stddev,0.5928979025284541
3,min,0.19
4,max,4.18


In [0]:
regions.drop('university_count')

In [0]:
regions.drop('university_count').limit(1).collect()

In [0]:
#Return a new DataFrame with duplicate rows removed, optionally only considering certain columns.

#For a static batch DataFrame, it just drops duplicate rows. For a streaming DataFrame, it will keep all data across triggers as intermediate state to drop duplicates rows. You can use withWatermark() to limit how late the duplicate data can be and system will accordingly limit the state. In addition, too late data older than watermark will be dropped to avoid any possibility of duplicates.

from pyspark.sql import Row
df = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Alice', age=5, height=80),Row(name='Alice', age=10, height=80)]).toDF()
df.dropDuplicates().toPandas()

Unnamed: 0,name,age,height
0,Alice,5,80
1,Alice,10,80


In [0]:
df.dropDuplicates(['name', 'height']).toPandas()

Unnamed: 0,name,age,height
0,Alice,5,80


In [0]:
#Parameters
#how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
#thresh – int, default None If specified, drop rows that have less than non-null values. This overwrites the parameter.
#subset – optional list of column names to consider.
cases= spark.read.load("/FileStore/Case.csv",format="csv", sep=",", inferSchema="true", header="true")
df1 = cases.toPandas()
df1


Unnamed: 0,case_id,province,city,group,infection_case,confirmed,latitude,longitude
0,1000001,Seoul,Yongsan-gu,True,Itaewon Clubs,139,37.538621,126.992652
1,1000002,Seoul,Gwanak-gu,True,Richway,119,37.48208,126.901384
2,1000003,Seoul,Guro-gu,True,Guro-gu Call Center,95,37.508163,126.884387
3,1000004,Seoul,Yangcheon-gu,True,Yangcheon Table Tennis Club,43,37.546061,126.874209
4,1000005,Seoul,Dobong-gu,True,Day Care Center,43,37.679422,127.044374
...,...,...,...,...,...,...,...,...
169,6100012,Gyeongsangnam-do,-,False,etc,20,-,-
170,7000001,Jeju-do,-,False,overseas inflow,14,-,-
171,7000002,Jeju-do,-,False,contact with patient,0,-,-
172,7000003,Jeju-do,-,False,etc,4,-,-


In [0]:
cases.dropna(how='any')
cases.toPandas()

Unnamed: 0,case_id,province,city,group,infection_case,confirmed,latitude,longitude
0,1000001,Seoul,Yongsan-gu,True,Itaewon Clubs,139,37.538621,126.992652
1,1000002,Seoul,Gwanak-gu,True,Richway,119,37.48208,126.901384
2,1000003,Seoul,Guro-gu,True,Guro-gu Call Center,95,37.508163,126.884387
3,1000004,Seoul,Yangcheon-gu,True,Yangcheon Table Tennis Club,43,37.546061,126.874209
4,1000005,Seoul,Dobong-gu,True,Day Care Center,43,37.679422,127.044374
...,...,...,...,...,...,...,...,...
169,6100012,Gyeongsangnam-do,-,False,etc,20,-,-
170,7000001,Jeju-do,-,False,overseas inflow,14,-,-
171,7000002,Jeju-do,-,False,contact with patient,0,-,-
172,7000003,Jeju-do,-,False,etc,4,-,-
