# Elastic Search Update Notebook

This notebook is used to update the elastic search index with the latest datasets

In [None]:
!pip install xgboost
!pip install --upgrade --force-reinstall git+https://github.com/rbilleci/pandora.git
!pip install requests-aws4auth
!pip install elasticsearch

In [None]:
import boto3
import pandas as pd
import pandora
import pandora.data.age_distribution as age_dist
import pandora.data.oxford_data as oxford
import pandora.data.population as population
import pandora.data.temperatures as temperatures
import shutil
import os
import uuid
import numpy as np
from sklearn.preprocessing import RobustScaler
from pandora.data import geo, continent, country_code, working_day
from pandora import loader, encoders
from pandora.core_fields import DATE, COUNTRY_CODE
from sagemaker import get_execution_role, session
import datetime
from pathlib import Path
from elasticsearch import helpers
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection
from logging import INFO, basicConfig, info
import warnings

In [None]:
# setup logging
basicConfig(level=INFO, format='%(asctime)s\t%(levelname)s\t%(filename)s\t%(message)s')
warnings.filterwarnings('ignore', category=FutureWarning)  # ignore FutureWarning from scikit learn

In [None]:
role = get_execution_role()
region = boto3.Session().region_name
bucket =  session.Session(boto3.Session()).default_bucket()

In [None]:
pd.options.display.max_columns = None
pd.options.display.max_rows = None
pd.options.display.max_info_columns = 1000

# Download the data files

In [None]:
import pandora.data.oxford_data_update

# Generate Dataset

In [None]:

# determine the last date we have from the oxford data set
# which is the first day we'll begin prediction from
prediction_start_date = pd.read_csv(oxford.module.location, keep_default_na=False, na_values='')['date'].max()
prediction_start_date = datetime.datetime.strptime(prediction_start_date, '%Y-%m-%d').date()

# the data range should cover the ground truth data + the time window we are predicting into
days_to_predict = 90
start_date = datetime.date(2020, 1, 1)
end_date = prediction_start_date + datetime.timedelta(days=days_to_predict)

# the imputation window might extend far before or after
# the range we are predicting, this is so we have more data samples for imputation calculations
imputation_window_start_date =  datetime.date(2020, 1, 1)
imputation_window_end_date =  datetime.date(2021, 12, 31)

# load the data
df = loader.load(start_date,
                 end_date,
                 imputation_window_start_date,
                 imputation_window_end_date,
                 geo.module,
                 [
                     country_code.module,
                     continent.module,
                     population.module,
                     age_dist.module,
                     temperatures.module,
                     oxford.module,
                     working_day.module
                 ])

# Add computed column

In [None]:
def compute_ma(field, window_size):
    df[f"{field}_ma_{window_size}"] = df.groupby('geo_code')[field].rolling(window_size, center=False).mean().fillna(0).reset_index(0, drop=True)

def add_working_day_tomorrow(grouped):
    grouped['working_day' + '_tomorrow'] = grouped['working_day'].copy().shift(-1).bfill().ffill()
    return grouped


def add_working_day_yesterday(grouped):
    grouped['working_day' + '_yesterday'] = grouped['working_day'].copy().shift(1).bfill().ffill()
    return grouped

def transform_column_order(df):
    df = df.reindex(sorted(df.columns), axis=1)  # Sort columns by name
    df_label = df['predicted_new_cases']
    df = df.drop(labels=['predicted_new_cases'], axis=1)
    df.insert(0, 'predicted_new_cases', df_label)
    return df

# Compute number of new cases and deaths each day
# Replace negative values (which do not make sense for these columns) with 0
df['new_cases'] = df.groupby('geo_code').confirmed_cases.diff().fillna(0)
df['new_cases'] = df['new_cases'].clip(lower=0)

# add predicted new cases
df['predicted_new_cases'] = df.groupby('geo_code').new_cases.shift(-1).fillna(0)
df['predicted_new_cases'] = df['predicted_new_cases'].clip(lower=0)

# add confirmed cases as percent of population
df['new_cases_as_percent_of_population'] = df['new_cases'] / df['population']
df['confirmed_cases_as_percent_of_population'] = df['confirmed_cases'] / df['population']

# Add moving averages
for window_size in [3, 7, 21]:
    compute_ma('new_cases', window_size)
    compute_ma('confirmed_cases', window_size)
    compute_ma('specific_humidity', window_size)    
    compute_ma('temperature', window_size)        
    compute_ma('c1_school_closing', window_size)        
    compute_ma('c2_workplace_closing', window_size)        
    compute_ma('c3_cancel_public_events', window_size)        
    compute_ma('c4_restrictions_on_gatherings', window_size)   
    compute_ma('c5_close_public_transport', window_size)        
    compute_ma('c6_stay_at_home_requirements', window_size)        
    compute_ma('c7_restrictions_on_internal_movement', window_size)        
    compute_ma('c8_international_travel_controls', window_size)   
    compute_ma('h1_public_information_campaigns', window_size)        
    compute_ma('h2_testing_policy', window_size)        
    compute_ma('h3_contact_tracing', window_size)        
    compute_ma('h6_facial_coverings', window_size)     
    compute_ma('working_day', window_size)        
    compute_ma('new_cases_as_percent_of_population', window_size)     
    compute_ma('confirmed_cases_as_percent_of_population', window_size)     

