In [1]:
import os
import io
import datetime
import logging
import sys

import numpy as np
import pandas as pd
from pandas.io.json import json_normalize 

import civis
import civis.io
from civis.futures import CivisFuture

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor, GradientBoostingClassifier, RandomForestRegressor
from civis.ml import ModelPipeline

import uuid
import json
from pprint import pprint
import tempfile
import concurrent.futures
from concurrent.futures import wait
from collections import namedtuple

  from numpy.core.umath_tests import inner1d


In [2]:
# Default feature lists for Rainbow Modeling Frame (each number corresponds to number of features)
feature_table = civis.io.read_civis_sql(sql='select * from bernie_nmarchio2.feature_list order by sort_order asc', use_pandas = True, database='Bernie 2020')
feature_list_large = list(feature_table[(feature_table['frame_large'] == 1)]['feature_name']) + ['state_code']
feature_list_medium = list(feature_table[(feature_table['frame_medium'] == 1)]['feature_name']) + ['state_code']
feature_list_small = list(feature_table[(feature_table['frame_small'] == 1)]['feature_name']) + ['state_code']

table_columns = civis.io.read_civis_sql(
    sql=f'''select ordinal_position as position, column_name, data_type 
    from information_schema.columns 
    where table_name = 'rainbow_modeling_frame' and table_schema = 'bernie_data_commons' and column_name != 'person_id'
    order by ordinal_position;''', use_pandas = True, database='Bernie 2020')

#exclusion_list_466 = [e for e in list(table_columns['column_name']) if e not in feature_list_466] 

In [7]:
# USER INPUT CELL

# DV table parameters
DATABASE = 'Bernie 2020'
# Primary key in both the DV table and the Modeling Frame
PRIMARY_KEY = 'person_id' 
# Table containing recoded Dependent Variables keyed to the PRIMARY_KEY
DV_TABLE = 'bernie_nmarchio2.action_pop_dvs'
# List of binarized dependent variables (accepts 1, 0, and null values) in DV_TABLE
DV_LIST = ['attendee_or_host',
           'attendee',
           'kickoff_party_attendee',
           'canvasser_attendee',
           'phonebank_attendee',
           'rally_barnstorm_attendee',
           'kickoff_party_rally_barnstorm_attendee',
           'canvasser_phonebank_attendee',
           'donor_1plus_times',
           'donor_27plus_usd',
           'bernie_action']

# Model estimator (Logistic with L1 penalty using glmnet feature selection)
GRID_SEARCH = {'cross_validation_parameters': 
               {'n_estimators': [500, 1000], 'max_depth': [2, 3, 5]}
              }
ESTIMATOR = {'sparse_logistic':{},
             'gradient_boosting_classifier': GRID_SEARCH,
             'sparse_logistic': GRID_SEARCH,
             'gradient_boosting_classifier': GRID_SEARCH,
             'random_forest_classifier': GRID_SEARCH,
             'extra_trees_classifier': GRID_SEARCH,
             'multilayer_perceptron_classifier': {'cross_validation_parameters': 'hyperband'},
             'stacking_classifier': {}
            }
             
# Modeling frame table parameters
# Table containing covariates and keyed to PRIMARY_KEY
MODELING_FRAME = 'bernie_data_commons.rainbow_modeling_frame'
# Columns in the Modeling Frame to exclude from feature list (i.e., strings or incomplete coverage)
EXCLUSION_COLUMNS = ['state_code']

# Output table parameters
# Schema to contain prediction tables
SCHEMA = 'bernie_nmarchio2'
# String that will be concatenated in front of the output table's name
PREFIX = 'actionpop'

# Sampling parameters
# Non-response training data
    # If set to True it will automatically select people not in DV_TABLE at random from Phoenix (assumes person_id is PRIMARY_KEY)
    # If set to False it will automatically select people where the DV equals 0 from the DV_TABLE
SAMPLE_FROM_PHOENIX = True
# Number of non-response classes per target class (default is 2) 
    # Parameter accepts any integer or decimal
CLASS_BALANCE = 2
# Maximum number of targets to randomly sample from DV_TABLE
MAX_TARGET_COUNT = 30000

In [8]:
datestamp = '{:%Y%m%d}'.format(datetime.date.today())

In [9]:
# Counts of positive classes
dv_sql_targets = "\n,".join(["sum({dv}) as {dv}".format(dv=i) for i in DV_LIST])
sql_collapse_targets = f"""select {dv_sql_targets} from {DV_TABLE};"""
sql_count_targets = civis.io.read_civis_sql(sql_collapse_targets, DATABASE)


