In [None]:
import pandas as pd
import json

from datetime import datetime

from google.cloud import bigquery, storage

In [None]:
import dabl

from sklearn import svm
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import precision_recall_fscore_support, accuracy_score, auc
from sklearn.preprocessing import OneHotEncoder

In [None]:
client = bigquery.Client()

# FETCH DATA AND LABEL IT

In [None]:
def export_and_upload_df(dataframe, file_prefix, bucket_name):
    """Uploads a file to the bucket."""
    # bucket_name = "your-bucket-name"
    # source_file_name = "local/path/to/file"
    # destination_blob_name = "storage-object-name"
    
    file_name = f"{file_prefix}_{datetime.today().strftime('%Y_%m_%dT%H:%M:%S')}.csv"
    dataframe.to_csv(file_name, index=False)
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)

    blob.upload_from_filename(f"./{file_name}")

    print(f"File {file_name} uploaded to {bucket_name}.")

In [None]:
def obtain_monthly_save_aggregates():
    sql = """
        select EXTRACT(MONTH from TIMESTAMP_MILLIS(time_transaction_occurred)) as save_month, unit, 
            sum(amount) as sum, avg(amount) as average, count(*) as count from ops.user_behaviour 
            where transaction_type = 'SAVING_EVENT' group by save_month, unit order by save_month desc;
    """
    
    df = client.query(sql).to_dataframe()
    return df

In [None]:
def obtain_boosts_with_saves():
    sql = """
    with boost_offers as (
            select *, TIMESTAMP_MILLIS(created_at) as creation_timestamp 
            from ops.all_user_events 
            where event_type like 'BOOST_CREATED%'

    ), save_events as (
            select *, TIMESTAMP_MILLIS(created_at) as creation_timestamp 
            from ops.all_user_events 
            where event_type = 'SAVING_PAYMENT_SUCCESSFUL'
    )
    select boost_offers.user_id, boost_offers.event_type, boost_offers.context, 
        boost_offers.creation_timestamp as boost_creation_time, save_events.creation_timestamp as save_completion_time,  
        TIMESTAMP_DIFF(save_events.creation_timestamp, boost_offers.creation_timestamp, HOUR) as time_from_boost_to_save
    from boost_offers left join save_events on boost_offers.user_id = save_events.user_id
    """
    
    df = client.query(sql).to_dataframe()
    return df

In [None]:
def obtain_boosts_with_prior_redemptions():
    sql = """
    with boost_offers as (
            select *, TIMESTAMP_MILLIS(created_at) as creation_timestamp 
            from ops.all_user_events 
            where event_type like 'BOOST_CREATED%'

    ), boost_redemptions as (
            select *, TIMESTAMP_MILLIS(created_at) as creation_timestamp 
            from ops.all_user_events 
            where event_type = 'BOOST_REDEEMED'
    )
    select boost_offers.user_id, boost_offers.event_type, boost_offers.context, 
        boost_offers.creation_timestamp as boost_creation_time, boost_redemptions.creation_timestamp as boost_redemption_time,  
        TIMESTAMP_DIFF(boost_redemptions.creation_timestamp, boost_offers.creation_timestamp, HOUR) as time_from_boost_to_last_redeem
    from boost_offers left join boost_redemptions on boost_offers.user_id = boost_redemptions.user_id
        where TIMESTAMP_DIFF(boost_redemptions.creation_timestamp, boost_offers.creation_timestamp, HOUR) < 0 or
        TIMESTAMP_DIFF(boost_redemptions.creation_timestamp, boost_offers.creation_timestamp, HOUR) is null
    """

    df = client.query(sql).to_dataframe()
    return df

In [None]:
def parse_context_and_set_boost_id(df):
    # extract a bunch of context from the boosts    
    df["parsed_context"] = df.context.apply(json.loads)
    df["boost_id"] = df["parsed_context"].apply(lambda context: context["boostId"])
    # and this functions as our index     
    df["boost_user_id"] = df["boost_id"] + "::" + df["user_id"]
    return df

In [None]:
def extract_prior_save_counts(prior_save_counts):
    print('Past rows: ', prior_save_counts.shape)
    prior_save_counts["boost_prior_saves"] = prior_save_counts.groupby('boost_user_id').transform('count')["save_completion_time"]
    prior_save_counts = prior_save_counts[["boost_user_id", "boost_prior_saves"]]
    prior_save_counts = prior_save_counts.groupby("boost_user_id").first() # no need for a sort
    return prior_save_counts

