# <center>DATA643: Recommender System </center>
## <center> Final Project </center>
### <i> <center> Harpreet Shoker, Rose Koh, Summer 2018 </center> </i>

## Notebook4_ALS

In this notebook, we perform matrix factorization using Alternating Least Squares on implicit feedback data.

#### ALS with Spark ML library 

Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using Alternating Least Squares. The implementation in MLlib has these parameters:

* numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
* rank is the number of latent factors in the model.
* iterations is the number of iterations to run.
* lambda specifies the regularization parameter in ALS.
* implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
* alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

See documentation at https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html

---

## Get Data

In [2]:
import os
from pathlib import Path

datasets_path = os.path.join(os.getcwd(), 'data')
dt_path = os.path.join(datasets_path, 'instacart_2017_05_01.tar.gz')

In [3]:
from subprocess import check_output
print(check_output(["ls", "./data/instacart_2017_05_01"]).decode("utf8"))

aisles.csv
departments.csv
order_products__prior.csv
order_products__train.csv
orders.csv
products.csv



In [4]:
import pandas as pd

# Order and User dataset
order_products_prior = pd.read_csv('./data/instacart_2017_05_01/order_products__prior.csv')
order_products_train = pd.read_csv('./data/instacart_2017_05_01/order_products__train.csv')
orders = pd.read_csv('./data/instacart_2017_05_01/orders.csv')
# Products dataset
products = pd.read_csv('./data/instacart_2017_05_01/products.csv')

## Test data

In [5]:
def test_data(path, orders, order_products_train):
    """
    Make test data and save it in the given path as .csv
    """
    # read `orders` and filter eval_set == train
    orders_train = orders.loc[(orders.eval_set == "train")].reset_index()
    orders_userid = orders_train[["order_id", "user_id"]]
    
    # `orders_userid` and `order_products_train` lengths should match
    assert len(orders_userid["order_id"].unique()) == len(order_products_train["order_id"].unique())

    # Convert `order_products`_train as same format
    orders_productid = order_products_train[["order_id", "product_id"]]
    orders_productid = orders_productid.groupby("order_id")["product_id"].apply(list).reset_index().rename(columns={"product_id": "products"})

    # `orders_products_train` and `orders_productid` size should match
    assert orders_productid.size == orders_userid.size

    # merge `orders_userid` and `orders_productid` on order_id
    user_products_test = pd.merge(orders_userid, orders_productid, on="order_id")
    user_products_test = user_products_test[["user_id", "products"]]

    # save as .csv
    user_products_test.to_csv(path, index_label=False)

In [6]:
import time

%%time
# Generate test data if it doesn't exist
if_test_data_exists = False
test_data_path = "./data/user_products__test.csv"

if if_test_data_exists or not Path(test_data_path).is_file():
    test_data(test_data_path, orders, order_products_train)

user_products_test_df = pd.read_csv(test_data_path)

CPU times: user 181 ms, sys: 41.3 ms, total: 222 ms
Wall time: 222 ms


In [7]:
print(user_products_test_df.shape)
user_products_test_df.head()

(131209, 2)


Unnamed: 0,user_id,products
0,1,"[196, 25133, 38928, 26405, 39657, 10258, 13032..."
1,2,"[22963, 7963, 16589, 32792, 41787, 22825, 1364..."
2,5,"[15349, 19057, 16185, 21413, 20843, 20114, 482..."
3,7,"[12053, 47272, 37999, 13198, 43967, 40852, 176..."
4,8,"[15937, 5539, 10960, 23165, 22247, 4853, 27104..."


## Utility Matrix

In [8]:
def user_item_prior_df(path, orders, order_products_prior):
    """
    Make prior user-product dataframe and save it as .csv
    """   
    
    # read `orders` and filter eval_set == prior
    orders_user_prior = orders.loc[orders.eval_set == "prior"]
    orders_user_prior = orders_user_prior[["order_id", "user_id"]]
    
    # merge `orders_user_prior` and `order_products_prior` on order_id
    merged = pd.merge(orders_user_prior, order_products_prior[["order_id", "product_id"]], on="order_id")
    user_item_prior = merged[["user_id", "product_id"]]
    user_item_prior = user_item_prior.groupby(["user_id", "product_id"]).size().reset_index().rename(columns={0:"quantity"})
    
    # save as .csv
    user_item_prior.to_csv(path, index_label=False)

In [9]:
%%time
# Generate users prior purchases data if it doesn't exist
if_user_prod_df_exists = True
matrix_df_path = "./data/user_products__prior.csv"

if if_user_prod_df_exists or not Path(matrix_df_path).is_file():
    user_item_prior_df(matrix_df_path, orders, order_products_prior)

user_item_prior = pd.read_csv(matrix_df_path)
user_item_prior["user_id"] = user_item_prior["user_id"].astype("category")
user_item_prior["product_id"] = user_item_prior["product_id"].astype("category")

CPU times: user 1min 18s, sys: 9.8 s, total: 1min 28s
Wall time: 1min 22s


In [10]:
print(user_item_prior.shape)
user_item_prior.head()

(13307953, 3)


Unnamed: 0,user_id,product_id,quantity
0,1,196,10
1,1,10258,9
2,1,10326,1
3,1,12427,10
4,1,13032,3


## User-Item Matrix

In [11]:
import scipy.sparse as sparse
import numpy as np

