In [1]:
import pandas as pd 
import findspark
findspark.init('/opt/spark')

import pyspark
import random
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession

In [32]:
from pyspark.sql.functions import udf 
from pyspark.sql.types import StructField,IntegerType, StructType,StringType, FloatType
from pyspark.sql.functions import desc 
from pyspark.sql.functions import asc 
import pyspark.sql.functions as F 
import numpy as np
from pyspark.sql import Window
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import DateType
from pyspark.sql.functions import *

# Data Wrangling: Pandas vs. Pyspark

This basic introduction is to compare common data wrangling methods in pyspark and pandas data frame with a concrete example. Here, I used US counties' COVID-19 dataset to show the data wrangling differences between these two types of data frames. The dataset can be downloaded from Kaggle Dataset (https://www.kaggle.com/fireballbyedimyrnmom/us-counties-covid-19-dataset). This should allow you to get started with data manipulation and analysis under both pandas and spark. Specific objectives are to show you how to:

1. Load data from local files
2. Display the schema of the DataFrame 
3.	Change data types of the DataFrame 
4.  Show the head of the DataFrame 
4.	Select columns from the DataFrame 
5.	Show the statistics of the DataFrame 
6.	Drop duplicates 
7.	Missing Values (check NA, drop NA, replace NA)
8.	Datetime manipulations
9.	Filter data based on conditions 
10.	Aggregation functions
12.	Sort values 
13.	Rename columns
14.	Create new columns 
15.	Join tables 
16.	User Defined functions 
17.	Window function 
18.	Operate SQL queries within DataFrame 
19.	Convert one type of DataFrame to another 
20.	Write out the data 

## 1.	Load data from local files

Unlike pandas, we first need to create a spark session or a spark context before we can import data. Spark Context is used as a channel to access all spark functionality. The spark driver program uses spark context to connect to the cluster through a resource manager (YARN or Mesos). SparkConf is required to create the spark context object, which stores configuration parameters like appName (to identify your spark driver), application, number of core and memory size of executor running on the worker node.

In [4]:
path = '/home/zhili/SparkSample/us_counties.csv' # defline the file path 
# Pandas 
df = pd.read_csv(path)

# Pyspark 
appName = "PySpark SQL Server Example - via JDBC"
master = "local"
conf = pyspark.SparkConf().set('spark.driver.host','127.0.0.1').setAppName(appName).setMaster(master).set("spark.driver.extraClassPath","sqljdbc_7.2/enu/mssql-jdbc-7.2.1.jre8.jar")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = SparkSession \
    .builder \
    .appName("Our first Python Spark SQL example") \
    .getOrCreate()
df_s = spark.read.csv(path,header = True)



## 2.	Display the schema of the DataFrame 

In [5]:
## pandas 
print(df.dtypes)
## pyspark 
print(df_s.printSchema())

date       object
county     object
state      object
fips      float64
cases       int64
deaths      int64
dtype: object
root
 |-- date: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- fips: string (nullable = true)
 |-- cases: string (nullable = true)
 |-- deaths: string (nullable = true)

None


## 3. Change the Data Types when Importing the Data

When we check the data types above, we found that the cases and deaths need to be converted to numerical values instead of string format. Here is the way how we can change the data types when importing the data in pandas and spark. 

In [6]:
# pandas : aim to reduce memory size 

types = {'date': 'object', 'county': 'object', 
         'state': 'object', 'fips': 'float',
         'cases': np.uint32, 'deaths': np.uint32}

df = pd.read_csv(path, usecols=types.keys(), dtype=types)
print(df.info(memory_usage='deep'))

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 30843 entries, 0 to 30842
Data columns (total 6 columns):
date      30843 non-null object
county    30843 non-null object
state     30843 non-null object
fips      30388 non-null float64
cases     30843 non-null uint32
deaths    30843 non-null uint32
dtypes: float64(1), object(3), uint32(2)
memory usage: 6.3 MB
None


In pandas, I change the data type of cases and deaths to np.uint32.  We can use 'integer' as the data type for these two columns, but np.unit 32 can help us to save memory. If your data is too large, you can also change the object data type to category to save memory.

In [7]:
# pyspark 
newDF=[StructField('date',StringType(),True),
       StructField('county',StringType(),True),
       StructField('state',StringType(),True),
       StructField('fips',FloatType(),True),
       StructField('cases',IntegerType(),True),
       StructField('deaths',IntegerType(),True)
       ]
finalStruct=StructType(fields=newDF)
df_s = spark.read.csv(path,header = True,schema=finalStruct)
df_s.printSchema()