In [None]:
def extract_time_since_latest_save(prior_save_df):
    with_latest_save = prior_save_df.sort_values("save_completion_time").groupby("boost_user_id", as_index = False).last()
    with_latest_save["days_since_latest_save"] = abs(with_latest_save["time_from_boost_to_save"] / 24)
    with_latest_save = with_latest_save[["boost_user_id", "days_since_latest_save"]]
    return with_latest_save

In [None]:
def extract_time_since_first_save(prior_save_df):
    # for some reason if index, causes issues here
    with_earliest_save = prior_save_df.sort_values("save_completion_time").groupby("boost_user_id", as_index = False).first()
    with_earliest_save["days_since_first_save"] = abs(with_earliest_save["time_from_boost_to_save"] / 24)
    with_earliest_save = with_earliest_save[["boost_user_id", "days_since_first_save"]]
    return with_earliest_save

In [None]:
def extract_prior_redemption(df):
    df = parse_context_and_set_boost_id(df)
    print('Priors, length: ', df.shape)
    adjusted_df = df.sort_values("time_from_boost_to_last_redeem").groupby("boost_user_id", as_index=False).last()
    adjusted_df["has_prior_redeemed"] = adjusted_df.boost_redemption_time.notna()
    return adjusted_df[["boost_user_id", "has_prior_redeemed"]]

In [None]:
def clean_up_and_construct_labels(boosts_with_saves, boosts_with_prior_redeemed):
    unit_convertors = { 'WHOLE_CURRENCY': 1, 'WHOLE_CENT': 100, 'HUNDREDTH_CENT': 10000 }
    
    df = boosts_with_saves
    print('Starting count: ', df.shape)
    
    df['user_id_count'] = boosts_with_saves.groupby(['user_id'])['boost_creation_time'].transform('count')
    
    # we remove the top 2, because they are team members often testing, so distort
    outlier_user_ids = df['user_id'].value_counts()[:2].index.tolist()
    # probably a better panda-ninja way to do this but not worth it right now
    for user_id in outlier_user_ids:
        df = df[df.user_id != user_id]
        
    print('With outlier top users stripped: ', df.shape)
    
    df = parse_context_and_set_boost_id(df)
    
    # here we have our label
    df["is_save_within_day"] = df["time_from_boost_to_save"] < 24
    
    df["boost_amount_whole_currency"] = df["parsed_context"].apply(
        lambda context: int(context["boostAmount"]) / unit_convertors[context["boostUnit"]])
    
    df["boost_type"] = df["parsed_context"].apply(lambda context: context["boostType"])
    df["boost_category"] = df["parsed_context"].apply(lambda context: context["boostCategory"])
    df["boost_type_category"] = df["boost_type"] + "::" + df["boost_category"]
    
    print('Categories: ', df["boost_type_category"].unique())
    
    df["day_of_month"] = df["boost_creation_time"].dt.day
    df["hour_of_day"] = df["boost_creation_time"].dt.hour
    df["day_of_week"] = df["boost_creation_time"].dt.dayofweek
    
    # then we construct our future and past masks, calculate prior saves, and find next save
    prior_save_mask = df["time_from_boost_to_save"] < 0
    future_save_mask = df["time_from_boost_to_save"] > 0
        
    # likely a way to do these more simply, but for now doing groups & sorts differently    
    prior_save_counts = extract_prior_save_counts(df[prior_save_mask].copy())
    days_since_latest_save = extract_time_since_latest_save(df[prior_save_mask].copy())
    days_since_first_save = extract_time_since_first_save(df[prior_save_mask].copy())
    
    # then we discard the past -- NOTE : it looks like doing this, because of the sort, discards any
    # boost type categories that do not have more than 2 redemptions -- should fix soon    
    with_future_saves = df[future_save_mask].copy()
    print('And with future saves: ', with_future_saves['boost_type_category'].unique())
    
    with_next_save = with_future_saves.sort_values("save_completion_time").groupby("boost_user_id").first()
    print('Unique types, with next save: ', with_next_save['boost_type_category'].unique())
    
    print('Now with just future saves crossed: ', with_future_saves.shape, ' and next save only: ', with_next_save.shape)
    
    with_prior_redemption = extract_prior_redemption(boosts_with_prior_redeemed)
    
    # and finally we strip out the surplus boost-save pairs (by retaining only the opening)
    # at the moment an inner join, but we may want to turn this into joining from those with saves
    final_df = pd.merge(with_next_save, prior_save_counts, on='boost_user_id')
    print('Categories, final DF: ', final_df["boost_type_category"].unique())

    final_df = pd.merge(final_df, days_since_latest_save, on='boost_user_id')
    final_df = pd.merge(final_df, days_since_first_save, on='boost_user_id')
    print("And finally, stripped to just one per: ", final_df.shape)
    
    final_df = pd.merge(final_df, with_prior_redemption, on='boost_user_id')
    print("And now with boolean on prior redemption: ", final_df.shape)
    
    return final_df