def build_user_item_matrix(path, user_item_prior):
    """
    make user-item matrix that displays order history of users, save it as .csv
    rows = products
    columns = users
    """
    user_item_matrix = sparse.coo_matrix((user_item_prior["quantity"],
                                          (user_item_prior["product_id"].cat.codes.copy(),
                                           user_item_prior["user_id"].cat.codes.copy())))    
    sparse.save_npz(path, user_item_matrix)

In [12]:
# Build dataframe of users, products and quantity bought using prior datasets
if_user_item_matrix_exists = False
matrix_path = "./data/user_item_matrix.npz"

if if_user_item_matrix_exists or not Path(matrix_path).is_file():
    build_user_item_matrix(matrix_path, user_item_prior)  

user_item_matrix=sparse.load_npz(matrix_path).tocsr().astype(np.float32)

In [13]:
user_item_matrix.shape
user_item_matrix

<49677x206209 sparse matrix of type '<class 'numpy.float32'>'
	with 13307953 stored elements in Compressed Sparse Row format>

In [14]:
sparsity = (1 - (user_item_matrix.size / (user_item_matrix.shape[0] * user_item_matrix.shape[1])))
print(('The sparsity of user_item_matrix is ') +  str(round(sparsity,6)*100) + '%')

The sparsity of user_item_matrix is 99.8701%


## Alternate Least Squares - Implicit Matrix Factorization

In [15]:
import pickle
import implicit

def confidence_matrix(user_item_matrix, alpha):
    """
    Given a utility matrix,
    Returns the given matrix converted to a confidence matrix
    For more details, look at http://yifanhu.net/PUB/cf.pdf
    """
    return (user_item_matrix * alpha).astype("double")
    
from implicit.als import AlternatingLeastSquares

def build_imf(user_item_matrix, **kwargs):
    """
    Given the utility matrix and model parameters,
    Builds models and writes it to disk at 
    """
    start = time.time()
    
    # Build model
    print("Building IMF model with alpha: {} ...".format(kwargs["alpha"]))
    model = AlternatingLeastSquares()
    model.approximate_similar_items = False
    
    model.fit(confidence_matrix(user_item_matrix, kwargs["alpha"]))

    # Save model to disk
    with open(kwargs["path"], "wb+") as f:
        pickle.dump(model, f, pickle.HIGHEST_PROTOCOL)
    
    print("Completed in {:.2f}s".format(time.time() - start))

In [16]:
# Specify model params and build it
## Alpha's in the range [10, 50] with a step size of 5 were tried. alpha = 15 was found to have the best overall 
## recall value. 
model_params = {"alpha": 15} 
model_params["path"] = "./models/implicit_matrix_factorization/{}.imf".format(model_params["alpha"])

In [17]:
REBUILD_MODEL = False
if REBUILD_MODEL or not Path(model_params["path"]).exists():
    build_imf(user_item_matrix, **model_params)

In [18]:
with open(model_params["path"], "rb") as f:
    imf_model = pickle.load(f)

## ALS with Spark ML library

In [19]:
os.environ['PYSPARK_SUBMIT_ARGS'] = "--conf spark.driver.memory=2g  pyspark-shell"

from pyspark.sql import SparkSession

#Start Spark session with local master and 2 cores
spark = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("ALS") \
    .getOrCreate()

sc=spark.sparkContext

In [20]:
# read in user_item_prior from "./data/user_products__prior.csv"
data = spark.read.csv(matrix_df_path, inferSchema=True, header=True)
data.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)



In [21]:
data.describe().show()

+-------+------------------+------------------+-----------------+
|summary|           user_id|        product_id|         quantity|
+-------+------------------+------------------+-----------------+
|  count|          13307953|          13307953|         13307953|
|   mean|         6653976.0|102998.69201506798|25513.50658301844|
| stddev|3841675.2677940084| 59436.76555726704|14224.29023480084|
|    min|                 0|                 1|                1|
|    max|          13307952|            206209|            49688|
+-------+------------------+------------------+-----------------+



In [22]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [23]:
%%time
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.1, implicitPrefs=True, nonnegative=True,\
          coldStartStrategy="drop",\
          userCol='user_id', itemCol='product_id', ratingCol='quantity')

als.setSeed(23)
model = als.fit(data)

CPU times: user 32.6 ms, sys: 17 ms, total: 49.6 ms
Wall time: 3min 18s


In [24]:
print('Factorized user matrix with rank = %d' % model.rank)
model.userFactors.show(5)

print('-'*40)

print('Factorized item matrix with rank = %d' % model.rank)
model.itemFactors.show(5)

Factorized user matrix with rank = 10
+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[0.0, 0.0, 0.0, 0...|
| 10|[0.0, 0.0, 0.0, 0...|
| 20|[0.0, 0.0, 0.0, 0...|
| 30|[0.0, 0.0, 0.0, 0...|
| 40|[0.0, 0.0, 0.0, 0...|
+---+--------------------+
only showing top 5 rows

----------------------------------------
Factorized item matrix with rank = 10
+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.0, 0.0, 0.0, 0...|
| 20|[0.0, 0.0, 0.0, 0...|
| 30|[0.0, 0.0, 0.0, 0...|
| 40|[0.0, 0.0, 0.0, 0...|
| 50|[0.0, 0.0, 0.0, 0...|
+---+--------------------+
only showing top 5 rows