# Add working day information for tomorrow, and today
df = df.groupby('geo_code').apply(lambda group: add_working_day_tomorrow(group)).reset_index(drop=True)
df = df.groupby('geo_code').apply(lambda group: add_working_day_yesterday(group)).reset_index(drop=True)
df['npi_sum'] = df['c1_school_closing'] + df['c2_workplace_closing'] + \
                df['c3_cancel_public_events'] + df['c4_restrictions_on_gatherings'] + \
                df['c5_close_public_transport'] + df['c6_stay_at_home_requirements'] + \
                df['c7_restrictions_on_internal_movement'] + df['c8_international_travel_controls'] + \
                df['h1_public_information_campaigns'] + df['h2_testing_policy'] + \
                df['h3_contact_tracing'] + df['h6_facial_coverings']

# Drop unused columns
df = transform_column_order(df)
df = df.sort_values(['geo_code', 'date'])

# Prepare dataset for machine learning

In [None]:
# only work within the specified range
df_ml = df.loc[df['date'] < pd.to_datetime(prediction_start_date)]
df_ml = df_ml.drop(labels=['country_name',
                        'country_code3',
                        'country_code_numeric',
                        'confirmed_deaths',
                        'region_name',
                        'month',
                        'quarter',
                        'week'], axis=1)

def encode(df_x):
    # convert the date to an integer value
    df_x['date_day'] = df_x['date'].apply(lambda x: x.day)

    # encode the geo data
    df_x = encoders.BinaryEncoder('continent').fit_transform(df_x).drop(labels=['continent'], axis=1)
    df_x = encoders.BinaryEncoder('geo_code').fit_transform(df_x).drop(labels=['geo_code'], axis=1)
    df_x = encoders.BinaryEncoder('country_code').fit_transform(df_x).drop(labels=['country_code'], axis=1)

    # dy of week
    df_x = encoders.OneHotEncoder('day_of_week').fit_transform(df_x)
    df_x['day_of_week'] = df_x['day_of_week'] / 7.0
    df_x = encoders.CyclicalEncoder('day_of_week').fit_transform(df_x).drop(labels=['day_of_week'], axis=1)

    # day of month
    df_x['day_of_month'] = df_x['day_of_month'] / 31.0 # keep it simple
    df_x = encoders.BinaryEncoder('day_of_month').fit_transform(df_x)
    df_x = encoders.CyclicalEncoder('day_of_month').fit_transform(df_x).drop(labels=['day_of_month'], axis=1)

    # day of year
    df_x['day_of_year'] = df_x['day_of_year'] / 366.0 # keep it simple
    df_x = encoders.BinaryEncoder('day_of_year').fit_transform(df_x)
    df_x = encoders.CyclicalEncoder('day_of_year').fit_transform(df_x).drop(labels=['day_of_year'], axis=1)
    return df_x

df_ml = encode(df_ml)

# Get the train, val, test split



In [None]:
days_for_validation = 0
days_for_test = 14

def split(df: pd.DataFrame, 
          days_for_validation: int, 
          days_for_test: int) -> (pd.DataFrame, pd.DataFrame, pd.DataFrame):
    
    # First, sort the data by date
    df = df.sort_values('date')

    # Determine the maximum date
    date_start_test = df[DATE].max() - pd.to_timedelta(days_for_test - 1, unit='d')
    date_start_validation = date_start_test - pd.to_timedelta(days_for_validation, unit='d')

    df_train = df[df['date'] < date_start_validation]
    df_validation = df[(df['date'] >= date_start_validation) & (df['date'] < date_start_test)]
    df_test = df[df['date'] >= date_start_test]

    # Debug the outpoint
    print(f"Training Range:   {df_train['date'].min().date()} - {df_train['date'].max().date()}")
    print(f"Validation Range: {df_validation['date'].min().date()} - {df_validation['date'].max().date()}")
    print(f"Test Range:       {df_test['date'].min().date()} - {df_test['date'].max().date()}")

    # Sanity Check
    if len(df.index) != len(df_train.index) + len(df_validation.index) + len(df_test.index):
        raise Exception('entries do not add up')

    return df_train, df_validation, df_test

df_train_prescaled, df_validation_prescaled, df_test_prescaled = split(df_ml, days_for_validation, days_for_test)