In [None]:
def add_withdraw_then_save(df):
    withdrawals_with_next_save = """
        with withdrawal_events as (
          select user_id, event_type, timestamp_millis(time_transaction_occurred) as withdrawal_time, context
          from ops.all_user_events where event_type = 'ADMIN_SETTLED_WITHDRAWAL'
        ),
        save_events as (
          select user_id, transaction_type, timestamp_millis(time_transaction_occurred) as save_time, amount, unit
          from ops.user_behaviour where transaction_type = 'SAVING_EVENT'
        )
        select withdrawal_events.user_id, withdrawal_events.withdrawal_time as withdrawal_time, 
        min(save_events.save_time) as next_save_time, count(save_events.save_time > withdrawal_time) as subsequent_save_count,
        from withdrawal_events left join save_events on withdrawal_events.user_id = save_events.user_id
        where (save_events.save_time > withdrawal_time) 
        group by user_id, withdrawal_events.withdrawal_time
    """

    wdf = client.query(withdrawals_with_next_save).to_dataframe()
    slimmed_df = wdf[['user_id', 'subsequent_save_count']]
    slimmed_df = slimmed_df.groupby('user_id').sum()
    with_withdrawal = df.merge(slimmed_df, how='left', on='user_id')
    with_withdrawal['has_withdrawn_and_saved'] = with_withdrawal['subsequent_save_count'] > 0
    return with_withdrawal

In [None]:
boosts_with_saves = obtain_boosts_with_saves()
boosts_with_redeems = obtain_boosts_with_prior_redemptions()

In [None]:
data = clean_up_and_construct_labels(boosts_with_saves, boosts_with_redeems)

In [None]:
data = add_withdraw_then_save(data).rename(columns={'subsequent_save_count': 'saves_after_withdraw'})

In [None]:
data.shape

In [None]:
data.has_prior_redeemed.value_counts()

In [None]:
data.is_save_within_day.value_counts()

In [None]:
result_store = {}

In [None]:
result_store['n'] = len(data)
result_store['n_positive'] = data.is_save_within_day.value_counts()[True]
print(result_store)

In [None]:
data.dtypes

In [None]:
def feature_extraction(data):
    features_of_interest = [
        "boost_amount_whole_currency", 
        "day_of_month", 
        "boost_prior_saves",
        "boost_type_category",
        "days_since_latest_save",
        "has_prior_redeemed",
        "days_since_first_save",
        "day_of_week",
        "saves_after_withdraw",
        "has_withdrawn_and_saved",
        "is_save_within_day"
    ]
    stripped_df = data[features_of_interest]
    return stripped_df

In [None]:
feature_frame = feature_extraction(data)
feature_frame.dtypes

In [None]:
def export_and_upload_df(dataframe, file_prefix='boost_save_inducement', bucket_name='prod_boost_ml_datasets'):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    file_name = f"{file_prefix}_{datetime.today().strftime('%Y_%m_%dT%H:%M:%S')}.csv"
    dataframe.to_csv(file_name, index=False)
    
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)

    blob.upload_from_filename(f"./{file_name}")

    print(f"File {file_name} uploaded to {bucket_name}.")
    
    return file_name

In [None]:
latest_dataset = export_and_upload_df(feature_frame)

# USE DABL AUTO TOOLS

In [None]:
# feature_frame.head()
dabl_data = dabl.clean(feature_frame)
dabl_data.dtypes
dabl.plot(dabl_data, target_col='is_save_within_day')

In [None]:
X = dabl_data.drop("is_save_within_day", axis=1)
Y = dabl_data.is_save_within_day

