In [1]:
"""
Snowflake + DataRobot Prediction API example code.

1. Data extracted via Snowflake python connector
2. Python scoring http request sent
3. Data written back to Snowflake via connector as raw json and flattened in Snowflake
4. Data flattened in python
5. Batch Scoring Script scoring

*******
NOTE: 
Write back only shown as an example - the processed used here may be ok on some databases,
but Snowflake should ingest data back via stage objects
*******

v1.0 Mike Taveirne (doyouevendata) 1/17/2020
"""

'\nSnowflake + DataRobot Prediction API example code.\n\n1. Data extracted via Snowflake python connector\n2. Python scoring http request sent\n3. Data written back to Snowflake via connector as raw json and flattened in Snowflake\n4. Data flattened in python\n5. Batch Scoring Script scoring\n\n*******\nNOTE: \nWrite back only shown as an example - the processed used here may be ok on some databases,\nbut Snowflake should ingest data back via stage objects\n*******\n\nv1.0 Mike Taveirne (doyouevendata) 1/17/2020\n'

In [2]:
import snowflake.connector
import datetime
import sys
from pandas.io.json import json_normalize
import pandas as pd
import requests

import my_creds

In [3]:
# snowflake parameters
SNOW_ACCOUNT = my_creds.SNOW_ACCOUNT
SNOW_USER = my_creds.SNOW_USER
SNOW_PASS = my_creds.SNOW_PASS
SNOW_DB = 'TITANIC'
SNOW_SCHEMA = 'PUBLIC'

In [4]:
# create a connection
ctx = snowflake.connector.connect(
          user=SNOW_USER,
          password=SNOW_PASS,
          account=SNOW_ACCOUNT,
          database=SNOW_DB,
          schema=SNOW_SCHEMA,
          protocol='https'
)

# create a cursor
cur = ctx.cursor()

# execute sql
sql = "select passengerid, pclass, name, sex, age, sibsp, parch, fare, cabin, embarked " \
    + " from titanic.public.passengers"
cur.execute(sql)

# fetch results into dataframe
df = cur.fetch_pandas_all()
df.head()

Unnamed: 0,PASSENGERID,PCLASS,NAME,SEX,AGE,SIBSP,PARCH,FARE,CABIN,EMBARKED
0,892,3,"Kelly, Mr. James",male,34.5,0,0,7.8292,,Q
1,893,3,"Wilkes, Mrs. James (Ellen Needs)",female,47.0,1,0,7.0,,S
2,894,2,"Myles, Mr. Thomas Francis",male,62.0,0,0,9.6875,,Q
3,895,3,"Wirz, Mr. Albert",male,27.0,0,0,8.6625,,S
4,896,3,"Hirvonen, Mrs. Alexander (Helga E Lindqvist)",female,22.0,1,1,12.2875,,S


In [5]:
# datarobot parameters
API_KEY = my_creds.API_KEY
USERNAME = my_creds.USERNAME
DEPLOYMENT_ID = '5e27561dfbfc8805cf61ac08'
DATAROBOT_KEY = my_creds.DATAROBOT_KEY
# replace with the load balancer for your prediction instance(s)
DR_PREDICTION_HOST = 'https://datarobot-support.orm.datarobot.com'

headers = {'Content-Type': 'text/plain; charset=UTF-8', 'datarobot-key': DATAROBOT_KEY}
url = '{dr_prediction_host}/predApi/v1.0/deployments/{deployment_id}/'\
          'predictions'.format(dr_prediction_host=DR_PREDICTION_HOST, deployment_id=DEPLOYMENT_ID)

In [6]:
predictions_response = requests.post(
        url,
        auth=(USERNAME, API_KEY),
        data=df.to_csv(),
        headers=headers,
        # business key passed through
        params={'passthroughColumns' : 'PASSENGERID'}
    )

if predictions_response.status_code != 200:
    print("error {status_code}: {content}".format(status_code=predictions_response.status_code, content=predictions_response.content))
    sys.exit(-1)

In [7]:
# first 3 records json structure
predictions_response.json()['data'][0:3]

[{'predictionValues': [{'value': 0.1192797848, 'label': 1.0},
   {'value': 0.8807202152, 'label': 0.0}],
  'predictionThreshold': 0.5,
  'prediction': 0.0,
  'rowId': 0,
  'passthroughValues': {'PASSENGERID': '892'}},
 {'predictionValues': [{'value': 0.371408663, 'label': 1.0},
   {'value': 0.628591337, 'label': 0.0}],
  'predictionThreshold': 0.5,
  'prediction': 0.0,
  'rowId': 1,
  'passthroughValues': {'PASSENGERID': '893'}},
 {'predictionValues': [{'value': 0.1242133111, 'label': 1.0},
   {'value': 0.8757866889, 'label': 0.0}],
  'predictionThreshold': 0.5,
  'prediction': 0.0,
  'rowId': 2,
  'passthroughValues': {'PASSENGERID': '894'}}]

In [8]:
df_response = pd.DataFrame.from_dict(predictions_response.json())
df_response.head()

Unnamed: 0,data
0,"{'predictionValues': [{'value': 0.1192797848, ..."
1,"{'predictionValues': [{'value': 0.371408663, '..."
2,"{'predictionValues': [{'value': 0.1242133111, ..."
3,"{'predictionValues': [{'value': 0.1251585249, ..."
4,"{'predictionValues': [{'value': 0.5486341299, ..."


# load json and flatten in snowflake

