First, I am going to import all of the libraries and software needed to perform this lab

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
%%bash
apt-get install openjdk-8-jdk-headless -qq > /dev/null
[ ! -e "$(basename spark-3.0.1-bin-hadoop2.7.tgz)" ] && wget  http://apache.osuosl.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz  
tar xf spark-3.0.1-bin-hadoop2.7.tgz
pip install -q findspark

--2020-12-07 02:40:01--  http://apache.osuosl.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
Resolving apache.osuosl.org (apache.osuosl.org)... 64.50.236.52, 140.211.166.134, 64.50.233.100, ...
Connecting to apache.osuosl.org (apache.osuosl.org)|64.50.236.52|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 219929956 (210M) [application/x-gzip]
Saving to: ‘spark-3.0.1-bin-hadoop2.7.tgz’

     0K .......... .......... .......... .......... ..........  0%  154K 23m15s
    50K .......... .......... .......... .......... ..........  0%  308K 17m26s
   100K .......... .......... .......... .......... ..........  0%  308K 15m30s
   150K .......... .......... .......... .......... ..........  0%  218M 11m37s
   200K .......... .......... .......... .......... ..........  0%  308K 11m37s
   250K .......... .......... .......... .......... ..........  0%  263M 9m41s
   300K .......... .......... .......... .......... ..........  0%  276M 8m18s
   350K .......... ....

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

Next, I am going to import the dataset that I will be working on for this lab, 'Covid-19_Tests_by_County_2020-09-17_702630_7.' This data set can be found at https://storage.googleapis.com/files.mobibootcamp.com/2020-datafiles/Covid-19_Tests_by_County_2020-09-17_702630_7.csv

In [None]:
! [ ! -e "$(basename Covid-19_Tests_by_County_2020-09-17_702630_7)" ] && wget  https://storage.googleapis.com/files.mobibootcamp.com/2020-datafiles/Covid-19_Tests_by_County_2020-09-17_702630_7.csv
df = spark.read.csv('Covid-19_Tests_by_County_2020-09-17_702630_7.csv',
                      header= True, 
                      inferSchema = True)


print(df.columns)

--2020-12-07 02:40:39--  https://storage.googleapis.com/files.mobibootcamp.com/2020-datafiles/Covid-19_Tests_by_County_2020-09-17_702630_7.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 74.125.23.128, 74.125.204.128, 64.233.189.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|74.125.23.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12780 (12K) [application/octet-stream]
Saving to: ‘Covid-19_Tests_by_County_2020-09-17_702630_7.csv’


2020-12-07 02:40:39 (95.0 MB/s) - ‘Covid-19_Tests_by_County_2020-09-17_702630_7.csv’ saved [12780/12780]

['COUNTY', 'TestType', 'Count', 'RatePerMillion', 'Updated']


Next, I am going to do a quick data check of the columns to make sure that they were pulled through properly. I will also use this to see what type of data is being stored in each column

In [None]:
df.dtypes

[('COUNTY', 'string'),
 ('TestType', 'string'),
 ('Count', 'int'),
 ('RatePerMillion', 'int'),
 ('Updated', 'string')]

Next, I want to rename some of these columns to make them more uniform. To do this, I am separating words using a "_" symbol, and I am also putting everything in lower case

In [None]:
df = df.toDF('county', 'test_type','count','rate_per_million','updated')
df.show()

+-------+----------+-----+----------------+-------------------+
| county| test_type|count|rate_per_million|            updated|
+-------+----------+-----+----------------+-------------------+
| Alcona|Diagnostic| 1974|          189716|2020/09/17 13:31:36|
| Alcona|  Serology|   51|            4901|2020/09/17 13:31:36|
| Alcona|     Total| 2025|          194618|2020/09/17 13:31:36|
|  Alger|  Serology|   66|            7246|2020/09/17 13:31:36|
|  Alger|Diagnostic| 1156|          126921|2020/09/17 13:31:36|
|  Alger|     Total| 1222|          134168|2020/09/17 13:31:36|
|Allegan|Diagnostic|29764|          252064|2020/09/17 13:31:36|
|Allegan|  Serology|  927|            7851|2020/09/17 13:31:36|
|Allegan|     Total|30691|          259915|2020/09/17 13:31:36|
| Alpena|  Serology|  157|            5527|2020/09/17 13:31:36|
| Alpena|Diagnostic| 2701|           95089|2020/09/17 13:31:36|
| Alpena|     Total| 2858|          100616|2020/09/17 13:31:36|
| Antrim|Diagnostic| 5075|          2175