# Scale

In [None]:


df_train = df_train_prescaled.copy()
df_validation = df_validation_prescaled.copy()
df_test = df_test_prescaled.copy()

scalers = {}


for feature_name in df_ml.columns.values:
    if feature_name == 'date' or feature_name == 'predicted_new_cases':
        continue
    scalers[feature_name] = RobustScaler()
    df_train[feature_name] = scalers[feature_name].fit_transform(df_train_prescaled[[feature_name]])
        
if len(df_validation_prescaled) > 0:        
    for feature_name in df_ml.columns.values:
        if feature_name == 'date' or feature_name == 'predicted_new_cases':
            continue
        df_validation[feature_name] = scalers[feature_name].transform(df_validation_prescaled[[feature_name]])

if len(df_test_prescaled) > 0:        
    for feature_name in df_ml.columns.values:
        if feature_name == 'date' or feature_name == 'predicted_new_cases':
            continue        
        df_test[feature_name] = scalers[feature_name].transform(df_test_prescaled[[feature_name]])

df_train = df_train.drop(labels=['date'], axis=1)
df_validation = df_validation.drop(labels=['date'], axis=1)
df_test = df_test.drop(labels=['date'], axis=1)


# HPO Baby!

In [15]:

# this is a manual step to determine the best parameters...

# note that HPO does not have a header column
"""
df_train.to_csv(f"hpo_train.csv", index=False, header=False)
df_validation.to_csv(f"hpo_validation.csv", index=False, header=False)
df_test.to_csv(f"hpo_test.csv", index=False, header=False)

# push to s3
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join('hpo', 'train')).upload_file('hpo_train.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join('hpo', 'validation')).upload_file('hpo_validation.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join('hpo', 'test')).upload_file('hpo_test.csv')
"""

# Let's Train

In [None]:
import xgboost as xgb
import numpy as np
from sklearn.metrics import mean_squared_error


train_x, train_y, validation_x, validation_y, test_x, test_y = (
    df_train.iloc[:, 1:], df_train.iloc[:, :1],
    df_validation.iloc[:, 1:], df_validation.iloc[:, :1],
    df_test.iloc[:, 1:], df_test.iloc[:, :1])


data_dmatrix = xgb.DMatrix(data=train_x,label=train_y)



params = {
    'alpha': 0.734,
    #"booster": "gbtree",
    'colsample_bytree': 0.9247351903173018,
    "colsample_bylevel": 0.11338785332863967,
    "eta": 0.2857594277665818,
    "gamma": 0.7840414267431137,
    #'learning_rate': 0.1,
    "max_delta_step": 0,
    'max_depth': 43, 
    "max_leaves": 0,
    "min_child_weight": 0.015174585370894897,
    "objective": "reg:squarederror",
    "subsample": 0.5982549859025215
}


params_baseline = {
    'max_depth':6,
    'min_child_weight': 1,
    'eta':.3,
    'subsample': 1,
    'colsample_bytree': 1,
    'objective':'reg:squarederror'
}

cv_results = xgb.cv(dtrain=data_dmatrix, 
                    params=params, 
                    nfold=10,
                    num_boost_round=20,
                    early_stopping_rounds=10,
                    metrics="rmse", 
                    as_pandas=True,
                    verbose_eval=True,
                    seed=123)
print(cv_results.tail(100))
print((cv_results["test-rmse-mean"]).tail(1))




[0]	train-rmse:7030.52534+146.13767	test-rmse:7017.84634+1264.82343
[1]	train-rmse:6014.82485+152.46684	test-rmse:6051.61816+1363.17175
[2]	train-rmse:5153.38862+151.06150	test-rmse:5271.16826+1461.30391
[3]	train-rmse:4452.34253+162.47070	test-rmse:4639.33069+1552.80275
[4]	train-rmse:3857.96174+167.17000	test-rmse:4126.22151+1645.06974
[5]	train-rmse:3365.56177+160.58723	test-rmse:3722.10637+1718.97338
[6]	train-rmse:2938.35625+157.62463	test-rmse:3396.18481+1778.71113
[7]	train-rmse:2574.99138+155.65825	test-rmse:3147.93709+1830.01902
[8]	train-rmse:2282.22495+156.46754	test-rmse:2971.96692+1868.51628
[9]	train-rmse:2041.65498+163.99757	test-rmse:2819.78458+1900.49468
[10]	train-rmse:1818.10314+152.91726	test-rmse:2690.87959+1929.98852
[11]	train-rmse:1618.93767+143.72061	test-rmse:2602.85756+1949.31768
[12]	train-rmse:1446.69102+137.13164	test-rmse:2537.62733+1963.47213
[13]	train-rmse:1308.88207+128.32482	test-rmse:2488.94933+1975.21751
[14]	train-rmse:1186.30573+126.88331	test-rm

