# *Internal DataRobot or GoPuff Only*
# *!!! Do not share with customers !!!*
# Choosing the Optimum Selections per Customer
#### Overview
In this notebook, we continue our exploration from a marketing use-case that predicts the likelihood to respond to several different methods of outreach.
For each customer, we have several different "methods" of reaching out to them:
- `email`
- `push` (an in-app push notificaton to the device)
- `webhook` (a text message to the customer's cell phone number)

We can also consider several "journeys" that hit several channels within the same hour or so:

- `email + push`
- `email + push + push`
- `email + push + push + push`
- `email + webhook`
- `push + push`
- `push + push + push`

In addition to those 9 different choices, we can choose when to initiate the outreach:
- `day of week` (7 different choices)
- `hour of day` (24 different choices)

#### Background
In previous work, we've already designed the dataset and built a model to predict the `probability of response` within 12 hours. It is important to note that this was only possible because the customer had enough history where they had done outreach on all days of the week as well as many different hours of the day.



### Library Imports

In [1]:
### Standard Imports - Sorry PEP8 fans, do not look below
import pandas as pd, numpy as np, os, re, json
from pathlib import Path
from datetime import datetime

## Specific Imports

#### I save my API token as an environmental variable.
if os.environ.get("DR_API_TOKEN") == None:
    API_TOKEN = "OR__pasteyourtokenherefromthedatarobotbyclickinginthetopright"
else:
    API_TOKEN = os.environ.get("DR_API_TOKEN")
ENDPOINT_URL = "https://app.datarobot.com/api/v2"

# Optimizer app credentials
app_url = 'https://5ebe59b00c7ebb310dae5fc1.apps.datarobot.com'
token = '-e0wiZY9tIORyojg2vkGDDMdt9LqcwZYrfwOq3qC4ag'

# Deployment id
deployment_id = '5ec4671924f0ff0797025582'

### Display options for notebooks
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_rows', 25)

### set path directories
curr_dir = Path(os.getcwd())
print('Current Directory is: ', str(curr_dir))
data_dir = Path(curr_dir.parents[0] / 'data/')
artifacts_dir = Path(curr_dir.parents[0] / 'artifacts/')

Current Directory is:  /Users/josh.berry/_use cases/gopuff/outbound channel optimization/best action with optimization app/notebooks


In [2]:
### Common project specific variables
FILENAME = '1K_to_score_example.csv'  # original data

FILENAME_OPT_CACHE = 'optimizer_cache.csv'
FILENAME_OPT_RESULT = 'optimizer_answer.csv'

FILENAME_BRUTE_CACHE = 'temp2.csv'
FILENAME_BRUTE_RESULT = 'temp2_scored.csv'

### Helper Functions

In [3]:
# helper function to reduce memory footprint of the dataframe
def reduce_mem_usage(df, verbose=True):
    import numpy as np
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_mem = df.memory_usage().sum() / 1024**2    
    for col in df.columns:
        col_type = df[col].dtypes
        if col_type in numerics:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float32)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)    
    end_mem = df.memory_usage().sum() / 1024**2
    if verbose: 
        print('Mem. usage decreased to {:5.2f} MB ({:.1f}% reduction)'.format(end_mem, 100 * (start_mem - end_mem) / start_mem))
    return df

### Data Import

In [4]:
indata = reduce_mem_usage(pd.read_csv(Path(data_dir) / FILENAME))

Mem. usage decreased to  0.10 MB (46.2% reduction)


-----------------------------
We'll use a random sample of our training data to illustrate how we would take new data, and figure out the best channel to use for each customer, to maximize the probability of response.

In [5]:
indata['event_time_hour_of_day'].unique()

array([18, 23,  6,  0, 13, 20,  1, 22, 10,  4, 17, 21, 19,  2,  9,  3,  8,
       16,  5, 14, 15, 12,  7])

In [6]:
# (Optional) increase dataframe size for more testing
times_to_repeat = 1

