# DSCI591 Data Collection
## COVID-19

In [1]:
import requests
import json
from time import sleep
import os
from os import path
from datetime import datetime, timedelta
from glob import glob
import re
import csv
import math


import pandas as pd
from matplotlib import pyplot as plt
from scipy.stats import gamma
import numpy as np


### Helper Methods from the C3.ai Quickstart

In [2]:
def read_data_json(typename, api, body):
    """
    read_data_json directly accesses the C3.ai COVID-19 Data Lake APIs using the requests library, 
    and returns the response as a JSON, raising an error if the call fails for any reason.
    ------
    typename: The type you want to access, i.e. 'OutbreakLocation', 'LineListRecord', 'BiblioEntry', etc.
    api: The API you want to access, either 'fetch' or 'evalmetrics'.
    body: The spec you want to pass. For examples, see the API documentation.
    """
    response = requests.post(
        "https://api.c3.ai/covid/api/1/" + typename + "/" + api, 
        json = body, 
        headers = {
            'Accept' : 'application/json', 
            'Content-Type' : 'application/json'
        }
    )
    if not response.ok:
        print(response.text)
    
    response.raise_for_status()
    
    return response.json()

def fetch(typename, body, get_all = False, remove_meta = True):
    """
    fetch accesses the Data Lake using read_data_json, and converts the response into a Pandas dataframe. 
    fetch is used for all non-timeseries data in the Data Lake, and will call read_data as many times 
    as required to access all of the relevant data for a given typename and body.
    ------
    typename: The type you want to access, i.e. 'OutbreakLocation', 'LineListRecord', 'BiblioEntry', etc.
    body: The spec you want to pass. For examples, see the API documentation.
    get_all: If True, get all records and ignore any limit argument passed in the body. If False, use the limit argument passed in the body. The default is False.
    remove_meta: If True, remove metadata about each record. If False, include it. The default is True.
    """
    if get_all:
        has_more = True
        offset = 0
        limit = 2000
        df = pd.DataFrame()

        while has_more:
            body['spec'].update(limit = limit, offset = offset)
            response_json = read_data_json(typename, 'fetch', body)
            new_df = pd.json_normalize(response_json['objs'])
            df = df.append(new_df)
            has_more = response_json['hasMore']
            offset += limit
            
    else:
        response_json = read_data_json(typename, 'fetch', body)
        df = pd.json_normalize(response_json['objs'])
        
    if remove_meta:
        df = df.drop(columns = [c for c in df.columns if ('meta' in c) | ('version' in c)])
    
    return df
    
def evalmetrics(typename, body, remove_meta = True):
    """
    evalmetrics accesses the Data Lake using read_data_json, and converts the response into a Pandas dataframe.
    evalmetrics is used for all timeseries data in the Data Lake.
    ------
    typename: The type you want to access, i.e. 'OutbreakLocation', 'LineListRecord', 'BiblioEntry', etc.
    body: The spec you want to pass. For examples, see the API documentation.
    remove_meta: If True, remove metadata about each record. If False, include it. The default is True.
    """
    response_json = read_data_json(typename, 'evalmetrics', body)
    df = pd.json_normalize(response_json['result'])
    
    # get the useful data out
    df = df.apply(pd.Series.explode)
    if remove_meta:
        df = df.filter(regex = 'dates|data|missing')
    
    # only keep one date column
    date_cols = [col for col in df.columns if 'dates' in col]
    keep_cols =  date_cols[:1] + [col for col in df.columns if 'dates' not in col]
    df = df.filter(items = keep_cols).rename(columns = {date_cols[0] : "dates"})
    df["dates"] = pd.to_datetime(df["dates"])
    
    return df

#### Streamlined request for single item

In [3]:
def fetch_one(typename: str, body: dict, objs_only=True) -> dict:
    """
    Returns JSON output from single API call
    
    Args:
        typename: the C3.ai type name
        body: the body of the request
        objs_only: if True, remove the metadata and just returns the objects
        
    Returns:
        JSON response as dictionary
    
    """

    response = read_data_json(typename, 'fetch', body)
    if objs_only:
        for r in response['objs']:
            if 'meta' in r.keys():
                del(r['meta'])
                
        return response['objs']
    
    return response

### Load the location codes for the US into a Pandas DataFrame

