# Defining, predicting, and preventing disengaged users in FinTech (Part 2)
*__Part 2: Developing intervention using contextual recommender system and designing AB testing experiment to test the intervention__*


In the previous article, we cover a strategy in addressing disengaged users in a fintech setting, aided with data. We started with defining engagement, built a predictive model, and analyzed the results. The model's task was to predict who will be the disengaged users next month and performed with 0.94 AUC, and ATM use in the previous month is one of the strong predictors. This suggested that ATM is one of the primary services. This exercise covers the techniques to perform experimentation in preventing these users from disengaging.

In [None]:
from pyspark.sql import functions as f, SparkSession, DataFrame, Window
from pyspark.sql.functions import col, lit, udf
from IPython.display import display
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

In [71]:
# Conf and set up

spark = (SparkSession.builder.master("local[*]")
                     .config("spark.sql.repl.eagerEval.enabled", True)
                     .getOrCreate())


transaction = spark.read.option('compression', 'gzip').option('inferSchema', 'true').csv('data/raw_data', header=True)
transaction.limit(5)

user_id,birth_year,home_country,home_city,device,num_contacts,plan,num_successful_referrals,joined_date,transaction_id,created_date,merchant_country,merchant_city,card_presence,direction,transactions_type,transactions_state,amount_usd
015aff8c321742c6b...,1992,SR,New Cassandra,Apple,8,STANDARD,0,2018-06-16 02:33:...,ae3a61b8c0254daca...,2018-06-15 06:41:...,,,,INBOUND,TOPUP,COMPLETED,6.29
015aff8c321742c6b...,1992,SR,New Cassandra,Apple,8,STANDARD,0,2018-06-16 02:33:...,5f8f57ed3aa74775a...,2018-06-16 00:16:...,,,,INBOUND,TOPUP,REVERTED,1.36
015aff8c321742c6b...,1992,SR,New Cassandra,Apple,8,STANDARD,0,2018-06-16 02:33:...,7986278db1824634a...,2018-06-16 01:37:...,,,,OUTBOUND,FEE,COMPLETED,6.36
015aff8c321742c6b...,1992,SR,New Cassandra,Apple,8,STANDARD,0,2018-06-16 02:33:...,596f2f7d24754ef48...,2018-06-18 03:09:...,,,,INBOUND,TOPUP,COMPLETED,13.3
015aff8c321742c6b...,1992,SR,New Cassandra,Apple,8,STANDARD,0,2018-06-16 02:33:...,6f8ad9c8af9a40fdb...,2018-06-28 00:50:...,BR,da Rosa,False,OUTBOUND,CARD_PAYMENT,COMPLETED,2.84


# Contextual Recommender System


Context is essential in recommender systems. Given the day of the week as a context, what drinks, movies, items we consume on a Monday night may not be the same as on a Friday night. Adding recency, weather, geolocation, preferences, or other information as the context may make more personalized recommendations, alas, more complex to design as well. Finding which context is practical requires substantial experimentations.
To start, we will use the month of the year as the context. Since the data is limited (there is no geodata or merchant information), we can reframe the problem and scope it down. For example: for each user, rank the top five cities where ATMs services will likely be used and provide promotional rates for these cities. As a context, we will use the month of the year. For example, users who usually utilize ATMs in New York are more likely to use the ATM in Florida in February but Chicago in August.

### Designing Contextual Recommendation using ALS

Alternating Least Squares (ALS) is one of the common techniques used for collaborative filtering. Like other collaborative filtering techniques, ALS decomposes the original matrix (user and interaction) into matrix U and V, such as the multiplication of both matrices produces a close approximation to the original matrix. What makes ALS different from the others is how ALS learns and approximates U and V.
While some other algorithm uses gradient descent or other methods, ALS uses similar methods to Ordinary Least Squares (OLS) in regression problems. A simplified explanation of how ALS learns:
It starts by initializing U and V pseudo-randomly.
Learn matrix V by performing a variation of OLS on each row in matrix V, with every row as the feature and its corresponding column vector from the original matrix as the label.
Similar to step 2; except that we switch with column vector in matrix V as the features and its corresponding row from the original matrix the label.
Iterates until it converges or maximum iteration is reached.

