# Covid19 Data Preprocessing

### ToDo
1. Add population per 1k to datasets
1. Get the data types correct
1. Create cumulative & non-cumulative columns for time series cases & deaths

### Overview
...

### Data Sources
* **Covid19**
    * [Johns Hopkins University - Covid19 Data](https://github.com/CSSEGISandData/COVID-19)
* **Demographics**
    * [US Census Bureau - 2019 Annual Social and Economic Supplements](https://www.census.gov/content/census/en/data/datasets/2019/demo/cps/cps-asec-2019.html)
    * [US Census Bureau - 2019 Current Population Survey](https://www.census.gov/content/census/en/data/datasets/2019/demo/cps/cps-basic-2019.html)
    * [US Census Bureau - International Demographic Overview](https://www.census.gov/data-tools/demo/idb/region.php?T=13&RT=0&A=both&Y=2020&C=&R=1)
    * [The World Bank - World Development Indicators](https://datacatalog.worldbank.org/dataset/world-development-indicators)
    * [The World Bank - Population Estimates and Projections](https://datacatalog.worldbank.org/dataset/population-estimates-and-projections)
    * [IMF - World Economic Outlook](https://www.imf.org/external/pubs/ft/weo/2020/01/weodata/download.aspx)

In [None]:
import logging
import datetime
import subprocess
import os
from io import StringIO
import re
import sys
import json
import numpy as np
import pandas as pd
import pyarrow
import requests as rq
import csv

In [None]:
class Process:
    def __init__(self):
        self.

In [None]:
def CONFIG(env):
    """"""
    pid = 'analysis-covid19'
    now = datetime.datetime.now()
    cwd = os.getcwd()
    f_in = os.path.join(cwd,'in')
    f_out = os.path.join(cwd,'out')
    cov = os.path.join(f_in,'covid')
    
    cfg = {
        'paths': {
            'covid': cov
            ,'f_in': f_in
            ,'f_out': f_out
            ,'eda': os.path.join(cwd,'eda')
            ,'pq': os.path.join(f_out)
        }
        ,'jhu-refresh': os.path.join(cwd,'jhu-refresh.sh')
        ,'jhu-dly': { # johns hopkins university - global daily
            'path': os.path.join(cov,'jhu','csse-dly','{DATE}.csv')
            ,'dates': {
                'start': datetime.date(2020,4,12)
                ,'end': datetime.date(2020,9,30)
            }
        }
        ,'jhu-dly-us': { # johns hopkins university - us daily
            'path': os.path.join(cov,'jhu','csse-dly-us','{DATE}.csv')
            ,'dates': {
                'start': datetime.date(2020,4,12)
                ,'end': datetime.date(2020,9,30)
            }
        }
        ,'jhu-ts': { # johns hopkins university - timeseries
            'base': os.path.join(cov,'jhu','csse-ts','time_series_covid19_{STATUS}_{REGION}.csv')
            ,'options': {
                'status': ['confirmed','deaths','recovered']
                ,'region': ['global','US']
            }
            ,'errata': os.path.join(cov,'jhu','csse-ts','Errata.csv')
        }
        ,'jhu-who-ts': { # johns hopkins university - WHO timeseries
            'path': os.path.join(cov,'jhu','who-ts','who_covid_19_sit_rep_time_series.csv')
        }
        ,'jhu-fips': { # johns hopkins university - fips lkup table
            'path': os.path.join(cov,'jhu','csse-fips-lkup.csv')
        }
        ,'cb-acs': { # census bureau - american community survey
            'base': 'https://api.census.gov/data/{YEAR}/pep/charage?get={FIELDS}&for=state:{STATES}'
            ,'query': {
                'years': ['2019']
                ,'fields': ','.join(['POP','NAME'])
                ,'states': ','.join(['*'])
            }
        }
        ,'cb-idb': { # census bureau - international database
            'base': 'https://api.census.gov/data/timeseries/idb/1year?time={YEAR}&get={FIELDS}'
            ,'query': {
                'years': ['2019']
                ,'fields': ['AREA_KM2','NAME','AGE','POP','FIPS','SEX']
            }
        }
        ,'cb-geo': { # census bureau - county pop density data
            'base': 'https://opendata.arcgis.com/datasets' # /
            ,'fname': '21843f238cbb46b08615fc53e19e0daf_1.geojson'
        }
        ,'cb-pov': { # census bureau - county poverty data
            'url': 'https://www.census.gov/cgi-bin/nbroker?_service=sas_serv1&_debug=0&_program=cedr.sasapp_main.sas&s_appName=saipe&map_geoSelector=aa_c&s_year=2019&s_measures=aa_snc&s_state=&s_county=&s_district=&menu=grid&s_output=csv&s_orderBy=id%20asc,year%20desc'
              # 2020-03-01 unable to feed params into get request without error
#             'base': 'https://www.census.gov/cgi-bin/nbroker' # ?
#             ,'params': {
#                 '_service': 'sas_serv1',
#                 '_debug': '0',
#                 '_program': 'cedr.sasapp_main.sas',
#                 's_appName': 'saipe',
#                 's_measures': 'aa_snc',
#                 's_state': '',
#                 's_county': '',
#                 's_district': '',
#                 'map_yearSelector': '2019',
#                 'map_geoSelector': 'aa_c',
#                 's_year': '2019',
#                 'menu': 'grid',
#                 's_output': 'csv',
#                 's_orderBy': 'id%20asc,year%20desc',
#             } # join with &
        }
#         ,'wb-dev': {
#             'base'
#         }
#         ,'wb-pop': {
            
#         }
#         ,'imf-econ': {
            
#         }
    }
    
    return cfg

In [None]:
def JHU_DLY(base,start,end):
    """"""
    logger = logging.getLogger(__name__)
    
    file_dt = '%m-%d-%Y'
    data_dt = '%Y-%m-%d'
    
    df = pd.DataFrame(None)
    errs = []
    
    for i in range((end-start).days+1):
        dt = (start+datetime.timedelta(days=i))
        try:
            if df.empty:
                df = pd.read_csv(base.format(DATE=dt.strftime(file_dt)),header=0,dtype=object).assign(DATA_DT=dt.strftime(data_dt))
            else:
                df = df.append(pd.read_csv(base.format(DATE=dt.strftime(file_dt)),header=0,dtype=object).assign(DATA_DT=dt.strftime(data_dt)))
        except Exception as e:
            errs.append((dt,str(e)))
                               
    return df,errs

In [None]:
def CB_ACS(base,years,fields,states):
    """"""
    logger = logging.getLogger(__name__)
    
    df = pd.DataFrame(None)
    errs = []
    
    for yr in years:
        url = base.format(YEAR=yr,FIELDS=fields,STATES=states)
        response = rq.get(url)
        if response.status_code == rq.codes.ok:
            data = response.json()
            if df.empty:
                df = pd.DataFrame(data[1:],columns=data[0])
            else:
                df.append(pd.DataFrame(data[1:],columns=data[0]))
        else:
            errs.append(url)
    
    return df,errs

In [None]:
def CB_IDB(base,years,fields):
    """"""
    logger = logging.getLogger(__name__)
    
    df = pd.DataFrame(None)
    errs = []
    
    for yr in years:
        url = base.format(YEAR=yr,FIELDS=','.join(fields))
        response = rq.get(url)
        if response.status_code == rq.codes.ok:
            data = response.json()
            if df.empty:
                df = pd.DataFrame(data[1:],columns=data[0])
            else:
                df.append(pd.DataFrame(data[1:],columns=data[0]))
        else:
            errs.append(url)
    
    return df,errs

In [None]:
def CB_GEO(base,fname):
    """"""
    logger = logging.getLogger(__name__)
    
    errs = []
    
    url = os.path.join(base,fname)
    response = rq.get(url)
    if response.status_code == rq.codes.ok:
        content = response.json()
        data = [feature['properties'] for feature in content['features']]
        df = pd.DataFrame(data)
    else:
        errs.append(url)
    
    return df,errs

In [None]:
# def CB_POV(base,params):
def CB_POV(url):
    """"""
    logger = logging.getLogger(__name__)
    
    errs = []
    
#     url = '{}?{}'.format(base,'&'.join(params))
    response = rq.get(url)
    if response.status_code == rq.codes.ok:
        decode = response.content.decode('utf-8')
        data = list(csv.reader(decode.splitlines(), delimiter=','))
        df = pd.DataFrame(data[1:],columns=data[0])
    else:
        errs.append(url)
    
    return df,errs

In [None]:
def CLEANER(df):
    """"""
    def COLUMN(x):
        if not x[0].isalnum():
            x=x[1:]
        if not x[-1].isalnum():
            x=x[:-1]
        return x.upper()

    df.columns = map(lambda x: COLUMN(x), df.columns)
    df = df.dropna()
    
    fips = ['GEOID','FIPS','COUNTY_ID']
    state = ['PROVINCE_STATE']
    for i in df.columns:
        if i in fips:
            try:
                df.loc[:,i] = df.loc[:,i].astype(int).astype(str)
            except:
                pass
        if i in state:
            df.loc[:,i] = df.loc[:,i].str.upper()    
    
    return df

In [None]:
def UNPIVOT(df, segment):
    """
    Unpivot JHU Covid19 timeseries data
    """
    
    unpivot = [i for i in df.columns if re.match('\d{1,2}\/\d{1,2}\/\d{2}',i)]
    static = [i for i in df.columns if i not in unpivot]
    
    df = pd.melt(df, id_vars=static, value_vars=unpivot, var_name='RECORD_DT', value_name=segment)
    df.RECORD_DT = pd.to_datetime(df.RECORD_DT)
    
#     if segment == 'CASES':
#         df = df.sort_values(by=['RECORD_DT']).reset_index(drop=True)
#         df['CASES_CUM'] = df.groupby(['FIPS'])['CASES'].cumsum(axis=0)
    
    return df.sort_values(by=['FIPS','RECORD_DT']).reset_index(drop=True)

In [None]:
def EDA(df,f_out,n=100):
    """"""
    logger = logging.getLogger(__name__)
    
    name = os.path.basename(f_out).split('.')[0]
    lb = '\n'
    lblb = '\n\n'
    
    # columns & types
    content = '# EDA - {} Files {}'.format(os.path.basename(name).upper(),lblb)
    content+='#### Column Name [IDX] -  Dtype (Head / Tail) \n'
    dtypes = df.dtypes.to_dict()
    head = df.head(1).T.iloc[:,0].to_list() # to_dict() - head.get(j)
    tail = df.tail(1).T.iloc[:,0].to_list() # to_dict() - tail.get(j)
    for i,j in enumerate(df.columns):
        content+='- **{}** [{}] - {} ({} / {}) {}'.format(j, i, dtypes.get(j), head[i], tail[i], lb)
    
    # html
    content+='{}#### Head / Tail [n={}] Sample {}'.format(lb+lblb,n,lblb)
    content+=(df.head(n).append(df.tail(n)).to_html(None,index=True,header=True))
    
    with open(f_out,'w') as f:
        f.write(content)

In [None]:
def PARQUET(df,f_out):
    """"""
    logger = logging.getLogger(__name__)
    df.to_parquet(f_out,engine='pyarrow',index=False,compression='gzip')

In [None]:
if __name__=='__main__':
# def MAIN():
#     """"""
    env = 'dev'
    LOG,FEED = LOGGER(env)
    LOG.info('Setting config for env {}'.format(env))
    CFG = CONFIG(env)
    LOG.info('Config set.')
    sample_size = 20
    refresh = False
    
    ########################
    ### JHU DATA REFRESH ###
    ########################
    LOG.info('Refresh value: {}.'.format(refresh))
    if refresh:
        LOG.info('Refreshing JHU GitHub data.')
        out = subprocess.run(['sh', CFG['jhu-refresh']]).returncode
        if out == 0:
            pass
        elif out == 1:
            raise Exception('Error while refreshing covid19 data.')
        elif out == 127:
            raise Exception('File not found.')
        else:
            raise Exception('Unknown shell out code: {}'.format(out))
        
    ########################
    ### COVID19 JHU DATA ###
    ########################
    LOG.info('Starting Covid data.')
    try:
        segment = 'covid'
        src = 'jhu'
        LOG.info('Creating Covid dataframes.')
        covid = {
            'jhu-dly': JHU_DLY(CFG['jhu-dly']['path'],**CFG['jhu-dly']['dates'])[0],
            'jhu-dly-us': JHU_DLY(CFG['jhu-dly-us']['path'],**CFG['jhu-dly-us']['dates'])[0],
            'jhu-ts-deaths': pd.read_csv(CFG['jhu-ts']['base'].format(STATUS='deaths',REGION='US'),header=0,dtype=object),
            'jhu-ts-cases': pd.read_csv(CFG['jhu-ts']['base'].format(STATUS='confirmed',REGION='US'),header=0,dtype=object),
            #'jhu-who-ts': pd.read_csv(CFG['jhu-who-ts']['path'],header=0,dtype=object), # keep getting indexing error in EDA function
            'jhu-ts-err': pd.read_csv(CFG['jhu-ts']['errata'],header=0,dtype=object),
            'jhu-fips': pd.read_csv(CFG['jhu-fips']['path'],header=0,dtype=object),
        }
        for fname,df in covid.items():
            LOG.info('Processing {}.'.format(fname))
            # clean df
            df = CLEANER(df)
            # unpivot timeseries data
            if fname in ['jhu-ts-cases','jhu-ts-deaths']:
                df = UNPIVOT(df, fname.split('-')[-1].upper())
            # md eda file
            fpath = os.path.join(CFG['paths']['eda'],'{}-{}.md'.format(segment,fname))
            EDA(df,fpath,n=sample_size)
            # parquet clean file
            fpath = os.path.join(CFG['paths']['pq'],segment,'{}-{}.parquet.gzip'.format(segment,fname))
            PARQUET(df,fpath)
    except Exception as e:
        LOG.critical(str(e))
        raise
    LOG.info('Covid data processed.')
    
    ########################
    ### DEMOGRAPHIC DATA ###
    ########################
    LOG.info('Starting demographic data.')
    try:
        segment = 'demo'
        for fname in ['cb-acs','cb-idb','cb-geo','cb-pov']:
            LOG.info('Processing {}.'.format(fname))
            if fname == 'cb-acs':
                df,errs = CB_ACS(CFG[fname]['base'],**CFG[fname]['query'])
            elif fname == 'cb-idb':
                df,errs = CB_IDB(CFG[fname]['base'],**CFG[fname]['query'])
            elif fname == 'cb-geo':
                df,errs = CB_GEO(**CFG[fname])
            elif fname == 'cb-pov':
                df,errs = CB_POV(**CFG[fname])
            else:
                raise Exception('Unknown file name.')
            # md eda file
            fpath = os.path.join(CFG['paths']['eda'],'{}-{}.md'.format(segment,fname))
            EDA(df,fpath,n=sample_size)
            # parquet clean file
            fpath = os.path.join(CFG['paths']['pq'],segment,'{}-{}.parquet.gzip'.format(segment,fname))
            PARQUET(df,fpath)
        
    LOG.info('Processing complete.')
    
    except Exception as e:
        LOG.critical(str(e))
        print(LOG.getvalue())
        del LOG
        raise