root
 |-- date: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- fips: float (nullable = true)
 |-- cases: integer (nullable = true)
 |-- deaths: integer (nullable = true)



## 4. Show the head of the DataFrame 

In [8]:
# pandas 
df.head()

Unnamed: 0,date,county,state,fips,cases,deaths
0,2020-01-21,Snohomish,Washington,53061.0,1,0
1,2020-01-22,Snohomish,Washington,53061.0,1,0
2,2020-01-23,Snohomish,Washington,53061.0,1,0
3,2020-01-24,Cook,Illinois,17031.0,1,0
4,2020-01-24,Snohomish,Washington,53061.0,1,0


In [9]:
# pyspark 
df_s.show(5)

+----------+---------+----------+-------+-----+------+
|      date|   county|     state|   fips|cases|deaths|
+----------+---------+----------+-------+-----+------+
|2020-01-21|Snohomish|Washington|53061.0|    1|     0|
|2020-01-22|Snohomish|Washington|53061.0|    1|     0|
|2020-01-23|Snohomish|Washington|53061.0|    1|     0|
|2020-01-24|     Cook|  Illinois|17031.0|    1|     0|
|2020-01-24|Snohomish|Washington|53061.0|    1|     0|
+----------+---------+----------+-------+-----+------+
only showing top 5 rows



In [57]:
df_s.take(5)

[Row(date='2020-01-21', county='Snohomish', state='Washington', fips=53061.0, cases=1, deaths=0, new_num='4'),
 Row(date='2020-01-22', county='Snohomish', state='Washington', fips=53061.0, cases=1, deaths=0, new_num='4'),
 Row(date='2020-01-23', county='Snohomish', state='Washington', fips=53061.0, cases=1, deaths=0, new_num='4'),
 Row(date='2020-01-24', county='Cook', state='Illinois', fips=17031.0, cases=1, deaths=0, new_num='4'),
 Row(date='2020-01-24', county='Snohomish', state='Washington', fips=53061.0, cases=1, deaths=0, new_num='4')]

In pyspark, take() and show() are different. show() prints results, take() returns a list of rows (in PySpark) and can be used to create a new dataframe. They are both actions.

## 5.	Select Columns from the DataFrame

In [58]:
## Pandas 
df[['state','cases']].head()

Unnamed: 0,state,cases
0,Washington,1
1,Washington,1
2,Washington,1
3,Illinois,1
4,Washington,1


In [59]:
## pyspark 
df_s.select('state','cases').show(5)

+----------+-----+
|     state|cases|
+----------+-----+
|Washington|    1|
|Washington|    1|
|Washington|    1|
|  Illinois|    1|
|Washington|    1|
+----------+-----+
only showing top 5 rows



## 6.	Show the Statistics of the DataFrame

In [60]:
# pandas 
df.describe() # show the stats for all numerical columns 

Unnamed: 0,fips,cases,deaths,new_num
count,30843.0,30843.0,30843.0,30843.0
mean,28817.802192,59.337937,1.151607,62.337937
std,15922.062074,778.74866,20.304794,778.74866
min,-1.0,0.0,0.0,3.0
25%,16069.0,1.0,0.0,4.0
50%,28075.0,4.0,0.0,7.0
75%,42079.0,14.0,0.0,17.0
max,56043.0,57160.0,1867.0,57163.0


In [61]:
df['cases'].describe() # show the stats for a specific column

count    30843.000000
mean        59.337937
std        778.748660
min          0.000000
25%          1.000000
50%          4.000000
75%         14.000000
max      57160.000000
Name: cases, dtype: float64

In [62]:
# pyspark 
df_s.describe().show() #pyspark can show stats for all columns but it may contain missing values 

+-------+----------+---------+-------+------------------+------------------+------------------+------------------+
|summary|      date|   county|  state|              fips|             cases|            deaths|           new_num|
+-------+----------+---------+-------+------------------+------------------+------------------+------------------+
|  count|     30843|    30843|  30843|             30388|             30843|             30843|             30843|
|   mean|      null|     null|   null|29249.306568382257|59.337937295334434|1.1516065233602437|62.337937295334434|
| stddev|      null|     null|   null|15642.441465721475|   778.74866005819|20.304793699175974|   778.74866005819|
|    min|2020-01-21|Abbeville|Alabama|            1001.0|                 0|                 0|                10|
|    max|2020-04-03|     Yuma|Wyoming|           56043.0|             57160|              1867|              9970|
+-------+----------+---------+-------+------------------+------------------+----

In [63]:
df_s.describe('cases').show() # show the stats for a specific column

