In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pyspark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark=SparkSession.builder.appName('COVID').getOrCreate()
spark

In [5]:
# Load the Cases Over Time data file (which is a comma separated file) into a dataframe
df=spark.read.csv('us_cases_over_time.csv')

In [6]:
# Show sample records and check if all columns are displayed correctly
df.show(5)

+---------------+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+--------------------+-------------+--------------+
|            _c0|  _c1|      _c2|       _c3|       _c4|     _c5|      _c6|      _c7|       _c8|       _c9|     _c10|      _c11|                _c12|         _c13|          _c14|
+---------------+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+--------------------+-------------+--------------+
|submission_date|state|tot_cases|conf_cases|prob_cases|new_case|pnew_case|tot_death|conf_death|prob_death|new_death|pnew_death|          created_at|consent_cases|consent_deaths|
|     01/25/2020|   OR|        0|      null|      null|       0|     null|        0|      null|      null|        0|      null|03/26/2020 04:22:...|        Agree|         Agree|
|     07/23/2020|   KY|    25147|     23882|      1265|     607|       61|      684|       680|         4|    

In [7]:
# Take a look at the dataframe schema and check if data types of all columns are correctly inferred
df=spark.read.option('header','true').csv('us_cases_over_time.csv')

In [8]:
# Take a look at the dataframe description and check descriptions or stats of all numeric columns are appripriate
# Note - We can use <df>.describe.show() function for this.
df.describe()
df.printSchema()

root
 |-- submission_date: string (nullable = true)
 |-- state: string (nullable = true)
 |-- tot_cases: string (nullable = true)
 |-- conf_cases: string (nullable = true)
 |-- prob_cases: string (nullable = true)
 |-- new_case: string (nullable = true)
 |-- pnew_case: string (nullable = true)
 |-- tot_death: string (nullable = true)
 |-- conf_death: string (nullable = true)
 |-- prob_death: string (nullable = true)
 |-- new_death: string (nullable = true)
 |-- pnew_death: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- consent_cases: string (nullable = true)
 |-- consent_deaths: string (nullable = true)



In [43]:
# The dates given in the first data set are not in the default format of PySpark SQL/Hive.
# So we need to format the dates to make use of the timestamp/date calculations.
# We can use to_date and to_timestamp functions respectively for submission date and created at columns
#  based on how the dates are rendered in the columns.
from pyspark.sql.types import StringType, DateType, FloatType, IntegerType
df2 = df.withColumn("submission_date" ,df["submission_date"].cast(DateType()))
df2 = df.withColumn("tot_cases", df["tot_cases"].cast(IntegerType()))   
df2 = df.withColumn("conf_cases", df["conf_cases"].cast(FloatType()))    
df2 = df.withColumn("prob_cases", df["prob_cases"].cast(FloatType()))     
df2 = df.withColumn("new_case", df["new_case"].cast(FloatType()))
df2 = df.withColumn("pnew_case", df["pnew_case"].cast(FloatType()))
df2 = df.withColumn("tot_death", df["tot_death"].cast(FloatType()))
df2 = df.withColumn("conf_death", df["conf_death"].cast(FloatType()))
df2 = df.withColumn("prob_death", df["prob_death"].cast(FloatType()))
df2 = df.withColumn("new_death", df["new_death"].cast(FloatType()))
df2 = df.withColumn("pnew_death", df["pnew_death"].cast(FloatType()))


df2.printSchema()
df2.show(5)

root
 |-- submission_date: string (nullable = true)
 |-- state: string (nullable = true)
 |-- tot_cases: string (nullable = true)
 |-- conf_cases: string (nullable = true)
 |-- prob_cases: string (nullable = true)
 |-- new_case: string (nullable = true)
 |-- pnew_case: string (nullable = true)
 |-- tot_death: string (nullable = true)
 |-- conf_death: string (nullable = true)
 |-- prob_death: string (nullable = true)
 |-- new_death: string (nullable = true)
 |-- pnew_death: float (nullable = true)
 |-- created_at: string (nullable = true)
 |-- consent_cases: string (nullable = true)
 |-- consent_deaths: string (nullable = true)

+---------------+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+--------------------+-------------+--------------+
|submission_date|state|tot_cases|conf_cases|prob_cases|new_case|pnew_case|tot_death|conf_death|prob_death|new_death|pnew_death|          created_at|consent_cases|consent_deaths|
+-------

In [44]:
from pyspark.sql.functions import to_date
from pyspark.sql.functions import col
from pyspark.sql.functions import to_timestamp

df.select(to_timestamp(df.submission_date, 'MM-dd-yyyy HH:mm:ss').alias('dt')).collect()
df.show(5)
df.printSchema()

