In [16]:
import numpy as np
import pandas as pd
import math
from datetime import datetime

import fblearner.flow.api as flow
from pvc import Dataset, query, preview, load, upload
from pvc.operators.dataset.hivedataset import HiveDataset
from fblearner.flow.api import types


In [15]:
NAMESPACE = 'instagram'
VALIDATION_RESULT_HIVE_TABLE = 'fips_prediction_validation_result'
PREDICTION_RESULT_HIVE_TABLE = 'fips_prediction_prediction_result'
PERDICTION_PERCENTILES_HIVE_TABLE = 'fips_prediction_prediction_percentiles'
NUMERICAL_FEATURE_AGGREGATION_TABLE = 'fips_prediction_numerical_feature_agg'
CATEGORICAL_FEATURE_AGGREGATION_TABLE = 'fips_prediction_categorical_feature_agg'

In [2]:
df_jhu = pd.read_csv(f"data/us/aggregate_jhu.csv")

# Get rid of the aggregate country data
df_jhu = df_jhu.drop([0])
df_jhu['FIPS'] = df_jhu['FIPS'].map(lambda f : str(f))

def alter(fips):
    if len(fips) == 4:
        return '0' + fips
    return fips
df_jhu['FIPS'] = df_jhu['FIPS'].map(alter)
df_jhu = df_jhu.set_index('FIPS')
df_jhu['fips'] = df_jhu.index.map(lambda s : int(s))

In [3]:
features = ['POP_ESTIMATE_2018', 'Area in square miles - Land area', 'Density per square mile of land area - Population', 'Total_Male', 'Total_Female', 'Total_age0to17', 
            'Total_age18to64','Total_age65plus', 'Active Physicians per 100000 Population 2018 (AAMC)', 'Active General Surgeons per 100000 Population 2018 (AAMC)',
           'Non-profit hospital beds per 1000 people (2019)', 'Employed_2018', 'Unemployment_rate_2018'
           , 'Total hospital beds per 1000 people (2019)', 'Total nurse practitioners (2019)',
           'Total Hospitals (2019)','fips']

In [4]:
df = df_jhu[features]
df.iloc[2]

POP_ESTIMATE_2018                                            218022.000000
Area in square miles - Land area                               1589.780000
Density per square mile of land area - Population               114.600000
Total_Male                                                   105657.000000
Total_Female                                                 112365.000000
Total_age0to17                                                47110.000000
Total_age18to64                                              126341.000000
Total_age65plus                                               44571.000000
Active Physicians per 100000 Population 2018 (AAMC)             217.100000
Active General Surgeons per 100000 Population 2018 (AAMC)         7.600000
Non-profit hospital beds per 1000 people (2019)                   0.800000
Employed_2018                                                 90456.000000
Unemployment_rate_2018                                            3.600000
Total hospital beds per 1

In [5]:
df = df[df.fips % 1000 != 0] # remove aggregate states
# df = df[df.State != 'PR']   # peurto rico has some weird data...
df = df[df.POP_ESTIMATE_2018 > 1000] # restrict to large counties since getting lots of data is difficult

# fill out missing data
df.at['02158', 'Area in square miles - Land area'] = 19673
df.at['02158', 'Density per square mile of land area - Population'] = 0.44
df.at['46102', 'Area in square miles - Land area'] = 2097
df.at['46102', 'Density per square mile of land area - Population'] = 6.5

In [17]:
features_df = pd.melt(df, id_vars=['fips'], value_vars=features[:-1], \
                 var_name='feature_name', value_name='feature_value')

In [11]:
def gbdt_train(ds, model_config):
    partition_dict = {'ds': ds}
    if model_config.training_set_partition is not None:
        partition_dict.update(model_config.training_set_partition)
    train_gbdt_input = {
        'feature_ds':
            ds - timedelta(days=model_config.prediction_timespan),
        'label_data': pcvdataset()(Dataset)(
            table=model_config.training_set_table,
            partition=partition_dict),
        'training_validation_split':
            model_config.get('training_validation_split'),
        'training_negative_sampling_rate':
            model_config.get('training_negative_sampling_rate'),
    }
    train_gbdt_input = dict(
        (k, v) for k, v in train_gbdt_input.items() if v is not None)
    workflow_metadata = WorkflowRunMetadataMutation(
        name='GBDT Training, {0}'.format(
            model_config.model_name))
    workflow_output = RunWorkflowOperator(
        TrainGbdtModelWorkflow,
        train_gbdt_input,
        metadata=workflow_metadata,
        memoize=True,
        version="v0")
    return workflow_output

