In [None]:
# for numerical work
import pandas as pd
import numpy as np

import pymongo

import datetime
import json

from pandas.io.json import json_normalize
from pymongo import MongoClient

import pickle

import re


# load the database credentials from file
with open('../creds/creds.json') as json_data:
    creds = json.load(json_data)
    
client = MongoClient(creds['connection_string'])

# connect to the ml mongo database
ml = client['ml']

# check if collection exists and if not create it
if 'requestEvents60Summaries' not in list(ml.list_collection_names()):
    
    # create the collection 
    ml.create_collection('requestEvents60Summaries')
    
latest_record_list = list(ml['requestEvents60Summaries'].find().sort('request_created',-1).limit(1))

# if there's a latest record
if len(latest_record_list) > 0:
    
    # get the latest record created date
    latest_record_time = latest_record_list[0]['request_created']
    
    print('Most recent request time:', latest_record_time)
    
        
    # get the new credit card and interac requests
    all_requests = list(client['ml']['requestEvents60'].find({'request.created': {'$gte': latest_record_time}}))

# get them all
else:
    print("No pre-processed records, processing all.")
    # get the new credit card and interac requests
    all_requests = list(client['ml']['requestEvents60'].find())

In [None]:
# if there's new requests to process
if len(all_requests) > 0:
    # flatten into one object per event
    flat_requests = [{'request': rs['request'], 
                      'event': event} for rs in all_requests for event in rs['events']]

    # flatten requests into a dataframe
    all_events = pd.DataFrame(json_normalize(flat_requests))

    # create a dataframe with the results
    df_with_id = all_events

    # replace infinity and Nan strings with proper np.nan
    df_with_id.replace(['NaN','Infinity'],np.nan, inplace=True)

    # sort by request id and date
    df_with_id = df_with_id.sort_values(by=['request._id','event.created'])

    # calculate the previous event time and the time between events
    df_with_id['previous_event_time'] = df_with_id.groupby(['request._id'])['event.created'].shift(1)
    df_with_id['event.time_since_last_event'] = pd.to_numeric(df_with_id['event.created']-df_with_id['previous_event_time'])*1e-9

    # replace string versions of infinity with proper inf object
    df_with_id = df_with_id.replace('Infinity', np.inf)

    # convert columns that should be to numeric
    df_with_id['request.metadata.amount'] = pd.to_numeric(df_with_id['request.metadata.amount'])
    df_with_id['request.metadata.rate'] = pd.to_numeric(df_with_id['request.metadata.rate'])
    df_with_id['request.metadata.cents'] = pd.to_numeric(df_with_id['request.metadata.cents'])
    df_with_id['request.value'] = pd.to_numeric(df_with_id['request.value'])
    df_with_id['event.metadata.amount'] = pd.to_numeric(df_with_id['event.metadata.amount'])
    df_with_id['event.metadata.rate'] = pd.to_numeric(df_with_id['event.metadata.rate'])
    df_with_id['event.metadata.cents'] = pd.to_numeric(df_with_id['event.metadata.cents'])
    df_with_id['event.value'] = pd.to_numeric(df_with_id['event.value'])

    # get the days since november
    df_with_id['event.days_since_nov'] = df_with_id['event.created'].apply(lambda x: (x - datetime.datetime(year=2017,month=11,day=1)).days)

    # replace older bitcoin labels with new format
    df_with_id.loc[df_with_id['event.eventLabel'].str.lower() == 'bitcoin', 'event.eventLabel'] = 'BTC'

    # create unique category identifiers
    df_with_id['event.category_action_label'] = df_with_id['event.eventCategory']+'_'+df_with_id['event.eventAction']+'_'+df_with_id['event.eventLabel']
    df_with_id['event.category_action'] = df_with_id['event.eventCategory']+'_'+df_with_id['event.eventAction']

    # drop columns that contain list/array values because they can't be processed
    list_drops = [col for col in df_with_id.columns if df_with_id[col].apply(lambda x: type(x)).value_counts().index[0] == "<class 'list'>"]
    df_with_id = df_with_id.drop(list_drops, axis=1)

    # drop some other columns
    df_with_id = df_with_id.drop(['event.metadata.authResponseEIN.body.data.token_type','event.metadata.authResponseEIN.headers.map.content-type'], axis=1)

    # categorical columns that need to be converted to binary
    categorical_columns = ['event.category_action',
                            'event.category_action_label',
                            'event.metadata.addressCity',
                            'event.metadata.addressCountry',
                            'event.metadata.addressProvince',
                            'event.metadata.city',
                            'event.metadata.country',
                            'event.metadata.currency',
                            'event.metadata.instrument',
                            'event.metadata.mongoResponse.product',
                            'event.metadata.product',
                            'event.metadata.productId',
                            'event.metadata.prossessorError.billingDetails.city',
                            'event.metadata.prossessorError.billingDetails.country',
                            'event.metadata.prossessorError.billingDetails.state',
                            'event.metadata.prossessorError.card.type',
                            'event.metadata.prossessorError.currencyCode',
                            'event.metadata.prossessorResponse.billingDetails.city',
                            'event.metadata.prossessorResponse.billingDetails.country',
                            'event.metadata.prossessorResponse.billingDetails.province',
                            'event.metadata.prossessorResponse.billingDetails.state',
                            'event.metadata.prossessorResponse.card.cardType',
                            'event.metadata.prossessorResponse.card.type',
                            'event.metadata.prossessorResponse.card_type',
                            'event.metadata.prossessorResponse.currency',
                            'event.metadata.prossessorResponse.currencyCode',
                            'event.metadata.province',
                            'event.metadata.requestParams.currency',
                            'event.metadata.requestParams.product',
                            'event.metadata.type']


    unique_columns = ['event.metadata.bankName',
                     'event.metadata.cardHolder',
                     'event.metadata.cardId',
                     'event.metadata.cardName',
                     'event.metadata.cardNumberLastFour',
                     'event.metadata.cardPrefix',
                     'event.metadata.cardSuffix',
                     'event.metadata.email',
                     'event.metadata.firstName',
                     'event.metadata.fullName',
                     'event.metadata.lastName',
                     'event.metadata.mongoResponse.email',
                     'event.metadata.name',
                     'event.metadata.prossessorError.card.cardExpiry.month',
                     'event.metadata.prossessorError.card.cardExpiry.year',
                     'event.metadata.prossessorError.card.lastDigits',
                     'event.metadata.prossessorError.card.type',
                     'event.metadata.prossessorResponse.card.cardExpiry.month',
                     'event.metadata.prossessorResponse.card.cardExpiry.year',
                     'event.metadata.prossessorResponse.card.cardType',
                     'event.metadata.prossessorResponse.card.lastDigits',
                     'event.metadata.prossessorResponse.card.type',
                     'event.metadata.prossessorResponse.card_expiry_month',
                     'event.metadata.prossessorResponse.card_expiry_year',
                     'event.metadata.prossessorResponse.card_suffix',
                     'event.metadata.prossessorResponse.card_type',
                     'event.metadata.prossessorResponse.profile.email',
                     'event.metadata.prossessorResponse.profile.firstName',
                     'event.metadata.prossessorResponse.profile.lastName',
                     'event.metadata.requestParams.card_id',
                     'event.metadata.requestParams.email']

    numerical_per_currency = ['event.metadata.amount',
                             'event.metadata.blockioResponse.data.amount_sent',
                             'event.metadata.blockioResponse.data.amount_withdrawn',
                             'event.metadata.lastTradedPx',
                             'event.metadata.mongoResponse.amount',
                             'event.metadata.mongoResponse.price',
                             'event.metadata.price',
                             'event.metadata.prossessorResponse.amount',
                             'event.metadata.rate',
                             'event.metadata.requestParams.amount',
                             'event.metadata.requestParams.price',
                             'event.metadata.requestParams.product_amount']

    numerical_overall = ['event.metadata.cents',
                         'event.metadata.prossessorResponse.charge_amount',
                         'event.metadata.requestParams.charge_amount',
                         'event.value',
                         'event.time_since_last_event',
                         'event.days_since_nov']

    all_columns = list(set(categorical_columns + numerical_per_currency + numerical_overall + unique_columns))

    # convert columns to either numeric or categorical
    def convert_to_numeric_or_lower_str(column):

            try:
                return pd.to_numeric(column)

            except:
                return column.str.lower()

    # summarize the columns where unique values matter
    unique_data = df_with_id[['request._id']+unique_columns]


    def n_unique(series):
        '''Function to map over a Pandas series to get the number of unique elements.'''
        return series.dropna().unique().size


    def n_NaN(series):
        '''Function to map over a pandas columns ot get the number of NaN elements.'''
        return np.sum(series.isnull())

    # summarize the columns where uniqueness matters
    unique_summary = unique_data.groupby(['request._id'])[unique_columns].agg([n_unique, n_NaN])
    unique_summary.columns = [col[0] if col[1] == '' else col[0]+'_'+col[1] for col in unique_summary.columns.ravel()]

    # summarize the columns where they are numerical but the values are specific to a currency such as the metadata.amount field.
    numerical_by_currency_data = df_with_id[['request._id','event.category_action_label']+numerical_per_currency]
    numerical_by_currency_data.dropna(axis=0, how='all', subset=numerical_per_currency, inplace=True)
    groupby_agg = numerical_by_currency_data.groupby(['request._id','event.category_action_label'], as_index=False)[numerical_per_currency].agg(['mean','median','max','min','std'])
    groupby_agg.columns = [col[0] if col[1] == '' else col[0]+'_'+col[1] for col in groupby_agg.columns.ravel()]
    groupby_agg = groupby_agg.reset_index()
    groupby_agg = groupby_agg.melt(id_vars=['request._id','event.category_action_label'])
    groupby_agg['variable'] = groupby_agg['event.category_action_label']+'_'+groupby_agg['variable'].astype(str)
    groupby_agg.drop('event.category_action_label', axis=1, inplace=True)
    groupby_agg = groupby_agg.pivot(index='request._id', columns='variable', values='value').reset_index().set_index('request._id')
    numerical_by_currency_summary = groupby_agg

    # summarize the columns that are numeric in nature and not specific to a given currency like the value
    numerical_overall_data = df_with_id[['request._id']+numerical_overall]
    numerical_overall_data.dropna(axis=0, how='all', subset=numerical_overall, inplace=True)
    groupby_agg = numerical_overall_data.groupby(['request._id'])[numerical_overall].agg(['mean','median','max','min','std'])
    groupby_agg.columns = [col[0] if col[1] == '' else col[0]+'_'+col[1] for col in groupby_agg.columns.ravel()]
    numerical_overall_summary = groupby_agg

    # summarize the data the is categorical in nature - it needs to be converted to binary format.
    categorical_data = df_with_id[['request._id']+categorical_columns]

    def string_to_lower(col):
        '''Function to map over a Pandas series that tries to convert the column to a lowercase string column.'''
        try:
            return col.str.lower()
        except:
            return col

    categorical_data = categorical_data.apply(string_to_lower, axis=1)

    categorical_data = pd.get_dummies(categorical_data, columns=categorical_columns, dummy_na=True)
    groupby_agg = categorical_data.groupby(['request._id']).agg(['sum'])
    groupby_agg.columns = [col[0] if col[1] == '' else col[0]+'_'+col[1] for col in groupby_agg.columns.ravel()]

    categorical_data_summary = groupby_agg

    # combine all the summaries together into one column
    data_summary = unique_summary.join(numerical_by_currency_summary).join(numerical_overall_summary).join(categorical_data_summary)

    # format the actual request data 
    requests = [rs['request'] for rs in all_requests]
    requests_df = pd.DataFrame(json_normalize(requests))
    requests_df.columns = ['request.'+col for col in requests_df.columns]
    request_df = pd.get_dummies(requests_df[['request._id','request.metadata.email','request.created','request.eventCategory']], columns=['request.eventCategory'])

    # join the request data with the pre-request data
    data = request_df.set_index('request._id').join(data_summary)
    
    def convert(name):
        s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
        return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower().replace('.','_').replace('__','_')
    
    # change camelcase to snake case and remove periods in column names 
    data.columns = [convert(col) for col in data.columns]
    