Now, I want to see how many rows of data I have to work with in this data set

In [None]:
df.count()

258

Next, I want to do a quick check to see if there are any null variables in my data set.

In [None]:
df.groupBy('test_type').count().show()

+----------+-----+
| test_type|count|
+----------+-----+
|     Total|   86|
|  Serology|   86|
|Diagnostic|   86|
+----------+-----+



As you can see above, I have a uniform set of data for each of my test types (84). If I add up all of these test types, I get 252 records, which tells me that there are 6 rows that do not have any data recorded in them. SO what I am going to do, since this data discrepancy is uniform, I'm just going to drop the null values from the data set entirely

In [None]:
df = df.dropna()
df.count()

252

In [None]:
df.show()

+-------+----------+-----+----------------+-------------------+
| county| test_type|count|rate_per_million|            updated|
+-------+----------+-----+----------------+-------------------+
| Alcona|Diagnostic| 1974|          189716|2020/09/17 13:31:36|
| Alcona|  Serology|   51|            4901|2020/09/17 13:31:36|
| Alcona|     Total| 2025|          194618|2020/09/17 13:31:36|
|  Alger|  Serology|   66|            7246|2020/09/17 13:31:36|
|  Alger|Diagnostic| 1156|          126921|2020/09/17 13:31:36|
|  Alger|     Total| 1222|          134168|2020/09/17 13:31:36|
|Allegan|Diagnostic|29764|          252064|2020/09/17 13:31:36|
|Allegan|  Serology|  927|            7851|2020/09/17 13:31:36|
|Allegan|     Total|30691|          259915|2020/09/17 13:31:36|
| Alpena|  Serology|  157|            5527|2020/09/17 13:31:36|
| Alpena|Diagnostic| 2701|           95089|2020/09/17 13:31:36|
| Alpena|     Total| 2858|          100616|2020/09/17 13:31:36|
| Antrim|Diagnostic| 5075|          2175

Next what I want to do is create three separate data sets (one each test type I have) so I can run descriptive analysis on each. So, one data set will only have Diagnostic Testing data, one will have Serology Testing data, and the other will have the total testing data of each. 

In [None]:
DiagnosticDF = df.filter(df["test_type"] == "Diagnostic")
SerologyDF = df.filter(df["test_type"] == "Serology")
TotalDF = df.filter(df["test_type"] == "Total")

So next I want to run some statistics on each data set to see what the average, minimum, and maximum values are.

In [None]:
result = DiagnosticDF.select(["count",'rate_per_million']).describe()
result.show()

+-------+------------------+------------------+
|summary|             count|  rate_per_million|
+-------+------------------+------------------+
|  count|                84|                84|
|   mean|35463.869047619046|258052.44047619047|
| stddev|  70497.5165651253| 86840.15102423576|
|    min|               475|             95089|
|    max|            375403|            508856|
+-------+------------------+------------------+



In [None]:
result = SerologyDF.select(["count",'rate_per_million']).describe()
result.show()

+-------+------------------+-----------------+
|summary|             count| rate_per_million|
+-------+------------------+-----------------+
|  count|                84|               84|
|   mean|2966.0833333333335|12141.97619047619|
| stddev| 8807.370147818401|9501.034524491048|
|    min|                15|             1536|
|    max|             61704|            49066|
+-------+------------------+-----------------+



In [None]:
result = TotalDF.select(["count",'rate_per_million']).describe()
result.show()