In [13]:
def gbdt_predict(ds, model_config, training_output):
    partition_dict = {'ds': ds}
    if model_config.predicting_set_partition is not None:
        partition_dict.update(model_config.predicting_set_partition)
    predict_gbdt_input = {
        'feature_ds': ds,
        'user_set': pcvdataset()(Dataset)(
                table=model_config.predicting_set_table,
                partition=partition_dict),
        'gbdt_model': training_output.model.path,
    }
    workflow_metadata = WorkflowRunMetadataMutation(
        name='GBDT Training, {0}'.format(
            model_config.model_name))
    workflow_output = RunWorkflowOperator(
        PredictWithGbdtModelWorkflow,
        predict_gbdt_input,
        metadata=workflow_metadata,
        memoize=True,
        version="v0")
    return workflow_output

In [14]:
def gbdt_validate(ds, model_config, training_output):
    predict_gbdt_input = {
        'feature_ds': ds - timedelta(days=model_config.prediction_timespan),
        'user_set': training_output.validation_set,
        'gbdt_model': training_output.model.path,
    }
    workflow_metadata = WorkflowRunMetadataMutation(
        name='GBDT Evaluating, {0}'.format(
            model_config.model_name))
    workflow_output = RunWorkflowOperator(
        PredictWithGbdtModelWorkflow,
        predict_gbdt_input,
        metadata=workflow_metadata,
        memoize=True,
        version="v0")
    return workflow_output

In [102]:
@flow.flow_async()
@flow.typed()
def upload_features_df_to_hive(
    features_df: pd.DataFrame,
) -> HiveDataset:
    dataset = upload(
    schema=types.STRUCT(
        ('number', types.PARTITION),
        ('fips', types.INT),
        ('feature_name', types.TEXT),
        ('feature_value', types.FLOAT),
    ),
    dataframe=features_df,
    lines_delimiter=r"\n",
    fields_delimiter=r"\t",
    collection_items_delimiter=",",
    map_keys_delimiter=":",
    namespace='instagram',
#     tablename='fips_feature_names_and_values',
    partition=dict(number=1),
    retention=180,
)

In [None]:
# upload_features_df_to_hive(features_df) # partition into fips so notebook doesn't crash

In [18]:
nyt_us_counties_df = pd.read_csv('data/us/covid/nyt_us_counties.csv', sep=',', header=None)

In [19]:
# gets list of all fips numbers
def get_fips():
    Y = pd.read_csv(f"data/us/covid/deaths.csv")
    return set(Y.countyFIPS.values)

def get_date(datestr, formatstr='%Y-%m-%d'):
    return datetime.strptime(datestr, formatstr)

In [44]:
class CumDeathCounter():
    def __init__(self):
        self.cum_deaths = pd.read_csv(f"data/us/covid/deaths.csv")
        self.cum_deaths = self.cum_deaths.iloc[1:]
        fips_list = self.cum_deaths.countyFIPS.values
        
        self.cache = {}
        for fips in fips_list:
            self.cache[fips] = self.get_cum_deaths(fips)
            
    def get_cum_deaths(self, fips, clip_zeros=False):
        idx = self.cum_deaths.index[self.cum_deaths['countyFIPS'] == fips].values[0]
        county_deaths = self.cum_deaths.loc[self.cum_deaths['countyFIPS'] == fips]
        dates = pd.to_datetime(county_deaths.columns[4:].values).map(lambda dt : str(dt))
#         X = np.array([(get_date(d[:10]) - get_date('2020-01-01')).days for d in dates])
        X = np.array([get_date(d[:10]).date().isoformat() for d in dates])
        y = []
        for i in range(4, len(county_deaths.columns)):
            y.append(county_deaths.loc[idx,county_deaths.columns[i]])
        if not clip_zeros:
            return X, y
        for i in range(len(y)):
            if y[i] != 0:
                return X[i:], y[i:]
            
    def getY(self, fips):
        return self.cache[fips]
    
    def getCache(self):
        return self.cache
    
class CumCaseCounter():
    def __init__(self):
        self.cum_cases = pd.read_csv(f"data/us/covid/confirmed_cases.csv")
        self.cum_cases = self.cum_cases.iloc[1:]
        self.cum_cases = self.cum_cases.iloc[:, :-1]
        
        fips_list = self.cum_cases.countyFIPS.values
        
        self.cache = {}
        for fips in fips_list:
            self.cache[fips] = self.get_cum_cases(fips)
        
    def get_cum_cases(self, fips,clip_zeros=False):
        idx = self.cum_cases.index[self.cum_cases['countyFIPS'] == fips].values[0]
        county_cases = self.cum_cases.loc[self.cum_cases['countyFIPS'] == fips]
        dates = pd.to_datetime(county_cases.columns[4:].values).map(lambda dt : str(dt))
