In [319]:
import pyspark
from pyspark.sql.functions import *
import json
sc.setLogLevel("ERROR")

# Spark Application:

### Read the data file into a Spark DataFrame

In [320]:
# https://www.cms.gov/files/zip/medicare-covid-19-data-snapshot-data-file.zip
df1 = spark.read.csv('COVID-19-2021-02-20.csv', header='true')

In [321]:
print(type(df1))
df1.printSchema()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- Claims_Thru_Dt: string (nullable = true)
 |-- Measure_Level: string (nullable = true)
 |-- Measure_Element: string (nullable = true)
 |-- Measure_Unit: string (nullable = true)
 |-- Value: string (nullable = true)



### Produce a count of all rows

In [322]:
total = df1.count()
print("Total Rows " + str(total))

Total Rows 962


In [323]:
df1.createOrReplaceTempView("covid")

### select the column(s) in the data which distinguishes different types of records

In [324]:

SQL = """SELECT DISTINCT Measure_Level
FROM covid 
ORDER BY Measure_Level"""
df2 = spark.sql(SQL)
df2.show(99, False)

+----------------------------------------------------------------+
|Measure_Level                                                   |
+----------------------------------------------------------------+
|COVID-19 Cases                                                  |
|COVID-19 Cases by Age Group                                     |
|COVID-19 Cases by Dual Status                                   |
|COVID-19 Cases by Dual Status and Age Group                     |
|COVID-19 Cases by Dual Status and Medicare Status               |
|COVID-19 Cases by Dual Status and Race                          |
|COVID-19 Cases by Dual Status and Sex                           |
|COVID-19 Cases by Medicare Status                               |
|COVID-19 Cases by Race                                          |
|COVID-19 Cases by Race and Age Group                            |
|COVID-19 Cases by Race and Medicare Status                      |
|COVID-19 Cases by Race and Sex                               

### Generate and output a small `DataFrame` containing all the "overall case" counts and their corresponding dates

In [325]:
SQL = """
SELECT Claims_Thru_Dt, CAST(SUM(Value) AS INT) overall_case
FROM covid 
WHERE Measure_Level = 'COVID-19 Cases'
  AND Measure_Element = 'Overall'
  AND Measure_Unit = 'Beneficiary Count'
GROUP BY Claims_Thru_Dt
"""

df2 = spark.sql(SQL)
df2.show(10, False)

+--------------+------------+
|Claims_Thru_Dt|overall_case|
+--------------+------------+
|02/20/2021    |3860957     |
+--------------+------------+



### Generate and output a small DataFrame containing all the "overall case" counts and their corresponding dates

In [326]:
SQL = """
SELECT to_date(Measure_Element, 'MM/dd/yyyy') AS date, 
        CAST(SUM(Value) AS INT) overall_case
FROM covid 
WHERE Measure_Level LIKE 'COVID-19 Weekly Cases%'
GROUP BY date
ORDER BY date DESC
LIMIT 10
"""

df2 = spark.sql(SQL)
df2.show(99, False)


+----------+------------+
|date      |overall_case|
+----------+------------+
|2021-02-20|40369       |
|2021-02-13|60966       |
|2021-02-06|91285       |
|2021-01-30|114880      |
|2021-01-23|138264      |
|2021-01-16|162906      |
|2021-01-09|191995      |
|2021-01-02|199866      |
|2020-12-26|157472      |
|2020-12-19|174050      |
+----------+------------+



## Analyze a specific date:

### Choose a single `Claims_Thru_Dt` with `Measure_Level` equal to `COVID-19 Cases by State` and `Measure_Unit` equal to `Beneficiary Count`

In [327]:

SQL = """
SELECT Claims_Thru_Dt, Measure_Level, Measure_Unit, Value
FROM covid 
WHERE Claims_Thru_Dt = '02/20/2021' 
  AND Measure_Level = 'COVID-19 Cases by State' 
  AND Measure_Unit = 'Beneficiary Count'
LIMIT 10
"""

df2 = spark.sql(SQL)
df2.show(99, False)

+--------------+-----------------------+-----------------+------+
|Claims_Thru_Dt|Measure_Level          |Measure_Unit     |Value |
+--------------+-----------------------+-----------------+------+
|02/20/2021    |COVID-19 Cases by State|Beneficiary Count|86496 |
|02/20/2021    |COVID-19 Cases by State|Beneficiary Count|3023  |
|02/20/2021    |COVID-19 Cases by State|Beneficiary Count|100260|
|02/20/2021    |COVID-19 Cases by State|Beneficiary Count|45366 |
|02/20/2021    |COVID-19 Cases by State|Beneficiary Count|338346|
|02/20/2021    |COVID-19 Cases by State|Beneficiary Count|40837 |
|02/20/2021    |COVID-19 Cases by State|Beneficiary Count|45536 |
|02/20/2021    |COVID-19 Cases by State|Beneficiary Count|11247 |
|02/20/2021    |COVID-19 Cases by State|Beneficiary Count|6191  |
|02/20/2021    |COVID-19 Cases by State|Beneficiary Count|245845|
+--------------+-----------------------+-----------------+------+



### For that date, retrieve the `Value` for the `Overall` COVID-19 Cases