+-------+-----------------+------------------+
|summary|            count|  rate_per_million|
+-------+-----------------+------------------+
|  count|               84|                84|
|   mean|38429.95238095238|270194.40476190473|
| stddev|78633.93458005185| 89292.34247610674|
|    min|              490|            100616|
|    max|           416039|            510392|
+-------+-----------------+------------------+



So next thing I want to do is run some SQL queries on my data set. To do this, I am going to create a temporary Spark SQL session within my code which will allow me to use SQL queries on my data.


In [None]:
DiagnosticDF.registerTempTable('mytable') 
GreaterThanMean = spark.sql(""" 
    SELECT * 
    FROM mytable 
    WHERE count > 35464
    ORDER BY count ASC """)
GreaterThanMean.show(GreaterThanMean.count())
GreaterThanMean.count()

+------------+----------+------+----------------+-------------------+
|      county| test_type| count|rate_per_million|            updated|
+------------+----------+------+----------------+-------------------+
|     Calhoun|Diagnostic| 48676|          362823|2020/09/17 13:31:36|
|  Livingston|Diagnostic| 51542|          268455|2020/09/17 13:31:36|
|     Jackson|Diagnostic| 57861|          365031|2020/09/17 13:31:36|
|    Muskegon|Diagnostic| 60143|          346514|2020/09/17 13:31:36|
|     Berrien|Diagnostic| 62734|          408954|2020/09/17 13:31:36|
|     Saginaw|Diagnostic| 72735|          381733|2020/09/17 13:31:36|
|   Kalamazoo|Diagnostic| 74258|          280149|2020/09/17 13:31:36|
|     Genesee|Diagnostic| 89810|          221309|2020/09/17 13:31:36|
|      Ottawa|Diagnostic| 94172|          322695|2020/09/17 13:31:36|
|      Ingham|Diagnostic|100272|          342920|2020/09/17 13:31:36|
|   Washtenaw|Diagnostic|128943|          350769|2020/09/17 13:31:36|
|      Macomb|Diagno

16

In [None]:
SerologyDF.registerTempTable('mytable') 
GreaterThanMean = spark.sql(""" 
    SELECT * 
    FROM mytable 
    WHERE count > 2966
    ORDER BY count ASC """)
GreaterThanMean.show(GreaterThanMean.count())
GreaterThanMean.count()

+------------+---------+-----+----------------+-------------------+
|      county|test_type|count|rate_per_million|            updated|
+------------+---------+-----+----------------+-------------------+
|       Eaton| Serology| 3806|           34516|2020/09/17 13:31:36|
|    St Clair| Serology| 4024|           25288|2020/09/17 13:31:36|
|  Livingston| Serology| 4409|           22964|2020/09/17 13:31:36|
|      Ottawa| Serology| 4556|           15612|2020/09/17 13:31:36|
|   Washtenaw| Serology|10064|           27378|2020/09/17 13:31:36|
|     Genesee| Serology|10648|           26239|2020/09/17 13:31:36|
|      Ingham| Serology|12920|           44185|2020/09/17 13:31:36|
|Detroit City| Serology|13975|           21506|2020/09/17 13:31:36|
|        Kent| Serology|15090|           22970|2020/09/17 13:31:36|
|      Macomb| Serology|28130|           32186|2020/09/17 13:31:36|
|       Wayne| Serology|40636|           36814|2020/09/17 13:31:36|
|     Oakland| Serology|61704|           49066|2

12

In [None]:
TotalDF.registerTempTable('mytable') 
GreaterThanMean = spark.sql(""" 
    SELECT * 
    FROM mytable 
    WHERE count > 38430
    ORDER BY count ASC """)
GreaterThanMean.show(GreaterThanMean.count())
GreaterThanMean.count()

