### Notebook Primer

Configuration:
- Run on a 15.25 GB, DBR 10.4 LTS ML Cluster with Spark 3.2.1 and Scala 2.12
- Default interpreter: Python

Contents:
- Data Pre-processing, and summary.
- Data analysis and visualizations are done in the "Covid Health Equity" notebook.

Side Notes:
- Data is pulled from an API, and some queries can be time-taking.
- **It's best not to overwrite the data in the S3 bucket, because the dataset is updated monthly, and overwriting may lead to inconsistencies with the results reported in the "Covid Health Equity" notebook.**

First download all the required libraries for this notebook.
<br>It's best to put them on top, because installations tend to restart the interpreter, and local cache is cleared.

In [0]:
pip install sodapy

Python interpreter will be restarted.
Collecting sodapy
  Downloading sodapy-2.1.1-py2.py3-none-any.whl (14 kB)
Installing collected packages: sodapy
Successfully installed sodapy-2.1.1
Python interpreter will be restarted.


Basic Imports

In [0]:
import pandas as pd
import numpy as np

Uncomment and run the mounting code only if you haven't already mounted the S3 bucket from the "Covid Health Equity" notebook.
<br>An attempt to mount again will throw an error.
<br>Refer to the other notebook for more details on this.

In [0]:
# # The S3 bucket we'll be connecting to.
# ACCESS_KEY = "AKIAV5BNVNLLAOR62RRG"
# SECRET_KEY = "LAVf9dmguya3IAfj18KzKb//JUFstryt1Qe5NwUc"
# ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
# AWS_BUCKET_NAME = "sparkprojectcovid"
MOUNT_NAME = "s3_source"

# dbutils.fs.mount(f"s3a://{ACCESS_KEY}:{ENCODED_SECRET_KEY}@{AWS_BUCKET_NAME}", f"/mnt/{MOUNT_NAME}")

The CDC dataset we're using here is a Socrata open dataset that can be queried using the SODA API.
<br>I've registered for an account and created an App Token, that gives more flexibility on the # of requests and data throttling limits.

For more information see: https://dev.socrata.com/foundry/data.cdc.gov/vbim-akqf

In [0]:
from sodapy import Socrata

# configure the client with App Token, and User Credentials
client = Socrata("data.cdc.gov",
                 "vFqPASkgZLIdbeD62bi9iG4nv",
                 username="akell011@umn.edu",
                 password="Sparkproject5!")

# First 2000 results, returned as Python lists
results = client.get("n8mc-b4w4", limit=2000)

# Convert to pandas DataFrame
results_df = pd.DataFrame.from_records(results)

Every dataset is associated with a unique ID (*n8mc-b4w4* in this case), which can be used with the same client object to fetch data.
<br>This makes our life much easier, as we don't have to download and deal with bulky data, on which even a basic select operation might take hours to run.

Note that the user credentials and app token are exposed here and it's not a good practice. In the Standard edition, one could set-up secret keys and use them.

###### Going forward, we'll query this API and use other small datasources to prepare our dataframes for analysis.

Most of our data comes from the following dataset:
#### COVID-19 Case Surveillance Public Use Data with Geography
"This case surveillance public use dataset has 19 elements for all COVID-19 cases shared with CDC and includes demographics, geography (county and state of residence), any exposure history, disease severity indicators and outcomes, and presence of any underlying medical conditions and risk behaviors."

<br>**69.7 Million rows | 19 columns | Each row is a Deidentified Patient Case**

<a href="https://data.cdc.gov/Case-Surveillance/COVID-19-Case-Surveillance-Public-Use-Data-with-Ge/n8mc-b4w4/">Source</a>

Schema:
Column Name | Description | Type
-- | -- | --
case_month | The earlier of month the Clinical Date (date related to the illness or specimen collection) or the Date Received by CDC | Plain Text
res_state | State of residence | Plain Text
state_fips_code | State FIPS code | Plain Text
res_county | County of residence | Plain Text
county_fips_code | County FIPS code | Plain Text
age_group | Age group (0 - 17 years; 18 - 49 years; 50 - 64 years; 65 + years; Unknown; Missing; NA, if value suppressed for privacy protection.) | Plain Text
sex | Sex (Female; Male; Other; Unknown; Missing; NA, if value suppressed for privacy protection.) | Plain Text
race | Race (American Indian/Alaska Native; Asian; Black; Multiple/Other; Native Hawaiian/Other Pacific Islander; White; Unknown; Missing; NA, if value suppressed for privacy protection.) | Plain Text
ethnicity | Ethnicity (Hispanic; Non-Hispanic; Unknown; Missing; NA, if value suppressed for privacy protection.) | Plain Text
case_positive_specimen_interval | Weeks between earliest date and date of first positive specimen collection | Number
case_onset_interval | Weeks between earliest date and date of symptom onset. | Number
process | Under what process was the case first identified? (Clinical evaluation; Routine surveillance; Contact tracing of case patient; Multiple; Other; Unknown; Missing) | Plain Text
exposure_yn | In the 14 days prior to illness onset, did the patient have any of the following known exposures: domestic travel, international travel, cruise ship or vessel travel as a passenger or crew member, workplace, airport/airplane, adult congregate living facility (nursing, assisted living, or long-term care facility), school/university/childcare center, correctional facility, community event/mass gathering, animal with confirmed or suspected COVID-19, other exposure, contact with a known COVID-19 case? (Yes, Unknown, Missing) | Plain Text
current_status | What is the current status of this person? (Laboratory-confirmed case, Probable case) | Plain Text
symptom_status | What is the symptom status of this person? (Asymptomatic, Symptomatic, Unknown, Missing) | Plain Text
hosp_yn | Was the patient hospitalized? (Yes, No, Unknown, Missing) | Plain Text
icu_yn | Was the patient admitted to an intensive care unit (ICU)? (Yes, No, Unknown, Missing) | Plain Text
death_yn | Did the patient die as a result of this illness? (Yes; No; Unknown; Missing; NA, if value suppressed for privacy protection.) | Plain Text
underlying_conditions_yn | Did the patient have one or more of the underlying medical conditions and risk behaviors: diabetes mellitus, hypertension, severe obesity (BMI>40), cardiovascular disease, chronic renal disease, chronic liver disease, chronic lung disease, other chronic diseases, immunosuppressive condition, autoimmune condition, current smoker, former smoker, substance abuse or misuse, disability, psychological/psychiatric, pregnancy, other. (Yes, No, blank) | Plain Text

In [0]:
results_df.head(3)

