#DSE6000 Lab-2 

The purpose of this project is to leverage Spark DataFrame, SparkSQL, and Exploratory Data Analysis $(EDA)$ framework to:

* Performs initial analysis to assess the quality of the data
* Removes dirty data as and when discovered
* Performs Exploratory Data Analysis (EDA) to determine the meaning of the data
* Perform statistical analysis on COVID-19 Cases, Status, & Deaths

Finally, compare Pandas and Pyspark Dataframe in terms of data processing, speed, flexibility, adaptability, advantages, and disadvantages.

# Install Java, Spark, and Findspark

In [None]:
%%bash
apt-get install openjdk-8-jdk-headless -qq > /dev/null
[ ! -e "$(basename spark-3.0.1-bin-hadoop2.7.tgz)" ] && wget  http://apache.osuosl.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz  
tar xf spark-3.0.1-bin-hadoop2.7.tgz
pip install -q findspark

# Set Environment Variables

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

# Start SparkSession

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

# get a spark session. 
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Create the Dataframe

In [None]:
! [ ! -e "$(basename detroit_demolitions_dataset.csv)" ] && wget  https://storage.googleapis.com/files.mobibootcamp.com/2020-datafiles/Covid-19_Tests_by_County_2020-09-17_702630_7.csv
df = spark.read.csv('Covid-19_Tests_by_County_2020-09-17_702630_7.csv',
                      header= True, 
                      inferSchema = True)


print(df.columns)

--2020-11-26 02:09:33--  https://storage.googleapis.com/files.mobibootcamp.com/2020-datafiles/Covid-19_Tests_by_County_2020-09-17_702630_7.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 74.125.26.128, 172.217.193.128, 172.217.204.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|74.125.26.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12780 (12K) [application/octet-stream]
Saving to: ‘Covid-19_Tests_by_County_2020-09-17_702630_7.csv’


2020-11-26 02:09:33 (46.4 MB/s) - ‘Covid-19_Tests_by_County_2020-09-17_702630_7.csv’ saved [12780/12780]

['COUNTY', 'TestType', 'Count', 'RatePerMillion', 'Updated']


## Data Exploration

Performs initial analysis to assess the quality of the data.

Count number of rows and columns

In [None]:
print((df.count(), len(df.columns)))

(258, 5)


Print the Data Types

In [None]:
df.dtypes

[('COUNTY', 'string'),
 ('TestType', 'string'),
 ('Count', 'int'),
 ('RatePerMillion', 'int'),
 ('Updated', 'string')]

Print the $Shcema$

In [None]:
df.printSchema()

root
 |-- COUNTY: string (nullable = true)
 |-- TestType: string (nullable = true)
 |-- Count: integer (nullable = true)
 |-- RatePerMillion: integer (nullable = true)
 |-- Updated: string (nullable = true)



Check columns with $NaN$ values

In [None]:
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------+--------+-----+--------------+-------+
|COUNTY|TestType|Count|RatePerMillion|Updated|
+------+--------+-----+--------------+-------+
|     0|       0|    0|             6|      0|
+------+--------+-----+--------------+-------+



Print the rows with $NULL$ values

In [None]:
df.registerTempTable("mytable") 
df_Null = spark.sql("""SELECT * FROM mytable where RatePerMillion IS NULL""")
print("Rows with null values")
df_Null.show()

Rows with null values
+------------+----------+------+--------------+-------------------+
|      COUNTY|  TestType| Count|RatePerMillion|            Updated|
+------------+----------+------+--------------+-------------------+
|Correctional|  Serology| 10990|          null|2020/09/17 13:31:36|
|Correctional|Diagnostic| 83705|          null|2020/09/17 13:31:36|
|Correctional|     Total| 94695|          null|2020/09/17 13:31:36|
|     Unknown|  Serology| 19839|          null|2020/09/17 13:31:36|
|     Unknown|Diagnostic|223551|          null|2020/09/17 13:31:36|
|     Unknown|     Total|243390|          null|2020/09/17 13:31:36|
+------------+----------+------+--------------+-------------------+