We'll delete the target variable, since we wouldn't know it, and we'll blank out the columns which comes from "the future" of our invisible snapshot date which was used to query this dataset.

In [7]:
df = indata.copy()   
df = pd.concat([df]*times_to_repeat, ignore_index=True)
df['fake_customerid'] = df.index
df.drop(columns=['has_session_w_atc','z_cost'], inplace=True)

In [8]:
df.head(5)

Unnamed: 0,campaign_concat_str,last_event_to_snapshot_gap,last_session_time_hod,last_session_time_dow,email_domain,email_ext,rfm_label,recency_score,frequency_score,monetary_value_score,user_lifespan,cumulative_order_num,cumulative_revenue,event_counts,mean_gap,last_campaign_name_cleaned,last_order_locationid_categorical,order_lifespan_bin_combo,event_name_transformed,event_time_day_of_week,event_time_hour_of_day,fake_customerid
0,lifecycle register to purchase lifecycle regis...,5.0,18.0,5.0,gmail,com,00_Lost,0,0,0,17,1,26.5,6,2.091667,lifecycle register to purchase,LOC: 19,0 | 2,email,7,18,0
1,t non fam free delivery,59.0,14.0,0.0,gmail,com,06_Promising,4,0,2,450,2,16.34,1,,t4 non fam free delivery,LOC: 182,1 | 5,push,4,23,1
2,days of deals day new-act-dorm ncoup new years...,3.0,4.0,4.0,gmail,com,02_At_Risk,1,2,2,211,6,92.940002,14,3.86218,valentines day launch chocolatesroses no alc m...,LOC: 106,3 | 4,email,4,23,2
3,lifecycle register to purchase lifecycle regis...,1.0,4.0,6.0,gmail,com,06_Promising,4,0,2,16,1,10.36,10,1.37963,lifecycle 1st to 2nd,LOC: 106,0 | 2,email,7,6,3
4,,,,,gmail,com,neverbuyer,0,0,0,155,0,0.0,0,,,LOC: UNKNOWN,0 | 4,email,4,0,4


--------------------------
## Optimizer App Method

With this method, we 
1. cache a .csv file to our disk
2. call the rest API to the optimizer that is already setup
3. wait for results to download

In [None]:
future_cols = ['event_name_transformed','event_time_day_of_week','event_time_hour_of_day'] 

# optimizer app wants to see blank columns
for c in future_cols:
    df[c]=''

# optimizer app works by caching a file to disk and then sending it
df.to_csv(Path(data_dir) / FILENAME_OPT_CACHE, index=False)

############# Using retry request #############
import json
import time
import requests
from functools import wraps


headers = {"Authorization": f"Bearer {token}"}

ATTEMPT_COUNT = 1024
ATTEMPT_INCREMENT = 60

class MaxAttemptsLimitReached(Exception):
    pass

class retry_on_false:
    """
    Helper decorator class to retry getting data from resource before give up.
    """
    def __init__(self, attempts_count: int = 20, time_wait: int = 60) -> None:
        self.attempts_count = ATTEMPT_COUNT
        self.time_wait = ATTEMPT_INCREMENT  # sec
    def __call__(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            i = 0
            while i <= self.attempts_count:
                results = func(*args, **kwargs)
                if results:
                    return results
                else:
                    time.sleep(self.time_wait)
                    i = i + 1
                    continue
            raise MaxAttemptsLimitReached("Maximum attempts limit reached.")
            
        return wrapper
    
def send_dataset(path):
    with open(path) as f:
        response = requests.post(f"{app_url}/api/uploadAsync", files={'file': f}, headers=headers, timeout=(5, 60))
        if response.status_code > 400:
            raise Exception(response.content)
        data = response.json()
        if response.status_code == 400:
            raise Exception('\n'.join(data['errors']))
            
        return data['dataset']
    
@retry_on_false(attempts_count=ATTEMPT_COUNT) 
def wait_dataset_readiness(dataset_id):
    response = requests.get(f"{app_url}/api/datasets/{dataset_id}", headers=headers, timeout=(3, ATTEMPT_INCREMENT))
    response.raise_for_status()
    dataset_info = response.json()
    if dataset_info['task']['status'] in ('CANCELED', 'ERRORED', 'FINISHED'):
        return dataset_info
    print(f"Dataset {dataset_id} is not ready yet. Waiting...")
    
    return False

def download_optimization_results(dataset_id):
    response = requests.get(f"{app_url}/api/datasets/{dataset_id}/download", headers=headers)
    response.raise_for_status()
    path = str(Path(data_dir) / FILENAME_OPT_RESULT)
    with open(path, 'wb') as f:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)
                time.sleep(1)
                
    return path