+------------+---------+------+----------------+-------------------+
|      county|test_type| count|rate_per_million|            updated|
+------------+---------+------+----------------+-------------------+
|       Eaton|    Total| 39108|          354663|2020/09/17 13:31:36|
|     Calhoun|    Total| 49420|          368369|2020/09/17 13:31:36|
|  Livingston|    Total| 55951|          291419|2020/09/17 13:31:36|
|     Jackson|    Total| 59243|          373749|2020/09/17 13:31:36|
|    Muskegon|    Total| 61445|          354015|2020/09/17 13:31:36|
|     Berrien|    Total| 64716|          421875|2020/09/17 13:31:36|
|     Saginaw|    Total| 74643|          391747|2020/09/17 13:31:36|
|   Kalamazoo|    Total| 77040|          290645|2020/09/17 13:31:36|
|      Ottawa|    Total| 98728|          338307|2020/09/17 13:31:36|
|     Genesee|    Total|100458|          247548|2020/09/17 13:31:36|
|      Ingham|    Total|113192|          387106|2020/09/17 13:31:36|
|   Washtenaw|    Total|139007|   

17

With these temporary data windows, I can run any SQL query I want. So, for example, if I am contemplating whether to visit family up in Traverse City (located in Grand Traverse County) during the holidays, I might be interested in looking at testing results for that county

In [None]:
df.registerTempTable('mytable') 
CountyLookUp = spark.sql(""" 
    SELECT * 
    FROM mytable 
    WHERE county = 'Grand Traverse'
    """)
CountyLookUp.show()

+--------------+----------+-----+----------------+-------------------+
|        county| test_type|count|rate_per_million|            updated|
+--------------+----------+-----+----------------+-------------------+
|Grand Traverse|  Serology|  974|           10463|2020/09/17 13:31:36|
|Grand Traverse|Diagnostic|21935|          235637|2020/09/17 13:31:36|
|Grand Traverse|     Total|22909|          246100|2020/09/17 13:31:36|
+--------------+----------+-----+----------------+-------------------+



If I wanted to convert the pyspark dataframe to Pandas, here is how to do it

In [None]:
pandasDF = df.toPandas()
print(pandasDF)

      county   test_type   count  rate_per_million              updated
0     Alcona  Diagnostic    1974            189716  2020/09/17 13:31:36
1     Alcona    Serology      51              4901  2020/09/17 13:31:36
2     Alcona       Total    2025            194618  2020/09/17 13:31:36
3      Alger    Serology      66              7246  2020/09/17 13:31:36
4      Alger  Diagnostic    1156            126921  2020/09/17 13:31:36
..       ...         ...     ...               ...                  ...
247    Wayne    Serology   40636             36814  2020/09/17 13:31:36
248    Wayne       Total  416039            376911  2020/09/17 13:31:36
249  Wexford  Diagnostic    7712            229312  2020/09/17 13:31:36
250  Wexford    Serology     230              6839  2020/09/17 13:31:36
251  Wexford       Total    7942            236151  2020/09/17 13:31:36

[252 rows x 5 columns]


From here, I can run all of the Python code I want:

In [None]:
pandasDF.info()
pandasDF.describe()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 252 entries, 0 to 251
Data columns (total 5 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   county            252 non-null    object
 1   test_type         252 non-null    object
 2   count             252 non-null    int32 
 3   rate_per_million  252 non-null    int32 
 4   updated           252 non-null    object
dtypes: int32(2), object(3)
memory usage: 8.0+ KB


Unnamed: 0,count,rate_per_million
count,252.0,252.0
mean,25619.968254,180129.607143
std,63030.569661,139107.491391
min,15.0,1536.0
25%,1130.25,15437.75
50%,4803.0,194775.5
75%,17328.25,281410.75
max,416039.0,510392.0


In [None]:
Counties_By_Region = {'Midland':'Southeast' , 'Bay':'Southeast' , 'Saginaw':'Southeast' , 'Detroit City':'Southeast' , 'Shiawassee':'Southeast' , 'Ingham':'Southeast' , 'Jackson':'Southeast' , 'Hillsdale':'Southeast' , 'Lenawee':'Southeast' , 'Monroe':'Southeast' , 'Wayne':'Southeast' , 'Macomb':'Southeast' , 'St Clair':'Southeast' , 'Sanilac':'Southeast' , 'Huron':'Southeast' , 'Tuscola':'Southeast' , 'Genesee':'Southeast' , 'Livingston':'Southeast' , 'Washtenaw':'Southeast' , 'Oakland':'Southeast' , 'Lapeer':'Southeast' , 'Isabella':'Southwest' , 'Mecosta':'Southwest' , 'Newaygo':'Southwest' , 'Oceana':'Southwest' , 'Muskegon':'Southwest' , 'Ottawa':'Southwest' , 'Allegan':'Southwest' , 'Van Buren':'Southwest' , 'Berrien':'Southwest' , 'Cass':'Southwest' , 'St Joseph':'Southwest' , 'Branch':'Southwest' , 'Calhoun':'Southwest' , 'Eaton':'Southwest' , 'Clinton':'Southwest' , 'Gratiot':'Southwest' ,'Montcalm':'Southwest' , 'Kent':'Southwest' , 'Ionia':'Southwest' , 'Barry':'Southwest' , 'Kalamazoo':'Southwest' , 'Arenac':'Northeast' , 'Gladwin':'Northeast' , 'Roscommon':'Northeast' , 'Crawford':'Northeast' , 'Otsego':'Northeast' , 'Cheboygan':'Northeast' , 'Presque Isle':'Northeast' , 'Alpena':'Northeast' , 'Alcona':'Northeast' , 'Iosco':'Northeast' , 'Ogemaw':'Northeast' , 'Oscoda':'Northeast' , 'Montmorency':'Northeast' , 'Clare':'Northwest' ,  'Osceola':'Northwest' , 'Lake':'Northwest' , 'Mason':'Northwest' , 'Manistee':'Northwest' , 'Benzie':'Northwest' , 'Leelanau':'Northwest' , 'Emmet':'Northwest' , 'Charlevoix':'Northwest' , 'Antrim':'Northwest' , 'Kalkaska':'Northwest' , 'Missaukee':'Northwest' , 'Wexford':'Northwest' , 'Grand Traverse':'Northwest' , 'Chippewa':'Upper Peninsula' ,  'Mackinac':'Upper Peninsula' , 'Luce':'Upper Peninsula' , 'Schoolcraft':'Upper Peninsula' , 'Alger':'Upper Peninsula' , 'Delta':'Upper Peninsula' , 'Menominee':'Upper Peninsula' , 'Dickinson':'Upper Peninsula' , 'Marquette':'Upper Peninsula' , 'Iron':'Upper Peninsula' , 'Baraga':'Upper Peninsula' , 'Houghton':'Upper Peninsula' , 'Keweenaw':'Upper Peninsula' , 'Ontonagon':'Upper Peninsula' , 'Gogebic':'Upper Peninsula'}
pandasDF["region"] = pandasDF['county'].map(Counties_By_Region)
pandasDF

Unnamed: 0,county,test_type,count,rate_per_million,updated,region
0,Alcona,Diagnostic,1974,189716,2020/09/17 13:31:36,Northeast
1,Alcona,Serology,51,4901,2020/09/17 13:31:36,Northeast
2,Alcona,Total,2025,194618,2020/09/17 13:31:36,Northeast
3,Alger,Serology,66,7246,2020/09/17 13:31:36,Upper Peninsula
4,Alger,Diagnostic,1156,126921,2020/09/17 13:31:36,Upper Peninsula
...,...,...,...,...,...,...
247,Wayne,Serology,40636,36814,2020/09/17 13:31:36,Southeast
248,Wayne,Total,416039,376911,2020/09/17 13:31:36,Southeast
249,Wexford,Diagnostic,7712,229312,2020/09/17 13:31:36,Northwest
250,Wexford,Serology,230,6839,2020/09/17 13:31:36,Northwest


From here, I can continue to use pandas to create data visualizations that we did in Lab 1 if I choose, or I can convert the pandas table back to Spark. So what I did here to demonstrate this is I created a brand new column in my data set using a Python dictionary that groups counties by region. Once I have completed the proper formatting, I can then pass this pandas data set back through as a Spark data set and use it there.

In [None]:
pandasDF.dropna()
pandasDF.info()
# it is really important to make sure that the number of rows is the same for 
# every column before the conversion, otherwise the code below will not work properly

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 252 entries, 0 to 251
Data columns (total 6 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   county            252 non-null    object
 1   test_type         252 non-null    object
 2   count             252 non-null    int32 
 3   rate_per_million  252 non-null    int32 
 4   updated           252 non-null    object
 5   region            252 non-null    object
dtypes: int32(2), object(4)
memory usage: 10.0+ KB


In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled" , "true")
pdf = pd.DataFrame(pandasDF)
sparkpdf = spark.createDataFrame(pdf)
# spark will automatically infer the schema 

  PyArrow >= 0.15.1 must be installed; however, your version was 0.14.1.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


In [None]:
sparkpdf.show()

+-------+----------+-----+----------------+-------------------+---------------+
| county| test_type|count|rate_per_million|            updated|         region|
+-------+----------+-----+----------------+-------------------+---------------+
| Alcona|Diagnostic| 1974|          189716|2020/09/17 13:31:36|      Northeast|
| Alcona|  Serology|   51|            4901|2020/09/17 13:31:36|      Northeast|
| Alcona|     Total| 2025|          194618|2020/09/17 13:31:36|      Northeast|
|  Alger|  Serology|   66|            7246|2020/09/17 13:31:36|Upper Peninsula|
|  Alger|Diagnostic| 1156|          126921|2020/09/17 13:31:36|Upper Peninsula|
|  Alger|     Total| 1222|          134168|2020/09/17 13:31:36|Upper Peninsula|
|Allegan|Diagnostic|29764|          252064|2020/09/17 13:31:36|      Southwest|
|Allegan|  Serology|  927|            7851|2020/09/17 13:31:36|      Southwest|
|Allegan|     Total|30691|          259915|2020/09/17 13:31:36|      Southwest|
| Alpena|  Serology|  157|            55

So from here, I can run SQL queries using the additional 'region' column like so:

In [None]:
sparkpdf.registerTempTable('mytable') 
RegionLookUp = spark.sql(""" 
    SELECT * 
    FROM mytable 
    WHERE region = 'Southwest' AND test_type = 'Total'
    ORDER BY count DESC
    """)
RegionLookUp.show(RegionLookUp.count())
RegionLookUp.count()

+---------+---------+------+----------------+-------------------+---------+
|   county|test_type| count|rate_per_million|            updated|   region|
+---------+---------+------+----------------+-------------------+---------+
|     Kent|    Total|246899|          375823|2020/09/17 13:31:36|Southwest|
|   Ottawa|    Total| 98728|          338307|2020/09/17 13:31:36|Southwest|
|Kalamazoo|    Total| 77040|          290645|2020/09/17 13:31:36|Southwest|
|  Berrien|    Total| 64716|          421875|2020/09/17 13:31:36|Southwest|
| Muskegon|    Total| 61445|          354015|2020/09/17 13:31:36|Southwest|
|  Calhoun|    Total| 49420|          368369|2020/09/17 13:31:36|Southwest|
|    Eaton|    Total| 39108|          354663|2020/09/17 13:31:36|Southwest|
|  Allegan|    Total| 30691|          259915|2020/09/17 13:31:36|Southwest|
|Van Buren|    Total| 26378|          348560|2020/09/17 13:31:36|Southwest|
| Montcalm|    Total| 20339|          318354|2020/09/17 13:31:36|Southwest|
|    Ionia| 

21

Finally, here are some of the advantages I see with PySpark:

*   Great for querying data
*   more versatile than Pandas
*   can switch from PySpark to Pandas, Pandas to PySpark, etc

Here are some of the advantages I see with Pandas

*   Great for visualizations
*   great for data transformations