## COVID-DW Data Import Preprocessing

In [1]:
import pandas as pd
import numpy as np
import dask.dataframe as dd

### Read COVID Surveillance Data

In [2]:
in_base = "./raw/sample5kk"
in_file = in_base+".csv"
out_file = in_base+".prep.csv"

In [3]:
# set of columns to keep
cols_all = set(pd.read_csv(in_file, nrows=1))
cols_drop = {
    "case_positive_specimen_interval",
    "case_onset_interval",
    "process",
    "ethnicity",
    "symptom_status",
    "exposure_yn",
    "current_status",
    "underlying_conditions_yn"
    }
in_cols_keep = cols_all - cols_drop
print(in_cols_keep)

# set of columns that if value is missing, the record is discarded
in_cols_drop_na = {
    "res_state",
    "sex",
    "age_group",
    "case_month"
    }

# missing values
in_missing = {"Unknown","Missing","nul","Other"}

{'county_fips_code', 'state_fips_code', 'res_county', 'death_yn', 'sex', 'race', 'case_month', 'res_state', 'hosp_yn', 'age_group', 'icu_yn'}


In [4]:
# define col data types
# set all cols to keep to be categorical to improve import speed
in_dtypes = dict.fromkeys(in_cols_keep, "category")

# column to parse as date
# in_cols_date = ["case_month"]
# in_date_parser = pd.to_datetime

#### read raw csv to pandas


In [5]:
my_df = pd.read_csv(
    in_file,
    header=0,
    usecols=in_cols_keep,
    dtype=in_dtypes,
    na_values=in_missing
    )
my_df.shape

(5000000, 11)

In [6]:
# examine the df
for col in my_df.columns:
    print("\n==== " + col + " ====")
    print(my_df[col].value_counts(dropna=False))


==== case_month ====
2022-01    1108488
2021-12     471912
2020-12     434382
2021-01     367371
2020-11     327749
2021-08     274025
2021-09     258084
2021-11     186011
2021-10     171834
2020-10     153727
2021-03     142504
2021-02     142109
2021-04     120131
2020-07     119141
2022-02     110567
2021-07     105218
2020-08      91866
2020-09      88523
2020-06      79084
2021-05      66127
2020-04      62305
2020-05      56013
2021-06      32632
2020-03      29506
2020-02        347
2020-01        344
Name: case_month, dtype: int64

==== res_state ====
CA     701022
NY     385673
FL     259356
IL     239351
PA     215743
OH     208125
NC     201568
NJ     172286
GA     161698
TN     155540
AZ     152058
IN     133606
MA     131286
SC     115332
WA     111241
MN     111063
VA     110486
MO     110158
MI     104377
CO     103078
AL      99494
LA      91072
WI      79340
TX      66927
AR      63942
KS      60650
UT      58518
OR      53834
OK      52721
MD      52210
KY      4941

#### data cleaning

In [7]:
# drop records that have key values missing
my_df.dropna(subset=in_cols_drop_na, inplace=True)

In [8]:
# 1. converter for Yes/No values to boolean
conv_bool = {'Yes': True, 'No': False}
# columns to convert to boolean
cols_bool = {x for x in in_cols_keep if x.endswith("_yn")}

for col in cols_bool:
    my_df[col] = my_df[col].cat.rename_categories(conv_bool)

# 2. converter for specific cols
# conv_symp = {
#     'Symptomatic': True,
#     'Asymptomatic': False
#     }
conv_age = {
    '18 to 49 years': '18-49',
    '50 to 64 years': '50-64',
    '0 - 17 years': '0-17',
    '65+ years': '65+'
    }
conv_race = {
    'Native Hawaiian/Other Pacific Islander':'Native Hawaiian/Pacific Islander'
    }

dict_converter = {
    # 'symptom_status':conv_symp,
    'age_group':conv_age,
    'race':conv_race
    }

