# Collaborative Filtering using Matrix Factorization with Weighted Alternating Least Squares (WALS)

The following code shows she steps required to build a collaboarative filtering recommendation engine with these steps:

1. Import required data from database
2. Create user-product mapping for all insurance products
3. Define key functions and accuracy measures
4. Build and train model
5. Perform hyperparameter tuning

In [None]:

import os
PROJECT = "astute-veld-253418" # Masters Project
BUCKET = "masters-research" # Storage bucket for insurance dataset
REGION = "us-central1" #Cloud server region

# Do not change these
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = "1.13"

In [None]:
#Configure environment
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

In [None]:
import tensorflow as tf
print(tf.__version__)

Fetch the neccessary data from the big data warehouse.

In [None]:
from google.cloud import bigquery
bq = bigquery.Client(project = PROJECT)

sql = """
SELECT Mem_ID , TypeName as Product, policy_time as Time  FROM `astute-veld-253418.Masters.Masters` 
"""

df = bq.query(sql).to_dataframe()
df.head()

Some statistical exploration of the data 

In [None]:
stats = df.describe()
df[["Time"]].plot(kind="hist", logy=True, bins=100, figsize=[8,5])

In [None]:
df[["Time"]] -= df[["Time"]].min()  # equivalent to df = df - df.min()
df[["Time"]] /= df[["Time"]].max()

In [None]:
df[["Time"]].plot(kind="hist", logy=True, bins=100, figsize=[8,5])
#Now the value is scaled

We need to create a local storage folder to import the dataset for analysis.

In [None]:
#Local directory to store insurance data
%%bash
rm -rf data
mkdir data
#Convert data to CSV
df.to_csv(path_or_buf = "data/collab_raw.csv", index = False, header = False)
!head data/collab_raw.csv #Sample rows

Build the mapping function. This is eseential to maintain keys which accurately map the users to products when performing recommendations.

In [None]:
#This is the core mapping function
import pandas as pd
import numpy as np
def create_mapping(values, filename):
    with open(filename, 'w') as ofp:
        value_to_id = {value:idx for idx, value in enumerate(values.unique())}
        for value, idx in value_to_id.items():
            ofp.write("{},{}\n".format(value, idx))
    return value_to_id
#Function to read the raw data from the CSV
df = pd.read_csv(filepath_or_buffer = "data/collab_raw.csv",
                 header = None,
                 names = ["Mem_ID", "Product", "Time"],
                dtype = {"Mem_ID": str, "Product": str, "Time": np.float})
df.to_csv(path_or_buf = "data/collab_raw.csv", index = False, header = False)

#Perform the mapping
user_mapping = create_mapping(df["Mem_ID"], "data/users.csv")
item_mapping = create_mapping(df["Product"], "data/items.csv")