In [None]:
# at the moment this is making things worse, so
preprocessor = dabl.EasyPreprocessor()
X_trans = preprocessor.fit_transform(X)

In [None]:
fc = dabl.SimpleClassifier(random_state=0)
fc.fit(X_trans, Y)

In [None]:
from sklearn import metrics
fpr, tpr, thresholds = metrics.roc_curve(Y, fc.predict(X), pos_label=2)
metrics.auc(fpr, tpr)

# STARTING GOOGLE AUTOML SECTION

In [None]:
bucket_name = 'prod_boost_ml_datasets'
dataset_path = f'gs://{bucket_name}/{latest_dataset}'

In [None]:
from google.cloud import automl_v1beta1 as automl
client_options = {'api_endpoint': 'eu-automl.googleapis.com:443'}

In [None]:
%env GOOGLE_APPLICATION_CREDENTIALS=./google_service_account.json

In [None]:
# set up client
client_options = {'api_endpoint': 'eu-automl.googleapis.com:443'}

automl_client = automl.TablesClient(project='jupiter-production-258809', region='eu', client_options=client_options)

In [None]:
# Create a dataset with the given display name (gcloud currently only allows 32 chars, and no :, so)
import time
timestamp = int(time.time()*1000.0)
dataset_display_name = f"boost_select_{timestamp}"
print('Dataset display name: ', dataset_display_name)

In [None]:
dataset = automl_client.create_dataset(dataset_display_name)

# Display the dataset information.
print("Dataset name: {}".format(dataset.name))
print("Dataset id: {}".format(dataset.name.split("/")[-1]))
print("Dataset display name: {}".format(dataset.display_name))
print("Dataset metadata:")
print("\t{}".format(dataset.tables_dataset_metadata))
print("Dataset example count: {}".format(dataset.example_count))
print("Dataset create time:")
print("\tseconds: {}".format(dataset.create_time.seconds))
print("\tnanos: {}".format(dataset.create_time.nanos))

In [None]:
path = dataset_path
response = None

if path.startswith("bq"):
    response = automl_client.import_data(
        dataset_display_name=dataset_display_name, bigquery_input_uri=path
    )
else:
    # Get the multiple Google Cloud Storage URIs.
    input_uris = path.split(",")
    response = automl_client.import_data(
        dataset_display_name=dataset.display_name,
        gcs_input_uris=input_uris,
    )

print("Processing import...")
# synchronous check of operation status.
print("Data imported. {}".format(response.result()))

In [None]:
column_spec_display_name = 'is_save_within_day' #@param {type:'string'}

update_dataset_response = automl_client.set_target_column(
    dataset=dataset,
    column_spec_display_name=column_spec_display_name,
)
update_dataset_response

In [None]:
project_id = 'jupiter-production-258809'
compute_region = 'eu'
dataset_display_name = dataset_display_name
model_display_name = dataset_display_name
train_budget_milli_node_hours = 5000
# include_column_spec_names = 'INCLUDE_COLUMN_SPEC_NAMES_HERE'
#    or None if unspecified
# exclude_column_spec_names = 'EXCLUDE_COLUMN_SPEC_NAMES_HERE'
#    or None if unspecified

response = automl_client.create_model(
    model_display_name,
    train_budget_milli_node_hours=train_budget_milli_node_hours,
    dataset_display_name=dataset_display_name,
    include_column_spec_names=None,
    exclude_column_spec_names=None,
)

print("Training model...")
print("Training operation name: {}".format(response.operation.name))
print("Training completed: {}".format(response.result()))

# STARTING SVM SECTION

In [None]:
def ensure_all_one_hots(df):
    all_boost_type_categories = [
        'GAME::CHASE_ARROW', 
        'GAME::DESTROY_IMAGE',
        'GAME::TAP_SCREEN', 
        'SIMPLE::ROUND_UP',
        'SIMPLE::SIMPLE_SAVE', 
        'SIMPLE::TIME_LIMITED',
        'SIMPLE::TARGET_BALANCE',
        'SOCIAL::FRIENDS_ADDED',
        'SOCIAL::NUMBER_FRIENDS'
    ]
    
    assignment_args = {}
    for category in all_boost_type_categories:
        column_name = f'boost_type_category_{category}'
        if column_name not in df:
            assignment_args[column_name] = 0
    
    return df.assign(**assignment_args)