for col in dict_converter.keys():
    my_df[col] = my_df[col].cat.rename_categories(dict_converter.get(col))

# 3. convert date (YYYY-MM) to proper format (YYYY-MM-00)
my_df["case_month"] = my_df["case_month"].cat.rename_categories('{}-15'.format)

# 4. convert county fips code to keep county part only
# fill 000 for missing county fips code
my_df["county_fips_code"] = my_df["county_fips_code"].cat.add_categories("000").fillna("000")
# last 3 chars (first 1-2 digits correspond to state fips)
my_df["county_fips_code"] = my_df["county_fips_code"].map(lambda x: x[-3:]).astype('category')

In [9]:
# examine the df
for col in my_df.columns:
    print("\n==== " + col + " ====")
    print(my_df[col].value_counts(dropna=False))


==== case_month ====
2022-01-15    1040252
2021-12-15     454529
2020-12-15     419181
2021-01-15     354297
2020-11-15     316812
2021-08-15     258596
2021-09-15     245704
2021-11-15     177608
2021-10-15     163650
2020-10-15     146880
2021-03-15     136688
2021-02-15     135839
2021-04-15     114593
2020-07-15     112818
2022-02-15     102031
2021-07-15      98317
2020-08-15      86237
2020-09-15      83302
2020-06-15      74501
2021-05-15      61001
2020-04-15      58802
2020-05-15      52092
2021-06-15      28358
2020-03-15      27144
2020-02-15        110
2020-01-15         74
Name: case_month, dtype: int64

==== res_state ====
CA    685308
NY    379138
FL    253788
IL    231456
PA    211563
OH    200611
NC    193563
NJ    169599
GA    153457
AZ    150417
TN    149656
MA    128620
IN    127089
SC    110315
WA    106476
MN    104988
MO    104883
VA    104877
MI     99814
CO     97182
LA     86420
WI     75699
TX     59799
KS     58223
UT     57375
MD     50969
OK     50838
OR 

In [10]:
# rename cols properly
in_cols_rename = {
    'case_month':'month',
    'res_state':'state',
    'state_fips_code':'state_id',
    'res_county':'county',
    'county_fips_code':'county_id',
    'age_group':'age',
    'symptom_status':'symptom',
    'hosp_yn':'hosp',
    'icu_yn':'icu',
    'death_yn':'death'
}

my_df.rename(columns=in_cols_rename,inplace=True)

#### split into individual tables

In [11]:
# create demography primary key field from the related fields
# first merge related fields to get a tuple
tup = my_df["age"].str.cat(my_df["sex"],na_rep="NaN").str.cat(my_df["race"],na_rep="NaN")
# factorize the tuple to get the new primary key
my_df['demography_id'] = pd.factorize( tup )[0]

# create outcome primary key from related fields
tup = my_df["hosp"].astype(str).str.cat(my_df["icu"].astype(str),na_rep="NaN").str.cat(my_df["death"].astype(str),na_rep="NaN")
# factorize the tuple to get the new primary key
my_df['outcome_id'] = pd.factorize( tup )[0]

# create location primary key by concat state_id & county_id
my_df['location_id'] = my_df['state_id'].astype(str) + my_df['county_id'].astype(str)
my_df['location_id'] = my_df['location_id'].astype('category')

In [12]:
# split the data df to individual dfs for exporting to mysql
df_loc = my_df[['location_id','state_id','county_id','county','state']].drop_duplicates(ignore_index=True)
df_demogr = my_df[['demography_id','age','sex','race']].drop_duplicates().sort_values(['age','sex','race'],ignore_index=True)
df_outcome = my_df[['outcome_id','hosp','icu','death']].drop_duplicates().sort_values(['hosp','icu','death'],ignore_index=True)
df_case = my_df.drop(['county','county_id','state','state_id','age','sex','race','hosp','icu','death'],axis=1)

In [13]:
# convert outcome fields into aggregated score for each type of outcome
dict_outcome_score = {
    'hosp':1,
    'icu':2,
    'death':3
}