In [4]:
def get_us_locations(file_name='./config/C3-ai-Location-IDs.xlsx'): 
    """ Loads all US counties from C3 ai spreadsheet 
    
    Args:
        file_name: the name of the spreadsheet
        
    Returns:
        Pandas dataframe with the results
    
    """
                     
    locations = pd.read_excel(path.join('.', file_name), sheet_name='County IDs', header=2)
    us_locations = locations[locations.Country=='United States']
    
    return us_locations

### Get the basic population data for each of the 3429 counties

In [5]:
def make_outbreaklocation_body(county_id: str) -> dict:
    """ Forms the request body for a count for the outbreak location API 
    
    Args:
        count_id: the ID for the County
    
    Returns:
        The request body
    
    """
    return {
              "spec": {
                "filter": f"id == '{county_id}'"
              }
}

fetch_one('outbreaklocation', make_outbreaklocation_body('Autauga_Alabama_UnitedStates'))


[{'hospitalIcuBeds': 6,
  'hospitalStaffedBeds': 55,
  'hospitalLicensedBeds': 85,
  'latestTotalPopulation': 55869.0,
  'populationOfAllChildren': 55869.0,
  'latestLaborForce': 25541,
  'latestEmployedPopulation': 24953,
  'latestUnemployedPopulation': 588,
  'latestUnemploymentRate': 2.302180807329392,
  'laborForceOfAllChildren': 25541,
  'locationType': 'county',
  'populationCDS': 55869,
  'location': {'value': {'id': 'Autauga_Alabama_UnitedStates'},
   'timestamp': '2021-08-04T00:00:00Z'},
  'fips': {'id': '01001'},
  'id': 'Autauga_Alabama_UnitedStates',
  'name': 'Autauga',
  'version': 38863297,
  'typeIdent': 'EP_LOC'}]

In [6]:
def load_population_data(file_name='./config/counties.json'):
    """ Loads all population data for US counties and stores in a file called counties.json"""

    us_locations = get_us_locations()
    keep_going = True
    tries = 0
    while keep_going:
        try:
            with open(file_name) as file:
                county_data = json.load(file)
        except:    
            county_data = {}
        i = 0
        for county in us_locations['County id']:
            if county not in county_data.keys():
                try:
                    data = fetch_one('outbreaklocation',  make_outbreaklocation_body(county))
                    county_data[county] = data[0]
                    i += 1
                    if i % 100 == 0:
                        print(f'Saving: {i}')
                        with open(file_name, 'w') as file:
                            json.dump(county_data, file)
                except:
                    county_data[county] = None
                    print(f'Problem with {county}')
                sleep(1)
        with open('counties.json', 'w') as file:
            json.dump(county_data, file)
        if len(county_data) >= len(us_locations) or tries >= 5:
            keep_going = False
        else:
            tries += 1
        
def get_counties_df(file_name='./config/counties.json'):
    with open(file_name) as file:
                county_data = json.load(file)
    
    df = pd.DataFrame.from_dict(county_data)
    
    data = [df[col] for col in df.columns]    
    
    # pivot
    return pd.DataFrame(data,columns=df.index, index=df.columns)
    
get_counties_df()  

