# Web API for an Interactive Recommender Engine with Apache Spark MLlib using real-world e-commerce data


Vadym Ovcharenko [LinkedIn](https://www.linkedin.com/in/vadymovcharenko/) [GitHub](https://github.com/bolein)


## Abstract
In this project, I used data from a real e-commerce website to prototype two types of recommendation models. The motivation was to provide an evidance for a large pharmaceutical company that their data can bring value. The data was uset to prototype 2 types of recommendation models. To make a more realistic use-case, a data pipeline was built to enable consistent daily model retraining with new data. Both models were wrapped with REST APIs for access from mobile and web applications. As a result, a pilot version of a web-based recommendation engine provides meaningful suggestions for associated products and personal recommendations. If the customer discovers value in this development, they will release more data for further usage.

## Introduction
There are several common types of recommendation systems that are used in e-commerce today

**TODO** Intro

## Data collection
### Data extraction
The data is warehoused in the customer’s database and is available using a special bus called E-COM. E-COM is a relatively new technology in the customer's data infrastructure and is actively developed. Because of if this fact, I faced a few problems while working with it. For example, at first, there was no method to dowload the whole collection of transactions. Collaborationg with the E-COM development team, we were able to solve issues quickly.

E-COM exposes a REST API to interact with it. The API is well [documented](https://docs.google.com/document/d/12qB6IpXknP48yfyHkfkvrCxZ-NvEKq-hMchIRLeuHdc/edit#heading=h.91wpu9x8qw). For this project, I used two endpoints specifically:
* [GET /orders](https://docs.google.com/document/d/12qB6IpXknP48yfyHkfkvrCxZ-NvEKq-hMchIRLeuHdc/edit#heading=h.mzszi182k2p7) - Get all orders
* [GET /goods](https://docs.google.com/document/d/12qB6IpXknP48yfyHkfkvrCxZ-NvEKq-hMchIRLeuHdc/edit#heading=h.yclivj7eu413) - Get all goods (with filtering options)

Both models require transaction data. To perform analysis I had to retrieve the data and store it locally. For this purpose I created the following script that downloaded the data from API page by page. I then converted the data to JSON format and stored it on disk. 

E-COM provides sensitive user data, therefore the firwall blocks request for non-whitelisted hosts. To access the E-COM, one IP was whitelisted for our organization, and it belongs to a server that uses E-COM for other purposes. To be able to access E-COM for this project, I had to set up a VPN server on the whitelisted host. I used [this guide](https://www.digitalocean.com/community/tutorials/how-to-set-up-an-openvpn-server-on-ubuntu-16-04#prerequisites) to set up an OpenVPN server on Ubuntu 16.04. This was my first experience with setting up VPN servers and it turned out to be a relatvely simple procedure. I am confused why VPN service providers charge over 10 dollars per month for a [public VPN](https://nordvpn.com) server with limited number of connections, while you can bootstrap a low-cost machine on [Digital Ocean](https://www.digitalocean.com/) and get a dedicated VPN server with UNLIMITED number of connections for less than 10$/month!

In [3]:
page_count = 9999999999999 # init with max value
params = {'page': 1, 'per-page':100}
orders = []

In [4]:
from IPython.display import clear_output, display
import base64
import sys
import time
import requests

def download_all_orders():
    global page_count, orders
    auth_token = 'U2l0ZU96OkFWNzREOA=='
    headers = {'Authorization': 'Basic {}'.format(auth_token)}
    url = 'http://ws.erkapharm.com:8990/ecom/hs/orders?expand=basket'
    
    while params['page'] < page_count:
        started = time.time()
        res = requests.get(url, params=params, headers=headers).json()
        orders += res['orders']
        page_count = res['pageCount']
        params['page'] = params['page'] + 1
        clear_output(wait=True)
        progress = params['page']*1./res['pageCount'] * 100
        ellapsed_seconds = time.time() - started
        left_seconds = ellapsed_seconds * (page_count - params['page'])
        print('progress: {:.2f}% Remaining: {} minutes'.format(progress, int(left_seconds/60)))
        
    return orders

In [8]:
orders = download_all_orders()

In [9]:
len(orders)

99700

In [10]:
import json

with open('orders_complete.json', 'w') as outfile:
    json.dump(orders, outfile)

The downloading process took about 3 hours. To better understand the progress of the task, I calculated the progress rate and ETA. The process took about 3 hours to get about 100,000 records through the API. I fetched 100 records per time. I guess that the process took a long time because of a latency overhead - the VPN server is located in Germany and E-COM instance is in Russia.

The output file is only 63.4 MB, but once again, we're using transaction data only from one of the sources (one website). The company has multiple transaction sources, such as other websites and offline stores (around 1500 stores around the country). But for the purposes of this demo project, we only have access to one of the sources, however, one of the most important ones.

## Data preprocessing

For this project, I chose to use Spark MlLib since it provides a great Python API and uses functional-style operators which are very familiar and very pleasing to me. To test Spark code on my local machine, I used [this guide](https://towardsdatascience.com/how-to-use-pyspark-on-your-computer-9c7180075617) and executed spark code directly in this notebook. From now on, I imagined that I was dealing with a huge amount of data, and used PySpark for all operations

### Transforming phone numbers
As a result of the Data collection step, we have a dataset of well-structured transaction records. However, there are some problems with this data that require additional pre-processing. Since E-COM is a data bus, there data flows from different sources and sometimes does not follow the same conventions. For example, the phone number field, which is thought to be a primary user identifier in transaction data happens in different formats:


In [1]:
## pyspark --conf "spark.mongodb.output.uri=mongodb://127.0.0.1:27017/cs554.associationRules" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.1
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
rddjson = sc.textFile('orders_complete.json')
df = sqlContext.read.json(rddjson)

In [2]:
df.registerTempTable('orders')
phoneLengths = spark.sql('SELECT COUNT(*), LENGTH(clientTel), first(clientTel) as example FROM orders GROUP BY LENGTH(clientTel)')
phoneLengths.limit(5).show()

+--------+-----------------+-----------------+
|count(1)|length(clientTel)|          example|
+--------+-----------------+-----------------+
|    4762|               15|  +7(916)539-6205|
|   91516|               17|+7 (905) 545-5709|
|    3422|               11|      79153706507|
+--------+-----------------+-----------------+



We see that there's at least 3 types of different phone formats (depending on the string length). I removed all non-digits.

In [3]:
from pyspark.sql.functions import udf

strToNum = udf(lambda phone: ''.join(filter(str.isdigit, phone)))
df = df.withColumn('clientTelNumeric', strToNum(df.clientTel))
df.select('clientTel', 'clientTelNumeric').limit(10).show()

+-----------------+----------------+
|        clientTel|clientTelNumeric|
+-----------------+----------------+
|+7 (905) 545-5709|     79055455709|
|+7 (915) 182-9688|     79151829688|
|+7 (967) 135-7431|     79671357431|
|+7 (123) 456-7890|     71234567890|
|+7 (123) 456-7890|     71234567890|
|+7 (916) 714-9823|     79167149823|
|+7 (915) 215-0103|     79152150103|
|+7 (926) 172-6883|     79261726883|
|+7 (967) 170-4118|     79671704118|
|+7 (966) 192-6058|     79661926058|
+-----------------+----------------+



### Fitlering out duplicate transactions

There are also some duplicate records in the data. Duplicate transactions can confuse frequent pattern mining algorithms and collaborative filtering models as they will assume that every transaction record is unique. Consequenty, duplicates will artificially increase the frequency of certain datasets. 

In [4]:
df = df.dropDuplicates(['id']).cache()
df.select('clientTelNumeric').distinct().count()/df.count()

0.686037575585981

After parsing only numeric values from the `clientTel` field and dropping duplicate records, we can see that our transaction database has more than **30% of returning customers**. It means that the dataset is a decent case for building collaborative recomendation models.

## Model building


## Building association rules

I decided to start with [Frequent Pattern Mining](https://spark.apache.org/docs/2.3.2/mllib-frequent-pattern-mining.html). I used the FP-growth algorithm, since I studied it in Data Mining class and familiar with how the algorithm works. It turned out, that there's not much information on the Web on how to implement a piece of software using Spark MLlib that will provide association rules as an output of the algorithm. 

To create an FP-Growth model, we need to extract product IDs from transaction records. 

In [5]:
from pyspark.sql.functions import *
# explode items, distinct them and aggregate back (to remove duplicate items in transactions)
transactions = df.select('id', explode('basket.goodsId')) \
                    .groupby('id') \
                    .agg(collect_set('col').alias('items')) \
                    .cache()

In [6]:
from pyspark.ml.fpm import FPGrowth
fp = FPGrowth(minSupport=0.0005, minConfidence=0, numPartitions=4)
fpm = fp.fit(transactions)

Mining association rules takes quite some time. With the `minSupport` property, we can filter out infrequent itemsets. Depending on the implementation of the recommendation engine, it might make sense to mine as many association rules as possible by keeping the `minSupport` as low as possible. If a website has a "frequently bought together" section, it should be filled with items. However, if we want to create a pop-up sreen after click on the "add to cart" with a recommended item, we should have a high confidence in such recommendation.

Confidence is a probability of the consequent given the antecedent (e.g. how likely is the user to purchase consequent in case he already purchased antecedent). Since the dataset is relatively small, we use a low confidence threshold to generate association rules.

In [13]:
fpm.associationRules.sort(desc('confidence')).show(5)

+----------+----------+-------------------+
|antecedent|consequent|         confidence|
+----------+----------+-------------------+
|   [81976]|   [81977]| 0.5106382978723404|
|  [121286]|  [103180]|0.41626794258373206|
|  [103180]|  [121286]| 0.3096085409252669|
|  [114265]|  [114892]|0.27807486631016043|
|  [121563]|   [97807]| 0.2742616033755274|
+----------+----------+-------------------+
only showing top 5 rows



### Validating results
We generated a table of association rules that can be used by our Web service to give product recommendations. I decided to fetch product names to intuitively evaluate the results of the algorithm.

In [9]:
from pyspark.sql.functions import udf
import requests

def fetchProductNames(ids):
    names = []
    for id in ids:
        url = 'http://138.68.86.83/api/goods/{}?region=77'.format(id)
        res = requests.get(url).json()
        names.append(res['name'])
    return names

In [22]:
pandasRules = rules.limit(5).toPandas()
pandasRules['antecedentName'] = pandasRules.antecedent.map(fetchProductNames)
pandasRules['consequentName'] = pandasRules.consequent.map(fetchProductNames)
pandasRules

Unnamed: 0,antecedent,consequent,confidence,antecedentName,consequentName
0,[81976],[81977],0.510638,[Бриллиантовый зеленый р-р спирт 1% 5мл фломас...,[Йода р-р 5% 5мл фломастер]
1,[121286],[103180],0.416268,[PL Контейнер д/биопроб стер. 60мл с крыш и ло...,[PL Контейнер д/биопроб универс. 120мл полим. ...
2,[103180],[121286],0.309609,[PL Контейнер д/биопроб универс. 120мл полим. ...,[PL Контейнер д/биопроб стер. 60мл с крыш и ло...
3,[114265],[114892],0.278075,[Нью Лайф Бинт марл мед стер 7м х 14см уп N1/СТМ],[Нью Лайф Бинт марл мед стер 5м х 10см уп N1/СТМ]
4,[121563],[97807],0.274262,[Ля Рош Позе Эфаклар Гель для умывания очищающ...,[Ля Рош-Позе Эфаклар Дуо+ Крем-гель корректиру...


These results, indeed, make sense. The first rule is "brilliant green" -> "iodine solution". 

In [32]:
test = fpm.associationRules\
            .filter(array_contains('antecedent', 2)) \
            .sort(desc('confidence'))\
            .toJSON()\
            .collect()

In [33]:
'[{}]'.format(','.join(test))

'[]'

### Exporting results to MongoDB
The generated association rules will be used as lookup tables for our future HTTP server. Rules dataframe can be fetched directly, however, since it is hold in Spark context, it creates a significant overhead to find a specific row in the dataframe. Especially if we aim to create a relatively fast web-service. Therefore, we need to export the association rules out of Spark context. We could have stored it in another `json` file, but text files do not support random access, so there would not be any benefit. Additionally, in a real world case the file size might be enormous (i.e. for millions of rules). Thus, we need to store the rules in a database. For this task, I decided to use [MongoDB](http://api.mongodb.com/python/current/index.html), because it supports storing collections of millions of records in a distributed fashion.

In [11]:
# write the dataframe to MongoDB using mongo connector (uri specified in spark context configuration)
rules.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").save()

## Collaborative filtering 

### Matrix Factorization
One of the most effective methods of collaborative filtering is Matrix Factorization. Matrix factorization is one of the algorithms from recommender systems family and as the name suggests it factorize a matrix, i.e., decompose a matrix in two(or more) matrices such that once you multiply them you get your original matrix back. In case of the recommendation system, we will typically start with an interaction/rating matrix between users and items and matrix factorization algorithm will decompose this matrix in user and item feature matrix which is also known as embeddings. Example of interaction matrix would be user-movie ratings for movie recommender, user-product purchase flag for transaction data, etc.

![](https://cdn-images-1.medium.com/max/1600/0*Qy8Pku8FJXM60iAs)

Typically user/item embeddings capture latent features about attributes of users and item respectively. Essentially, latent features are the representation of user/item in an arbitrary space which represents how a user rate a movie. In the example of a movie recommender, an example of user embedding might represent affinity of a user to watch serious kind of movie when the value of the latent feature is high and comedy type of movie when the value is low. Similarly, a movie latent feature may have a high value when the movie is more male driven and when it’s more female-driven the value is typically low.

In case of product recomendations, the interaction matrix is very sparse and there are no explicit ratings. The approach used in spark.ml to deal with such data is taken from Collaborative Filtering for Implicit Feedback Datasets [12]. Essentially, instead of trying to model the matrix of ratings directly, this approach treats the data as numbers representing the strength in observations of user actions (such as the number of clicks, or the cumulative duration someone spent viewing a movie). Those numbers are then related to the level of confidence in observed user preferences, rather than explicit ratings given to items. The model then tries to find latent factors that can be used to predict the expected preference of a user for an item.

In this project, I used the total number of purchases for each item as an interaction measure. Alternating Least Squares algorithm implementation in `spark.ml` was used to calculate the latent factors.

In [8]:
# transform the transactions, aggregating total number of purchases per-item, per-user
implicitRatings = df.select('id', 
                            # hash phone numbers to fit in Integer type
                            (col('clientTelNumeric').cast(LongType()) * 67853 % 2**31).alias('telHashed'), 
                            explode('basket').alias('product'), 
                            'product.goodsId', 
                            'product.quantity') \
                    .groupBy('telHashed', 'goodsId') \
                    .agg(sum('quantity').alias('purchases')) \
                    .cache()

In [9]:
implicitRatings.show(5)

+----------+-------+---------+
| telHashed|goodsId|purchases|
+----------+-------+---------+
|1121302175|  16588|        1|
|1569021973| 116978|       30|
|  21874890|  51767|        1|
|  86765306|  26694|        1|
|2022784980| 106619|        2|
+----------+-------+---------+
only showing top 5 rows



In [10]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

# split into the train and test set
training, test = implicitRatings.randomSplit([0.8, 0.2])

# train an ALS model
als = ALS(maxIter=5, regParam=0.01, userCol="telHashed", itemCol="goodsId", ratingCol="purchases",
          coldStartStrategy="drop", implicitPrefs=True, nonnegative=True)
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="purchases", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
rmse

3.215298356790437

Fitting the best model is challenging and requires a lot of hyperparameter tuning.

One of the most effective use-cases of Matrix Factorization is finding similar items by distance between item embeddings. The item-based approach is proven to be more effective and flexible than other collaborative filtering techniques. It does not require retraining of the whole model for every new interaction and usually yields better results. 

### Validation
Validating a recommendation model with implicit recomedations is quite tricky. Basic RMSE does not fit in this case, because there is no information about movies that the user has rated negatively. Therefore, it is suggested to use recall-based metrics for such recommenders. Several metrics have been introduced, the most important being the Mean Percentage Ranking (MPR), also known as Percentile Ranking. However, there is no utility in Spark ML package to evaluate the model in such way. 

## Deployment

To serve the results over http, I decided to use [Flusk](https://github.com/pallets/flask) - a simple web-server framework for python.