+---------------+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+--------------------+-------------+--------------+
|submission_date|state|tot_cases|conf_cases|prob_cases|new_case|pnew_case|tot_death|conf_death|prob_death|new_death|pnew_death|          created_at|consent_cases|consent_deaths|
+---------------+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+--------------------+-------------+--------------+
|     01/25/2020|   OR|        0|      null|      null|       0|     null|        0|      null|      null|        0|      null|03/26/2020 04:22:...|        Agree|         Agree|
|     07/23/2020|   KY|    25147|     23882|      1265|     607|       61|      684|       680|         4|        7|         0|07/24/2020 02:18:...|        Agree|         Agree|
|     02/01/2021|   DC|    37008|      null|      null|     136|        0|      916|      null|      null|    

In [11]:
# To clean up inalid records we can check whether the sum of confirmed cases and probable cases is equal to the total cases or not.
# Filter for only those records where the sum of confirmed cases and probable cases is equal to the total cases.
df2=df.withColumn("Sum_of_conf_cases_and_Prob_cases",df.conf_cases+df.prob_cases)
df2=df2.filter(df2.tot_cases == df2.Sum_of_conf_cases_and_Prob_cases)
df2.show(5)
df2.printSchema()

+---------------+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+--------------------+-------------+--------------+--------------------------------+
|submission_date|state|tot_cases|conf_cases|prob_cases|new_case|pnew_case|tot_death|conf_death|prob_death|new_death|pnew_death|          created_at|consent_cases|consent_deaths|Sum_of_conf_cases_and_Prob_cases|
+---------------+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+--------------------+-------------+--------------+--------------------------------+
|     07/23/2020|   KY|    25147|     23882|      1265|     607|       61|      684|       680|         4|        7|         0|07/24/2020 02:18:...|        Agree|         Agree|                         25147.0|
|     04/11/2021|   NJ|   953490|    837052|    116438|    3387|      517|    24870|     22297|      2573|       11|         0|04/12/2021 01:39:...|        

In [12]:
df2 = df.selectExpr("cast(submission_date as date) submission_date", "cast(tot_cases as int) tot_cases", "cast(conf_cases as float) conf_cases", "cast(prob_cases as float) prob_cases", "cast(conf_cases as float) conf_cases", "cast(new_case as float) new_case", "cast(pnew_case as float) pnew_case","cast(tot_death as float) tot_death","cast(conf_death as float) conf_death","cast(prob_death as float) prob_death", "cast(new_death as float) new_death", "cast(pnew_death as float) pnew_death" )
df2.printSchema()


root
 |-- submission_date: date (nullable = true)
 |-- tot_cases: integer (nullable = true)
 |-- conf_cases: float (nullable = true)
 |-- prob_cases: float (nullable = true)
 |-- conf_cases: float (nullable = true)
 |-- new_case: float (nullable = true)
 |-- pnew_case: float (nullable = true)
 |-- tot_death: float (nullable = true)
 |-- conf_death: float (nullable = true)
 |-- prob_death: float (nullable = true)
 |-- new_death: float (nullable = true)
 |-- pnew_death: float (nullable = true)



In [13]:
df2 = df.selectExpr("cast(submission_date as date) submission_date", "cast(tot_cases as int) tot_cases", "cast(conf_cases as float) conf_cases", "cast(prob_cases as float) prob_cases", "cast(conf_cases as float) conf_cases", "cast(new_case as float) new_case", "cast(pnew_case as float) pnew_case","cast(tot_death as float) tot_death","cast(conf_death as float) conf_death","cast(prob_death as float) prob_death", "cast(new_death as float) new_death", "cast(pnew_death as float) pnew_death" )
df2.printSchema()


root
 |-- submission_date: date (nullable = true)
 |-- tot_cases: integer (nullable = true)
 |-- conf_cases: float (nullable = true)
 |-- prob_cases: float (nullable = true)
 |-- conf_cases: float (nullable = true)
 |-- new_case: float (nullable = true)
 |-- pnew_case: float (nullable = true)
 |-- tot_death: float (nullable = true)
 |-- conf_death: float (nullable = true)
 |-- prob_death: float (nullable = true)
 |-- new_death: float (nullable = true)
 |-- pnew_death: float (nullable = true)



In [14]:
# Show sample records and check if all columns are displayed correctly
census=spark.read.option('header','true').csv('us-census-data.csv')

In [15]:
# Take a look at the dataframe schema and check if data types of all columns are correctly inferred
census.printSchema()