#     # convert pandas dataframe to json
#     data_json = data.to_dict(orient='records')
    
#     requestEvents60Summaries = client['ml']['requestEvents60Summaries']
#     requestEvents60Summaries.insert_many(data_json)

else:
    print('No new requests to process.')

## Processing Data for The Actual Machine Learning Algorithm

In [None]:
all_request_sets = list(client['ml']['requestEvents60'].find().limit(1000))
flat_requests = [{'request': r['request'], 'event': e} for r in all_request_sets for e in r['events']]

In [None]:
flat = json_normalize(flat_requests)

In [None]:
[col for col in flat.columns]

In [None]:
# get the fraudulent emails
def remove_whitelist_emails(df, email_col):
    '''Remove whitelisted or test emails'''

    # get all the events related to the requests aka within 60 minutes before the first request for the users who mader requests
    wl_emails = [r['email'] for r in list(client['production']['emailWhitelistCollection'].find({'level': 'BLOCKED'}))]
 
    return df[df['request.metadata.email'].isin(wl_emails) == False]

# get the fraudulent emails
def get_fraud_labels(user_emails):
    '''Remove whitelisted or test emails'''

    # get all the events related to the requests aka within 60 minutes before the first request for the users who mader requests
    bl_emails = [r['email'] for r in list(client['production']['emailBlacklistCollection'].find({'level': 'BLOCKED'}))]
 
    return np.array([1 if user in bl_emails else 0 for user in user_emails])

# remove whitelist emails
data = remove_whitelist_emails(data, 'request.metadata.email')

# get the fraud labels
data['fraud'] = get_fraud_labels(data['request.metadata.email'])

# fill na values with zero
data = data.fillna(0)

print("Dataframe is",np.sum(data.memory_usage())*1e-9,'gigabytes in memory')

print("Saving to hdf5 file")

data.to_hdf('../lstm_data_prep_pipeline/results/all_request_summaries.hdf5', 'table')

print("Done")

In [None]:
{
    'model': 'Previous 60 Minutes Random Forest',
    'created': 'date the model was created'
    'n_examples': 11000
    'n_fraudulent': 1000
    'n_notfraudulent': 10000
    'training_time': 5 mins
    'training_type': 'full' or 'update'
}