Unnamed: 0,case_month,res_state,state_fips_code,res_county,county_fips_code,age_group,sex,race,ethnicity,process,exposure_yn,current_status,symptom_status,hosp_yn,icu_yn,death_yn,case_onset_interval,case_positive_specimen,underlying_conditions_yn
0,2021-12,TX,48,BEXAR,48029,18 to 49 years,Female,Unknown,Unknown,Missing,Missing,Laboratory-confirmed case,Missing,Missing,Missing,Missing,,,
1,2021-12,CA,6,SACRAMENTO,6067,0 - 17 years,Female,White,Unknown,Missing,Missing,Laboratory-confirmed case,Symptomatic,No,Missing,Missing,0.0,,
2,2022-01,TX,48,HARRIS,48201,65+ years,Male,White,Non-Hispanic/Latino,Missing,Missing,Laboratory-confirmed case,Missing,Missing,Missing,Missing,,,


We'll continue to work with this dataset by querying it on different fields to create different dataframes of interest

###### Data Frame of Case counts by Race

We group the above dataframe by state, county and race information, and then count the number of cases in each group.
<br>Note that we let some extra information stay in the dataset (county information in this case) for posterity.

In [0]:
client.timeout = 1000
case = client.get("n8mc-b4w4", select='res_state, state_fips_code, res_county, county_fips_code, race, count(*)', group='res_state, state_fips_code, res_county, county_fips_code, race', limit=100000)

# Convert to pandas DataFrame
case_df = pd.DataFrame.from_records(case)

# Convert counts to a numerical field and fills any missing values with 0
case_df['count'] = case_df['count'].apply(pd.to_numeric).fillna(0)
case_df.head()

Unnamed: 0,res_state,state_fips_code,res_county,county_fips_code,race,count
0,AK,2,ANCHORAGE,2020,American Indian/Alaska Native,11656
1,AK,2,ANCHORAGE,2020,Asian,3326
2,AK,2,ANCHORAGE,2020,Black,2197
3,AK,2,ANCHORAGE,2020,Missing,1453
4,AK,2,ANCHORAGE,2020,Multiple/Other,6887


Some of these queries can be time-taking, so it's essential we set a long enough timeout for the query to finish execution and return results.
<br>It's also important to set the record 'limit' parameter high enough to fit all our summary results.