+-------+------------------+
|summary|             cases|
+-------+------------------+
|  count|             30843|
|   mean|59.337937295334434|
| stddev|   778.74866005819|
|    min|                 0|
|    max|             57160|
+-------+------------------+



## 7. Drop Duplicates 

In [64]:
# pandas 
df.state.drop_duplicates().head()

0        Washington
3          Illinois
5        California
8           Arizona
44    Massachusetts
Name: state, dtype: object

In [66]:
# pyspark 
df_s.select('state').dropDuplicates().show(5)

+-------------+
|        state|
+-------------+
|   Washington|
|     Illinois|
|   California|
|      Arizona|
|Massachusetts|
+-------------+
only showing top 5 rows



## 8. Missing Values 

### 8.1 Check NA

In [21]:
# Pandas 
df.isnull().sum() #check the number of missing value for each column 
# if we want to check the non-missing value we can use notnull() instead 

date        0
county      0
state       0
fips      455
cases       0
deaths      0
dtype: int64

In [22]:
# pyspark 
df_s.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_s.columns]).show()
# if we want to check the non-missing value we can use isNotNull()

+----+------+-----+----+-----+------+
|date|county|state|fips|cases|deaths|
+----+------+-----+----+-----+------+
|   0|     0|    0| 455|    0|     0|
+----+------+-----+----+-----+------+



pyspark.sql.functions.isnan(col): an expression that returns true iff the column is NaN.
<br>
isNull() :True if the current expression is null.

### 8.2 Drop NA

In [23]:
# pandas 
df_valid = df.dropna(subset=['fips','state'], how ='any') #all 
print(df_valid.shape[0])
print(df.shape[0])

30388
30843


In [24]:
# pyspark 
df_s_valid = df_s.dropna(how='any', subset =['fips', 'state']) #all
print(df_s_valid.count())
print(df_s.count())

30388
30843


### 8.3 Replace NA

In [25]:
# pandas 
# Replace Missing values with 0 
print(df.fillna(0).isnull().sum())
# Replace Missing values based on specific columns 
values = {'fips': -1, 'cases': 0, 'deaths': 0}
df.fillna(value=values, inplace = True)

date      0
county    0
state     0
fips      0
cases     0
deaths    0
dtype: int64


In [26]:
# pyspark 
# Replace Missing values with 0 
df_s.na.fill(0).select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_s.columns]).show()
# Replace Missing values based on specific columns 
df_s.fillna({'fips':'0','cases': 0, 'deaths': 0 })

+----+------+-----+----+-----+------+
|date|county|state|fips|cases|deaths|
+----+------+-----+----+-----+------+
|   0|     0|    0|   0|    0|     0|
+----+------+-----+----+-----+------+



DataFrame[date: string, county: string, state: string, fips: float, cases: int, deaths: int]

## 9. Datetime Manipulations

In [29]:
# pandas 
df['date'] = pd.to_datetime(df.date) # change the data type from string to datetime 
# Extract weekday from the datetime column
df.date.dt.weekday.head() # alternatively, we can use year, month, day,dayofweek ,second,hour, quarter instead 

0    1
1    2
2    3
3    4
4    4
Name: date, dtype: int64

In [33]:
# pyspark
# change the data type from string to datetime 
df_spark = df_s.withColumn("record_date",df_s['date'].cast(DateType()))
df_spark.printSchema() #show the current schema 
newdf = df_spark.select(year(df_spark.record_date).alias('dt_year'), month(df_spark.record_date).alias('dt_month'), dayofmonth(df_spark.record_date).alias('dt_day'), dayofyear(df_spark.record_date).alias('dt_dayofy'), hour(df_spark.record_date).alias('dt_hour'), minute(df_spark.record_date).alias('dt_min'), weekofyear(df_spark.record_date).alias('dt_week_no'), unix_timestamp(df_spark.record_date).alias('dt_int'))
newdf.show()

