In [222]:
data_dir = '/Users/kelseyrobb/Documents/GitHub/measles/data-ingest/inputs-covid/'
data_file = 'daily_covid_counts-3-9.csv'

metric_id = 25
data_source = 'Johns Hopkins CSSE COVID-19 Data'

# database to use... metric for prod, metric_test for test
db_name = 'metric'
#db_name = 'metric_test'

In [223]:
# put a list of keys we want to exclude from the metric upload, based on values 
# in the observation table (so we need place ids and dt_ids if using those)
# e.g.
# to exclude everything for Venezuela
# {'place_id': 244}
# 
# to exclude these two periods for Samoa
# {'place_id': 251, 'datetime_id': [14550, 14519]}
# 
# to exclude everything after Jan 2020 (since we don't have data yet)
# you can add a single value or a list of values to any key
# {'datetime_id': 14580}

exclusion_list = [
    #{'place_id': 244},
    #{'place_id': 251, 'datetime_id': [14550, 14519, 14580, 14611]},
    #{'place_id': 48, 'datetime_id': [14277, 14305, 14336, 14366, 14397, 14427, 14458, 14489, 14519, 14550, 14580, 14611]},
    #{'datetime_id': [14642, 14671, 14702, 14732, 14763, 14793, 14824, 14855, 14885, 14916, 14946]}
]

In [224]:
import pandas as pd
import datetime
import json

In [225]:
from psycopg2.sql import SQL, Identifier, Placeholder
from psycopg2.extras import execute_batch

import psycopg2

from sqlalchemy import create_engine

In [226]:
from modules.connect import get_secret

In [227]:
def exclude(df, exclusion_list):
    for exclusion in exclusion_list:
        
        mask_list = []

        for key, value in exclusion.items():
            if type(value) is list:
                mask = (df[key].isin(value))
            else:
                mask = (df[key] == value)
            
            mask_list.append(mask)
        
        final_mask = mask_list[0]

        # if there are multiple conditions (e.g. time and place)
        # then the rows we want to exclude are only those that meet
        # all the conditions, so run through the masks and only
        # 'keep' the rows where the condition is true all the time
        # ('keep' because we'll invert the mask and actually exclude them)
        if len(mask_list) > 1:
            for mask in mask_list[1:]:
                final_mask = final_mask & mask
        
        old_len = df.shape[0]
        df = df[~final_mask]
        
        print(f"Went from {old_len} to {df.shape[0]} rows, excluded {old_len - df.shape[0]} rows")
        
    return(df)

In [228]:
rds_secret = json.loads(get_secret())

In [229]:
def get_table(schema, table, engine):
    q_str = f"SELECT * FROM {schema}.{table}"
    return pd.read_sql(q_str, engine)

In [230]:
conn_str = "postgresql://{3}:{4}@{0}:{1}/{2}".format(rds_secret['host'],
                                                     '5432',
                                                     db_name,
                                                     rds_secret['username'],
                                                     rds_secret['password'])

engine = create_engine(conn_str)

In [231]:
dt_df = get_table('public', 'datetime', engine)

# dt_df = pd.DataFrame(dt_q, columns=['dt_id', 'day boolean', 'week_sunday', 'week_monday', 'month', 'year', 'dt'])

dt_df = dt_df.loc[(dt_df.day), :]

dt_df.loc[:, 'date'] = dt_df.dt.dt.date

#dt_df.head()

In [232]:
place_q = get_table('public', 'place', engine)

place_df = pd.DataFrame(place_q, columns=['place_id', 'name', 'description', 'fips', 'iso',
                                           'place_type', 'geom_type', 'poly_id', 'point_id',
                                           'iso2', 'region_sdg', 'region'])

#place_df.head()

In [233]:
data = pd.read_csv(data_dir + data_file)

#data.head()

In [234]:
data_place = data.set_index('Country/Region').join(place_df.loc[:,['place_id', 'name']].set_index('name')).reset_index().rename(columns={'index': 'name'})

#data_place.head()

In [235]:
data_place.head()

Unnamed: 0,name,Lat,Long,1/22/20,1/23/20,1/24/20,1/25/20,1/26/20,1/27/20,1/28/20,...,2/29/20,3/1/20,3/2/20,3/3/20,3/4/20,3/5/20,3/6/20,3/7/20,3/8/20,place_id
0,Afghanistan,33.0,65.0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,3,2
1,Alameda County,37.6017,-121.7195,0,0,0,0,0,0,0,...,0,0,0,1,0,0,0,1,0,425
2,Algeria,28.0339,1.6596,0,0,0,0,0,0,0,...,0,0,2,2,7,0,5,0,2,67
3,Andorra,42.5063,1.5218,0,0,0,0,0,0,0,...,0,0,1,0,0,0,0,0,0,7
4,Anhui,31.8257,117.2264,1,8,6,24,21,10,36,...,0,0,0,0,0,0,0,0,0,316


In [236]:
#data_place.loc[(data_place.iso == 'VEN'), :]