def optimize_dataset(path):
    dataset = send_dataset(path)
    print(f"Optimization job created. Dataset ID: {dataset['id']}")
    print("Waiting for optimization results...")
    dataset = wait_dataset_readiness(dataset['id'])
    if dataset['task']['status'] in ('ERRORED', 'CANCELED'):
        print(dataset['task'].get('error_message'))
        return
    print("Optimization finished. Downloading...")
    results_path = download_optimization_results(dataset['id'])
    print(f"Finished downloading optimizations into: {results_path}")
    
    return results_path


start = time.monotonic()
optimize_dataset(Path(data_dir) / FILENAME_OPT_CACHE)
print('TIME: ', round((time.monotonic() - start) / 60), 'Min')

Optimization job created. Dataset ID: 5ee21943c51c3b00018f5696
Waiting for optimization results...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 is not ready yet. Waiting...
Dataset 5ee21943c51c3b00018f5696 

##### Results
Hyperopt is not designed for this type of problem, but it yields the best results (strangely):
- 1K  : 28 min
- 10K : 284 min

Grid Search is ideal for this problem since there are only 3 variables to optimize and only 1,512 possibilities per row, but this actually took slower:
- 1K  : 512 min (est) would not finish
- 10K : 20,480 minutes (est) not realistically possible

This execution time is concerning when dealing with potentially bigger data. We assume that this will continue to scale linearly with the increase in file size, because by watching the results appear through the Optimizer UI, we can see the progress incrementing with each record that is solved.

In [None]:
result_opt = pd.read_csv(Path(data_dir) / FILENAME_OPT_RESULT)
result_opt.head(3)

-------------------------------
## Brute Force Method
To compare the execution time, we will try a brute force method which should try every possible combination and then take the maximum:
1. Create cartesian product with all possible combinations
2. Cache file to disk
3. Submit for scoring against deployment and download results
4. Group rank to isolate maximum choice for each customer

In [9]:
df = indata.copy()   
df = pd.concat([df]*times_to_repeat, ignore_index=True)
df['fake_customerid'] = df.index
df.drop(columns=['has_session_w_atc','z_cost'], inplace=True)

future_cols = ['event_name_transformed','event_time_day_of_week','event_time_hour_of_day'] 
df.drop(columns=future_cols, inplace=True)

import time
start = time.monotonic()

# helper function
def create_list_from_range(r1,r2): 
    if (r1 == r2): 
        return r1 
    else: 
        res = [] 
        while(r1 < r2+1 ): 
            res.append(r1) 
            r1 += 1
        return res

# make a list of options for the 3 columns
options_event_name_transformed = ['email','push','webhook','email+push','email+push+push','email+push+push+push','email+webhook','push+push','push+push+push']
options_event_time_day_of_week = create_list_from_range(1,7)
options_event_time_hour_of_day = create_list_from_range(0,23)

# turn each list into a dataframe
df_options = pd.DataFrame({'event_name_transformed': options_event_name_transformed})
df_day = pd.DataFrame({'event_time_day_of_week': options_event_time_day_of_week})
df_hour = pd.DataFrame({'event_time_hour_of_day': options_event_time_hour_of_day})

# add a dummy columns to everything, because pythons stupid
df_options['dummy']=1
df_day['dummy']=1
df_hour['dummy']=1
df['dummy'] = 1