Spark ML has the ALS provided, so we don't have to implement the algorithm from scratch. Furthermore, Spark ML can help when the data is large, as collaborative filtering results in big and sparse matrices.

Features for contextual recommender using ALSSimilar to other collaborative filtering techniques, ALS takes user and interaction as the input. One of the possible approaches to add context is by simply splitting the interaction into different contexts. This approach is not recommended for extensive context, as it further increases the size and sparsity.

ALS is evaluated by comparing the original matrix and the reconstructed matrix. The comparison can be calculated by measuring the mean absolute differences (MAE) or Root Mean Square of Error (RMSE) to penalize large errors.

In [68]:
features = (transaction
            .where('transactions_type=="ATM"')
            .groupBy('user_id',  
                     f.concat_ws('|',
                                 f.date_format(f.col('created_date'), "MMMM"),
                                 f.when(f.col('transactions_type')=='ATM', 'ATM').otherwise('NonATM'),
                                 'merchant_city').alias('context'))
            .agg(f.count('merchant_city').alias('count'))
            .orderBy('user_id', 'context', 'count')
            )

features.limit(10)

user_id,context,count
00009eb59e5143159...,April|ATM|New Julia,1
00009eb59e5143159...,March|ATM|Aragão,2
00009eb59e5143159...,May|ATM|Bushtown,3
00009eb59e5143159...,May|ATM|New Travi...,1
00009eb59e5143159...,May|ATM|Roweshire,1
0001623b9afc43748...,April|ATM|Amberside,3
0001623b9afc43748...,April|ATM|Ballia,1
0001623b9afc43748...,April|ATM|East De...,1
0001623b9afc43748...,April|ATM|Jodifort,1
0001623b9afc43748...,April|ATM|New Joseph,1


In [43]:
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import IndexToString, StringIndexer

# Model
als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True, seed=777, nonnegative=True, coldStartStrategy='drop',
          userCol='user_index', itemCol='context_index', ratingCol='count')
rmse = RegressionEvaluator(metricName='rmse', labelCol='count',
                                predictionCol='prediction')

mae = RegressionEvaluator(metricName='mae', labelCol='count',
                                predictionCol='prediction')

# Convert IDs into Index
userid_to_index = StringIndexer(inputCol='user_id', outputCol='user_index', handleInvalid='keep')
context_to_index = StringIndexer(inputCol='context', outputCol='context_index', handleInvalid='keep')
id_to_index = Pipeline(stages=[userid_to_index, context_to_index]).fit(features)

features_indexed = id_to_index.transform(features)
user_mapping = features_indexed.select('user_id', 'user_index').distinct()
context_mapping = features_indexed.select('context', 'context_index').distinct()


# Training and evaluation
training, test = features_indexed.randomSplit([0.8, 0.2])
model = als.fit(training)
predictions = model.transform(test)
rmse_value = rmse.evaluate(predictions)
mae_value = mae.evaluate(predictions)
print('RMSE: ', rmse_value, 'MAE: ', mae_value)

RMSE:  2.3485932867050283 MAE:  1.7029504149895982


In [48]:
# Serving
def predict_cities(users:DataFrame, model: ALS, indexer:Pipeline, 
                   user_mapping:DataFrame, context_mapping:DataFrame,
                   top_n=10, month=None):
    ''' Generate top n cities recommendations for a subset of a user
        with a month as the context
    '''
    users = indexer.transform(users.withColumn('context', f.lit('')))
    
    recommendations_index = (model.recommendForUserSubset(users, top_n)
                             .select('user_index', f.explode('recommendations').alias('r'))
                             .select('user_index', f.col('r.context_index'), f.col('r.rating'))
                            )

    recommendations = (recommendations_index.join(user_mapping, ['user_index'], 'left')
                       .join(context_mapping, ['context_index'], 'left')
                       .withColumn('c', f.split(f.col('context'), '\|'))
                       .select('user_id', f.col('c')[0].alias('context_month'), f.col('c')[1].alias('context_type'), f.col('c')[2].alias('recommended_city'), 'rating')
                       .orderBy(f.desc('rating'), 'user_id', 'context_type', 'context_month')
                      )

    if month is None:
        return recommendations
    else:
        return recommendations.where(f.col('context_month')==f.lit(month))