In [0]:
case_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10009 entries, 0 to 10008
Data columns (total 6 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   res_state         10009 non-null  object
 1   state_fips_code   10009 non-null  object
 2   res_county        10009 non-null  object
 3   county_fips_code  10009 non-null  object
 4   race              10009 non-null  object
 5   count             10009 non-null  int64 
dtypes: int64(1), object(5)
memory usage: 469.3+ KB


In [0]:
case_df['res_state'].unique()

Out[7]: array(['AK', 'AL', 'AR', 'AZ', 'CA', 'CO', 'CT', 'DC', 'DE', 'FL', 'GA',
       'GU', 'HI', 'IA', 'ID', 'IL', 'IN', 'KS', 'KY', 'LA', 'MA', 'MD',
       'ME', 'MI', 'MN', 'MO', 'MS', 'MT', 'NA', 'NC', 'ND', 'NE', 'NH',
       'NJ', 'NM', 'NV', 'NY', 'OH', 'OK', 'OR', 'PA', 'PR', 'RI', 'SC',
       'SD', 'TN', 'TX', 'UT', 'VA', 'VI', 'VT', 'WA', 'WI', 'WV', 'WY'],
      dtype=object)

In [0]:
case_df['race'].unique()

Out[8]: array(['American Indian/Alaska Native', 'Asian', 'Black', 'Missing',
       'Multiple/Other', 'NA', 'Native Hawaiian/Other Pacific Islander',
       'Unknown', 'White'], dtype=object)

The data looks pretty good. There are no missing values anymore. The count column in numerical.
<br>In general, we don't have to worry about fields marked as 'Missing' or 'NA' etc. This is because we plot the data with the Plotly library which only uses rows with valid states and FIPS codes.
<br>We even filter out information pertaining to our races of interest, so we don't need to make sense of the 'Missing'/'Unknown' races.

<br>Note: A lot of inconsistencies exist in the dataset, mostly because it is curated from patient forms.
<br>As the data is mostly missing at random, the best strategy would be to use only meaningful records and drop the rest.
<br>The dataset being large, we hope to have enough information to draw meaningful insights.

###### Data Frame of Hispanic/Latino case counts

We also have information about a person's ethnicity: Hispanic/Latino. So we'll have to run another query for this.

In [0]:
client.timeout = 1000
hisp_case = client.get("n8mc-b4w4",
                       select='res_state, state_fips_code, res_county, county_fips_code, ethnicity, count(*)',
                       group='res_state, state_fips_code, res_county, county_fips_code, ethnicity',
                       where="ethnicity = 'Hispanic/Latino'",
                       limit=100000)

# Convert to pandas DataFrame
hisp_case_df = pd.DataFrame.from_records(hisp_case)
hisp_case_df['count'] = hisp_case_df['count'].apply(pd.to_numeric).fillna(0)
hisp_case_df.head()

Unnamed: 0,res_state,state_fips_code,res_county,county_fips_code,ethnicity,count
0,AK,2,ANCHORAGE,2020.0,Hispanic/Latino,1843
1,AK,2,FAIRBANKS NORTH STAR,2090.0,Hispanic/Latino,264
2,AK,2,KENAI PENINSULA,2122.0,Hispanic/Latino,23
3,AK,2,MATANUSKA-SUSITNA,2170.0,Hispanic/Latino,87
4,AK,2,,,Hispanic/Latino,2269


In [0]:
hisp_case_df['ethnicity'].unique()

Out[10]: array(['Hispanic/Latino'], dtype=object)

Join the previous two DataFrames to create one unified DataFrame with case counts:

In [0]:
hisp_case_df.columns = case_df.columns
case_df = pd.concat([case_df, hisp_case_df], ignore_index=True)

In [0]:
case_df.head()

Unnamed: 0,res_state,state_fips_code,res_county,county_fips_code,race,count
0,AK,2,ANCHORAGE,2020,American Indian/Alaska Native,11656
1,AK,2,ANCHORAGE,2020,Asian,3326
2,AK,2,ANCHORAGE,2020,Black,2197
3,AK,2,ANCHORAGE,2020,Missing,1453
4,AK,2,ANCHORAGE,2020,Multiple/Other,6887


We'll now write this result into our S3 bucket.
<br>I had issues writing a Pandas dataframe directly into S3, so I convert it into a PySpark Dataframe
<br>I let the writing mode be 'overwrite' for replication. This will fail if we're writing a dataframe or the first time. I couldn't find a 'create/replace' equivalent.

In [0]:
spark_case_df = spark.createDataFrame(case_df) 
spark_case_df.write.mode("overwrite").options(header="true", index="False").csv("/mnt/{}/cases_df".format(MOUNT_NAME))

###### Data Frame of Death counts by Race

We make use of the death_yn boolean field for this. It might actually be: Yes, No or Missing. But we only filter out the first two.

In [0]:
client.timeout = 1000
death = client.get("n8mc-b4w4",
                   select='res_state, state_fips_code, res_county, county_fips_code, race, death_yn, count(*)',
                   group='res_state, state_fips_code, res_county, county_fips_code, race, death_yn',
                   where="death_yn = 'Yes' or death_yn = 'No'",
                   limit=100000)

# Convert to pandas DataFrame
death_df = pd.DataFrame.from_records(death)
death_df[['count']] = death_df[['count']].apply(pd.to_numeric).fillna(0)
death_df.head()

Unnamed: 0,res_state,state_fips_code,res_county,county_fips_code,race,death_yn,count
0,AK,2,ANCHORAGE,2020,American Indian/Alaska Native,No,175
1,AK,2,ANCHORAGE,2020,Asian,No,14
2,AK,2,ANCHORAGE,2020,Black,No,42
3,AK,2,ANCHORAGE,2020,Missing,No,7
4,AK,2,ANCHORAGE,2020,Multiple/Other,No,23


In [0]:
death_df['death_yn'].unique()

Out[14]: array(['No', 'Yes'], dtype=object)

In [0]:
death_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8204 entries, 0 to 8203
Data columns (total 7 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   res_state         8204 non-null   object
 1   state_fips_code   8204 non-null   object
 2   res_county        8204 non-null   object
 3   county_fips_code  8204 non-null   object
 4   race              8204 non-null   object
 5   death_yn          8204 non-null   object
 6   count             8204 non-null   int64 
dtypes: int64(1), object(6)
memory usage: 448.8+ KB


###### Data Frame of Hispanic/Latino death counts:

In [0]:
client.timeout = 1000
hisp_death = client.get("n8mc-b4w4",
                        select='res_state, state_fips_code, res_county, county_fips_code, ethnicity, death_yn, count(*)',
                        group='res_state, state_fips_code, res_county, county_fips_code, ethnicity, death_yn',
                        where="(death_yn = 'Yes' or death_yn = 'No') and ethnicity='Hispanic/Latino'",
                        limit=100000)

# Convert to pandas DataFrame
hisp_death_df = pd.DataFrame.from_records(hisp_death)
hisp_death_df[['count']] = hisp_death_df[['count']].apply(pd.to_numeric).fillna(0)
hisp_death_df.head()

Unnamed: 0,res_state,state_fips_code,res_county,county_fips_code,ethnicity,death_yn,count
0,AK,2,ANCHORAGE,2020,Hispanic/Latino,No,11
1,AL,1,COFFEE,1031,Hispanic/Latino,No,11
2,AL,1,FRANKLIN,1059,Hispanic/Latino,No,90
3,AL,1,JEFFERSON,1073,Hispanic/Latino,No,122
4,AL,1,MADISON,1089,Hispanic/Latino,No,11


In [0]:
hisp_death_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 873 entries, 0 to 872
Data columns (total 7 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   res_state         873 non-null    object
 1   state_fips_code   873 non-null    object
 2   res_county        873 non-null    object
 3   county_fips_code  873 non-null    object
 4   ethnicity         873 non-null    object
 5   death_yn          873 non-null    object
 6   count             873 non-null    int64 
dtypes: int64(1), object(6)
memory usage: 47.9+ KB


Join the previous two DataFrames:

In [0]:
hisp_death_df.columns = death_df.columns
death_df = pd.concat([death_df, hisp_death_df], ignore_index=True)

In [0]:
death_df.head()

Unnamed: 0,res_state,state_fips_code,res_county,county_fips_code,race,death_yn,count
0,AK,2,ANCHORAGE,2020,American Indian/Alaska Native,No,175
1,AK,2,ANCHORAGE,2020,Asian,No,14
2,AK,2,ANCHORAGE,2020,Black,No,42
3,AK,2,ANCHORAGE,2020,Missing,No,7
4,AK,2,ANCHORAGE,2020,Multiple/Other,No,23


Write the combined dataset on death counts into the S3 bucket

In [0]:
spark_death_df = spark.createDataFrame(death_df) 
spark_death_df.write.mode('overwrite').options(header="true", index="False").csv("/mnt/{}/deaths_df".format(MOUNT_NAME))

The data seems pretty clean and we won't need to change much for our analysis.
<br>Based on how it went with Cases, and Deaths (although we can easily validate it for the rest of them), we'll autmoate this process with helper functions.

Check out the function and description below:

In [0]:
def gen_state_df(field_name, out_name):
    '''
    Groups the CDC dataframe by state, county, the given field, and race/ethnicity.
    Then joins them and dumps the result into S3.
    
    field_name: Given boolean field
    out_name: Output name for S3 storage
    '''
    
    # Wait long enough to fetch all the data
    client.timeout = 1000
    
    # Fetch all races data
    orig = client.get("n8mc-b4w4",
                      select='res_state, state_fips_code, res_county, county_fips_code, race, {}, count(*)'.format(field_name),
                      group='res_state, state_fips_code, res_county, county_fips_code, race, {}'.format(field_name),
                      where="{} = 'Yes' or {} = 'No'".format(field_name, field_name),
                      limit=100000)

    # Convert to pandas DataFrame
    orig_df = pd.DataFrame.from_records(orig)
    orig_df[['count']] = orig_df[['count']].apply(pd.to_numeric).fillna(0)
    
    # Fetch Hispanic/Latino's data
    hisp = client.get("n8mc-b4w4",
                      select='res_state, state_fips_code, res_county, county_fips_code, ethnicity, {}, count(*)'.format(field_name),
                      group='res_state, state_fips_code, res_county, county_fips_code, ethnicity, {}'.format(field_name),
                      where="({} = 'Yes' or {} = 'No') and ethnicity='Hispanic/Latino'".format(field_name, field_name),
                      limit=100000)

    # Convert to pandas DataFrame
    hisp_df = pd.DataFrame.from_records(hisp)
    hisp_df[['count']] = hisp_df[['count']].apply(pd.to_numeric).fillna(0)

    # Join the two dataframes
    hisp_df.columns = orig_df.columns
    orig_df = pd.concat([orig_df, hisp_df], ignore_index=True)
    
    # Dump into S3
    spark_df = spark.createDataFrame(orig_df)
    spark_df.write.mode('overwrite').options(header="true", index="False").csv("/mnt/{}/{}".format(MOUNT_NAME, out_name))

###### Data Frame for Underlying Conditions


It gives the total counts of underlying health conditions by race, for every state and county.

In [0]:
gen_state_df('underlying_conditions_yn', 'underlying_df')

###### Data Frame for Hospitalizations


It gives the total counts of covid related hospitalizations by race, for every state and county.

In [0]:
gen_state_df('hosp_yn', 'hosp_state_df')

###### Data Frame for ICU Admits


It gives the total counts of ICU Admits due to COVID by race, for every state and county.

In [0]:
gen_state_df('icu_yn', 'icu_state_df')

Now let's change things a bit and fetch the trends data for each of these variables of interest


###### Cases trend

We now group data by the 'case_month' column and race, to fetch counts.

In [0]:
client.timeout = 1000
case_time = client.get("n8mc-b4w4", select='case_month, race, count(*)', group='case_month, race', limit=100000)

# Convert to pandas DataFrame
case_time_df = pd.DataFrame.from_records(case_time)
case_time_df[['count']] = case_time_df[['count']].apply(pd.to_numeric).fillna(0)
case_time_df.head()

Unnamed: 0,case_month,race,count
0,2020-01,Black,27
1,2020-01,Missing,561
2,2020-01,Multiple/Other,25
3,2020-01,,4254
4,2020-01,White,436


The CDC has already verified and cleaned erroneous dates, so we don't have to worry about it.

Let's also calculate this data for Hispanics/Latinos as usual.

In [0]:
client.timeout = 1000
hisp_case_time = client.get("n8mc-b4w4", select='case_month, ethnicity, count(*)', group='case_month, ethnicity', where="ethnicity = 'Hispanic/Latino'", limit=100000)

# Convert to pandas DataFrame
hisp_case_time_df = pd.DataFrame.from_records(hisp_case_time)
hisp_case_time_df[['count']] = hisp_case_time_df[['count']].apply(pd.to_numeric).fillna(0)
hisp_case_time_df.head()

Unnamed: 0,case_month,ethnicity,count
0,2020-01,Hispanic/Latino,147
1,2020-03,Hispanic/Latino,32065
2,2020-04,Hispanic/Latino,95171
3,2020-05,Hispanic/Latino,102277
4,2020-06,Hispanic/Latino,181328


We join them

In [0]:
hisp_case_time_df.columns = case_time_df.columns
case_time_df = pd.concat([case_time_df, hisp_case_time_df], ignore_index=True)

And dump it into S3

In [0]:
spark_case_time_df = spark.createDataFrame(case_time_df) 
spark_case_time_df.write.mode("overwrite").options(header="true", index="False").csv("/mnt/{}/case_time_df".format(MOUNT_NAME))

We can automate this process as well:

In [0]:
def gen_time_df(field_name, out_name):
    '''
    Groups the CDC dataframe by month, the given field, and race/ethnicity ot generate time-series data.
    Then joins them and dumps the result into S3.
    
    field_name: Given boolean field
    out_name: Output name for S3 storage
    '''
    
    # Wait long enough to fetch all the data
    client.timeout = 1000
    
    # Fetch all races data
    orig = client.get("n8mc-b4w4",
                      select='case_month, race, {}, count(*)'.format(field_name),
                      group='case_month, race, {}'.format(field_name), where="{} = 'Yes' or {} = 'No'".format(field_name, field_name),
                      limit=100000)

    # Convert to pandas DataFrame
    orig_df = pd.DataFrame.from_records(orig)
    orig_df[['count']] = orig_df[['count']].apply(pd.to_numeric).fillna(0)


    # Fetch Hispanic/Latinos data
    hisp = client.get("n8mc-b4w4",
                      select='case_month, ethnicity, {}, count(*)'.format(field_name),
                      group='case_month, ethnicity, {}'.format(field_name),
                      where="({} = 'Yes' or {} = 'No') and ethnicity='Hispanic/Latino'".format(field_name, field_name),
                      limit=100000)

    # Convert to pandas DataFrame
    hisp_df = pd.DataFrame.from_records(hisp)
    hisp_df[['count']] = hisp_df[['count']].apply(pd.to_numeric).fillna(0)

    # Join the two dataframes
    hisp_df.columns = orig_df.columns
    orig_df = pd.concat([orig_df, hisp_df], ignore_index=True)
    
    # Dump into S3
    spark_df = spark.createDataFrame(orig_df) 
    spark_df.write.mode('overwrite').options(header="true", index="False").csv("/mnt/{}/{}".format(MOUNT_NAME, out_name))

###### Deaths Trend


It gives the deaths for each race, across time

In [0]:
gen_time_df('death_yn', 'death_time_df')

###### Hospitalizations Trend


It gives the hospitalization counts for each race, across time

In [0]:
gen_time_df('hosp_yn', 'hosp_df')

###### ICU Admits Trend


It gives the ICU admit counts for each race, across time

In [0]:
gen_time_df('icu_yn', 'icu_df')

We'd also like to understand what the vaccination rates were for each race. For this we pick another CDC dataset:


#### COVID-19 Vaccination Demographics in the United States,National
"Overall Demographic Characteristics of People Receiving COVID-19 Vaccinations in the United States at national level."

<br>**15.2K rows | 16 columns | Each row is vaccination information for a demographic category**

<a href="https://data.cdc.gov/Vaccinations/COVID-19-Vaccination-Demographics-in-the-United-St/km4m-vcsb/">Source</a>

Schema:
Column Name | Description | Type
-- | -- | --
Date | Date data are reported on CDC COVID Data Tracker | Date & Time
Demographic_category | Age, sex or race/ethnicity of person receiving vaccination | Plain Text
Administered_Dose1 | Total count of people with at least one dose in demographic category | Number
Administered_Dose1_pct_known | Percent among persons with at least one dose who are Hispanic/Latino | Number
Administered_Dose1_pct_US | Percent among persons with at least one dose, who have demographic information available on age, race/ethnicity or sex | Number
Series_Complete_Yes | Total count of fully vaccinated people in demographic category | Number
Administered_Dose1_pct_agegroup | Percent among persons with at least one dose in demographic category | Number
Series_Complete_Pop_pct | Percent among fully vaccinated persons in demographic category. | Number
Series_Complete_Pop_Pct_known | Percent among fully vaccinated persons who are Hispanic/Latino | Number
Series_Complete_Pop_Pct_US | Percent among fully vaccinated persons, who have demographic information available on age, race/ethnicity or sex | Number
Booster_Doses_Vax_pct_agegroup | Percent of people 12+ in a demographic category with a booster dose | Number
Booster_Doses_Pop_Pct_known | Percent of people 12+ with a booster dose where selected demographic category is known | Number
Booster_Doses_Vax_Pct_US | Percent of people 12+ with a booster dose who have known demographic information | Number
Booster_Doses_Pop_Pct_known_Last14Days | Percent of people 12+ with a booster dose in the last 14 days where selected demographic is known | Number
Booster_Doses_Yes | People 12+ with a booster dose | Number
Booster_Doses_Yes_Last14Days | People 12+ with a booster dose in the last 14 days | Number

In [0]:
client.timeout = 1000
vaccine = client.get("km4m-vcsb", limit=100000)

# Convert to pandas DataFrame
vaccines_df = pd.DataFrame.from_records(vaccine)
vaccines_df[['series_complete_pop_pct']] = vaccines_df[['series_complete_pop_pct']].apply(pd.to_numeric).fillna(0)
vaccines_df.head()

Unnamed: 0,date,demographic_category,administered_dose1,administered_dose1_pct_known,administered_dose1_pct_us,series_complete_yes,administered_dose1_pct,series_complete_pop_pct,series_complete_pop_pct_known,series_complete_pop_pct_us,booster_doses_vax_pct_agegroup,booster_doses_pop_pct_known,booster_doses_vax_pct_us,booster_doses_pop_pct_known_last14days,booster_doses_yes,booster_doses_yes_last14days
0,2022-05-04T00:00:00.000,Race_eth_NHAIAN,1762089,0.9,0.7,1467213,72.4,60.2,0.9,0.7,45.7,0.7,0.8,1.1,632826,5768
1,2022-05-04T00:00:00.000,Race_eth_NHWhite,105499005,55.0,40.9,95838798,53.5,48.6,56.0,43.6,59.4,61.2,56.6,52.0,55128639,284470
2,2022-05-04T00:00:00.000,Age_known,257872788,99.9,99.9,219900248,,0.0,99.9,99.9,47.7,99.9,99.9,99.9,100872182,618419
3,2022-05-04T00:00:00.000,Sex_known,255593523,99.9,99.1,218364160,,0.0,99.9,99.3,47.9,99.9,99.9,99.9,100685815,617560
4,2022-05-04T00:00:00.000,Sex_Male,122214756,47.8,47.4,103927525,74.8,63.6,47.6,47.3,45.6,45.2,47.5,44.4,45527796,274240


In [0]:
vaccines_df['demographic_category'].unique()

Out[32]: array(['Race_eth_NHAIAN', 'Race_eth_NHWhite', 'Age_known', 'Sex_known',
       'Sex_Male', 'Ages_25-39_yrs', 'Race_eth_NHNHOPI', 'Ages_5-11_yrs',
       'Ages_65-74_yrs', 'Race_eth_NHMultiracial', 'Age_unknown',
       'Ages_18-24_yrs', 'Ages_<5yrs', 'Race_eth_unknown',
       'Ages_12-15_yrs', 'Race_eth_NHAsian', 'Race_eth_NHMult_Oth', 'US',
       'Sex_Female', 'Ages_12-17_yrs', 'Ages_50-64_yrs', 'Race_eth_known',
       'Ages_<12yrs', 'Ages_40-49_yrs', 'Race_eth_NHOther',
       'Ages_16-17_yrs', 'Race_eth_NHBlack', 'Sex_unknown',
       'Ages_75+_yrs', 'Race_eth_Hispanic'], dtype=object)

We have different demographic categories, but in our analysis we're mostly interested in a few races.
<br>So we filter them out and rename the columns to keep it consistent across datasets.

In [0]:
races = ['Race_eth_NHAsian', 'Race_eth_NHWhite', 'Race_eth_NHBlack', 'Race_eth_Hispanic', 'Race_eth_NHAIAN', 'US']
vaccines_df = vaccines_df[vaccines_df['demographic_category'].isin(races)]

vaccines_df = pd.pivot_table(vaccines_df, index='date', columns='demographic_category', values='series_complete_pop_pct').reset_index()
vaccines_df.columns = ['Date', 'Hispanic/Latino', 'American Indian/Alaska Native', 'Asian', 'Black', 'White', 'US']
vaccines_df.head()

Unnamed: 0,Date,Hispanic/Latino,American Indian/Alaska Native,Asian,Black,White,US
0,2020-12-13T00:00:00.000,0.0,0.0,0.0,0.0,0.0,0.0
1,2020-12-14T00:00:00.000,0.0,0.0,0.0,0.0,0.0,0.0
2,2020-12-15T00:00:00.000,0.0,0.0,0.0,0.0,0.0,0.0
3,2020-12-16T00:00:00.000,0.0,0.0,0.0,0.0,0.0,0.0
4,2020-12-17T00:00:00.000,0.0,0.0,0.0,0.0,0.0,0.0


We can now dump it into S3

In [0]:
spark_vaccines_df = spark.createDataFrame(vaccines_df) 
spark_vaccines_df.write.mode("overwrite").options(header="true", index="False").csv("/mnt/{}/vaccines_df".format(MOUNT_NAME))

---
We used a few other dataframes for our analysis. Let's list and process them as well.


#### Population Distribution by Race/Ethnicity (CPS)
"Kaiser Family Foundation (KFF) estimates based on the Census Bureau's March Current Population Survey (CPS: Annual Social and Economic Supplements), 2017-2021."

<br>**52 rows | 9 columns | Each row is population information of a state for each race**

<a href="https://www.kff.org/other/state-indicator/percent-of-adults-reporting-not-seeing-a-doctor-in-the-past-12-months-because-of-cost-by-raceethnicity/?currentTimeframe=0">Source</a>

Schema:
Column Name | Description | Type
-- | -- | --
Location | State name / Region | Text
{rest of the columns} | Race/ethnicity | Number

In [0]:
population_df = spark.read.format("csv").options(header="true", inferSchema="true").load("/mnt/{}/population_race_state.csv".format(MOUNT_NAME)).toPandas()
population_df.head()

Unnamed: 0,Location,White,Black,Hispanic,Asian,American Indian/Alaska Native,Native Hawaiian/Other Pacific Islander,Multiple Races,Total
0,United States,194000000.0,40439900,61159700,19421200.0,2606600.0,989200.0,6790700,325637800
1,Alabama,3201500.0,1308800,189200,,,,65700,4885700
2,Alaska,399000.0,22600,43300,45200.0,133400.0,,47300,706100
3,Arizona,3977400.0,373300,2642100,228000.0,105300.0,,139000,7467800
4,Arkansas,2126500.0,440200,245900,40900.0,,,70500,2977500


The locations don't match with the previous data. We would want them to be abbreviations.
<br>So let's create a map that takes a State from its name to its abbreviation.

In [0]:
state_to_code = {
    'alabama': 'AL',
     'alaska': 'AK',
     'arizona': 'AZ',
     'arkansas': 'AR',
     'california': 'CA',
     'colorado': 'CO',
     'connecticut': 'CT',
     'delaware': 'DE',
     'district of columbia': 'DC',
     'florida': 'FL',
     'georgia': 'GA',
     'hawaii': 'HI',
     'idaho': 'ID',
     'illinois': 'IL',
     'indiana': 'IN',
     'iowa': 'IA',
     'kansas': 'KS',
     'kentucky': 'KY',
     'louisiana': 'LA',
     'maine': 'ME',
     'maryland': 'MD',
     'massachusetts': 'MA',
     'michigan': 'MI',
     'minnesota': 'MN',
     'mississippi': 'MS',
     'missouri': 'MO',
     'montana': 'MT',
     'nebraska': 'NE',
     'nevada': 'NV',
     'new hampshire': 'NH',
     'new jersey': 'NJ',
     'new mexico': 'NM',
     'new york': 'NY',
     'north carolina': 'NC',
     'north dakota': 'ND',
     'ohio': 'OH',
     'oklahoma': 'OK',
     'oregon': 'OR',
     'pennsylvania': 'PA',
     'rhode island': 'RI',
     'south carolina': 'SC',
     'south dakota': 'SD',
     'tennessee': 'TN',
     'texas': 'TX',
     'utah': 'UT',
     'vermont': 'VT',
     'virginia': 'VA',
     'washington': 'WA',
     'west virginia': 'WV',
     'wisconsin': 'WI',
     'wyoming': 'WY',
     'united states': 'US',
     'puerto rico': 'PR',
     'guam': 'GA',
     'u.s. virgin islands': 'VI',
     'new york city': 'NYC'
}

Let's apply it to the Locations column.
<br>We also fill N/A values with 0, and conver the columns to a numeric format.

In [0]:
population_df['Location'] = population_df['Location'].apply(lambda x: state_to_code[x.lower()])
population_df = population_df.where(population_df != 'N/A', 0)
population_df[['White', 'Black', 'American Indian/Alaska Native', 'Native Hawaiian/Other Pacific Islander', 'Asian', 'Hispanic', 'Multiple Races']] = population_df[['White', 'Black', 'American Indian/Alaska Native', 'Native Hawaiian/Other Pacific Islander', 'Asian', 'Hispanic', 'Multiple Races']].apply(np.int64)
population_df.head()

Unnamed: 0,Location,White,Black,Hispanic,Asian,American Indian/Alaska Native,Native Hawaiian/Other Pacific Islander,Multiple Races,Total
0,US,194000000,40439900,61159700,19421200,2606600,989200,6790700,325637800
1,AL,3201500,1308800,189200,0,0,0,65700,4885700
2,AK,399000,22600,43300,45200,133400,0,47300,706100
3,AZ,3977400,373300,2642100,228000,105300,0,139000,7467800
4,AR,2126500,440200,245900,40900,0,0,70500,2977500


Dump into S3

In [0]:
spark_population_df = spark.createDataFrame(population_df)
spark_population_df.write.mode("overwrite").options(header="true", index="False").csv("/mnt/{}/population_df".format(MOUNT_NAME))

#### Adults Who Report Currently Having Asthma by Race/Ethnicity
"KFF analysis of the Centers for Disease Control and Prevention (CDC)'s 2013-2020 Behavioral Risk Factor Surveillance System (BRFSS). Data represent adults who reported that they currently have asthma and have been told by a doctor that they have asthma. Percentages are weighted to reflect population characteristics."

<br>**55 rows | 9 columns | Each row is asthma % information in a state for each race**

<a href="https://www.kff.org/other/state-indicator/adults-who-report-having-asthma-by-race-ethnicity/?currentTimeframe=0">Source</a>

Schema:
Column Name | Description | Type
-- | -- | --
Location | State name | Text
All Adults | % who report having asthma among adults of all races | Number
{All other columns} | % who report having asthma among people of that race | Number
Footnotes | Column for collection notes (dropped) | Text

In [0]:
asthma_df = spark.read.format("csv").options(header="true", inferSchema="true").load("/mnt/{}/asthma_race_state.csv".format(MOUNT_NAME)).toPandas()
asthma_df.head()

Unnamed: 0,Location,All Adults,White,Black,Hispanic,Asian/ Native Hawaiian or Pacific Islander,American Indian/ Alaska Native,Other,Footnotes
0,United States,0.093,0.093,0.114,0.079,0.051,0.135,0.122,1.0
1,Alabama,0.089,0.081,0.102,NSD,NSD,NSD,0.181,
2,Alaska,0.091,0.093,NSD,NSD,NSD,0.074,0.085,
3,Arizona,0.101,0.107,0.085,0.083,NSD,0.138,0.14,
4,Arkansas,0.094,0.091,0.128,NSD,NSD,NSD,0.149,


In [0]:
asthma_df['Location'] = asthma_df['Location'].apply(lambda x: state_to_code[x.lower()])
asthma_df.drop(columns=['Footnotes'], inplace=True)
asthma_df = asthma_df.where(asthma_df != 'NSD', 0)
asthma_df = asthma_df.where(asthma_df != 'N/A', 0)
asthma_df[asthma_df.columns[1:]] = asthma_df[asthma_df.columns[1:]].apply(pd.to_numeric)
asthma_df.head()

Unnamed: 0,Location,All Adults,White,Black,Hispanic,Asian/ Native Hawaiian or Pacific Islander,American Indian/ Alaska Native,Other
0,US,0.093,0.093,0.114,0.079,0.051,0.135,0.122
1,AL,0.089,0.081,0.102,0.0,0.0,0.0,0.181
2,AK,0.091,0.093,0.0,0.0,0.0,0.074,0.085
3,AZ,0.101,0.107,0.085,0.083,0.0,0.138,0.14
4,AR,0.094,0.091,0.128,0.0,0.0,0.0,0.149


In [0]:
spark_asthma_df = spark.createDataFrame(asthma_df)
spark_asthma_df.write.mode("overwrite").options(header="true", index="False").csv("/mnt/{}/asthma_df".format(MOUNT_NAME))

#### Adults Who Report Not Seeing a Doctor in the Past 12 Months Because of Cost by Race/Ethnicity
"KFF analysis of the Centers for Disease Control and Prevention (CDC)'s 2013-2020 Behavioral Risk Factor Surveillance System (BRFSS). Data represent adults who reported that there was a time in the past 12 months when they needed to see a doctor but could not because of cost. Percentages are weighted to reflect population characteristics."

<br>**54 rows | 9 columns | Each row is % of people in a state who report not seeing a doctor, for each race**

<a href="https://www.kff.org/other/state-indicator/percent-of-adults-reporting-not-seeing-a-doctor-in-the-past-12-months-because-of-cost-by-raceethnicity/?currentTimeframe=0">Source</a>

Schema:
Column Name | Description | Type
-- | -- | --
Location | State name | Text
All Adults | % among all adults who report not having seen a doctor| Number
{All other columns} | % among people of that race who report having seen a doctor | Number
Footnotes | Column for collection notes (dropped) | Text

In [0]:
health_access_df = spark.read.format("csv").options(header="true", inferSchema="true").load("/mnt/{}/health_access_race_state.csv".format(MOUNT_NAME)).toPandas()
health_access_df.head()

Unnamed: 0,Location,All Adults,White,Black,Hispanic,Asian/ Native Hawaiian or Pacific Islander,American Indian/ Alaska Native,Other,Footnotes
0,United States,0.097,0.077,0.127,0.167,0.079,0.158,0.136,1.0
1,Alabama,0.112,0.094,0.153,0.135,NSD,NSD,0.183,
2,Alaska,0.102,0.096,NSD,NSD,NSD,0.108,NSD,
3,Arizona,0.106,0.081,0.125,0.176,0.076,0.121,0.116,
4,Arkansas,0.108,0.091,0.162,0.282,NSD,NSD,0.174,


In [0]:
health_access_df['Location'] = health_access_df['Location'].apply(lambda x: state_to_code[x.lower()])
health_access_df.drop(columns=['Footnotes'], inplace=True)
health_access_df = health_access_df.where(health_access_df != 'NSD', 0)
health_access_df = health_access_df.where(health_access_df != 'N/A', 0)
health_access_df[health_access_df.columns[1:]] = health_access_df[health_access_df.columns[1:]].apply(pd.to_numeric)
health_access_df.head()

Unnamed: 0,Location,All Adults,White,Black,Hispanic,Asian/ Native Hawaiian or Pacific Islander,American Indian/ Alaska Native,Other
0,US,0.097,0.077,0.127,0.167,0.079,0.158,0.136
1,AL,0.112,0.094,0.153,0.135,0.0,0.0,0.183
2,AK,0.102,0.096,0.0,0.0,0.0,0.108,0.0
3,AZ,0.106,0.081,0.125,0.176,0.076,0.121,0.116
4,AR,0.108,0.091,0.162,0.282,0.0,0.0,0.174


In [0]:
spark_health_access_df = spark.createDataFrame(health_access_df)
spark_health_access_df.write.mode("overwrite").options(header="true", index="False").csv("/mnt/{}/health_access_df".format(MOUNT_NAME))

#### Percent of Total Population that has Received a COVID-19 Vaccine by Race/Ethnicity
"Percent vaccinated includes people who have received at least one vaccine dose. Vaccination data based on KFF analysis of publicly available data on state websites; total population data used to calculate rates based on KFF analysis of 2019 American Community Survey data."

<br>**52 rows | 12 columns | Each row is % of people in a state who received atleast one dose of vaccine, for each race**

<a href="https://www.kff.org/other/state-indicator/percent-of-total-population-that-has-received-a-covid-19-vaccine-by-race-ethnicity/?currentTimeframe=0">Source</a>

Schema:
Column Name | Description | Type
-- | -- | --
Location | State name | Text
{All other columns} | % or ratio among people of that category who received at least one vaccine dose | Number
Footnotes | Column for collection notes (dropped) | Text

In [0]:
pct_vaccine_df = spark.read.format("csv").options(header="true", inferSchema="true").load("/mnt/{}/pct_vaccinated_state.csv".format(MOUNT_NAME)).toPandas()
pct_vaccine_df.head()

Unnamed: 0,Location,Race Categories Include Hispanic Individuals,% of Total White Population Vaccinated,% of Total Black Population Vaccinated,White to Black Ratio,Percentage Point Difference Between White and Black Rate,% of Total Hispanic Population Vaccinated,White to Hispanic Ratio,Percentage Point Difference Between White and Hispanic Rate,% of Total Asian Population Vaccinated,White to Asian Ratio,Percentage Point Difference Between White and Asian Rate,Footnotes
0,United States,,0.63,0.57,1.09,-5.34,0.65,0.97,2.24,0.85,0.74,22.15,
1,Alabama,Yes,0.49,0.51,0.96,2.27,0.58,0.84,9.27,0.82,0.6,33.21,1.0
2,Alaska,Yes,0.56,0.61,0.9,5.93,0.5,1.12,-5.75,0.81,0.69,25.14,
3,Arizona,,0.6,0.51,1.18,-9.03,0.44,1.36,-15.76,0.8,0.75,19.53,
4,Arkansas,Yes,NR,NR,NR,NR,NR,NR,NR,NR,NR,NR,


In [0]:
pct_vaccine_df['Location'] = pct_vaccine_df['Location'].apply(lambda x: state_to_code[x.lower()])
pct_vaccine_df.drop(columns=['Footnotes'], inplace=True)
pct_vaccine_df = pct_vaccine_df.where(pct_vaccine_df != 'NR', 0)
pct_vaccine_df = pct_vaccine_df.where(pct_vaccine_df != '>.99', 0)
pct_vaccine_df[pct_vaccine_df.columns[2:]] = pct_vaccine_df[pct_vaccine_df.columns[2:]].apply(pd.to_numeric)
pct_vaccine_df.head()

Unnamed: 0,Location,Race Categories Include Hispanic Individuals,% of Total White Population Vaccinated,% of Total Black Population Vaccinated,White to Black Ratio,Percentage Point Difference Between White and Black Rate,% of Total Hispanic Population Vaccinated,White to Hispanic Ratio,Percentage Point Difference Between White and Hispanic Rate,% of Total Asian Population Vaccinated,White to Asian Ratio,Percentage Point Difference Between White and Asian Rate
0,US,,0.63,0.57,1.09,-5.34,0.65,0.97,2.24,0.85,0.74,22.15
1,AL,Yes,0.49,0.51,0.96,2.27,0.58,0.84,9.27,0.82,0.6,33.21
2,AK,Yes,0.56,0.61,0.9,5.93,0.5,1.12,-5.75,0.81,0.69,25.14
3,AZ,,0.6,0.51,1.18,-9.03,0.44,1.36,-15.76,0.8,0.75,19.53
4,AR,Yes,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [0]:
spark_pct_vaccine_df = spark.createDataFrame(pct_vaccine_df)
spark_pct_vaccine_df.write.mode("overwrite").options(header="true", index="False").csv("/mnt/{}/pct_vaccine_df".format(MOUNT_NAME))

#### Conditions contributing to deaths COVID-19
"Kaggle dataset containing information on deaths related to Pneumonia, Infleunza, and COVID."

<br>**6489 rows | 13 columns | Each row is number of deaths in each category for every state, by race and age group**

<a href="https://www.kaggle.com/datasets/winternguyen/conditions-contributing-to-deaths-covid19?resource=download&select=Deaths_involving_coronavirus_disease_2019__COVID-19__by_race_and_Hispanic_origin_group_and_age__by_state.csv">Source</a>

Schema:
Column Name | Description | Type
-- | -- | --
Data as of | Last update date | Date
Start Date | Start date of data collection | Date
End Date | End date of data collection | Date
State | State name | Text
Age group | Age group under consideration | Text
Race and Hispanic Origin Group | Race/ethnicity of group under consideration | Text
{All other columns} | Deaths related to that particular disease | Number
Footnotes | Column for collection notes (dropped) | Text

In [0]:
death_causes_df = spark.read.format("csv").options(header="true", inferSchema="true").load("/mnt/{}/covid_death_causes.csv".format(MOUNT_NAME)).toPandas()
death_causes_df.head()

Unnamed: 0,Data as of,Start Date,End Date,State,Age group,Race and Hispanic Origin Group,COVID-19 Deaths,Total Deaths,Pneumonia Deaths,Pneumonia and COVID-19 Deaths,Influenza Deaths,"Pneumonia, Influenza, or COVID-19 Deaths",Footnote
0,04/07/2021,01/01/2020,04/03/2021,United States,All Ages,Total Deaths,539723.0,4160118.0,466386.0,263173.0,9031.0,750721.0,
1,04/07/2021,01/01/2020,04/03/2021,United States,All Ages,Non-Hispanic White,328239.0,3047347.0,301454.0,149830.0,6038.0,485501.0,
2,04/07/2021,01/01/2020,04/03/2021,United States,Under 1 year,Non-Hispanic White,14.0,9139.0,98.0,1.0,10.0,121.0,
3,04/07/2021,01/01/2020,04/03/2021,United States,0-17 years,Non-Hispanic White,72.0,16860.0,287.0,14.0,84.0,429.0,
4,04/07/2021,01/01/2020,04/03/2021,United States,1-4 years,Non-Hispanic White,12.0,1802.0,59.0,2.0,25.0,94.0,


In [0]:
death_causes_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6489 entries, 0 to 6488
Data columns (total 13 columns):
 #   Column                                    Non-Null Count  Dtype  
---  ------                                    --------------  -----  
 0   Data as of                                6489 non-null   object 
 1   Start Date                                6489 non-null   object 
 2   End Date                                  6489 non-null   object 
 3   State                                     6489 non-null   object 
 4   Age group                                 6489 non-null   object 
 5   Race and Hispanic Origin Group            6489 non-null   object 
 6   COVID-19 Deaths                           4759 non-null   float64
 7   Total Deaths                              4805 non-null   float64
 8   Pneumonia Deaths                          4566 non-null   float64
 9   Pneumonia and COVID-19 Deaths             4984 non-null   float64
 10  Influenza Deaths                    

In [0]:
death_causes_df['Data as of'].unique()

Out[47]: array(['04/07/2021'], dtype=object)

In [0]:
death_causes_df['Start Date'].unique()

Out[48]: array(['01/01/2020'], dtype=object)

In [0]:
death_causes_df['End Date'].unique()

Out[49]: array(['04/03/2021'], dtype=object)

There quite a few null values
<br>The date columns have only one date, and the footnote column is of no use

In [0]:
death_causes_df.drop(['Data as of', 'Start Date', 'End Date', 'Footnote'], axis=1, inplace=True)
death_causes_df.columns = ['res_state', 'age_group', 'race', 'covid', 'total', 'pneumonia', 'pneumonia_covid', 'influenza', 'all_3']

race_map = {
    'Hispanic': 'Hispanic/Latino',
    'Non-Hispanic American Indian or Alaska Native': 'American Indian/Alaska Native',
    'Non-Hispanic Asian': 'Asian',
    'Non-Hispanic Black': 'Black',
    'Non-Hispanic More than one race': 'Multiple',
    'Non-Hispanic Native Hawaiian or Other Pacific Islander': 'Native Hawaiian/Other Pacific Islander',
    'Non-Hispanic White': 'White',
    'Unknown': 'Unknown',
    'Total Deaths': 'Total'
}

death_causes_df['race'] = death_causes_df['race'].apply(lambda x: race_map[x])
death_causes_df['res_state'] = death_causes_df['res_state'].apply(lambda x: state_to_code[x.lower()])
death_causes_df = death_causes_df.fillna(0)
death_causes_df.head()

Unnamed: 0,res_state,age_group,race,covid,total,pneumonia,pneumonia_covid,influenza,all_3
0,US,All Ages,Total,539723.0,4160118.0,466386.0,263173.0,9031.0,750721.0
1,US,All Ages,White,328239.0,3047347.0,301454.0,149830.0,6038.0,485501.0
2,US,Under 1 year,White,14.0,9139.0,98.0,1.0,10.0,121.0
3,US,0-17 years,White,72.0,16860.0,287.0,14.0,84.0,429.0
4,US,1-4 years,White,12.0,1802.0,59.0,2.0,25.0,94.0


In [0]:
spark_death_causes_df = spark.createDataFrame(death_causes_df)
spark_death_causes_df.write.mode("overwrite").options(header="true", index="False").csv("/mnt/{}/death_causes_df".format(MOUNT_NAME))