# Data Cleaning

Drop the $Updated$ column with the static timestamp, because it will not add any values to the analysis.

In [None]:
df.select('Updated').distinct().rdd.map(lambda r: r[0]).collect()

['2020/09/17 13:31:36']

In [None]:
column_to_drop = ['Updated']
df = df.drop(*column_to_drop)
df.show(3)

+------+----------+-----+--------------+
|COUNTY|  TestType|Count|RatePerMillion|
+------+----------+-----+--------------+
|Alcona|Diagnostic| 1974|        189716|
|Alcona|  Serology|   51|          4901|
|Alcona|     Total| 2025|        194618|
+------+----------+-----+--------------+
only showing top 3 rows



##Handling $missing$ values. 

Since the reasons for the $RatePerMillion$ column's missing values are not specified, and Michigan does not have any $County$ with such a name. Therefore, it is safe to drop these six rows with NaN values.

In [None]:
df = df.filter(df.RatePerMillion. isNotNull())

Number of rows after dropping the NULL values

In [None]:
print("Number of rows & columns after dropping NaN values from the RatePerMillion column:")
print((df.count(), len(df.columns)))

Number of rows & columns after dropping NaN values from the RatePerMillion column:
(252, 4)


# Data Transformation


Rename column's name to $camelcase$ for consistency

In [None]:
df = df.toDF('County', 'TestType', 'Count', 'RatePerMillion')
df.show(3)

+------+----------+-----+--------------+
|County|  TestType|Count|RatePerMillion|
+------+----------+-----+--------------+
|Alcona|Diagnostic| 1974|        189716|
|Alcona|  Serology|   51|          4901|
|Alcona|     Total| 2025|        194618|
+------+----------+-----+--------------+
only showing top 3 rows



Format & replace some of the county's name in the County column.

In [None]:
from pyspark.sql.functions import *
df = df.withColumn('County', regexp_replace('County', 'St', 'Saint'))


In [None]:
from pyspark.sql.functions import col
df.where(col("County").isin({"Saint Joseph", "Saint Clair"})).show()

+------------+----------+-----+--------------+
|      County|  TestType|Count|RatePerMillion|
+------------+----------+-----+--------------+
| Saint Clair|  Serology| 4024|         25288|
| Saint Clair|Diagnostic|31351|        197017|
| Saint Clair|     Total|35375|        222305|
|Saint Joseph|  Serology|  594|          9743|
|Saint Joseph|Diagnostic|16586|        272062|
|Saint Joseph|     Total|17180|        281806|
+------------+----------+-----+--------------+



Add a new column $State$ with static value = $MI$ to use it with the county's FIPS code for visualizing data with the map  

In [None]:
from pyspark.sql.functions import lit
df = df.withColumn("State", lit('MI'))
df.show(3)

+------+----------+-----+--------------+-----+
|County|  TestType|Count|RatePerMillion|State|
+------+----------+-----+--------------+-----+
|Alcona|Diagnostic| 1974|        189716|   MI|
|Alcona|  Serology|   51|          4901|   MI|
|Alcona|     Total| 2025|        194618|   MI|
+------+----------+-----+--------------+-----+
only showing top 3 rows



Create a data frame with a subset of top 10 $Diagnostic$ counts and sort them in ascending order ($Usage:$ To visualize top 10 counties with Diagnostic counts)

In [None]:
top_ten_Diagnostic = df.where((col("Count") > 74000) & (col("TestType") == "Diagnostic"))
top_ten_Diagnostic= top_ten_Diagnostic.orderBy(top_ten_Diagnostic.Count.desc())
top_ten_Diagnostic.show(20)