predict_cities(features.select('user_id').distinct().limit(100), model, 
               id_to_index, user_mapping, context_mapping, 10, 'August')

user_id,context_month,context_type,recommended_city,rating
7dd8a2168e474526a...,August,ATM,New York,0.35182032
628d6a4c8f084994b...,August,ATM,Jessicabury,0.12443738
4acbb9c424264241a...,August,ATM,Jessicabury,0.11963231
628d6a4c8f084994b...,August,ATM,West Monicaland,0.10954083
4acbb9c424264241a...,August,ATM,West Monicaland,0.107432485
6c52a1c328f3450ba...,August,ATM,New York,0.10485288
d1cca52cde5245518...,August,ATM,North Darren,0.09719041
a097d7e832cc4adfb...,August,ATM,New York,0.09043781
dfbc87f7daab47cf9...,August,ATM,Jessicabury,0.084438555
dfbc87f7daab47cf9...,August,ATM,West Monicaland,0.07145732


# AB Testing

In [49]:
heartbeat = transaction.select(f.max('created_date')).collect()[0][0] - pd.Timedelta('60 days')

all_users = (transaction.filter(f.col('created_date')>f.lit(heartbeat))
             .select('user_id', f.concat_ws('|', 'birth_year', 'device').alias('stratum'), 'home_city')
             .distinct()
            )

fractions = all_users.select('stratum').distinct().withColumn('frac', lit(0.5)).rdd.collectAsMap()

group_A = all_users.sampleBy('stratum', fractions, 555)
group_B = all_users.subtract(group_A)


print('All users: ', all_users.count(), 'Group A:', group_A.count(), 'Group B:', group_B.count())

All users:  185987 Group A: 93440 Group B: 92547


In [69]:
promotion_month = 'August'
recommended_cities = predict_cities(group_B, model, id_to_index, user_mapping, 
                                    context_mapping, 10, promotion_month)

promotions = (group_B.join(recommended_cities, ['user_id'], 'left')
             .select('user_id', 
                     f.lit('Group_B').alias('group'),
                     f.lit(promotion_month).alias('promotion_month'),
                     'home_city',
                     'recommended_city',
                     f.when(f.col('recommended_city').isNull(), f.col('home_city')).otherwise(f.col('recommended_city')).alias('promotion_city'),
                    )
             .distinct()
             .orderBy('user_id', 'promotion_city')
             )

promotions              

user_id,group,promotion_month,home_city,recommended_city,promotion_city
00009eb59e5143159...,Group_B,August,East Brianmouth,,East Brianmouth
0001623b9afc43748...,Group_B,August,Haridwar,Jessicabury,Jessicabury
00061b403fd84d91a...,Group_B,August,Vieja Tonga,,Vieja Tonga
002583a3828e49079...,Group_B,August,Port Richard,,Port Richard
00265da8830d489aa...,Group_B,August,New Jennifer,,New Jennifer
00266dbf306f478ba...,Group_B,August,Thompsonfurt,Vancouver,Vancouver
002b638849be490aa...,Group_B,August,Lake Elizabethburgh,,Lake Elizabethburgh
002c28b399c148dc9...,Group_B,August,Douglasview,,Douglasview
002ceec7ee22437aa...,Group_B,August,New York,Jessicabury,Jessicabury
002ceec7ee22437aa...,Group_B,August,New York,West Monicaland,West Monicaland