In [328]:
SQL = """
SELECT Claims_Thru_Dt, Measure_Level, Measure_Element, Measure_Unit, Value
FROM covid 
WHERE Claims_Thru_Dt = '02/20/2021'
  AND Measure_Level = 'COVID-19 Cases' 
  AND Measure_Element = 'Overall'
  AND Measure_Unit = 'Beneficiary Count'
"""

df2 = spark.sql(SQL)
df2.show(10, False)

+--------------+--------------+---------------+-----------------+-------+
|Claims_Thru_Dt|Measure_Level |Measure_Element|Measure_Unit     |Value  |
+--------------+--------------+---------------+-----------------+-------+
|02/20/2021    |COVID-19 Cases|Overall        |Beneficiary Count|3860957|
+--------------+--------------+---------------+-----------------+-------+



### Sum the counts for every `COVID-19 Cases by State` `Measure_Element` that is an actual US state or territory

In [329]:
SQL = """
SELECT Measure_Element as State, 
       CAST(SUM(Value) AS INT) overall_cases_by_state
FROM covid 
WHERE Measure_Level = 'COVID-19 Cases by State'
  AND Measure_Unit = 'Beneficiary Count'
GROUP BY State
ORDER BY overall_cases_by_state DESC
"""

df2 = spark.sql(SQL)
df2.show(99, False)

+--------------------+----------------------+
|State               |overall_cases_by_state|
+--------------------+----------------------+
|California          |338346                |
|Texas               |332177                |
|New York            |304299                |
|Florida             |245845                |
|Pennsylvania        |167201                |
|Illinois            |156213                |
|New Jersey          |152911                |
|Ohio                |150021                |
|Georgia             |132114                |
|North Carolina      |113988                |
|Michigan            |109929                |
|Arizona             |100260                |
|Indiana             |93796                 |
|Tennessee           |88830                 |
|Missouri            |88171                 |
|Alabama             |86496                 |
|Massachusetts       |82730                 |
|Louisiana           |82421                 |
|South Carolina      |73728       

### Verify that the Overall total case count, minus the aggregation of all state/territory counts, is equal to the Missing Data count

In [330]:
#-----------------------------------------------------#
SQL1 = """
SELECT CAST(SUM(Value) AS INT) overall_cases
FROM covid 
WHERE Measure_Level = 'COVID-19 Cases'
  AND Measure_Element = 'Overall'
  AND Measure_Unit = 'Beneficiary Count'
"""

df2 = spark.sql(SQL1)
res1 = df2.select('overall_cases').collect()[0]
total_overall = int(res1[0])
print("Total Overall Cases:    " + str(total_overall))

#-----------------------------------------------------#
SQL2 = """
SELECT CAST(SUM(Value) AS INT) overall_cases_by_state
FROM covid 
WHERE Measure_Level = 'COVID-19 Cases by State'
  AND Measure_Unit = 'Beneficiary Count'
"""

df2 = spark.sql(SQL2)
res2 = df2.select('overall_cases_by_state').collect()[0]
total_overall_by_state = int(res2[0])
print("Total Overall By State: " + str(total_overall_by_state))
#-----------------------------------------------------#

missing = total_overall - total_overall_by_state
print("Missing Cases:          " + str(missing))



Total Overall Cases:    3860957
Total Overall By State: 3860957
Missing Cases:          0


### Create a summary over all available dates

In [365]:
# Again, choose the `Measure_Level` equal to `COVID-19 Cases by State`
# Create a data structure which produces the case data for each state, for each available date
# Design and generate a nested JSON data structure which allows retrieval of a time-series of case data per state


SQL = """
SELECT Measure_Element as State, 
       CAST(SUM(Value) AS INT) count
FROM covid 
WHERE Measure_Level = 'COVID-19 Cases by State'
  AND Measure_Unit = 'Beneficiary Count'
GROUP BY State
"""

df = spark.sql(SQL)
df_json = df.toJSON()

for row in df_json.collect():
    line = json.loads(row) 
    print(line) 
    

{'State': 'Utah', 'count': 21085}
{'State': 'Hawaii', 'count': 3108}
{'State': 'Minnesota', 'count': 56431}
{'State': 'Ohio', 'count': 150021}
{'State': 'Arkansas', 'count': 45366}
{'State': 'Oregon', 'count': 17381}
{'State': 'District Of Columbia', 'count': 6191}
{'State': 'Texas', 'count': 332177}
{'State': 'North Dakota', 'count': 9347}
{'State': 'Pennsylvania', 'count': 167201}
{'State': 'Connecticut', 'count': 45536}
{'State': 'Nebraska', 'count': 21773}
{'State': 'Vermont', 'count': 2096}
{'State': 'Nevada', 'count': 29395}
{'State': 'Puerto Rico', 'count': 39984}
{'State': 'Washington', 'count': 36583}
{'State': 'Illinois', 'count': 156213}
{'State': 'Oklahoma', 'count': 58800}
{'State': 'Missing Data', 'count': 1003}
{'State': 'Virgin Islands', 'count': 423}
{'State': 'Delaware', 'count': 11247}
{'State': 'Alaska', 'count': 3023}
{'State': 'New Mexico', 'count': 19974}
{'State': 'West Virginia', 'count': 20587}
{'State': 'Missouri', 'count': 88171}
{'State': 'Rhode Island', 'c