In [15]:
# Determing training table proportion of positives to negatives (to avoid class imbalance problems)
sample_share = []
for i in range(len(DV_LIST)):
    if int(sql_count_targets[1][i]) > MAX_TARGET_COUNT:
        sql_count_targets[1][i] = MAX_TARGET_COUNT
    u = round(int(sql_count_targets[1][i])*CLASS_BALANCE)
    sample_share.append(u)      

In [16]:
for i in range(len(DV_LIST)):
    dv_item = DV_LIST[i]
    random_sample = sample_share[i]
    if SAMPLE_FROM_PHOENIX is True:
        zero_sample = f'''(select p.person_id, 0 as {dv_item} from phoenix_analytics.person p left join (select person_id from {DV_TABLE}) d on p.person_id = d.person_id where d.person_id is null and is_deceased = false and reg_record_merged = false and reg_on_current_file = true and reg_voter_flag = true order by random() limit {random_sample})'''   
    if SAMPLE_FROM_PHOENIX is False:
        zero_sample = f'''(select {PRIMARY_KEY}, {dv_item} from {DV_TABLE} where {dv_item} = 0 order by random() limit {random_sample})'''


In [17]:
# Create training views
for i in range(len(DV_LIST)):
    if (int(sql_count_targets[1][i])*3) <= 1000:
        feature_select = "\n,".join(["{feature}".format(feature=f) for f in feature_list_small])
    if (int(sql_count_targets[1][i])*3) > 1000 & (int(sql_count_targets[1][i])*3) <= 2000:
        feature_select = "\n,".join(["{feature}".format(feature=f) for f in feature_list_medium])
    if (int(sql_count_targets[1][i])*3) > 2000:
        feature_select = "\n,".join(["{feature}".format(feature=f) for f in feature_list_large])
    dv_item = DV_LIST[i]
    print(dv_item)
    random_sample = sample_share[i]
    if SAMPLE_FROM_PHOENIX is True:
        zero_sample = f'''(select p.person_id, 0 as {dv_item} from phoenix_analytics.person p left join (select person_id from {DV_TABLE}) d on p.person_id = d.person_id where d.person_id is null and is_deceased = false and reg_record_merged = false and reg_on_current_file = true and reg_voter_flag = true order by random() limit {random_sample})'''   
    if SAMPLE_FROM_PHOENIX is False:
        zero_sample = f'''(select {PRIMARY_KEY}, {dv_item} from {DV_TABLE} where {dv_item} = 0 order by random() limit {random_sample})'''
    training_sql = f"""DROP TABLE IF EXISTS {SCHEMA}.{PREFIX}_training_{i} CASCADE;
    CREATE TABLE {SCHEMA}.{PREFIX}_training_{i} AS 
    (select * from (
    (select {PRIMARY_KEY}, {dv_item} from {DV_TABLE} where {dv_item} = 1 order by random() limit 30000) 
    union all 
    {zero_sample})
    inner join
    (select {PRIMARY_KEY}, {feature_select} from {MODELING_FRAME}) using({PRIMARY_KEY}));"""
    create_training_sql = civis.io.query_civis(training_sql, database=DATABASE)
    #create_training_sql.result().state
    

0
1
2
3
4
5
6
7
8
9


In [18]:
# Train models
train_list = []
model_list = []

for index, dv in enumerate(DV_LIST):
    print('TRAINING >>> {}'.format(dv))
    
    exc_list = DV_LIST.copy()
    exc_list.remove(dv)
    
    assert dv not in exc_list 
    
    name = f"""{dv}_{datestamp}"""
    model = ModelPipeline(model='sparse_logistic',
                          dependent_variable=dv,
                          primary_key=PRIMARY_KEY,
                          excluded_columns=EXCLUSION_COLUMNS,
                          calibration='sigmoid',
                          model_name=name,
                          memory_requested=15000#,
                          #disk_requested=5
                         )
    
    where_string = '{} is not null'.format(dv)

    train = model.train(table_name=f"""{SCHEMA}.{PREFIX}_training_{index}""", 
                        database_name=DATABASE,
                        sql_where=where_string#,
                        #fit_params={'sample_weight': WEIGHT_VAR}
                       )
    
    model_list.append(model)
    train_list.append(train)    