+------------+----------+------+--------------+-----+
|      County|  TestType| Count|RatePerMillion|State|
+------------+----------+------+--------------+-----+
|       Wayne|Diagnostic|375403|        340097|   MI|
|     Oakland|Diagnostic|351635|        279612|   MI|
|Detroit City|Diagnostic|259362|        399139|   MI|
|        Kent|Diagnostic|231809|        352854|   MI|
|      Macomb|Diagnostic|222400|        254470|   MI|
|   Washtenaw|Diagnostic|128943|        350769|   MI|
|      Ingham|Diagnostic|100272|        342920|   MI|
|      Ottawa|Diagnostic| 94172|        322695|   MI|
|     Genesee|Diagnostic| 89810|        221309|   MI|
|   Kalamazoo|Diagnostic| 74258|        280149|   MI|
+------------+----------+------+--------------+-----+



##Create & Transform $Supplimentary$ Dataframe 

In [None]:
! [ ! -e "$(basename detroit_demolitions_dataset.csv)" ] && wget  https://storage.googleapis.com/files.mobibootcamp.com/2020-datafiles/Cases_and_Deaths_by_County_2020-09-17_702626_7.csv
df_suppliment_data = spark.read.csv('Cases_and_Deaths_by_County_2020-09-17_702626_7.csv',
                      header= True, 
                      inferSchema = True)
column_to_drop = ['Updated']
df_suppliment_data = df_suppliment_data.drop(*column_to_drop)
df_suppliment_data = df_suppliment_data.toDF('County', 'CaseStatus', 'Cases', 'Deaths')
df_suppliment_data.printSchema()

--2020-11-26 02:09:53--  https://storage.googleapis.com/files.mobibootcamp.com/2020-datafiles/Cases_and_Deaths_by_County_2020-09-17_702626_7.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 172.217.204.128, 172.217.203.128, 172.253.123.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|172.217.204.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7840 (7.7K) [application/octet-stream]
Saving to: ‘Cases_and_Deaths_by_County_2020-09-17_702626_7.csv’


2020-11-26 02:09:53 (81.2 MB/s) - ‘Cases_and_Deaths_by_County_2020-09-17_702626_7.csv’ saved [7840/7840]

root
 |-- County: string (nullable = true)
 |-- CaseStatus: string (nullable = true)
 |-- Cases: integer (nullable = true)
 |-- Deaths: integer (nullable = true)



Use $where$ & $order$ by clause to filter and join actual Dataframe with the supplementary Dataframe by $County$ column.

(Usage: To visualize the correlation between numerical columns using the Heat Map & Linear Regression Model)

In [None]:
df.registerTempTable("DiagnosticTable")
df_suppliment_data.registerTempTable("CaseStatusTable")
df_Join =   spark.sql("""SELECT d.County, 
    d.TestType, 
    d.Count, 
    d.RatePerMillion, 
    c.CaseStatus,
    c.Cases,
    c.Deaths 
    from DiagnosticTable d LEFT JOIN CaseStatusTable c 
    ON d.County = c.County
    where d.TestType = "Diagnostic" and c.CaseStatus = "Confirmed" and c.Deaths != 0
    order by Count desc
""")
df_Join.show(5)

+------------+----------+------+--------------+----------+-----+------+
|      County|  TestType| Count|RatePerMillion|CaseStatus|Cases|Deaths|
+------------+----------+------+--------------+----------+-----+------+
|       Wayne|Diagnostic|375403|        340097| Confirmed|17568|  1265|
|     Oakland|Diagnostic|351635|        279612| Confirmed|15863|  1141|
|Detroit City|Diagnostic|259362|        399139| Confirmed|14131|  1517|
|        Kent|Diagnostic|231809|        352854| Confirmed| 8472|   170|
|      Macomb|Diagnostic|222400|        254470| Confirmed|13368|   959|
+------------+----------+------+--------------+----------+-----+------+
only showing top 5 rows



# Statistics on COVID-19 Cases, Status, & Deaths Rate

Top 10 Counties with Confirm Cases & Deaths Counts

In [None]:
df_Join.registerTempTable("df_Join")
top_counties = spark.sql("""
    SELECT County, Cases, Deaths  
	FROM df_Join 
  where County != "Detroit City"
  order by Deaths desc limit 10
""")
top_counties.show()