for col in dict_outcome_score.keys():
    df_outcome[col] = df_outcome[col].cat.codes

df_outcome['score'] = df_outcome[dict_outcome_score.keys()].mul(dict_outcome_score).max(axis=1)

In [14]:
df_outcome.head()

Unnamed: 0,outcome_id,hosp,icu,death,score
0,4,0,0,0,0
1,20,0,0,1,3
2,5,0,0,-1,0
3,19,0,1,0,2
4,23,0,1,1,3


In [15]:
df_demogr.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 56 entries, 0 to 55
Data columns (total 4 columns):
 #   Column         Non-Null Count  Dtype   
---  ------         --------------  -----   
 0   demography_id  56 non-null     int64   
 1   age            56 non-null     category
 2   sex            56 non-null     category
 3   race           48 non-null     category
dtypes: category(3), int64(1)
memory usage: 1.3 KB


In [16]:
for col in my_df.columns:
    print("\n==== " + col + " ====")
    print(my_df[col].value_counts(dropna=False))


==== month ====
2022-01-15    1040252
2021-12-15     454529
2020-12-15     419181
2021-01-15     354297
2020-11-15     316812
2021-08-15     258596
2021-09-15     245704
2021-11-15     177608
2021-10-15     163650
2020-10-15     146880
2021-03-15     136688
2021-02-15     135839
2021-04-15     114593
2020-07-15     112818
2022-02-15     102031
2021-07-15      98317
2020-08-15      86237
2020-09-15      83302
2020-06-15      74501
2021-05-15      61001
2020-04-15      58802
2020-05-15      52092
2021-06-15      28358
2020-03-15      27144
2020-02-15        110
2020-01-15         74
Name: month, dtype: int64

==== state ====
CA    685308
NY    379138
FL    253788
IL    231456
PA    211563
OH    200611
NC    193563
NJ    169599
GA    153457
AZ    150417
TN    149656
MA    128620
IN    127089
SC    110315
WA    106476
MN    104988
MO    104883
VA    104877
MI     99814
CO     97182
LA     86420
WI     75699
TX     59799
KS     58223
UT     57375
MD     50969
OK     50838
OR     50710
NV  

### Integrate Supplmeental Data
Integrate data from other sources

In [17]:
# input variables
in_population = "./raw/us-county-population-est2020.csv"
in_poverty = "./raw/us-county-poverty-est2020.csv"

#### county population data

In [18]:
cols_pop_name = {
    'STATE':'state_id',
    'COUNTY':'county_id',
    'POPESTIMATE2020':'population'
}
df_pop = pd.read_csv(
    in_population,
    header=0,
    usecols=cols_pop_name.keys(),
    dtype=str
    )
df_pop.rename(columns=cols_pop_name,inplace=True)
cols_pop_type = {
    'population':int
}
df_pop = df_pop.astype(cols_pop_type)
df_pop.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3194 entries, 0 to 3193
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   state_id    3194 non-null   object
 1   county_id   3194 non-null   object
 2   population  3194 non-null   int64 
dtypes: int64(1), object(2)
memory usage: 75.0+ KB


#### county poverty data

In [19]:
cols_pov_name = {
    'State FIPS Code':'state_id',
    'County FIPS Code':'county_id',
    'Poverty Estimate, All Ages':'poverty',
    'Median Household Income':'median_household_income'
}
cols_pov_type = {
    'State FIPS Code':str,
    'County FIPS Code':str
}
# poverty table has rows with empty values filled by '.'
df_pov = pd.read_csv(
    in_poverty,
    header=2,
    usecols=cols_pov_name.keys(),
    thousands=',',
    na_values='.',
    dtype=cols_pov_type
    )