TRAINING >>> attendee
TRAINING >>> kickoff_party_attendee
TRAINING >>> canvasser_attendee
TRAINING >>> phonebank_attendee
TRAINING >>> rally_barnstorm_attendee
TRAINING >>> kickoff_party_rally_barnstorm_attendee
TRAINING >>> canvasser_phonebank_attendee
TRAINING >>> donor_1plus_times
TRAINING >>> donor_27plus_usd
TRAINING >>> bernie_action


In [19]:
# Extract successful models
model_output = model_list
train_output = train_list

jobs_list = []
for t in train_output: 
    try:
        if len(t.metadata['output']) > 0:  
            jobs_list.append(t)
            print('Job success')
    except:
        print('Job failure')
        pass

    
model_output, train_output = zip(*((m, t) for m, t in zip(model_output, train_output) if t in jobs_list))
model_output = list(model_output)
train_output = list(train_output)

Job success
Job success
Job success
Job success
Job success
Job success
Job success
Job success
Job success
Job success


In [20]:
# Generate validation metrics
metrics_list = []

for a, b in enumerate(train_output):
    metric = {'job_id':b.job_id,
              'run_id':b.run_id,
              'dv': ''.join(b.metadata['run']['configuration']['data']['y']),
              'model': b.metadata['model']['model'],
              'time_of_train_run': b.metadata['run']['time_of_run'],
              'n_rows': b.metadata['data']['n_rows'],
              'n_features': b.metadata['data']['n_cols'],
              'auc': b.metadata['metrics']['roc_auc'],
              'deciles': b.metadata['metrics']['deciles'],
              'confusion_matrix': b.metadata['metrics']['confusion_matrix'],
              'accuracy': b.metadata['metrics']['accuracy'],
              'p_correct': b.metadata['metrics']['p_correct'],
              'pop_incidence_true': b.metadata['metrics']['pop_incidence_true'],
              'feature_list':b.metadata['model']['parameters']['relvars']
             }
    metrics_list.append(metric)
    
metric_order = (['job_id', 'run_id', 'dv', 'model', 'time_of_train_run', 'n_rows', 'n_features',
                 'auc', 'deciles', 'confusion_matrix', 'accuracy', 'p_correct','pop_incidence_true','feature_list'])

validation_df = pd.DataFrame.from_records(metrics_list, columns=metric_order, index='run_id')
validation_df