+---------+-----+------+
|   County|Cases|Deaths|
+---------+-----+------+
|    Wayne|17568|  1265|
|  Oakland|15863|  1141|
|   Macomb|13368|   959|
|  Genesee| 3441|   281|
|     Kent| 8472|   170|
|  Saginaw| 2653|   134|
|Washtenaw| 2916|   115|
|Kalamazoo| 2108|    89|
|  Berrien| 1583|    72|
| Muskegon| 1374|    69|
+---------+-----+------+



Using mean function to calculate mean death rate

In [None]:
from pyspark.sql.functions import mean
df_suppliment_data_filter = df_suppliment_data.where(col("Deaths") > 0)
df_suppliment_data_filter.select(mean("Deaths").alias("Mean Death Rate")).show()

+-----------------+
|  Mean Death Rate|
+-----------------+
|73.21052631578948|
+-----------------+



Using $Sum()$ and $partitionBy()$ function to calculate the Deaths Percentage

In [None]:
import pyspark.sql.functions as f
from pyspark.sql.window import Window
df_percent = df_Join.withColumn('DeathsPercent', f.col('Deaths')/f.sum('Deaths').over(Window.partitionBy()))
col_to_drop = ['TestType', 'RatePerMillion', 'Count', 'CaseStatus']
df_percent = df_percent.drop(*col_to_drop)
df_percent.orderBy('Deaths','DeathsPercent', ascending=False).show(10)

+------------+-----+------+--------------------+
|      County|Cases|Deaths|       DeathsPercent|
+------------+-----+------+--------------------+
|Detroit City|14131|  1517| 0.23367221195317314|
|       Wayne|17568|  1265| 0.19485520640788662|
|     Oakland|15863|  1141| 0.17575477510782503|
|      Macomb|13368|   959| 0.14772027110289587|
|     Genesee| 3441|   281| 0.04328404189772027|
|        Kent| 8472|   170| 0.02618607516943931|
|     Saginaw| 2653|   134|0.020640788662969808|
|   Washtenaw| 2916|   115| 0.01771410967344424|
|   Kalamazoo| 2108|    89|0.013709180529882932|
|     Berrien| 1583|    72|0.011090573012939002|
+------------+-----+------+--------------------+
only showing top 10 rows



Find the $Max$ and $Min$ count of confirmed cases


In [None]:
from pyspark.sql.functions import max,min
df_suppliment_data_filter.select(max("Cases").alias("MaxCasesCount"), min("Cases").alias("MinCasesCount")).show()

+-------------+-------------+
|MaxCasesCount|MinCasesCount|
+-------------+-------------+
|        17568|            4|
+-------------+-------------+



Group by aggregated sum for $TestType$ categorical column

In [None]:
df.groupBy("TestType").agg(sum("Count").alias("TestTypeTotal")).show()

+----------+-------------+
|  TestType|TestTypeTotal|
+----------+-------------+
|     Total|      3228116|
|  Serology|       249151|
|Diagnostic|      2978965|
+----------+-------------+



#Advantages and Disadvantages of PySpark and Pandas: 



Pandas data frames are in-memory, single-server, High-performance, easy-to-use data structures, which also limits its processing power to a single server. While, PySpark is a Python API for Spark, distributed on the spark clusters, leveraged for big data processing.
##Advantages/Disadvantages of Pandas & Pyspark
* Pandas DataFrames processing is faster, while PySpark DataFrames are lazy.
* PySpark DataFrame runs parallel on different nodes in cluster but, pandas DataFrames are in-memory, single-server based.
* PySpark DataFrames are immutable, but in pandas, this is not the case.
* Pandas API supports more operations than PySpark DataFrame.
* Pandas are more flexible to perform complex tasks compare to PySpark DataFrame.

Finally, Pandas have powerful and flexible data analysis/manipulation libraries for Python. While PySpark is a Python API for Spark that harnesses the power of Python and Apache Spark to process Big Data, but to date Pandas API is more powerful than Pyspark.