df_pov.dropna(inplace=True)
df_pov.rename(columns=cols_pov_name,inplace=True)
df_pov = df_pov.astype({'poverty':int,'median_household_income':int})
df_pov.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 3194 entries, 0 to 3194
Data columns (total 4 columns):
 #   Column                   Non-Null Count  Dtype 
---  ------                   --------------  ----- 
 0   state_id                 3194 non-null   object
 1   county_id                3194 non-null   object
 2   poverty                  3194 non-null   int64 
 3   median_household_income  3194 non-null   int64 
dtypes: int64(2), object(2)
memory usage: 124.8+ KB


#### merge into location df

In [20]:
df_pop_pov = pd.merge(df_pop,df_pov,how="inner",on=['state_id','county_id'])
df_pop_pov.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 3193 entries, 0 to 3192
Data columns (total 5 columns):
 #   Column                   Non-Null Count  Dtype 
---  ------                   --------------  ----- 
 0   state_id                 3193 non-null   object
 1   county_id                3193 non-null   object
 2   population               3193 non-null   int64 
 3   poverty                  3193 non-null   int64 
 4   median_household_income  3193 non-null   int64 
dtypes: int64(3), object(2)
memory usage: 149.7+ KB


In [21]:
df_loc_final = pd.merge(df_loc,df_pop_pov,how='left',on=['state_id','county_id']).sort_values(by=['state_id','county_id'])
# convert data types
cols_loc_final = {
    'state_id':'category',
    'county_id':'category',
}
df_loc_final = df_loc_final.astype(cols_loc_final)
df_loc_final.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1758 entries, 746 to 297
Data columns (total 8 columns):
 #   Column                   Non-Null Count  Dtype   
---  ------                   --------------  -----   
 0   location_id              1758 non-null   category
 1   state_id                 1758 non-null   category
 2   county_id                1758 non-null   category
 3   county                   1704 non-null   category
 4   state                    1758 non-null   category
 5   population               1755 non-null   float64 
 6   poverty                  1755 non-null   float64 
 7   median_household_income  1755 non-null   float64 
dtypes: category(5), float64(3)
memory usage: 202.9 KB


#### check data discrepency b/w the data sets

In [None]:
print("df_pop_pov")
for col in df_pop_pov.columns:
    print('%s: %d' % (col,df_pop_pov[col].nunique()))

print("\ndf_loc")
for col in df_loc.columns:
    print('%s: %d' % (col,df_loc[col].nunique()))

# some states do not exist in the population and poverty dataset
set(df_loc.state_id.unique()) - set(df_pop_pov.state_id.unique())

### Data Output

In [None]:
# export preprocessed data to csv
#my_df.to_csv(out_file,index=False)
#df_loc_final.to_csv("tbl/location.csv",index=False)
#df_outcome.to_csv("tbl/outcome.csv",index=False)
#df_demogr.to_csv("tbl/demography.csv",index=False)
#df_case.to_csv("tbl/case.csv",index=False)

### pandas to mysql

In [None]:
import sqlalchemy as sql
import pymysql

# db connection profile
hostname="localhost"
port="3307" # default 3306, using 3307 for the db hosted in vm
dbname="covid-dw"
uname="root"
pwd="testpassmysql"

# create mysqlalchemy engine to connect to mysql db
engine = sql.create_engine(
    "mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}".format(
        host=hostname,
        db=dbname,
        port=port,
        user=uname,
        pwd=pwd
    )
)

In [None]:
# load the dimension tables
dbConn = engine.connect()
dbConn.execute("CALL sp_reset_dim_tbls")
df_loc_final.to_sql("location",dbConn,index=False,if_exists='append')
df_outcome.to_sql("outcome",dbConn,index=False,if_exists='append')
df_demogr.to_sql("demography",dbConn,index=False,if_exists='append')
dbConn.close()

In [None]:
# load / update the case table
dbConn = engine.connect()
dbConn.execute("CALL sp_reset_case_tbl")
df_case.to_sql("case",dbConn,index=False, if_exists='append')
dbConn.close()