In [237]:
date_cols = ['place_id', '1/22/20', '1/23/20', '1/24/20', '1/25/20', '1/26/20', '1/27/20', '1/28/20', '1/29/20', '1/30/20', '1/31/20', '2/1/20', 
             '2/2/20', '2/3/20', '2/4/20', '2/5/20', '2/6/20', '2/7/20', '2/8/20', '2/9/20', '2/10/20', '2/11/20',
            '2/12/20', '2/13/20', '2/14/20', '2/15/20', '2/16/20', '2/17/20', '2/18/20', '2/19/20', '2/20/20',
            '2/21/20', '2/22/20', '2/23/20', '2/24/20', '2/25/20', '2/26/20', '2/27/20', '2/28/20', '2/29/20',
            '3/1/20', '3/2/20', '3/3/20', '3/4/20', '3/5/20', '3/6/20', '3/7/20', '3/8/20']

monthly = data_place.loc[:, date_cols].set_index(['place_id']).stack().reset_index()

monthly.columns = ['place_id', 'day', 'value']

#monthly.head()

In [238]:
monthly.head()

Unnamed: 0,place_id,day,value
0,2,1/22/20,0
1,2,1/23/20,0
2,2,1/24/20,0
3,2,1/25/20,0
4,2,1/26/20,0


In [239]:
monthly.loc[:, 'date'] = monthly.apply(lambda x: datetime.datetime.strptime(f"{x['day']}", "%m/%d/%y"), axis=1)

monthly.head()

Unnamed: 0,place_id,day,value,date
0,2,1/22/20,0,2020-01-22
1,2,1/23/20,0,2020-01-23
2,2,1/24/20,0,2020-01-24
3,2,1/25/20,0,2020-01-25
4,2,1/26/20,0,2020-01-26


In [240]:
dated = monthly.set_index('date').join(dt_df.loc[:,['dt_id', 'date']].set_index('date')).reset_index(drop=True)

In [241]:
dated.head()

Unnamed: 0,place_id,day,value,dt_id
0,2,1/22/20,0,14632
1,425,1/22/20,0,14632
2,67,1/22/20,0,14632
3,7,1/22/20,0,14632
4,316,1/22/20,1,14632


In [242]:
dated.loc[:, 'updated_at'] = datetime.datetime(2020, 3, 9) #.date()

dated.loc[:, 'metric_id'] = metric_id
dated.loc[:, 'data_source'] = data_source

dated = dated.loc[:, ['metric_id', 'value', 'updated_at', 'data_source', 'place_id', 'dt_id']].rename(columns= {'dt_id': 'datetime_id'})

dated.loc[:, 'updated_at'] = dated['updated_at'].dt.strftime('%Y-%m-%d')

In [243]:
dated.head()

Unnamed: 0,metric_id,value,updated_at,data_source,place_id,datetime_id
0,25,0,2020-03-09,Johns Hopkins CSSE COVID-19 Data,2,14632
1,25,0,2020-03-09,Johns Hopkins CSSE COVID-19 Data,425,14632
2,25,0,2020-03-09,Johns Hopkins CSSE COVID-19 Data,67,14632
3,25,0,2020-03-09,Johns Hopkins CSSE COVID-19 Data,7,14632
4,25,1,2020-03-09,Johns Hopkins CSSE COVID-19 Data,316,14632


In [244]:
pre_excluded = dated.shape[0]
dated_excluded = exclude(dated, exclusion_list)
print(f"Went from {pre_excluded} to {dated_excluded.shape[0]} rows, excluded {pre_excluded - dated_excluded.shape[0]} rows")

Went from 12643 to 12643 rows, excluded 0 rows


In [245]:
row_list = tuple(tuple(row) for row in dated_excluded.to_numpy())

In [246]:
row_list[0:5]

((25, 0, '2020-03-09', 'Johns Hopkins CSSE COVID-19 Data', 2, 14632),
 (25, 0, '2020-03-09', 'Johns Hopkins CSSE COVID-19 Data', 425, 14632),
 (25, 0, '2020-03-09', 'Johns Hopkins CSSE COVID-19 Data', 67, 14632),
 (25, 0, '2020-03-09', 'Johns Hopkins CSSE COVID-19 Data', 7, 14632),
 (25, 1, '2020-03-09', 'Johns Hopkins CSSE COVID-19 Data', 316, 14632))

In [247]:
len(row_list)

12643

In [248]:
test_q = """
INSERT INTO observation (
    metric_id,
    value,
    updated_at,
    data_source,
    place_id,
    datetime_id
)
VALUES (
    %s,
    %s,
    %s,
    %s,
    %s,
    %s
)
ON CONFLICT 
ON CONSTRAINT observation_metric_id_place_id_datetime_id_key
DO UPDATE SET (value, updated_at) = (EXCLUDED.value, EXCLUDED.updated_at);
"""

In [249]:
db_name

'metric'

In [250]:
rds_secret['host']

'talus-dev.cvsrrvlopzxr.us-west-1.rds.amazonaws.com'

In [251]:
# Open a cursor to perform database operations
connection_string = f"host={rds_secret['host']} dbname={db_name} user={rds_secret['username']} password={rds_secret['password']}"

conn = psycopg2.connect(connection_string)
cur = conn.cursor()

In [252]:
execute_batch(cur, test_q, row_list)

In [253]:
#for row in row_list:
    #print(row)
    #cur.execute(test_q, row)

In [254]:
conn.commit()