Unnamed: 0_level_0,job_id,dv,model,time_of_train_run,n_rows,n_features,auc,deciles,confusion_matrix,accuracy,p_correct,pop_incidence_true,feature_list
run_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
192913901,53594821,attendee,sparse_logistic,2019-12-21T00:28:28Z,90000,466,0.881263,"[0.016555555555555556, 0.03144444444444444, 0....","[[52953, 7047], [9434, 20566]]",0.816878,"[0.88255, 0.6855333333333333]","[0.6666666666666666, 0.3333333333333333]","[civis_2020_marriage, civis_2020_ideology_libe..."
192913902,53594822,kickoff_party_attendee,sparse_logistic,2019-12-21T00:27:46Z,90000,466,0.880999,"[0.018444444444444444, 0.028777777777777777, 0...","[[52847, 7153], [9381, 20619]]",0.816289,"[0.8807833333333334, 0.6873]","[0.6666666666666666, 0.3333333333333333]","[civis_2020_marriage, civis_2020_ideology_libe..."
192913903,53594823,canvasser_attendee,sparse_logistic,2019-12-21T00:24:14Z,46524,466,0.885538,"[0.01590713671539123, 0.026649473457984095, 0....","[[27362, 3654], [4753, 10755]]",0.819298,"[0.8821898375032241, 0.6935130255352077]","[0.6666666666666666, 0.3333333333333333]","[civis_2020_ideology_liberal, civis_2018_congr..."
192913905,53594824,phonebank_attendee,sparse_logistic,2019-12-21T00:23:30Z,28590,466,0.889394,"[0.015040223854494578, 0.024833857992305003, 0...","[[16875, 2185], [2889, 6641]]",0.822525,"[0.8853620146904512, 0.6968520461699895]","[0.6666666666666666, 0.3333333333333333]","[civis_2020_marriage, civis_2020_ideology_libe..."
192913907,53594825,rally_barnstorm_attendee,sparse_logistic,2019-12-21T00:31:17Z,90000,466,0.895905,"[0.011777777777777778, 0.023777777777777776, 0...","[[53455, 6545], [8593, 21407]]",0.8318,"[0.8909166666666667, 0.7135666666666667]","[0.6666666666666666, 0.3333333333333333]","[civis_2020_ideology_liberal, civis_2018_congr..."
192913909,53594827,kickoff_party_rally_barnstorm_attendee,sparse_logistic,2019-12-21T00:28:42Z,90000,466,0.880999,"[0.01711111111111111, 0.03388888888888889, 0.0...","[[53016, 6984], [9516, 20484]]",0.816667,"[0.8836, 0.6828]","[0.6666666666666666, 0.3333333333333333]","[civis_2020_marriage, civis_2020_ideology_libe..."
192913910,53594828,canvasser_phonebank_attendee,sparse_logistic,2019-12-21T00:26:49Z,67806,466,0.884553,"[0.017254092316767437, 0.02846607669616519, 0....","[[39828, 5376], [6964, 15638]]",0.81801,"[0.8810724714627024, 0.691885673834174]","[0.6666666666666666, 0.3333333333333333]","[civis_2020_marriage, civis_2020_ideology_libe..."
192913911,53594829,donor_1plus_times,sparse_logistic,2019-12-21T00:27:57Z,90000,466,0.867322,"[0.017, 0.036, 0.058888888888888886, 0.1088888...","[[52526, 7474], [10178, 19822]]",0.803867,"[0.8754333333333333, 0.6607333333333333]","[0.6666666666666666, 0.3333333333333333]","[civis_2020_marriage, civis_2020_ideology_libe..."
192913912,53594830,donor_27plus_usd,sparse_logistic,2019-12-21T00:30:55Z,90000,466,0.889335,"[0.013111111111111112, 0.025444444444444443, 0...","[[53206, 6794], [9032, 20968]]",0.824156,"[0.8867666666666667, 0.6989333333333333]","[0.6666666666666666, 0.3333333333333333]","[civis_2020_marriage, civis_2020_children_pres..."
192913916,53594831,bernie_action,sparse_logistic,2019-12-21T00:28:57Z,90000,466,0.869545,"[0.018666666666666668, 0.03344444444444444, 0....","[[52658, 7342], [10050, 19950]]",0.806756,"[0.8776333333333334, 0.665]","[0.6666666666666666, 0.3333333333333333]","[civis_2020_marriage, civis_2020_ideology_libe..."


In [32]:
# Write validation metrics to Redshift
create_validation_table = civis.io.dataframe_to_civis(df=validation_df,
                                                 database=DATABASE, 
                                                 table= f'{SCHEMA}.{PREFIX}_validation_{datestamp}', 
                                                 existing_table_rows='drop')


In [22]:
# Score the voterfile
scores_list = []
for m,t in zip(model_output, train_output):
    DV_NAME = ''.join(t.metadata['run']['configuration']['data']['y'])
    print(DV_NAME)
    SCORES_TABLE = f'{SCHEMA}.{PREFIX}_{DV_NAME}_{datestamp}'
    scores_list.append(SCORES_TABLE)
    scores = m.predict(primary_key=PRIMARY_KEY,
                       database_name=DATABASE, 
                       table_name=MODELING_FRAME,
                       if_exists='drop',
                       output_table=SCORES_TABLE,
                       disk_space=20)
scores.result()


attendee
kickoff_party_attendee
canvasser_attendee
phonebank_attendee
rally_barnstorm_attendee
kickoff_party_rally_barnstorm_attendee
canvasser_phonebank_attendee
donor_1plus_times
donor_27plus_usd
bernie_action


{'container_id': 53596276,
 'error': None,
 'finished_at': '2019-12-21T03:26:50.000Z',
 'id': 192917915,
 'is_cancel_requested': False,
 'started_at': '2019-12-21T00:42:36.000Z',
 'state': 'succeeded'}

In [23]:
# Generate SQL for final output table and drop intermediary tables
intermed_list = []
table_list = []
for i in range(len(DV_LIST)):
    intermed = f"{SCHEMA}.{PREFIX}_training_{i}"
    intermed_list.append(intermed)
    table = f"{SCHEMA}.{PREFIX}_{DV_LIST[i]}_{datestamp}"
    table_list.append(table)