In [None]:
!head -3 data/*.csv

In [None]:
#Store all mapped data
df["userId"] = df["Mem_ID"].map(user_mapping.get)
df["itemId"] = df["Product"].map(item_mapping.get)

In [None]:
mapped_df = df[["userId", "itemId", "Time"]]
mapped_df.to_csv(path_or_buf = "data/collab_mapped.csv", index = False, header = False)
mapped_df.head()

In [None]:
import pandas as pd
import numpy as np
mapped_df = pd.read_csv(filepath_or_buffer = "data/collab_mapped.csv", header = None, names = ["userId", "itemId", "Time"])
mapped_df.head()

In [None]:
#Store key information in environment variables
NITEMS = np.max(mapped_df["itemId"]) + 1
NUSERS = np.max(mapped_df["userId"]) + 1
mapped_df["Time"] = np.round(mapped_df["Time"].values, 2)
print("{} items, {} users, {} interactions".format( NITEMS, NUSERS, len(mapped_df) ))

In [None]:
grouped_by_items = mapped_df.groupby("itemId")
iter = 0
for item, grouped in grouped_by_items:
    print(item, grouped["userId"].values, grouped["Time"].values)
    iter = iter + 1
    if iter > 5:
        break

In [None]:
#Build a TFRecord which stores all users for items
import tensorflow as tf
grouped_by_items = mapped_df.groupby("itemId")
with tf.python_io.TFRecordWriter("data/users_for_item") as ofp:
    for item, grouped in grouped_by_items:
        example = tf.train.Example(features = tf.train.Features(feature = {
            "key": tf.train.Feature(int64_list = tf.train.Int64List(value = [item])),
            "indices": tf.train.Feature(int64_list = tf.train.Int64List(value = grouped["userId"].values)),
            "values": tf.train.Feature(float_list = tf.train.FloatList(value = grouped["Time"].values))
        }))
        ofp.write(example.SerializeToString())

In [None]:
#Build a TFRecord which stores all items for users
grouped_by_users = mapped_df.groupby("userId")
with tf.python_io.TFRecordWriter("data/items_for_user") as ofp:
    for user, grouped in grouped_by_users:
        example = tf.train.Example(features = tf.train.Features(feature = {
            "key": tf.train.Feature(int64_list = tf.train.Int64List(value = [user])),
            "indices": tf.train.Feature(int64_list = tf.train.Int64List(value = grouped["itemId"].values)),
            "values": tf.train.Feature(float_list = tf.train.FloatList(value = grouped["Time"].values))
        }))
        ofp.write(example.SerializeToString())

To review, we created the following data files from the raw csv:

1. We created a mapped version, which have member ID and product names are now mapped to the enumerated versions used by the WALS model
2. We built TFrecords which effeciently store data for users and products

## Perform the Model training using the WALS estimator package

We use a Tensorflow estimator library for WALS. This requires that the data is formatted and preprocessed appropriately as conducted above.

We write an input function to provide the data to the model, and then create the model to do training and evaluation.

In [None]:
!ls -lrt data

In [None]:
import os
import tensorflow as tf
from tensorflow.python.lib.io import file_io
from tensorflow.contrib.factorization import WALSMatrixFactorization
  
def read_dataset(mode, args):
    def decode_example(protos, vocab_size):
        features = {
            "key": tf.FixedLenFeature(shape = [1], dtype = tf.int64),
            "indices": tf.VarLenFeature(dtype = tf.int64),
            "values": tf.VarLenFeature(dtype = tf.float32)}
        parsed_features = tf.parse_single_example(serialized = protos, features = features)
        values = tf.sparse_merge(sp_ids = parsed_features["indices"], sp_values = parsed_features["values"], vocab_size = vocab_size)
        # Important to store key for remapping once batching is done
        # This is to generate the correct row numbers for each batch once completed
        key = parsed_features["key"]
        decoded_sparse_tensor = tf.SparseTensor(indices = tf.concat(values = [values.indices, [key]], axis = 0), 
                                                values = tf.concat(values = [values.values, [0.0]], axis = 0), 
                                                dense_shape = values.dense_shape)
        return decoded_sparse_tensor
  
  
    def remap_keys(sparse_tensor):
        # Sparse tensor remapping
        bad_indices = sparse_tensor.indices # Define shape appropriately
        # Values that we need to fix
        bad_values = sparse_tensor.values # Shape

        # Last value for batch index is user
        # We need to get the user rows
        # define 1 for user, otherwise 0
        user_mask = tf.concat(values = [bad_indices[1:,0] - bad_indices[:-1,0], tf.constant(value = [1], dtype = tf.int64)], axis = 0) 

        # Mask the user rows
        good_values = tf.boolean_mask(tensor = bad_values, mask = tf.equal(x = user_mask, y = 0)) 
        item_indices = tf.boolean_mask(tensor = bad_indices, mask = tf.equal(x = user_mask, y = 0)) 
        user_indices = tf.boolean_mask(tensor = bad_indices, mask = tf.equal(x = user_mask, y = 1))[:, 1] 

        good_user_indices = tf.gather(params = user_indices, indices = item_indices[:,0]) 

        # Customers and Products indices are rank 1
        good_user_indices_expanded = tf.expand_dims(input = good_user_indices, axis = -1) 
        good_item_indices_expanded = tf.expand_dims(input = item_indices[:, 1], axis = -1) 
        good_indices = tf.concat(values = [good_user_indices_expanded, good_item_indices_expanded], axis = 1)

        remapped_sparse_tensor = tf.SparseTensor(indices = good_indices, values = good_values, dense_shape = sparse_tensor.dense_shape)
        return remapped_sparse_tensor

    
    def parse_tfrecords(filename, vocab_size):
        if mode == tf.estimator.ModeKeys.TRAIN:
            num_epochs = None # carry on indefinitely
        else:
            num_epochs = 1 # end input from this point

        files = tf.gfile.Glob(filename = os.path.join(args["input_path"], filename))

        # Create dataset from file list
        dataset = tf.data.TFRecordDataset(files)
        dataset = dataset.map(map_func = lambda x: decode_example(x, vocab_size))
        dataset = dataset.repeat(count = num_epochs)
        dataset = dataset.batch(batch_size = args["batch_size"])
        dataset = dataset.map(map_func = lambda x: remap_keys(x))
        return dataset.make_one_shot_iterator().get_next()
  
    def _input_fn():
        features = {
            WALSMatrixFactorization.INPUT_ROWS: parse_tfrecords("items_for_user", args["nitems"]),
            WALSMatrixFactorization.INPUT_COLS: parse_tfrecords("users_for_item", args["nusers"]),
            WALSMatrixFactorization.PROJECT_ROW: tf.constant(True)
        }
        return features, None

    return _input_fn

The following code is used for the input function

In [None]:
def try_out():
    with tf.Session() as sess:
        fn = read_dataset(
            mode = tf.estimator.ModeKeys.EVAL, 
            args = {"input_path": "data", "batch_size": 4, "nitems": NITEMS, "nusers": NUSERS})
        feats, _ = fn()
        
        print(feats["input_rows"].eval())
        print(feats["input_rows"].eval())

try_out()

## Define Accuracy Measures and Prediction Functions

In [None]:
def find_top_k(user, item_factors, k):
    all_items = tf.matmul(a = tf.expand_dims(input = user, axis = 0), b = tf.transpose(a = item_factors))
    topk = tf.nn.top_k(input = all_items, k = k)
    return tf.cast(x = topk.indices, dtype = tf.int64)
    
def batch_predict(args):
    import numpy as np
    with tf.Session() as sess:
        estimator = tf.contrib.factorization.WALSMatrixFactorization(
            num_rows = args["nusers"], 
            num_cols = args["nitems"],
            embedding_dimension = args["n_embeds"],
            model_dir = args["output_dir"])
        
        # need to get the row factors for in vocabulary data
        user_factors = tf.convert_to_tensor(value = estimator.get_row_factors()[0]) 
        # The catalogue does not change and data is read in
        item_factors = tf.convert_to_tensor(value = estimator.get_col_factors()[0])

        # Find the top-k measure of accuracy
        topk = tf.squeeze(input = tf.map_fn(fn = lambda user: find_top_k(user, item_factors, args["topk"]), elems = user_factors, dtype = tf.int64))
        with file_io.FileIO(os.path.join(args["output_dir"], "batch_pred.txt"), mode = 'w') as f:
            for best_items_for_user in topk.eval():
                f.write(",".join(str(x) for x in best_items_for_user) + '\n')
#Training function
def train_and_evaluate(args):
    train_steps = int(0.5 + (1.0 * args["num_epochs"] * args["nusers"]) / args["batch_size"])
    steps_in_epoch = int(0.5 + args["nusers"] / args["batch_size"])
    print("Will train for {} steps, evaluating once every {} steps".format(train_steps, steps_in_epoch))
    def experiment_fn(output_dir):
        return tf.contrib.learn.Experiment(
            tf.contrib.factorization.WALSMatrixFactorization(
                num_rows = args["nusers"], 
                num_cols = args["nitems"],
                embedding_dimension = args["n_embeds"],
                model_dir = args["output_dir"]),
            train_input_fn = read_dataset(tf.estimator.ModeKeys.TRAIN, args),
            eval_input_fn = read_dataset(tf.estimator.ModeKeys.EVAL, args),
            train_steps = train_steps,
            eval_steps = 1,
            min_eval_frequency = steps_in_epoch
        )

    from tensorflow.contrib.learn.python.learn import learn_runner
    learn_runner.run(experiment_fn = experiment_fn, output_dir = args["output_dir"])
    
    batch_predict(args)

Specify the output directory

In [None]:
#Define output location for model
import shutil
shutil.rmtree(path = "wals_trained", ignore_errors=True)
train_and_evaluate({
    "output_dir": "wals_trained",
    "input_path": "data/",
    "num_epochs": 0.05,
    "nitems": NITEMS,
    "nusers": NUSERS,

    "batch_size": 512,
    "n_embeds": 10,
    "topk": 3
  })

We check the output directory

In [None]:
!ls wals_trained

In [None]:
!head wals_trained/batch_pred.txt

In [None]:
os.environ["NITEMS"] = str(NITEMS)
os.environ["NUSERS"] = str(NUSERS)

## Package up the model as a python module and perform hyperparameter tuning

The following code packages up the model described above so it can be trained and tuned remotely. The above code is packaged into python modules found in 'WALS_packaged'. Results are stored in the output directory 'WALS_trained'

Similarly - hyperparameter tunung relies on the packaged modules found in 'wals_htune'



In [None]:
#Run the packaged module 
%%bash
rm -rf wals.tar.gz wals_trained
gcloud ai-platform local train \
    --module-name=wals_packaged.task \
    --package-path=${PWD}/WALS_packaged \
    -- \
    --output_dir=${PWD}/wals_trained \
    --input_path=${PWD}/data \
    --num_epochs=0.01 --nitems=${NITEMS} --nusers=${NUSERS} \
    --job-dir=./tmp

In [None]:
#Clear previous runs
%%bash
rm -rf wals.tar.gz wals_trained

In [None]:
%%bash
gsutil -m cp data/* gs://${BUCKET}/wals/data

In [None]:
#We can also submit the job to a remote server for training, using the same core files
%%bash
OUTDIR=gs://${BUCKET}/wals/model_trained
JOBNAME=wals_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ml-engine jobs submit training $JOBNAME \
    --region=$REGION \
    --module-name=walsmodel.task \
    --package-path=${PWD}/walsmodel \
    --job-dir=$OUTDIR \
    --staging-bucket=gs://$BUCKET \
    --scale-tier=BASIC_GPU \
    --runtime-version=$TFVERSION \
    -- \
    --output_dir=$OUTDIR \
    --input_path=gs://${BUCKET}/wals/data \
    --num_epochs=10 --nitems=${NITEMS} --nusers=${NUSERS}

In [None]:
#We can get the latent factors for rows and columns
def get_factors(args):
    with tf.Session() as sess:
        estimator = tf.contrib.factorization.WALSMatrixFactorization(
            num_rows = args["nusers"], 
            num_cols = args["nitems"],
            embedding_dimension = args["n_embeds"],
            model_dir = args["output_dir"])
        
        row_factors = estimator.get_row_factors()[0]
        col_factors = estimator.get_col_factors()[0]
    return row_factors, col_factors

In [None]:
    "output_dir": "gs://{}/wals/model_trained".format(BUCKET),
    "nitems": NITEMS,
    "nusers": NUSERS,
    "n_embeds": 10
  }

user_embeddings, item_embeddings = get_factors(args)
print(user_embeddings[:3])
print(item_embeddings[:3])

In [None]:
#Visualize this latent factors and embeddings information using PCA 
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.decomposition import PCA

pca = PCA(n_components = 3)
pca.fit(user_embeddings)
user_embeddings_pca = pca.transform(user_embeddings)

fig = plt.figure(figsize = (8,8))
ax = fig.add_subplot(111, projection = "3d")
xs, ys, zs = user_embeddings_pca[::150].T
ax.scatter(xs, ys, zs)

### Similary, we perform hyperparameter tuning using packaged files

In [None]:
#Assign the correct directory
%%bash
!cd wals_htune
#Bucket target for storage
BUCKET=gs://masters-research
#Launch the hyperparameter tuning script
!gsutil cp -r data/u.data $BUCKET/data/collab_raw.csv
!./mltrain.sh local ../data collab_raw.csv --headers --delimiter ,