In [None]:
X_small = feature_frame[["boost_amount_whole_currency", "day_of_month", "day_of_week", "boost_prior_saves", "days_since_latest_save", "has_withdrawn_and_saved", "boost_type_category"]]
# will one hot encode day of week when more data so less sparse
X_encoded = pd.get_dummies(X_small, prefix_sep="_", columns=["boost_type_category", "day_of_week"]) 
X_encoded.dtypes

In [None]:
X_encoded = ensure_all_one_hots(X_encoded)

In [None]:
X_encoded.dtypes

In [None]:
X_encoded.shape

In [None]:
X_train, X_test, Y_train, Y_test = train_test_split(X_encoded, data.is_save_within_day, test_size=0.2)

In [None]:
Y_test.dtypes
# Y_test.value_counts()

In [None]:
X_train.dtypes

In [None]:
clf = svm.SVC(kernel='linear', class_weight='balanced', probability=True)
clf.fit(X_train, Y_train)
precision_recall_fscore_support(Y_test, clf.predict(X_test), average='binary')

In [None]:
clf.predict(X_test)

In [None]:
# see results notebook for removing C = 1000 for the moment
param_grid = [
  {'C': [1, 10, 100], 'kernel': ['linear'], 'class_weight': ['balanced'] },
  {'C': [1, 10, 100], 'gamma': [0.001, 0.0001], 'kernel': ['rbf'], 'class_weight': ['balanced'] },
 ]

In [None]:
search_svc = svm.SVC()

In [None]:
svc_clf = GridSearchCV(search_svc, param_grid, verbose=1)

In [None]:
svc_clf.fit(X_encoded, data.is_save_within_day)

In [None]:
# note we should really separate out test prior to train, with just a little more data
scores = precision_recall_fscore_support(Y_test, svc_clf.predict(X_test), average='binary')
print(scores)

In [None]:
precision, recall, fscore, support = scores

In [None]:
print(recall)

In [None]:
accuracy_score(Y_test, svc_clf.predict(X_test))

In [None]:
result_df = pd.DataFrame.from_dict(svc_clf.cv_results_)

In [None]:
result_df

# STARTING XBOOST SECTION

In [None]:
from sklearn.ensemble import GradientBoostingClassifier

In [None]:
gb_clf = GradientBoostingClassifier(min_samples_split=10)
gb_clf.fit(X_train, Y_train)

In [None]:
gb_clf.score(X_test, Y_test)

In [None]:
precision_recall_fscore_support(Y_test, gb_clf.predict(X_test), average='binary')

# PERSIST THE MODEL AND UPLOAD IT

In [None]:
from joblib import dump, load

In [None]:
def persist_model(clf, send_to_storage=False, storage_bucket=None, latest_model_name=None):
    file_name = f"boost_incuding_model_{datetime.today().strftime('%Y_%m_%dT%H:%M:%S')}.joblib"
    dump(clf, file_name)
    print(f"Model dumped to {file_name}")
    
    if send_to_storage:
        storage_client = storage.Client()
        bucket = storage_client.bucket(storage_bucket)
        blob = bucket.blob(file_name)

        blob.upload_from_filename(f"./{file_name}")
        print(f"File {file_name} uploaded to {storage_bucket}.")

In [None]:
persist_model(svc_clf)

In [None]:
restored_clf = load('boost_incuding_model_2020_06_23T18:16:43.joblib')

In [None]:
accuracy_score(Y_test, restored_clf.predict(X_test))

## STORE RESULTS IN DATAFRAME

In [None]:
from google.cloud import datastore

In [None]:
datastore_client = datastore.Client()

In [None]:
kind = "TrainingResult"
name = f"boost_inducement_{datetime.today().strftime('%Y_%m_%dT%H:%M:%S')}"
result_key = datastore_client.key(kind, name)
print('Result key: ', result_key)


In [None]:
model_result = datastore.Entity(key=result_key)
# model_result['recall'] = results['recall']
# model_result['precision'] = results['precision']
# model_result['accuracy'] = results['accuracy']
# model_result['n'] = result_store['n']
# model_result['n_positive'] = result_store['n_positive']

# model_result['description'] = 'This is a bit lousy'

model_result.update(result_store)

In [None]:
model_result

In [None]:
datastore_client.put(model_result)