drop_intermed_sql = "\n".join(["drop table if exists {tbl};".format(tbl=v) for v in view_list])
drop_table_sql = "\n".join(["drop table if exists {tbl};".format(tbl=t) for t in table_list])  
dv_strings = "\n,".join(["{dv_score}_1 as {dv_score}".format(dv_score=dv) for dv in DV_LIST])
dv_tiles = "\n,".join(["NTILE(100) OVER (ORDER BY {dv_tile}_1) AS {dv_tile}_100".format(dv_tile=dv) for dv in DV_LIST])
join_table = []
if len(table_list) > 1:
    for i in table_list[1:]:
        j = str(' left join '+f'{i}'+f' using({PRIMARY_KEY}) ')
        join_table.append(j)
        #dv_strings = "\nleft join ".join(["{dv_score}".format(table=tbl) for tbl in table_list[i])


In [30]:
output_table_sql = f"""
set query_group to 'importers';
set wlm_query_slot_count to 3;
DROP TABLE IF EXISTS {SCHEMA}.{PREFIX}_output_{datestamp};
CREATE TABLE {SCHEMA}.{PREFIX}_output_{datestamp}
  DISTSTYLE KEY
  DISTKEY ({PRIMARY_KEY})
  SORTKEY ({PRIMARY_KEY})
  AS ("""+'select '+ f"{PRIMARY_KEY} \n," + dv_strings + "\n," + dv_tiles + ' from '+ ''.join(table_list[0]) + ''.join(join_table) +');'  


In [31]:
print(output_table_sql)

DROP TABLE IF EXISTS bernie_nmarchio2.actionpop_output_20191220;
CREATE TABLE bernie_nmarchio2.actionpop_output_20191220
  DISTSTYLE KEY
  DISTKEY (person_id)
  SORTKEY (person_id)
  AS (select person_id 
,attendee_1 as attendee
,kickoff_party_attendee_1 as kickoff_party_attendee
,canvasser_attendee_1 as canvasser_attendee
,phonebank_attendee_1 as phonebank_attendee
,rally_barnstorm_attendee_1 as rally_barnstorm_attendee
,kickoff_party_rally_barnstorm_attendee_1 as kickoff_party_rally_barnstorm_attendee
,canvasser_phonebank_attendee_1 as canvasser_phonebank_attendee
,donor_1plus_times_1 as donor_1plus_times
,donor_27plus_usd_1 as donor_27plus_usd
,bernie_action_1 as bernie_action
,NTILE(100) OVER (ORDER BY attendee_1) AS attendee_100
,NTILE(100) OVER (ORDER BY kickoff_party_attendee_1) AS kickoff_party_attendee_100
,NTILE(100) OVER (ORDER BY canvasser_attendee_1) AS canvasser_attendee_100
,NTILE(100) OVER (ORDER BY phonebank_attendee_1) AS phonebank_attendee_100
,NTILE(100) OVER (ORDER

In [29]:
# Create final output table
create_output_table = civis.io.query_civis(sql=output_table_sql, database=DATABASE)
create_output_table.result().state


CivisJobFailure: Error in SQL: Query (118398) cancelled by WLM abort action
DETAIL:  
  -----------------------------------------------
  error:  Query (118398) cancelled by WLM abort action
  code:      1078
  context:   Query (118398) cancelled by WLM abort action
  query:     0
  location:  abort_query_action.cpp:105
  process:   wlm [pid=77361]
  -----------------------------------------------

In [None]:
# Drop intermediary tables
drop_intermed_query = civis.io.query_civis(sql=drop_intermed_sql, database=DATABASE)
drop_intermed_query.result().state

drop_tables_query = civis.io.query_civis(sql=drop_table_sql, database=DATABASE)
drop_tables_query.result().state

In [None]:
print(drop_intermed_sql)
print(drop_table_sql)

In [33]:
# Grant team on tables
grant_statement = f"""
GRANT ALL ON SCHEMA {SCHEMA} TO GROUP bernie_data;
GRANT SELECT ON {SCHEMA}.{PREFIX}_output_{datestamp} TO GROUP bernie_data;
GRANT SELECT ON {SCHEMA}.{PREFIX}_validation_{datestamp} TO GROUP bernie_data;
"""
grant_team = civis.io.query_civis(sql=grant_statement, database=DATABASE)
grant_team.result().state

'succeeded'

In [34]:
print(grant_statement)


GRANT ALL ON SCHEMA bernie_nmarchio2 TO GROUP bernie_data;
GRANT SELECT ON bernie_nmarchio2.actionpop_output_20191220 TO GROUP bernie_data;
GRANT SELECT ON bernie_nmarchio2.actionpop_validation_20191220 TO GROUP bernie_data;