Unnamed: 0,hospitalIcuBeds,hospitalStaffedBeds,hospitalLicensedBeds,latestTotalPopulation,location,fips,id,name,version,typeIdent
Autauga_Alabama_UnitedStates,6.0,55.0,85.0,55869.0,{'value': {'id': 'Autauga_Alabama_UnitedStates...,{'id': '01001'},Autauga_Alabama_UnitedStates,Autauga,3735560.0,EP_LOC
Baldwin_Alabama_UnitedStates,51.0,362.0,386.0,223234.0,{'value': {'id': 'Baldwin_Alabama_UnitedStates...,{'id': '01003'},Baldwin_Alabama_UnitedStates,Baldwin,3801096.0,EP_LOC
Barbour_Alabama_UnitedStates,5.0,30.0,74.0,24686.0,{'value': {'id': 'Barbour_Alabama_UnitedStates...,{'id': '01005'},Barbour_Alabama_UnitedStates,Barbour,3801096.0,EP_LOC
Bibb_Alabama_UnitedStates,4.0,25.0,35.0,22394.0,"{'value': {'id': 'Bibb_Alabama_UnitedStates'},...",{'id': '01007'},Bibb_Alabama_UnitedStates,Bibb,3997704.0,EP_LOC
Blount_Alabama_UnitedStates,6.0,25.0,25.0,57826.0,{'value': {'id': 'Blount_Alabama_UnitedStates'...,{'id': '01009'},Blount_Alabama_UnitedStates,Blount,3801096.0,EP_LOC
...,...,...,...,...,...,...,...,...,...,...
Unassigned_Wisconsin_UnitedStates,,,,,{'value': {'id': 'Unassigned_Wisconsin_UnitedS...,,Unassigned_Wisconsin_UnitedStates,Unassigned,3080195.0,EP_LOC
Unassigned_Wyoming_UnitedStates,,,,,{'value': {'id': 'Unassigned_Wyoming_UnitedSta...,,Unassigned_Wyoming_UnitedStates,Unassigned,3080201.0,EP_LOC
Unassigned_Guam_UnitedStates,,,,,,,Unassigned_Guam_UnitedStates,Unknown,1.0,EP_LOC
Unassigned_NorthernMarianaIslands_UnitedStates,,,,,{'value': {'id': 'Unassigned_NorthernMarianaIs...,,Unassigned_NorthernMarianaIslands_UnitedStates,Unknown,458755.0,EP_LOC


### Get the County Stats from the Census Bureau

### New source of county date
#### co-est2019-annres.xlsx
#### from US Census Bureau at https://www.census.gov/data/datasets/time-series/demo/popest/2010s-counties-total.html

In [7]:
def get_county_stats_df(file_name='./config/county_stats.csv'):
    """ Get county land area (LND110210) by FIPS code """
    return pd.read_csv(path.join('.',file_name))[['fips','LND110210']]

get_county_stats_df()

# Equivalent data https://www.kaggle.com/benhamner/2016-us-election

Unnamed: 0,fips,LND110210
0,0,3531905.43
1,1000,50645.33
2,1001,594.44
3,1003,1589.78
4,1005,884.88
...,...,...
3190,56037,10426.65
3191,56039,3995.38
3192,56041,2081.26
3193,56043,2238.55


### Functions to Download the Evalmetrics Data

In [8]:
def get_last_file_date(county: str) -> str:
    """ Retrieves the last date through which data was loaded for the specified county
    
    Args:
            county: The county requested
            
    Returns:
            A string representing the last date processed.  If the county has never been processed, a
            default date of 2020-01-01 is returned
            
    """
    
    max_date = '2020-01-01'
    files = glob(path.join('.', 'data', f'{county}*.psv'))
    if not files:
        return max_date
    for file in files:
        match = re.search(r'(\d\d\d\d-\d\d-\d\d).psv', file)
        if match:
            max_date = max(max_date, match.group(1))
    return max_date
            
    
def download_evalmetrics_data():
    """ Downloads the evalmetrics data from the last download 
    through current """
    
    today = datetime.now().strftime('%Y-%m-%d')

    # Get the list of counties
    counties_file = path.join('.', 'config', 'counties.json')
    with open(counties_file) as file:
        counties = json.load(file)

    # Iterate through counties saving the time series data
    for counter, (county, details) in enumerate(counties.items()):
        print('.', end='')
        if counter and not counter % 120:
            print()

        # Skip if missing county details
        if not details:
            continue

        # Get the last date we processed
        last_date = get_last_file_date(county)

        if last_date == today:
            continue
            
        expressions = [    "JHU_ConfirmedCases", 
                           "JHU_ConfirmedDeaths", 
                           "JHU_ConfirmedRecoveries",
                           "NYT_ConfirmedCases",
                           "NYT_ConfirmedDeaths",
                           "NYT_AllCausesDeathsWeekly_Deaths_AllCauses",
                           "NYT_AllCausesDeathsWeekly_Excess_Deaths",
                           "NYT_AllCausesDeathsWeekly_Expected_Deaths_AllCauses",
                           "NYT_AllCausesDeathsMonthly_Deaths_AllCauses",
                           "NYT_AllCausesDeathsMonthly_Excess_Deaths",
                           "NYT_AllCausesDeathsMonthly_Expected_Deaths_AllCauses",
                           # "CDS_Active",
                           # "CDS_Cases",
                           # "CDS_Deaths",
                           # "CDS_Discharged",
                           # "CDS_GrowthFactor",
                           # "CDS_Hospitalized",
                           # "CDS_Hospitalized_Current",
                           # "CDS_ICU",
                           # "CDS_ICU_Current",
                           # "CDS_Recovered",
                           # "CDS_Tested",
                           "TotalPopulation",
                           "Male_Total_Population",
                           "Female_Total_Population",
                           "MaleAndFemale_Under18_Population",
                           "MaleAndFemale_AtLeast65_Population",
                           "BLS_LaborForcePopulation",
                           "BLS_EmployedPopulation",
                           "BLS_UnemployedPopulation",
                           "BLS_UnemploymentRate",
                           "AverageDailyTemperature",
                           "AverageDewPoint",
                           "AverageRelativeHumidity",
                           "AverageSurfaceAirPressure",
                           "AveragePrecipitation",
                           "AverageWindSpeed",
                           "AverageWindDirection",
                           "AveragePrecipitationTotal",]
                                     
        # Get the data for the county from the last date processed
        for i in range(math.ceil(len(expressions)//4)): 
            body = {"spec" : {
                                "ids" : [county],
                                "expressions": expressions[i*4:(i+1)*4] , 
                                "start" : last_date,
                                "end" : today,
                                "interval" : "DAY",
                            }
                    }

            try:
                df = evalmetrics("outbreaklocation", body)
                file_name = path.join('.', 'data', f'{county}-part-{i}-{last_date}-{today}.psv')
                df.to_csv(file_name, sep='|')
            except Exception as e:
                print(f'Error processing {county}: {e}')
        
            sleep(1)
  
# download_evalmetrics_data()           

### Load all the downloaded data and produce raw DataFrame (OLD - not used)

In [9]:
# def append_to_dataframe(files: list) -> pd.DataFrame:
#     """ Appends all files passed as an argument into a single dataframe
    
#     Args:
#         files: list of file names
        
#     Returns:
#         A data frame with all the data from the files populated
    
#     """
#     name_pattern = re.compile('([\w_]+)')
#     county_df = None

#     for file in files:
#         base: str = path.basename(file)
#         if base.startswith('Unassigned') or base.startswith('Outof') or base.startswith('.'):
#             continue
#         try:
#             county = name_pattern.match(base).group(1)
#             records = []
#             with open(file) as fd:
#                 reader = csv.reader(fd, delimiter='|')
#                 # advance past the header
#                 next(iter(reader))
#                 records = [[county, row[1], row[2], row[4], row[6]] for row in reader]
            
#             temp_df = pd.DataFrame(records, columns=['county', 'date', 
#                                                      'confirmed_cases_running', 
#                                                      'confirmed_deaths_running', 
#                                                      'confirmed_recoveries_running'])
            
#             if float(temp_df.confirmed_deaths_running[-1:].values) < 2:
#                 continue
            
#             # turn running amounts into daily amounts
#             temp_df['prev_day_cases'] = ['0.0'] + list(temp_df.confirmed_cases_running[:-1])
#             temp_df['confirmed_cases'] = (temp_df.confirmed_cases_running.astype('float') - 
#                                           temp_df.prev_day_cases.astype('float'))
            
#             temp_df['prev_day_deaths'] = ['0.0'] + list(temp_df.confirmed_deaths_running[:-1])
#             temp_df['confirmed_deaths'] = (temp_df.confirmed_deaths_running.astype('float') - 
#                                            temp_df.prev_day_deaths.astype('float'))
            
#             temp_df['prev_day_recoveries'] = ['0.0'] + list(temp_df.confirmed_recoveries_running[:-1])
#             temp_df['confirmed_recoveries'] = (temp_df.confirmed_recoveries_running.astype('float') - 
#                                                temp_df.prev_day_recoveries.astype('float'))
            
            
#             temp_df = temp_df.drop(columns=['confirmed_cases_running', 'prev_day_cases',
#                                             'confirmed_deaths_running', 'prev_day_deaths',
#                                             'confirmed_recoveries_running', 
#                                             'prev_day_recoveries',])
#             if county_df is None:
#                 county_df = temp_df
#             else:
#                 county_df = pd.concat([county_df, temp_df], ignore_index=True)
#         except Exception as e:
#             print(f'Failure on file {base} with error {e}')
        
#     return county_df

# def join_evalmetrics_to_county(evalmetrics_df, counties_df, county_stats_df):
#     df = evalmetrics_df.merge(counties_df, how='left', left_on='county', right_index=True, sort=True)
#     # df.fips = [int(fips['id']) for fips in df.fips]
#     fips = []
#     for f in df.fips:
#         if isinstance(f, dict):
#             fips.append(int(f['id']))
#         else:
#             fips.append(f)
#     df.fips = fips
#     df = df.merge(county_stats_df, how='left', left_on='fips', right_on='fips')
#     df = df.drop(columns=['location', 'id', 'name', 'version', 'typeIdent'])
    
#     return df.sort_values(by=['county', 'date'])



### Save the raw dataframe to a file

In [10]:
def save_raw_df(df: pd.DataFrame):
    df.to_pickle(path.join('.', 'raw_evalmetrics_df.pkl'))

### Runners
#### The following runners can take a significant amount of time to process

##### Download Evalmetrics Data

In [13]:
download_evalmetrics_data()

........................Error processing Dallas_Alabama_UnitedStates: ('Connection aborted.', TimeoutError(10060, 'A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond', None, 10060, None))
.................................................................................................
........................................................................................................................
........................................................................................................................
........................................................................................................................
........................................................................................................................
..................................................................................................

#### Convert raw files to dataframes

In [None]:
def load_and_normalize_df(filename):
    df = pd.read_csv(path.join('.', 'data', filename), delimiter='|', index_col=1)
    df.drop(columns=['Unnamed: 0'], inplace=True)
    columns = {c: '.'.join(c.split('.')[-2:]) for c in df.columns}
    df.rename(columns=columns, inplace=True)
    return df

def merge_county_parts_to_dataframe(filenames):
    df = load_and_normalize_df(filenames[0])
    for filename in filenames[1:]:
        df2 = load_and_normalize_df(filename)
        df = df.join(df2)
    return df


def save_county_merged_parts_df(county, df):
    df.to_pickle(path.join('.', 'processed_data', 'county_merged_parts', 
                           f'{county}.pkl'))
    

def get_counties_from_files():
    return list({f.split('-part')[0] for f in os.listdir(path.join('.', 'data'))})


def get_dates_for_county(county):
    files = glob(f'./data/{county}*' )
    dates = {re.findall('\d\d\d\d-\d\d-\d\d-\d\d\d\d-\d\d-\d\d', f)[0] for f in files}
    return list(dates)
    
def get_county_files_for_date(county, dt):
    files = glob(f'./data/{county}-part-*-{dt}.psv')
    return [f[f.index(county):]  for f in files]
                    
def process_county(county, county_population_stats):
    df = None
    dates = get_dates_for_county(county)
    for dt in dates:
        files = get_county_files_for_date(county, dt)
        if df:
            df.append(merge_county_parts_to_dataframe(files))
        else:
            df = merge_county_parts_to_dataframe(files)
    
    for k,v in county_population_stats.iteritems():
        df[k] = v
    save_county_merged_parts_df(county, df)
    return df


def get_county_population_stats():
    county_population = get_counties_df()  
    county_stats = get_county_stats_df()
    fips = []
    for county, population in county_population.iterrows():
        if isinstance(population.fips, dict):
            fips.append(int(population.fips['id']))
        else:
            fips.append(population.fips)
    county_population.fips = fips
    county_population = county_population.merge(county_stats, how='left', left_on='fips', right_on='fips')
    county_population.set_index('id', inplace=True)
    county_population = county_population.drop(columns=['location', 'name', 'version', 'typeIdent'])
    return county_population

    
def process_counties():
    counties = get_counties_from_files()
    county_population_stats = get_county_population_stats()  
    
    for county in counties:
        # print(county)
        process_county(county, county_population_stats.loc[county])

        
# get_county_population_stats()
process_counties()        





### Collect Demographics

In [14]:
def make_outbreaklocation_body(county_id: str) -> dict:
    """ Forms the request body for a count for the outbreak location API 
    
    Args:
        count_id: the ID for the County
    
    Returns:
        The request body
    
    """
    return {
              "spec": {
                "filter": f"id == '{county_id}'"
              }
}

fetch_one('outbreaklocation', make_outbreaklocation_body('Autauga_Alabama_UnitedStates'))


[{'hospitalIcuBeds': 6,
  'hospitalStaffedBeds': 55,
  'hospitalLicensedBeds': 85,
  'latestTotalPopulation': 55869.0,
  'populationOfAllChildren': 55869.0,
  'latestLaborForce': 25541,
  'latestEmployedPopulation': 24953,
  'latestUnemployedPopulation': 588,
  'latestUnemploymentRate': 2.302180807329392,
  'laborForceOfAllChildren': 25541,
  'locationType': 'county',
  'populationCDS': 55869,
  'location': {'value': {'id': 'Autauga_Alabama_UnitedStates'},
   'timestamp': '2021-08-04T00:00:00Z'},
  'fips': {'id': '01001'},
  'id': 'Autauga_Alabama_UnitedStates',
  'name': 'Autauga',
  'version': 38863297,
  'typeIdent': 'EP_LOC'}]