In [273]:
print(cv_results.head(100))

    train-rmse-mean  train-rmse-std  test-rmse-mean  test-rmse-std
0         14.448232        0.457032       14.128955       3.542234
1         13.281850        0.453128       13.022488       3.631210
2         12.247323        0.451868       12.034971       3.731681
3         11.357578        0.527235       11.216321       3.785879
4         10.516548        0.530564       10.433459       3.872672
5          9.828135        0.591498        9.789183       3.954284
6          9.142688        0.583163        9.192409       4.022693
7          8.485017        0.562325        8.619303       4.099982
8          7.929716        0.564755        8.147279       4.160788
9          7.432303        0.567991        7.728239       4.210274
10         6.981730        0.556943        7.346896       4.272597
11         6.590199        0.541217        7.070919       4.311739
12         6.198319        0.515502        6.778163       4.365013
13         5.871631        0.526149        6.519623       4.40

# Stip off last day

The Oxford dataset often has partial data on the last 1 or 2 days of data.
So, we cut out the last day in the file

In [157]:
max_date = df['date'].max()
df = df.loc[df['date'] < max_date]

# Write the data files

In [134]:
# let's sorty by geo code and date first
df = df.sort_values(['geo_code', 'date'])

In [135]:
filename_prefix = 'ground-truth'
dir_output = 'output'
dir_output_geo = 'output/geo'

# recreate the directory, deleting any existing content
shutil.rmtree(dir_output, ignore_errors=True)
Path(dir_output_geo).mkdir(parents=True, exist_ok=True)


# for each geography, write a JSON and CSV file
for geo in df['geo_code'].unique():
    df_geo = df.loc[df['geo_code'] == geo]
    df_geo.to_json(f"{dir_output_geo}/{filename_prefix}-{geo.replace('/', '-')}.json", 
                   orient='records', 
                   lines=False, 
                   date_format='iso')
    df_geo.to_csv(f"{dir_output_geo}/{filename_prefix}-{geo.replace('/', '-')}.csv", index=False)

# also write the complete output
df_geo.to_json(f"{dir_output}/{filename_prefix}.json", 
               orient='records', 
               lines=False, 
               date_format='iso')
df_geo.to_csv(f"{dir_output}/{filename_prefix}.csv", index=False)


# Load to Elastic Search


In [138]:

endpoint = boto3.client('es').describe_elasticsearch_domain(DomainName=f"es-pandora-{region}")['DomainStatus']['Endpoint']
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, 'es', session_token=credentials.token)
        
es = Elasticsearch(
    hosts = [{'host': endpoint, 'port': 443}],
    http_auth = awsauth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

In [139]:


def decorate_json(documents, _index, doc_type):
    for doc in documents:
        doc['date'] = doc['date'].split('T')[0]
        doc['predicted'] = False
        doc['predicted_date_start'] = max_date
        yield {
            "_index": _index,
            "_type": doc_type,
            "_id": f"{doc['geo_code']}-{doc['date']}",
            "_source": doc }
        
            
for fn in os.listdir(dir_output_geo):
    if fn.endswith(".json"):
        file = open(f"{dir_output_geo}/{fn}")
        documents = json.loads(file.read())
        documents = decorate_json(documents, 'my_index_01', 'update')
        helpers.bulk(es, documents)

2021-02-07 23:31:13,382	INFO	base.py	POST https://search-es-pandora-eu-central-1-cqze54lhcquh7gcq7sbt7nccma.eu-central-1.es.amazonaws.com:443/_bulk [status:200 request:0.485s]
2021-02-07 23:31:13,508	INFO	base.py	POST https://search-es-pandora-eu-central-1-cqze54lhcquh7gcq7sbt7nccma.eu-central-1.es.amazonaws.com:443/_bulk [status:200 request:0.096s]
2021-02-07 23:31:13,644	INFO	base.py	POST https://search-es-pandora-eu-central-1-cqze54lhcquh7gcq7sbt7nccma.eu-central-1.es.amazonaws.com:443/_bulk [status:200 request:0.105s]
2021-02-07 23:31:13,778	INFO	base.py	POST https://search-es-pandora-eu-central-1-cqze54lhcquh7gcq7sbt7nccma.eu-central-1.es.amazonaws.com:443/_bulk [status:200 request:0.105s]
2021-02-07 23:31:13,867	INFO	base.py	POST https://search-es-pandora-eu-central-1-cqze54lhcquh7gcq7sbt7nccma.eu-central-1.es.amazonaws.com:443/_bulk [status:200 request:0.060s]
2021-02-07 23:31:13,977	INFO	base.py	POST https://search-es-pandora-eu-central-1-cqze54lhcquh7gcq7sbt7nccma.eu-central-1