#         X = np.array([(get_date(d[:10]) - get_date('2020-01-01')).days for d in dates])
        X = np.array([get_date(d[:10]).date().isoformat() for d in dates])
        y = []
        for i in range(4, len(county_cases.columns)):
            y.append(county_cases.loc[idx,county_cases.columns[i]])
        if not clip_zeros:
            return X, y
        for i in range(len(y)):
            if y[i] != 0:
                return X[i:], y[i:]
            
    def getY(self, fips):
        return self.cache[fips]
    
    def getCache(self):
        return self.cache

In [45]:
cum_death_counter = CumDeathCounter()
cum_case_counter = CumCaseCounter()

In [54]:
cum_deaths = cum_death_counter.getCache()

In [55]:
cum_deaths_df = pd.DataFrame(cum_deaths)

In [84]:
cum_deaths_df_transposed = cum_deaths_df.T.reset_index()

In [85]:
print(cum_deaths_df_transposed)

      index                                                  0  \
0      1001  [2020-01-22, 2020-01-23, 2020-01-24, 2020-01-2...   
1      1003  [2020-01-22, 2020-01-23, 2020-01-24, 2020-01-2...   
2      1005  [2020-01-22, 2020-01-23, 2020-01-24, 2020-01-2...   
3      1007  [2020-01-22, 2020-01-23, 2020-01-24, 2020-01-2...   
4      1009  [2020-01-22, 2020-01-23, 2020-01-24, 2020-01-2...   
...     ...                                                ...   
3141  56037  [2020-01-22, 2020-01-23, 2020-01-24, 2020-01-2...   
3142  56039  [2020-01-22, 2020-01-23, 2020-01-24, 2020-01-2...   
3143  56041  [2020-01-22, 2020-01-23, 2020-01-24, 2020-01-2...   
3144  56043  [2020-01-22, 2020-01-23, 2020-01-24, 2020-01-2...   
3145  56045  [2020-01-22, 2020-01-23, 2020-01-24, 2020-01-2...   

                                                      1  
0     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...  
1     [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...  
2     [0, 0, 0, 0, 0, 0, 0, 0, 0,

In [94]:
cum_deaths_df_transposed = cum_deaths_df_transposed.rename(columns={'index':'fips', 0:'ds', 1:'label'})

In [83]:
def explode(df, lst_cols, fill_value='', preserve_index=False):
    # make sure `lst_cols` is list-alike
    if (lst_cols is not None
        and len(lst_cols) > 0
        and not isinstance(lst_cols, (list, tuple, np.ndarray, pd.Series))):
        lst_cols = [lst_cols]
    # all columns except `lst_cols`
    idx_cols = df.columns.difference(lst_cols)
    # calculate lengths of lists
    lens = df[lst_cols[0]].str.len()
    # preserve original index values    
    idx = np.repeat(df.index.values, lens)
    # create "exploded" DF
    res = (pd.DataFrame({
                col:np.repeat(df[col].values, lens)
                for col in idx_cols},
                index=idx)
             .assign(**{col:np.concatenate(df.loc[lens>0, col].values)
                            for col in lst_cols}))
    # append those rows that have empty lists
    if (lens == 0).any():
        # at least one list in cells is empty
        res = (res.append(df.loc[lens==0, idx_cols], sort=False)
                  .fillna(fill_value))
    # revert the original index order
    res = res.sort_index()
    # reset index if requested
    if not preserve_index:        
        res = res.reset_index(drop=True)
    return res

In [98]:
death_labels_df = explode(cum_deaths_df_transposed, ['ds', 'label'], fill_value='')

In [99]:
death_labels_df

Unnamed: 0,fips,ds,label
0,1001,2020-01-22,0
1,1001,2020-01-23,0
2,1001,2020-01-24,0
3,1001,2020-01-25,0
4,1001,2020-01-26,0
...,...,...,...
279989,56045,2020-04-15,0
279990,56045,2020-04-16,0
279991,56045,2020-04-17,0
279992,56045,2020-04-18,0


In [106]:
# @flow.flow_async()
# @flow.typed()
def upload_labels_df_to_hive(
    labels_df: pd.DataFrame,
    ds: str,
) -> HiveDataset:
    dataset = upload(
    schema=types.STRUCT(
        ('ds', types.PARTITION),
        ('number', types.PARTITION),
        ('fips', types.INT),
#         ('features', types.TEXT),
        ('label', types.FLOAT),
    ),
    dataframe=labels_df,
    lines_delimiter=r"\n",
    fields_delimiter=r"\t",
    collection_items_delimiter=",",
    map_keys_delimiter=":",
    namespace='instagram',
#     tablename='fips_features_labels_training_data',
    partition=dict(ds=ds, number=1),
    retention=180,
)

In [107]:
upload_labels_df_to_hive(death_labels_df, '2020-04-21')