In [9]:
ctx.cursor().execute('create or replace table passenger_scored_json(json_rec variant)')

<snowflake.connector.cursor.SnowflakeCursor at 0x123dd2f90>

In [10]:
df5 =  df_response.head()

# this is not the proper way to insert data into snowflake, but is used for quick demo convenience.
# snowflake ingest should be done via snowflake stage objects.
for ind, row in df5.iterrows():
    escaped = str(row['data']).replace("'", "''")
    ctx.cursor().execute("insert into passenger_scored_json select parse_json('{rec}')".format(rec=escaped))
    print(row['data'])

{'predictionValues': [{'value': 0.1192797848, 'label': 1.0}, {'value': 0.8807202152, 'label': 0.0}], 'predictionThreshold': 0.5, 'prediction': 0.0, 'rowId': 0, 'passthroughValues': {'PASSENGERID': '892'}}
{'predictionValues': [{'value': 0.371408663, 'label': 1.0}, {'value': 0.628591337, 'label': 0.0}], 'predictionThreshold': 0.5, 'prediction': 0.0, 'rowId': 1, 'passthroughValues': {'PASSENGERID': '893'}}
{'predictionValues': [{'value': 0.1242133111, 'label': 1.0}, {'value': 0.8757866889, 'label': 0.0}], 'predictionThreshold': 0.5, 'prediction': 0.0, 'rowId': 2, 'passthroughValues': {'PASSENGERID': '894'}}
{'predictionValues': [{'value': 0.1251585249, 'label': 1.0}, {'value': 0.8748414751, 'label': 0.0}], 'predictionThreshold': 0.5, 'prediction': 0.0, 'rowId': 3, 'passthroughValues': {'PASSENGERID': '895'}}
{'predictionValues': [{'value': 0.5486341299, 'label': 1.0}, {'value': 0.4513658701, 'label': 0.0}], 'predictionThreshold': 0.5, 'prediction': 1.0, 'rowId': 4, 'passthroughValues': {

In [11]:
ctx.cursor().execute('create or replace table passenger_scored_flattened as \
    select json_rec:passthroughValues.PASSENGERID::int as passengerid \
    , json_rec:prediction::int as prediction \
    , json_rec:predictionThreshold::numeric(10,9) as prediction_threshold \
    , f.value:label as prediction_label \
    , f.value:value as prediction_score \
    from titanic.public.passenger_scored_json, table(flatten(json_rec:predictionValues)) f \
    where f.value:label = 1')

<snowflake.connector.cursor.SnowflakeCursor at 0x123ebe910>

In [12]:
sql = "select * from passenger_scored_flattened"
cur.execute(sql)

# fetch results into dataframe
df_new = cur.fetch_pandas_all()
df_new.head()

Unnamed: 0,PASSENGERID,PREDICTION,PREDICTION_THRESHOLD,PREDICTION_LABEL,PREDICTION_SCORE
0,892,0,0.5,1,0.1192797848
1,893,0,0.5,1,0.371408663
2,894,0,0.5,1,0.1242133111
3,895,0,0.5,1,0.1251585249
4,896,1,0.5,1,0.5486341299


# flatten in python instead

In [13]:
df_results = json_normalize(data=predictions_response.json()['data'], record_path='predictionValues',
    meta = [['passthroughValues', 'PASSENGERID'], 'prediction', 'predictionThreshold'])
df_results = df_results[df_results['label'] == 1]
df_results.rename(columns={"passthroughValues.PASSENGERID": "PASSENGERID"}, inplace=True)
df_results.head()

Unnamed: 0,value,label,PASSENGERID,prediction,predictionThreshold
0,0.11928,1.0,892,0,0.5
2,0.371409,1.0,893,0,0.5
4,0.124213,1.0,894,0,0.5
6,0.125159,1.0,895,0,0.5
8,0.548634,1.0,896,1,0.5


# batch scoring script approach

https://github.com/datarobot/batch-scoring

defaults (auto sampled request size, 4 concurrent request threads)

In [14]:
import os

df.to_csv('input.csv', index=False)
os.system('rm output.csv')

0

In [15]:
batch_script_string = 'batch_scoring_deployment_aware \
--host="https://datarobot-support.orm.datarobot.com/" \
--user="{user}" \
--api_token="{api_token}" \
--out="output.csv" \
--datarobot_key="{datarobot_key}" \
--keep_cols="PASSENGERID" \
--max_prediction_explanations=3 \
{deployment_id} \
input.csv'.format(user=USERNAME, api_token=API_KEY, datarobot_key=DATAROBOT_KEY, deployment_id=DEPLOYMENT_ID)
os.system(batch_script_string)

0

In [16]:
df_output = pd.read_csv('output.csv')
df_output.head()

Unnamed: 0,row_id,PASSENGERID,0.0,1.0,explanation_1_feature,explanation_1_strength,explanation_2_feature,explanation_2_strength,explanation_3_feature,explanation_3_strength
0,0,892,0.88072,0.11928,NAME,-0.504941,SEX,-0.426696,FARE,-0.308353
1,1,893,0.628591,0.371409,PCLASS,-1.018456,NAME,0.835053,AGE,-0.764217
2,2,894,0.875787,0.124213,SEX,-0.643906,CABIN,-0.315584,NAME,-0.239311
3,3,895,0.874841,0.125159,NAME,-0.402308,CABIN,-0.353527,FARE,-0.182825
4,4,896,0.451366,0.548634,NAME,0.87715,PCLASS,-0.740308,SEX,0.491609