# now cartesian product... cascading, because pythons stupid
dfDayWeek = pd.merge(df_day, df_hour, on='dummy')
dfDayWeekOpt = pd.merge(dfDayWeek, df_options, on='dummy')
dfToScore = pd.merge(df[df.columns[~df.columns.isin(['event_name_transformed','event_time_day_of_week','event_time_hour_of_day'])]], dfDayWeekOpt, on='dummy')

# cache to disk for a fair comparison
dfToScore.to_csv(Path(data_dir) / FILENAME_BRUTE_CACHE, index=False)

# now submit for scoring | https://datarobot-public-api-client.readthedocs-hosted.com/en/v2.20.0/entities/batch_predictions.html

#######################################################################
infile = str(Path(data_dir) / FILENAME_BRUTE_CACHE)
outfile = str(Path(data_dir) / FILENAME_BRUTE_RESULT)

import datarobot as dr
dr.Client(endpoint=ENDPOINT_URL, token=API_TOKEN, connect_timeout=9999, max_retries=99)

dr.BatchPredictionJob.score_to_file(deployment_id,infile,outfile, passthrough_columns=['fake_customerid','event_name_transformed','event_time_day_of_week','event_time_hour_of_day'])

scored = pd.read_csv(Path(data_dir) / FILENAME_BRUTE_RESULT)
scored['rank'] = scored.groupby('fake_customerid')['has_session_w_atc_1.0_PREDICTION'].rank(ascending=False, method='first')

result_brute = scored[scored['rank']==1]

print('TIME: ', round((time.monotonic() - start) / 60), 'Min')

TIME:  5 Min


##### Results

(Max options (24 * 7 * 9) = 1512 row expansion)
- 1K  : 4 min
- 10K : 37 min

In [None]:
result_brute.head(3)

----------------------------------
## Comparing Accuracy of Results

Here we can see that the brute force method is not only faster, but more accurate

In [None]:
result_brute['has_session_w_atc_1.0_PREDICTION'].sum()

In [None]:
result_opt['optimized_prediction'].sum()

#### Disagreements
Let's filter for records where the optimum choice disagrees between both methods.

In [None]:
try:
    result_brute = result_brute.set_index(['fake_customerid'])
    result_opt = result_opt.set_index(['fake_customerid'])
except:
    pass

joined = pd.merge(result_brute[['has_session_w_atc_1.0_PREDICTION']], result_opt[['optimized_prediction']], left_index=True, right_index=True)

joined['absdiff'] = np.abs(joined['has_session_w_atc_1.0_PREDICTION'] - joined['optimized_prediction'])

disagrees = joined[joined['absdiff']>0.000001]

In [None]:
disagree_opt = result_opt.loc[disagrees.index][['optimized_event_time_day_of_week','optimized_event_time_hour_of_day','optimized_event_name_transformed','optimized_prediction']]
disagree_opt

Focusing on the first one, fake_customerid=1, we can see that the 9th most optimal value was chosen by the optimizer tool.

In [None]:
disagree_brute = scored[scored['fake_customerid'].isin(disagrees.index)].sort_values('fake_customerid').sort_values(['fake_customerid','rank'])
disagree_brute[disagree_brute['fake_customerid']==1][:10]

## Conclusion
When it comes to scaling this to millions of customers, we're stuck between a rock and a hard place.

#### Optimization App
*Pros* 
 - Only sending records as-is (send 1 row per customer)

*Cons* 
 - Not always optimum (this is a minor difference, however)
 - Will take too long to execute (10M rows => 286,000 minutes => 4,766 hours => 199 days
 
#### Brute Force
*Pros*
 - Answer will be optimum
 - More efficient in calculations per row

*Cons* 
 - Need to pre-populate every possible choice: So, we'll quickly be limited by memory and/or disk space


### Next Steps
Is there a way that we can intelligently cache the results to disk without exceeding memory limits, performing the Brute Force operation in chunks?