root
 |-- date: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- fips: float (nullable = true)
 |-- cases: integer (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- record_date: date (nullable = true)

+-------+--------+------+---------+-------+------+----------+----------+
|dt_year|dt_month|dt_day|dt_dayofy|dt_hour|dt_min|dt_week_no|    dt_int|
+-------+--------+------+---------+-------+------+----------+----------+
|   2020|       1|    21|       21|      0|     0|         4|1579593600|
|   2020|       1|    22|       22|      0|     0|         4|1579680000|
|   2020|       1|    23|       23|      0|     0|         4|1579766400|
|   2020|       1|    24|       24|      0|     0|         4|1579852800|
|   2020|       1|    24|       24|      0|     0|         4|1579852800|
|   2020|       1|    25|       25|      0|     0|         4|1579939200|
|   2020|       1|    25|       25|      0|     0|         4|1579939200|


You can use functions in pyspark.sql.functions: functions like year, month, etc. refer to here: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

## 10. 	Filter Data Based on Conditions 

In [67]:
# Pandas 
print(df[df.state =='California'].head()) # Filter data based on the state is CA
print(df[df.state =='California'] [['cases','deaths']].head())  # only show specific columns based on the filtering condistions 

         date       county       state    fips  cases  deaths  new_num
5  2020-01-25       Orange  California  6059.0      1       0        4
9  2020-01-26  Los Angeles  California  6037.0      1       0        4
10 2020-01-26       Orange  California  6059.0      1       0        4
14 2020-01-27  Los Angeles  California  6037.0      1       0        4
15 2020-01-27       Orange  California  6059.0      1       0        4
    cases  deaths
5       1       0
9       1       0
10      1       0
14      1       0
15      1       0


In [68]:
# pyspark 
df_s.where(df_s.state == 'California').show(5)
# Alternatively, we can write it in this way
df_s[df_s.state.isin("California")].show(5)
##only show specific columns based on the filtering condistions 
df_s.select('deaths', 'cases').filter(df_s.state == 'California').show(5)

+----------+-----------+----------+------+-----+------+-------+
|      date|     county|     state|  fips|cases|deaths|new_num|
+----------+-----------+----------+------+-----+------+-------+
|2020-01-25|     Orange|California|6059.0|    1|     0|      4|
|2020-01-26|Los Angeles|California|6037.0|    1|     0|      4|
|2020-01-26|     Orange|California|6059.0|    1|     0|      4|
|2020-01-27|Los Angeles|California|6037.0|    1|     0|      4|
|2020-01-27|     Orange|California|6059.0|    1|     0|      4|
+----------+-----------+----------+------+-----+------+-------+
only showing top 5 rows

+----------+-----------+----------+------+-----+------+-------+
|      date|     county|     state|  fips|cases|deaths|new_num|
+----------+-----------+----------+------+-----+------+-------+
|2020-01-25|     Orange|California|6059.0|    1|     0|      4|
|2020-01-26|Los Angeles|California|6037.0|    1|     0|      4|
|2020-01-26|     Orange|California|6059.0|    1|     0|      4|
|2020-01-27|Los

## 11.	Group By with Aggregation Functions

Common aggreagation functions for both pandas and pyspark include: sum(), count(),mean(), min(),max()

In [28]:
# pandas 
df.groupby('state').cases.sum() # the total cases for each state 
# can change to min(),max(), mean()

state
Alabama                      10042
Alaska                        1296
Arizona                      11647
Arkansas                      6527
California                   89145
Colorado                     29895
Connecticut                  26902
Delaware                      3098
District of Columbia          5314
Florida                      62627
Georgia                      38318
Guam                          1047
Hawaii                        2330
Idaho                         4934
Illinois                     57310
Indiana                      19069
Iowa                          4655
Kansas                        4076
Kentucky                      5653
Louisiana                    54795
Maine                         3278
Maryland                     15871
Massachusetts                61632
Michigan                     71954
Minnesota                     6706
Mississippi                   9412
Missouri                     11968
Montana                       1922
Nebraska      

In [29]:
# pyspark
df_s.groupby(df_s.state).agg(F.sum('cases')).show()
#F.mean(), F.max(), F.countDistinct(), F.min(), F.count()

+--------------+----------+
|         state|sum(cases)|
+--------------+----------+
|    Washington|     63368|
|      Illinois|     57310|
|    California|     89145|
|       Arizona|     11647|
| Massachusetts|     61632|
|     Wisconsin|     14576|
|         Texas|     35446|
|      Nebraska|      2633|
|          Utah|      8778|
|        Oregon|      7086|
|       Florida|     62627|
|      New York|    743198|
|  Rhode Island|      4875|
|       Georgia|     38318|
| New Hampshire|      3666|
|North Carolina|     14176|
|    New Jersey|    170274|
|      Colorado|     29895|
|      Maryland|     15871|
|        Nevada|     11214|
+--------------+----------+
only showing top 20 rows



In [38]:
# pandas 
# apply different aggregation functions for different columns 
df.groupby(['state','date']).agg({'cases': 'sum', 'deaths': 'max'}).head()

Unnamed: 0_level_0,Unnamed: 1_level_0,cases,deaths
state,date,Unnamed: 2_level_1,Unnamed: 3_level_1
Alabama,2020-03-13,6,0
Alabama,2020-03-14,12,0
Alabama,2020-03-15,23,0
Alabama,2020-03-16,29,0
Alabama,2020-03-17,39,0


In [39]:
#spark 
# apply different aggregation functions for different columns 
df_s.groupBy(['state','date']).agg({'cases': 'sum', 'deaths': 'max'}).show()

+----------+----------+----------+-----------+
|     state|      date|sum(cases)|max(deaths)|
+----------+----------+----------+-----------+
|Washington|2020-01-21|         1|          0|
|Washington|2020-01-22|         1|          0|
|Washington|2020-01-23|         1|          0|
|  Illinois|2020-01-24|         1|          0|
|Washington|2020-01-24|         1|          0|
|California|2020-01-25|         1|          0|
|  Illinois|2020-01-25|         1|          0|
|Washington|2020-01-25|         1|          0|
|   Arizona|2020-01-26|         1|          0|
|California|2020-01-26|         2|          0|
|  Illinois|2020-01-26|         1|          0|
|Washington|2020-01-26|         1|          0|
|   Arizona|2020-01-27|         1|          0|
|California|2020-01-27|         2|          0|
|  Illinois|2020-01-27|         1|          0|
|Washington|2020-01-27|         1|          0|
|   Arizona|2020-01-28|         1|          0|
|California|2020-01-28|         2|          0|
|  Illinois|2

It's hard to compare the aggregation results, since the pandas dataframe and pyspark dataframe are in different orders. The following shows how can we sort the data frame based on specific columns. 

## 12. Sort Data

In pandas, we use sort_values(), while we use sort() in pyspark to sort the data frame based on specific columns. The default sorting order is ascending.  

In [42]:
# pandas 
df_agg = df.groupby(['state','date']).agg({'deaths': 'sum',
                                 'cases': 'sum'}).reset_index().sort_values(by =['state', 'date'],
                                                                              ascending = True)
df_agg.head(10)

Unnamed: 0,state,date,cases,deaths
0,Alabama,2020-03-13,6,0
1,Alabama,2020-03-14,12,0
2,Alabama,2020-03-15,23,0
3,Alabama,2020-03-16,29,0
4,Alabama,2020-03-17,39,0
5,Alabama,2020-03-18,51,0
6,Alabama,2020-03-19,78,0
7,Alabama,2020-03-20,106,0
8,Alabama,2020-03-21,131,0
9,Alabama,2020-03-22,157,0


In [43]:
# spark
df_s_agg = df_s.groupBy(['state','date']).agg({'deaths': 'sum', 
                                    'cases': 'sum'}).sort(['state','date'], ascending =True)
df_s_agg.show()

+-------+----------+----------+-----------+
|  state|      date|sum(cases)|sum(deaths)|
+-------+----------+----------+-----------+
|Alabama|2020-03-13|         6|          0|
|Alabama|2020-03-14|        12|          0|
|Alabama|2020-03-15|        23|          0|
|Alabama|2020-03-16|        29|          0|
|Alabama|2020-03-17|        39|          0|
|Alabama|2020-03-18|        51|          0|
|Alabama|2020-03-19|        78|          0|
|Alabama|2020-03-20|       106|          0|
|Alabama|2020-03-21|       131|          0|
|Alabama|2020-03-22|       157|          0|
|Alabama|2020-03-23|       196|          0|
|Alabama|2020-03-24|       242|          0|
|Alabama|2020-03-25|       386|          1|
|Alabama|2020-03-26|       538|          3|
|Alabama|2020-03-27|       639|          4|
|Alabama|2020-03-28|       720|          4|
|Alabama|2020-03-29|       830|          5|
|Alabama|2020-03-30|       947|         11|
|Alabama|2020-03-31|       999|         14|
|Alabama|2020-04-01|      1108| 

## 13. Rename Columns 

After the aggregation functions, the names of some columns are not reasonable. We need to rename these column names to avoid confusion. The following shows how can we rename columns in pandas and pyspark dataframe. 

In [69]:
# pandas 
df_agg.rename(columns={"deaths": "total_death", "cases": "total_cases"}, inplace = True)
df_agg.head()

Unnamed: 0,state,date,total_cases,total_death
0,Alabama,2020-03-13,6,0
1,Alabama,2020-03-14,12,0
2,Alabama,2020-03-15,23,0
3,Alabama,2020-03-16,29,0
4,Alabama,2020-03-17,39,0


In [46]:
# pyspark
df_s_agg = df_s_agg.withColumnRenamed("sum(cases)","total_cases").withColumnRenamed("sum(deaths)", "total_death")
df_s_agg.show(5)   

+-------+----------+-----------+-----------+
|  state|      date|total_cases|total_death|
+-------+----------+-----------+-----------+
|Alabama|2020-03-13|          6|          0|
|Alabama|2020-03-14|         12|          0|
|Alabama|2020-03-15|         23|          0|
|Alabama|2020-03-16|         29|          0|
|Alabama|2020-03-17|         39|          0|
+-------+----------+-----------+-----------+
only showing top 5 rows



## 14. Create a New Column 

In [36]:
# pandas
# create a new column by dividing two columns 
df_agg['fata_rate'] = df_agg['total_death']/df_agg['total_cases'] # show the fatality rate 
df_agg.sort_values(by='fata_rate',ascending = False).head(10)

Unnamed: 0,state,date,total_death,total_cases,fata_rate
1641,Washington,2020-03-03,10,32,0.3125
322,Florida,2020-03-06,2,7,0.285714
1640,Washington,2020-03-02,6,23,0.26087
1642,Washington,2020-03-04,11,47,0.234043
1376,South Dakota,2020-03-10,1,5,0.2
587,Kansas,2020-03-12,1,5,0.2
1639,Washington,2020-03-01,3,17,0.176471
588,Kansas,2020-03-13,1,6,0.166667
1168,Northern Mariana Islands,2020-04-01,1,6,0.166667
323,Florida,2020-03-07,2,12,0.166667


In [47]:
#pyspark
# use withColumn to create a new column in pyspark 
df_s_agg = df_s_agg.withColumn('fata_rate', 
                    F.col('total_death')/F.col('total_cases')).sort('fata_rate',ascending =False)
df_s_agg.show(10)

+--------------------+----------+-----------+-----------+-------------------+
|               state|      date|total_cases|total_death|          fata_rate|
+--------------------+----------+-----------+-----------+-------------------+
|          Washington|2020-03-03|         32|         10|             0.3125|
|             Florida|2020-03-06|          7|          2| 0.2857142857142857|
|          Washington|2020-03-02|         23|          6| 0.2608695652173913|
|          Washington|2020-03-04|         47|         11|0.23404255319148937|
|        South Dakota|2020-03-10|          5|          1|                0.2|
|              Kansas|2020-03-12|          5|          1|                0.2|
|          Washington|2020-03-01|         17|          3|0.17647058823529413|
|             Florida|2020-03-07|         12|          2|0.16666666666666666|
|              Kansas|2020-03-13|          6|          1|0.16666666666666666|
|Northern Mariana ...|2020-04-01|          6|          1|0.16666

## 15. Join Tables

In pandas, we can either use merge() or join to join two dataframe. My example shows how to use merge to join two tables. For pyspark, we use join() to join two dataframe. The default join for both data frame is inner join. We can change it to left join, right join or outer join by changing the parameter in how{‘left’, ‘right’, ‘outer’, ‘inner’}. For more details, you can check in https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.merge.html &
http://www.learnbymarketing.com/1100/pyspark-joins-by-example/


In [49]:
# pandas 
# create a new dataframe by calculating the total case for each fip
table_a = df.groupby('fips').cases.sum().reset_index() 
# rename the columns for the new dataframe 
table_a.columns = ['fips', 'total_cases']
# create another dataframe by only extracting some columns from the original data frame 
table_b = df[['county','state','fips']]
# Left Join two tables 
table_b.merge(table_a , on ='fips', how ='left').head()

Unnamed: 0,county,state,fips,total_cases
0,Snohomish,Washington,53061.0,15246
1,Snohomish,Washington,53061.0,15246
2,Snohomish,Washington,53061.0,15246
3,Cook,Illinois,17031.0,41916
4,Snohomish,Washington,53061.0,15246


In [48]:
# pyspark 
TableA =  df_s.groupBy('fips').agg(F.sum('cases').alias('total_cases'))
TableB = df_s.select(['county','state','fips'])
ta = TableA.alias('ta') 
#The alias provides a short name for referencing fields and for referencing the fields after creation of the joined table.
tb = TableB.alias('tb')
tb.join(ta, ta.fips==tb.fips, how ='left' ).show(5) # Could also use 'left_outer'
# right, right_outer, full, default ‘inner’

+---------+----------+-------+-------+-----------+
|   county|     state|   fips|   fips|total_cases|
+---------+----------+-------+-------+-----------+
|Snohomish|Washington|53061.0|53061.0|      15246|
|Snohomish|Washington|53061.0|53061.0|      15246|
|Snohomish|Washington|53061.0|53061.0|      15246|
|     Cook|  Illinois|17031.0|17031.0|      41916|
|Snohomish|Washington|53061.0|53061.0|      15246|
+---------+----------+-------+-------+-----------+
only showing top 5 rows



## 16. User Defined Functions 

User Defined Functions (aka UDF) is a feature of Spark SQL to define new Column-based functions that extend the vocabulary of Spark SQL's DSL for transforming Dataset

In [52]:
# pyspark 
add_number = udf(lambda x: x+3)
df_s = df_s.withColumn("new_num", add_number(df_s.cases))
df_s.show(5)

+----------+---------+----------+-------+-----+------+-------+
|      date|   county|     state|   fips|cases|deaths|new_num|
+----------+---------+----------+-------+-----+------+-------+
|2020-01-21|Snohomish|Washington|53061.0|    1|     0|      4|
|2020-01-22|Snohomish|Washington|53061.0|    1|     0|      4|
|2020-01-23|Snohomish|Washington|53061.0|    1|     0|      4|
|2020-01-24|     Cook|  Illinois|17031.0|    1|     0|      4|
|2020-01-24|Snohomish|Washington|53061.0|    1|     0|      4|
+----------+---------+----------+-------+-----+------+-------+
only showing top 5 rows



In [70]:
# pandas 
df['new_num']= df.cases.apply(lambda x: x+3)
df.head()

Unnamed: 0,date,county,state,fips,cases,deaths,new_num
0,2020-01-21,Snohomish,Washington,53061.0,1,0,4
1,2020-01-22,Snohomish,Washington,53061.0,1,0,4
2,2020-01-23,Snohomish,Washington,53061.0,1,0,4
3,2020-01-24,Cook,Illinois,17031.0,1,0,4
4,2020-01-24,Snohomish,Washington,53061.0,1,0,4


## 17. Window Function to calculate 

Window function can be one of any of the standard aggregate functions (sum, count, max, min, avg) as well as a number of functions that can only be used as analytic functions. Window function can help us to write a query with less complexity. It's mainly composed of four main parts:
1. The over() caluse: This tells the databaseto expect a window function, rather than a standard aggregate function.
2. The partitionBy() clause: This clause tells the database how to break up the data. In other words, it is similar to a Group By in that it tells the database that row with the same values should be treated as a single group or parition.
3. The orderBy() clause: It tells the database how to sort the data within each partition.
4. The rangeBetween() clause: It defines the regions over which the function is calculated. 
      - unboundedPreceding: from the start of the partition 
      - unboundedFollowing: to the end of the partition 
      - 0: current row 

The following exmaple shows how can we caluclate the cumulative cases for each state. Since we want to get the value for each state, so the data should be partitioned by the state. We want to get the cumulative number, so we frist need to sort the data within each state by date in and ascending order. Then,we need to define the window frame from the start of partition to the current row. Finally, we could apply the aggregation function 'sum' over this window function. 

In [70]:
# pyspark 
w = Window \
    .partitionBy('state') \
    .orderBy(asc('date')) \
    .rangeBetween(Window.unboundedPreceding, 0)

df2 = df_s_agg.withColumn('cumulative_cases', 
                      F.sum(df_s_agg.total_cases).over(w))\
                        .withColumn('cumulative_deaths',F.sum(df_s_agg.total_death).over(w))\
                        .select(['state','date','total_cases','cumulative_cases', 'cumulative_deaths'])\
                        .orderBy('state')
df2.show()

+-------+----------+-----------+----------------+-----------------+
|  state|      date|total_cases|cumulative_cases|cumulative_deaths|
+-------+----------+-----------+----------------+-----------------+
|Alabama|2020-03-13|          6|               6|                0|
|Alabama|2020-03-14|         12|              18|                0|
|Alabama|2020-03-15|         23|              41|                0|
|Alabama|2020-03-16|         29|              70|                0|
|Alabama|2020-03-17|         39|             109|                0|
|Alabama|2020-03-18|         51|             160|                0|
|Alabama|2020-03-19|         78|             238|                0|
|Alabama|2020-03-20|        106|             344|                0|
|Alabama|2020-03-21|        131|             475|                0|
|Alabama|2020-03-22|        157|             632|                0|
|Alabama|2020-03-23|        196|             828|                0|
|Alabama|2020-03-24|        242|            1070

In [68]:
## pandas 
df_agg.groupby(['state', 'date']).sum().groupby(level=0).cumsum().reset_index()

Unnamed: 0,state,date,total_death,total_cases,fata_rate
0,Alabama,2020-03-13,0,6,0.000000
1,Alabama,2020-03-14,0,18,0.000000
2,Alabama,2020-03-15,0,41,0.000000
3,Alabama,2020-03-16,0,70,0.000000
4,Alabama,2020-03-17,0,109,0.000000
5,Alabama,2020-03-18,0,160,0.000000
6,Alabama,2020-03-19,0,238,0.000000
7,Alabama,2020-03-20,0,344,0.000000
8,Alabama,2020-03-21,0,475,0.000000
9,Alabama,2020-03-22,0,632,0.000000


In [66]:
df_agg.total_cases.rolling(window =3).mean()

0               NaN
1               NaN
2         13.666667
3         21.333333
4         30.333333
5         39.666667
6         56.000000
7         78.333333
8        105.000000
9        131.333333
10       161.333333
11       198.333333
12       274.666667
13       388.666667
14       521.000000
15       632.333333
16       729.666667
17       832.333333
18       925.333333
19      1018.000000
20      1125.666667
21      1304.333333
22       935.333333
23       512.333333
24         1.000000
25         1.000000
26         1.666667
27         3.333333
28         6.000000
29         9.000000
           ...     
1744    1030.666667
1745    1143.666667
1746    1246.666667
1747    1390.666667
1748    1544.333333
1749    1732.666667
1750    1215.666667
1751     639.333333
1752       1.333333
1753       2.000000
1754       2.666667
1755       5.333333
1756       9.333333
1757      14.000000
1758      16.666667
1759      19.000000
1760      21.333333
1761      24.000000
1762      26.000000


## 18.	Operate SQL Queries within DataFrame 

Spark comes with a sql library that lets us query Dataframe using the SQL syntax. At first, we have to create a temporary view of the DataFrame using createOrReplaceTempView(), which is a temporary SQL table. This will be enable us to use the same SQL syntax as we were using a database like Mysql. 

In [54]:
# pyspark 
df_s.createOrReplaceTempView("log_table")
spark.sql("select count(1) from log_table where state ='California'").show()

+--------+
|count(1)|
+--------+
|    1302|
+--------+



Pandas cannot let us directly write sql queries within DataFrame, but we still can use query() to write some sql like syntax to manipulate the data. 

In [53]:
#pandas 
# query is to query the columns of a frame with a boolean expression.
df.query("state=='California'").count()

date       1302
county     1302
state      1302
fips       1302
cases      1302
deaths     1302
new_num    1302
dtype: int64

## 19. Convert One Type of DataFrame to Another 

### 19.1 Convert pandas to pyspark 

In [55]:
# pandas to pyspark 
df_s_agg_2 = spark.createDataFrame(df_agg)
df_s_agg_2.show(5)

+-------+-------------------+-----------+-----------+
|  state|               date|total_cases|total_death|
+-------+-------------------+-----------+-----------+
|Alabama|2020-03-13 00:00:00|          6|          0|
|Alabama|2020-03-14 00:00:00|         12|          0|
|Alabama|2020-03-15 00:00:00|         23|          0|
|Alabama|2020-03-16 00:00:00|         29|          0|
|Alabama|2020-03-17 00:00:00|         39|          0|
+-------+-------------------+-----------+-----------+
only showing top 5 rows



It is also possible to use Pandas dataframes when using Spark, by calling toPandas() on a Spark dataframe, which returns a pandas object. However, this function should generally be avoided except when working with small dataframes, because it pulls the entire object into memory on a single node.

### 19.2 Convert pyspark to pandas

In [56]:
df_s_agg.toPandas().head()

Unnamed: 0,state,date,total_cases,total_death,fata_rate
0,Washington,2020-03-03,32,10,0.3125
1,Florida,2020-03-06,7,2,0.285714
2,Washington,2020-03-02,23,6,0.26087
3,Washington,2020-03-04,47,11,0.234043
4,South Dakota,2020-03-10,5,1,0.2


## 20.	Export the Data 

In [None]:
# Save the data as csv file 
# pandas 
df_agg.to_csv('pd.csv')
# pyspark 
df_s_agg.write.format('com.databricks.spark.csv').save("pyspark.csv",header = 'true')
# alternatively we can save pyspark to csv in this way if Spark 2.0+ 
df_s_agg.write.csv('pyspark2.csv',header ='true')

You must stop() the active SparkContext before creating a new one. When we are done with our data manipulation, we need to run sc.stop() to end the active SparkContext. 

In [2]:
sc.stop()