root
 |-- SUMLEV: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- DIVISION: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTY: string (nullable = true)
 |-- STNAME: string (nullable = true)
 |-- CTYNAME: string (nullable = true)
 |-- CENSUS2010POP: string (nullable = true)
 |-- ESTIMATESBASE2010: string (nullable = true)
 |-- POPESTIMATE2010: string (nullable = true)
 |-- POPESTIMATE2011: string (nullable = true)
 |-- POPESTIMATE2012: string (nullable = true)
 |-- POPESTIMATE2013: string (nullable = true)
 |-- POPESTIMATE2014: string (nullable = true)
 |-- POPESTIMATE2015: string (nullable = true)
 |-- POPESTIMATE2016: string (nullable = true)
 |-- POPESTIMATE2017: string (nullable = true)
 |-- POPESTIMATE2018: string (nullable = true)
 |-- POPESTIMATE2019: string (nullable = true)
 |-- NPOPCHG_2010: string (nullable = true)
 |-- NPOPCHG_2011: string (nullable = true)
 |-- NPOPCHG_2012: string (nullable = true)
 |-- NPOPCHG_2013: string (null

In [16]:
# Take a look at the dataframe description and check descriptions or stats of all numeric columns are appripriate
# Note - We can use <df>.describe.show() function for this.
from pyspark.sql.functions import col
columns_to_cast = ["ESTIMATESBASE2010", "POPESTIMATE2010", "POPESTIMATE2011","POPESTIMATE2012","POPESTIMATE2013","POPESTIMATE2014","POPESTIMATE2015","POPESTIMATE2016","POPESTIMATE2017","POPESTIMATE2018","POPESTIMATE2019","NPOPCHG_2010","NPOPCHG_2011","NPOPCHG_2012","NPOPCHG_2013","NPOPCHG_2014","NPOPCHG_2015","NPOPCHG_2016","NPOPCHG_2017","NPOPCHG_2018","NPOPCHG_2019"]
df_temp = (
   census
   .select(
     *(c for c in census.columns if c not in columns_to_cast),
     *(col(c).cast("int").alias(c) for c in columns_to_cast)
   )
)

In [17]:

df_temp.show(5)

+------+------+--------+-----+------+-------+--------------+-------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+------------+------------+------------+------------+------------+-----------

In [18]:
state_est = df_temp.select(col("POPESTIMATE2019"),col("STATE"),col("STNAME"))
state_est=state_est.groupby('STNAME').sum('POPESTIMATE2019')
state_est=state_est.withColumnRenamed("sum(POPESTIMATE2019)","POPESTIMATE2019")
state_est.show()
state_est.printSchema()

+--------------------+---------------+
|              STNAME|POPESTIMATE2019|
+--------------------+---------------+
|                Utah|        6411916|
|              Hawaii|        2831744|
|           Minnesota|       11279264|
|                Ohio|       23378200|
|            Arkansas|        6035608|
|              Oregon|        8435474|
|               Texas|       57991762|
|        North Dakota|        1524124|
|        Pennsylvania|       25603978|
|         Connecticut|        7130574|
|            Nebraska|        3868816|
|             Vermont|        1247978|
|              Nevada|        6160312|
|          Washington|       15229786|
|            Illinois|       25343642|
|            Oklahoma|        7913942|
|District of Columbia|        1411498|
|            Delaware|        1947528|
|              Alaska|        1463090|
|          New Mexico|        4193658|
+--------------------+---------------+
only showing top 20 rows

root
 |-- STNAME: string (nullable = t

In [19]:
# Take the census data and extract the state-wise estimated population of 2019.
# If you check the records manually you will notice that for the whole state population estimate of 2019
#   the county will be mentioned as 0
# Or you can use the other criteria like to get this data

c = df_temp.select(col("POPESTIMATE2019"),col("STATE"),col("STNAME"))
state_est=state_est.groupby('STNAME').sum('POPESTIMATE2019')

state_est=state_est.withColumnRenamed("sum(POPESTIMATE2019)","POPESTIMATE2019")
import pyspark.sql.functions as func
state_est = state_est.withColumn("STNAME",func.upper(func.col("STNAME")))

state_est.show()
state_est.printSchema()

+--------------------+---------------+
|              STNAME|POPESTIMATE2019|
+--------------------+---------------+
|                UTAH|        6411916|
|              HAWAII|        2831744|
|           MINNESOTA|       11279264|
|                OHIO|       23378200|
|            ARKANSAS|        6035608|
|              OREGON|        8435474|
|               TEXAS|       57991762|
|        NORTH DAKOTA|        1524124|
|        PENNSYLVANIA|       25603978|
|         CONNECTICUT|        7130574|
|            NEBRASKA|        3868816|
|             VERMONT|        1247978|
|              NEVADA|        6160312|
|          WASHINGTON|       15229786|
|            ILLINOIS|       25343642|
|            OKLAHOMA|        7913942|
|DISTRICT OF COLUMBIA|        1411498|
|            DELAWARE|        1947528|
|              ALASKA|        1463090|
|          NEW MEXICO|        4193658|
+--------------------+---------------+
only showing top 20 rows

root
 |-- STNAME: string (nullable = t

In [20]:
# In census data we need to use the state name instead of the two-letter state code with the census data
# For this we can use the US State Names and Codes data file
# We can look up the standard state names and state code table and get the state name and the population data into a DataFrame.
# We can use a join of the two dataframes or we can convert the state names and codes into a dictionary and use it for look up
statecodes=spark.read.option('header','true').csv('usstatenamescodes.csv')

statecodes.show()

+--------------------+------+
|              STNAME|STCODE|
+--------------------+------+
|             ALABAMA|    AL|
|              ALASKA|    AK|
|      AMERICAN SAMOA|    AS|
|             ARIZONA|    AZ|
|            ARKANSAS|    AR|
|          CALIFORNIA|    CA|
|            COLORADO|    CO|
|         CONNECTICUT|    CT|
|            DELAWARE|    DE|
|DISTRICT OF COLUMBIA|    DC|
|             FLORIDA|    FL|
|             GEORGIA|    GA|
|                GUAM|    GU|
|              HAWAII|    HI|
|               IDAHO|    ID|
|            ILLINOIS|    IL|
|             INDIANA|    IN|
|                IOWA|    IA|
|              KANSAS|    KS|
|            KENTUCKY|    KY|
+--------------------+------+
only showing top 20 rows



In [21]:
clean=state_est.join(statecodes,state_est.STNAME ==  statecodes.STNAME,"inner")
clean.show(5)
clean.count()

+---------+---------------+---------+------+
|   STNAME|POPESTIMATE2019|   STNAME|STCODE|
+---------+---------------+---------+------+
|     UTAH|        6411916|     UTAH|    UT|
|   HAWAII|        2831744|   HAWAII|    HI|
|MINNESOTA|       11279264|MINNESOTA|    MN|
|     OHIO|       23378200|     OHIO|    OH|
| ARKANSAS|        6035608| ARKANSAS|    AR|
+---------+---------------+---------+------+
only showing top 5 rows



50

In [22]:
df.createOrReplaceTempView("df")
 
# use sql function to convert string to integer data type of cost column


df5=spark.sql("SELECT state,INT(tot_cases),INT(tot_death) from df")
df5=df5.groupby('state').sum('tot_cases','tot_death')
df5=df5.withColumnRenamed("sum(tot_cases)","tot_cases")
df5=df5.withColumnRenamed("sum(tot_death)","tot_death")
df5.printSchema()
df5.show(5)
df5.count()

root
 |-- state: string (nullable = true)
 |-- tot_cases: long (nullable = true)
 |-- tot_death: long (nullable = true)

+-----+---------+---------+
|state|tot_cases|tot_death|
+-----+---------+---------+
|   AZ|153218568|  3076458|
|   SC| 98460132|  1788444|
|   LA| 91629962|  2458995|
|   MN| 97118069|  1420024|
|  FSM|      127|        0|
+-----+---------+---------+
only showing top 5 rows



60

In [23]:
clean=state_est.join(statecodes,state_est.STNAME ==  statecodes.STNAME,"inner").drop(state_est.STNAME)
clean.show(5)
clean.count()

+---------------+---------+------+
|POPESTIMATE2019|   STNAME|STCODE|
+---------------+---------+------+
|        6411916|     UTAH|    UT|
|        2831744|   HAWAII|    HI|
|       11279264|MINNESOTA|    MN|
|       23378200|     OHIO|    OH|
|        6035608| ARKANSAS|    AR|
+---------------+---------+------+
only showing top 5 rows



50

In [24]:
a=df5.join(clean,df5.state ==  clean.STCODE,"inner").drop(df5.state)
a.show(5)
a.count()

+---------+---------+---------------+---------+------+
|tot_cases|tot_death|POPESTIMATE2019|   STNAME|STCODE|
+---------+---------+---------------+---------+------+
| 69106594|   366801|        6411916|     UTAH|    UT|
|  5773614|    84060|        2831744|   HAWAII|    HI|
| 97118069|  1420024|       11279264|MINNESOTA|    MN|
|173640607|  3186269|       23378200|     OHIO|    OH|
| 60223666|   985281|        6035608| ARKANSAS|    AR|
+---------+---------+---------------+---------+------+
only showing top 5 rows



50

In [25]:
# Case Rate per 100000 = (Total cases/Total Population of 2019)*100000
from pyspark.sql.functions import sum as sum



b=a.select(sum('POPESTIMATE2019'), sum('tot_cases'))

b=b.withColumnRenamed("sum(POPESTIMATE2019)","POPESTIMATE2019")
b=b.withColumnRenamed("sum(tot_cases)","tot_cases")
b=b.withColumn("Case Rate per 100000",b.tot_cases/b.POPESTIMATE2019)
b.show()

+---------------+----------+--------------------+
|POPESTIMATE2019| tot_cases|Case Rate per 100000|
+---------------+----------+--------------------+
|      654360324|5320788314|   8.131281984633286|
+---------------+----------+--------------------+



In [26]:
# Death Rate per 100000 = (Total deaths/Total Population of 2019)*100000
b=a.select(sum('POPESTIMATE2019'), sum('tot_death'))

b=b.withColumnRenamed("sum(POPESTIMATE2019)","POPESTIMATE2019")
b=b.withColumnRenamed("sum(tot_death)","tot_death")
b=b.withColumn("Death Rate per 100000",b.tot_death/b.POPESTIMATE2019)
b.show()

+---------------+---------+---------------------+
|POPESTIMATE2019|tot_death|Death Rate per 100000|
+---------------+---------+---------------------+
|      654360324|104102765|  0.15909088797382526|
+---------------+---------+---------------------+



In [27]:
# Load the vaccination allocation data into a dataframe for each brand
janssen=spark.read.option('header','true').csv('vaccines_janssen.csv')
moderna=spark.read.option('header','true').csv('vaccines_moderna.csv')
pfizer=spark.read.option('header','true').csv('vaccines_pfizer.csv')

In [28]:
# Show sample records and check if all columns are displayed correctly
janssen.show(4)
moderna.show(4)
pfizer.show(4)

+-------------+-------------------+--------------------+
| Jurisdiction|Week of Allocations|1st Dose Allocations|
+-------------+-------------------+--------------------+
|  Connecticut|         05/10/2021|                6400|
|        Maine|         05/10/2021|                2500|
|Massachusetts|         05/10/2021|               12300|
|New Hampshire|         05/10/2021|                2500|
+-------------+-------------------+--------------------+
only showing top 4 rows

+-------------+-------------------+--------------------+--------------------+
| Jurisdiction|Week of Allocations|1st Dose Allocations|2nd Dose Allocations|
+-------------+-------------------+--------------------+--------------------+
|  Connecticut|         05/17/2021|               41220|               41220|
|        Maine|         05/17/2021|               15800|               15800|
|Massachusetts|         05/17/2021|               79500|               79500|
|New Hampshire|         05/17/2021|               1

In [29]:
# Take a look at the dataframe schema and check if data types of all columns are correctly inferred
janssen.printSchema()
moderna.printSchema()
pfizer.printSchema()

root
 |-- Jurisdiction: string (nullable = true)
 |-- Week of Allocations: string (nullable = true)
 |-- 1st Dose Allocations: string (nullable = true)

root
 |-- Jurisdiction: string (nullable = true)
 |-- Week of Allocations: string (nullable = true)
 |-- 1st Dose Allocations: string (nullable = true)
 |-- 2nd Dose Allocations: string (nullable = true)

root
 |-- Jurisdiction: string (nullable = true)
 |-- Week of Allocations: string (nullable = true)
 |-- 1st Dose Allocations: string (nullable = true)
 |-- 2nd Dose Allocations: string (nullable = true)



In [30]:
# Take a look at the dataframe description and check descriptions or stats of all numeric columns are appripriate
# Note - We can use <df>.describe.show() function for this.
janssen.describe()

from pyspark.sql import functions as F

jan = janssen.select([F.col(col).alias(col.replace(' ', '_')) for col in janssen.columns])
jan=jan.withColumnRenamed("1st_Dose_Allocations","1st_Dose_Allocations_jan")

jan.createOrReplaceTempView("jan")
jan=spark.sql("SELECT Jurisdiction,Week_of_Allocations,INT(1st_Dose_Allocations_jan) from jan")
jan.show(5)
jan.printSchema()

+-------------+-------------------+------------------------+
| Jurisdiction|Week_of_Allocations|1st_Dose_Allocations_jan|
+-------------+-------------------+------------------------+
|  Connecticut|         05/10/2021|                    6400|
|        Maine|         05/10/2021|                    2500|
|Massachusetts|         05/10/2021|                   12300|
|New Hampshire|         05/10/2021|                    2500|
| Rhode Island|         05/10/2021|                    2000|
+-------------+-------------------+------------------------+
only showing top 5 rows

root
 |-- Jurisdiction: string (nullable = true)
 |-- Week_of_Allocations: string (nullable = true)
 |-- 1st_Dose_Allocations_jan: integer (nullable = true)



In [31]:
moderna.describe()

from pyspark.sql import functions as F

mod = moderna.select([F.col(col).alias(col.replace(' ', '_')) for col in moderna.columns])
mod=mod.withColumnRenamed("1st_Dose_Allocations","1st_Dose_Allocations_mod")
mod=mod.withColumnRenamed("2nd_Dose_Allocations","2nd_Dose_Allocations_mod")

mod.createOrReplaceTempView("mod")
mod=spark.sql("SELECT Jurisdiction,Week_of_Allocations,INT(1st_Dose_Allocations_mod),INT(2nd_Dose_Allocations_mod) from mod")
mod.show(5)
mod.printSchema()

+-------------+-------------------+------------------------+------------------------+
| Jurisdiction|Week_of_Allocations|1st_Dose_Allocations_mod|2nd_Dose_Allocations_mod|
+-------------+-------------------+------------------------+------------------------+
|  Connecticut|         05/17/2021|                   41220|                   41220|
|        Maine|         05/17/2021|                   15800|                   15800|
|Massachusetts|         05/17/2021|                   79500|                   79500|
|New Hampshire|         05/17/2021|                   15800|                   15800|
| Rhode Island|         05/17/2021|                   12480|                   12480|
+-------------+-------------------+------------------------+------------------------+
only showing top 5 rows

root
 |-- Jurisdiction: string (nullable = true)
 |-- Week_of_Allocations: string (nullable = true)
 |-- 1st_Dose_Allocations_mod: integer (nullable = true)
 |-- 2nd_Dose_Allocations_mod: integer (null

In [32]:
pfizer.describe()

from pyspark.sql import functions as F

pfi = pfizer.select([F.col(col).alias(col.replace(' ', '_')) for col in pfizer.columns])
pfi=pfi.withColumnRenamed("1st_Dose_Allocations","1st_Dose_Allocations_pfi")
pfi=pfi.withColumnRenamed("2nd_Dose_Allocations","2nd_Dose_Allocations_pfi")

pfi.createOrReplaceTempView("pfi")
pfi=spark.sql("SELECT Jurisdiction,Week_of_Allocations,INT(1st_Dose_Allocations_pfi),INT(2nd_Dose_Allocations_pfi) from pfi")
pfi.show(5)
pfi.printSchema()


+-------------+-------------------+------------------------+------------------------+
| Jurisdiction|Week_of_Allocations|1st_Dose_Allocations_pfi|2nd_Dose_Allocations_pfi|
+-------------+-------------------+------------------------+------------------------+
|  Connecticut|         05/17/2021|                   54990|                   54990|
|        Maine|         05/17/2021|                   21060|                   21060|
|Massachusetts|         05/17/2021|                  105300|                  105300|
|New Hampshire|         05/17/2021|                   21060|                   21060|
| Rhode Island|         05/17/2021|                   16380|                   16380|
+-------------+-------------------+------------------------+------------------------+
only showing top 5 rows

root
 |-- Jurisdiction: string (nullable = true)
 |-- Week_of_Allocations: string (nullable = true)
 |-- 1st_Dose_Allocations_pfi: integer (nullable = true)
 |-- 2nd_Dose_Allocations_pfi: integer (null

In [33]:
# Get the total allocation state-wise from the given week-wise break up of data 
# Repeat these steps for the other two brands of vaccine
print("Janssen vaccine statewise allocation")
jan.groupBy("Jurisdiction").sum("1st_Dose_Allocations_jan").show(truncate=False)

Janssen vaccine statewise allocation
+-------------------+-----------------------------+
|Jurisdiction       |sum(1st_Dose_Allocations_jan)|
+-------------------+-----------------------------+
|Utah               |102100                       |
|Mariana Islands    |2600                         |
|Hawaii             |53500                        |
|U.S. Virgin Islands|4300                         |
|Minnesota          |202000                       |
|Ohio               |428800                       |
|Arkansas           |109000                       |
|Oregon             |153700                       |
|Philadelphia       |58600                        |
|Palau              |3800                         |
|Texas              |978300                       |
|North Dakota       |28500                        |
|Pennsylvania       |422200                       |
|Connecticut        |134800                       |
|Vermont            |24500                        |
|Nebraska           |69300 

In [34]:
print("Moderna vaccine statewise allocation")
mod.groupBy("Jurisdiction").sum("1st_Dose_Allocations_mod","2nd_Dose_Allocations_mod").show(truncate=False)

Moderna vaccine statewise allocation
+-------------------+-----------------------------+-----------------------------+
|Jurisdiction       |sum(1st_Dose_Allocations_mod)|sum(2nd_Dose_Allocations_mod)|
+-------------------+-----------------------------+-----------------------------+
|Utah               |580520                       |580520                       |
|Mariana Islands    |17800                        |0                            |
|Hawaii             |303660                       |303660                       |
|U.S. Virgin Islands|23460                        |23460                        |
|Minnesota          |1152420                      |1152420                      |
|Ohio               |2445560                      |2445560                      |
|Arkansas           |619920                       |619920                       |
|Oregon             |874960                       |874960                       |
|Philadelphia       |334340                       |334340    

In [35]:
print("Pfizer vaccine statewise allocation")
pfi.groupBy("Jurisdiction").sum("1st_Dose_Allocations_pfi","2nd_Dose_Allocations_pfi").show(truncate=False)

Pfizer vaccine statewise allocation
+-------------------+-----------------------------+-----------------------------+
|Jurisdiction       |sum(1st_Dose_Allocations_pfi)|sum(2nd_Dose_Allocations_pfi)|
+-------------------+-----------------------------+-----------------------------+
|Utah               |679575                       |679575                       |
|Mariana Islands    |42510                        |0                            |
|Hawaii             |365040                       |365040                       |
|U.S. Virgin Islands|39000                        |39000                        |
|Minnesota          |1344915                      |1344915                      |
|Ohio               |2848365                      |2848365                      |
|Arkansas           |729300                       |729300                       |
|Oregon             |1026090                      |1026090                      |
|Philadelphia       |402870                       |402870     

In [36]:
# We can use union and Get the data of all the three brands into one dataframe
d=jan.join(mod,jan.Jurisdiction ==  mod.Jurisdiction,"inner").drop(mod.Jurisdiction)
d.show(5)

+------------+-------------------+------------------------+-------------------+------------------------+------------------------+
|Jurisdiction|Week_of_Allocations|1st_Dose_Allocations_jan|Week_of_Allocations|1st_Dose_Allocations_mod|2nd_Dose_Allocations_mod|
+------------+-------------------+------------------------+-------------------+------------------------+------------------------+
| Connecticut|         03/01/2021|                   30200|         05/17/2021|                   41220|                   41220|
| Connecticut|         03/15/2021|                    4200|         05/17/2021|                   41220|                   41220|
| Connecticut|         03/22/2021|                    4200|         05/17/2021|                   41220|                   41220|
| Connecticut|         03/29/2021|                   21200|         05/17/2021|                   41220|                   41220|
| Connecticut|         04/05/2021|                   53900|         05/17/2021|           

In [37]:
e=d.join(pfi,d.Jurisdiction ==  pfi.Jurisdiction,"inner").drop(pfi.Jurisdiction)
e.show(5)

+------------+-------------------+------------------------+-------------------+------------------------+------------------------+-------------------+------------------------+------------------------+
|Jurisdiction|Week_of_Allocations|1st_Dose_Allocations_jan|Week_of_Allocations|1st_Dose_Allocations_mod|2nd_Dose_Allocations_mod|Week_of_Allocations|1st_Dose_Allocations_pfi|2nd_Dose_Allocations_pfi|
+------------+-------------------+------------------------+-------------------+------------------------+------------------------+-------------------+------------------------+------------------------+
| Connecticut|         03/01/2021|                   30200|         05/17/2021|                   41220|                   41220|         12/14/2020|                   31200|                   31200|
| Connecticut|         03/01/2021|                   30200|         05/17/2021|                   41220|                   41220|         12/21/2020|                   22425|                   22425|


In [38]:
#Get the total allocation state-wise using group by aggregation
f=e.groupBy("Jurisdiction").sum("1st_Dose_Allocations_jan","1st_Dose_Allocations_mod","2nd_Dose_Allocations_mod","1st_Dose_Allocations_pfi","2nd_Dose_Allocations_pfi")
f=f.withColumnRenamed("sum(1st_Dose_Allocations_jan)","dose1_janssen")
f=f.withColumnRenamed("sum(1st_Dose_Allocations_mod)","dose1_moderna")
f=f.withColumnRenamed("sum(2nd_Dose_Allocations_mod)","dose2_moderna")
f=f.withColumnRenamed("sum(1st_Dose_Allocations_pfi)","dose1_pfizer")
f=f.withColumnRenamed("sum(2nd_Dose_Allocations_pfi)","dose2_pfizer")
f.show(5)

+-------------------+-------------+-------------+-------------+------------+------------+
|       Jurisdiction|dose1_janssen|dose1_moderna|dose2_moderna|dose1_pfizer|dose2_pfizer|
+-------------------+-------------+-------------+-------------+------------+------------+
|               Utah|     51662600|    106815680|    106815680|   119605200|   119605200|
|    Mariana Islands|      1315600|      3275200|            0|     7481760|           0|
|             Hawaii|     27071000|     55873440|     55873440|    64247040|    64247040|
|U.S. Virgin Islands|      2175800|      4316640|      4316640|     6864000|     6864000|
|          Minnesota|    102212000|    212045280|    212045280|   236705040|   236705040|
+-------------------+-------------+-------------+-------------+------------+------------+
only showing top 5 rows



In [39]:
g=f.withColumn("total_vacc", col("dose1_janssen")+col("dose1_moderna")+col("dose2_moderna")+col("dose1_pfizer")+col("dose2_pfizer"))
g.show()

+-------------------+-------------+-------------+-------------+------------+------------+----------+
|       Jurisdiction|dose1_janssen|dose1_moderna|dose2_moderna|dose1_pfizer|dose2_pfizer|total_vacc|
+-------------------+-------------+-------------+-------------+------------+------------+----------+
|               Utah|     51662600|    106815680|    106815680|   119605200|   119605200| 504504360|
|    Mariana Islands|      1315600|      3275200|            0|     7481760|           0|  12072560|
|             Hawaii|     27071000|     55873440|     55873440|    64247040|    64247040| 267311960|
|U.S. Virgin Islands|      2175800|      4316640|      4316640|     6864000|     6864000|  24537080|
|          Minnesota|    102212000|    212045280|    212045280|   236705040|   236705040| 999712640|
|               Ohio|    216972800|    449983040|    449983040|   501312240|   501312240|2119563360|
|           Arkansas|     55154000|    114065280|    114065280|   128356800|   128356800| 5

In [40]:
# Get the Population ratio covered by vaccination in each state by joining the census data
g = g.withColumn("Jurisdiction",func.upper(func.col("Jurisdiction")))
df7=g.join(state_est,g.Jurisdiction ==  state_est.STNAME,"inner").drop(pfi.Jurisdiction).drop(state_est.STNAME)
df7.show(5)


+------------+-------------+-------------+-------------+------------+------------+----------+---------------+
|Jurisdiction|dose1_janssen|dose1_moderna|dose2_moderna|dose1_pfizer|dose2_pfizer|total_vacc|POPESTIMATE2019|
+------------+-------------+-------------+-------------+------------+------------+----------+---------------+
|        UTAH|     51662600|    106815680|    106815680|   119605200|   119605200| 504504360|        6411916|
|      HAWAII|     27071000|     55873440|     55873440|    64247040|    64247040| 267311960|        2831744|
|   MINNESOTA|    102212000|    212045280|    212045280|   236705040|   236705040| 999712640|       11279264|
|        OHIO|    216972800|    449983040|    449983040|   501312240|   501312240|2119563360|       23378200|
|    ARKANSAS|     55154000|    114065280|    114065280|   128356800|   128356800| 539998160|        6035608|
+------------+-------------+-------------+-------------+------------+------------+----------+---------------+
only showi

In [41]:
df7=df7.withColumn("Population_ratio_vaccinated",df7.POPESTIMATE2019/df7.total_vacc)
df7.show()

+--------------------+-------------+-------------+-------------+------------+------------+----------+---------------+---------------------------+
|        Jurisdiction|dose1_janssen|dose1_moderna|dose2_moderna|dose1_pfizer|dose2_pfizer|total_vacc|POPESTIMATE2019|Population_ratio_vaccinated|
+--------------------+-------------+-------------+-------------+------------+------------+----------+---------------+---------------------------+
|                UTAH|     51662600|    106815680|    106815680|   119605200|   119605200| 504504360|        6411916|       0.012709337140317281|
|              HAWAII|     27071000|     55873440|     55873440|    64247040|    64247040| 267311960|        2831744|       0.010593405547585675|
|           MINNESOTA|    102212000|    212045280|    212045280|   236705040|   236705040| 999712640|       11279264|       0.011282506140964668|
|                OHIO|    216972800|    449983040|    449983040|   501312240|   501312240|2119563360|       23378200|       

In [42]:
# Similarly Get the Population ratio not yet covered by vaccination in each state
df7=df7.withColumn("Population_ratio_not_vaccinated",df7.total_vacc/df7.POPESTIMATE2019)
df7.show()

+--------------------+-------------+-------------+-------------+------------+------------+----------+---------------+---------------------------+-------------------------------+
|        Jurisdiction|dose1_janssen|dose1_moderna|dose2_moderna|dose1_pfizer|dose2_pfizer|total_vacc|POPESTIMATE2019|Population_ratio_vaccinated|Population_ratio_not_vaccinated|
+--------------------+-------------+-------------+-------------+------------+------------+----------+---------------+---------------------------+-------------------------------+
|                UTAH|     51662600|    106815680|    106815680|   119605200|   119605200| 504504360|        6411916|       0.012709337140317281|              78.68230962476738|
|              HAWAII|     27071000|     55873440|     55873440|    64247040|    64247040| 267311960|        2831744|       0.010593405547585675|              94.39834956832256|
|           MINNESOTA|    102212000|    212045280|    212045280|   236705040|   236705040| 999712640|       11