# RECOMMENDER SYSTEMS with SPARK

## Set up Spark environment

### Installation

In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!rm spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Get:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.39)] [Co                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Connecting to security.ubuntu.com (91.189.91.39)] [Connecting to cloud.r-pr                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [3 InRelease 38.8 kB/88.7 kB 44%] [Connecting to security.ubuntu.com (91.1890% [2 InRelease gpgv 242 kB] [3 InRelease 38.8 kB/88.7 kB 44%] [Connecting to s                                                                               Hit:4 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [2 InRelease gpgv 242 kB] [3 InRelease 47.5 kB/88.7 kB 54%] [Connecting to s0% [2 InRelease gpgv 242 kB] [Waiting for hea

### Import from Python
1. Having Spark and Java installed, first we need to set relevant environment variables to use them in Colab. More specifically, you must set the locations of Java and Spark as following:

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

Note that you should do this in Python instead of Bash because we need those variables for the whole runtime. Bash commands doesn't persist as we need.



2. Secondly, we can now import Spark from Python by using the `findspark` package that we installed earlier by `pip`. Next we create a Spark Session that we will use from now on:

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local[*]")\
                    .config("spark.driver.memory", "16g")\
                    .getOrCreate()

Note: from [Spark's official document](http://spark.apache.org/docs/2.4.7/submitting-applications.html#master-urls):
> local[*]:	Run Spark locally with as many worker threads as logical cores on your machine.

Check that we successfully created a Spark Session:

In [4]:
spark

## Fulfill the requirements

### 1. Requirement 1

Study a state-of-the-art (SOTA) collaborative filtering approach and present your findings.

Đã trình bày ở lớp học

### 2. Requirement 2

Implement the chosen algorithm studied above on Google Colab and Spark environment.

In [5]:
%tensorflow_version 1.x

TensorFlow 1.x selected.


In [6]:
# import image file from drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
import os
os.chdir('/content/drive/My Drive/bigdata-rs')
!ls

'Copy of PySpark Colab - CQ17'	 spark-2.4.7-bin-hadoop2.7.tgz.1
 ml-1m				 summary_ml1m.txt
 spark-2.4.7-bin-hadoop2.7


In [8]:
import numpy as np
from time import time

import tensorflow as tf
import sys

In [9]:
  seed = int(time())
  np.random.seed(seed)

In [10]:
def loadData(path='./', valfrac=0.1, delimiter='::', seed=1234,
             transpose=False):
    '''
    loads ml-1m data

    :param path: path to the ratings file
    :param valfrac: fraction of data to use for validation
    :param delimiter: delimiter used in data file
    :param seed: random seed for validation splitting
    :param transpose: flag to transpose output matrices (swapping users with movies)
    :return: train ratings (n_u, n_m), valid ratings (n_u, n_m)
    '''
    np.random.seed(seed)

    tic = time()
    print('reading data...')
    data = np.loadtxt(path, skiprows=0, delimiter=delimiter).astype('int32')
    print('data read in', time() - tic, 'seconds')

    n_u = np.unique(data[:, 0]).shape[0]  # number of users
    n_m = np.unique(data[:, 1]).shape[0]  # number of movies
    n_r = data.shape[0]  # number of ratings

    # these dictionaries define a mapping from user/movie id to to user/movie number (contiguous from zero)
    udict = {}
    for i, u in enumerate(np.unique(data[:, 0]).tolist()):
        udict[u] = i
    mdict = {}
    for i, m in enumerate(np.unique(data[:, 1]).tolist()):
        mdict[m] = i

    # shuffle indices
    idx = np.arange(n_r)
    np.random.shuffle(idx)

    trainRatings = np.zeros((n_u, n_m), dtype='float32')
    validRatings = np.zeros((n_u, n_m), dtype='float32')

    for i in range(n_r):
        u_id = data[idx[i], 0]
        m_id = data[idx[i], 1]
        r = data[idx[i], 2]

        # the first few ratings of the shuffled data array are validation data
        if i <= valfrac * n_r:
            validRatings[udict[u_id], mdict[m_id]] = int(r)
        # the rest are training data
        else:
            trainRatings[udict[u_id], mdict[m_id]] = int(r)

    if transpose:
        trainRatings = trainRatings.T
        validRatings = validRatings.T

    print('loaded dense data matrix')

    return trainRatings, validRatings


In [11]:
# define network functions
def kernel(u, v):
    """
    Sparsifying kernel function

    :param u: input vectors [n_in, 1, n_dim]
    :param v: output vectors [1, n_hid, n_dim]
    :return: input to output connection matrix
    """
    dist = tf.norm(u - v, ord=2, axis=2)
    hat = tf.maximum(0., 1. - dist**2)
    return hat

In [12]:
def kernel_layer(x, n_hid, n_dim, activation, lambda_s,
                 lambda_2, name):
    """
    a kernel sparsified layer

    :param x: input [batch, channels]
    :param n_hid: number of hidden units
    :param n_dim: number of dimensions to embed for kernelization
    :param activation: output activation
    :param name: layer name for scoping
    :return: layer output, regularization term
    """

    # define variables
    with tf.variable_scope(name):
        W = tf.get_variable('W', [x.shape[1], n_hid])
        n_in = x.get_shape().as_list()[1]
        u = tf.get_variable('u', initializer=tf.random.truncated_normal([n_in, 1, n_dim], 0., 1e-3))
        v = tf.get_variable('v', initializer=tf.random.truncated_normal([1, n_hid, n_dim], 0., 1e-3))
        b = tf.get_variable('b', [n_hid])

    # compute sparsifying kernel
    # as u and v move further from each other for some given pair of neurons, their connection
    # decreases in strength and eventually goes to zero.
    w_hat = kernel(u, v)

    # compute regularization terms
    sparse_reg = tf.contrib.layers.l2_regularizer(lambda_s)
    sparse_reg_term = tf.contrib.layers.apply_regularization(sparse_reg, [w_hat])

    l2_reg = tf.contrib.layers.l2_regularizer(lambda_2)
    l2_reg_term = tf.contrib.layers.apply_regularization(l2_reg, [W])

    # compute output
    W_eff = W * w_hat
    y = tf.matmul(x, W_eff) + b
    y = activation(y)
    return y, sparse_reg_term + l2_reg_term

In [13]:
def kernelNet(dataset='ml-1m', seed=seed):

  path = './' + dataset+ '/ratings.dat'
  # load data
  tr, vr = loadData(path, delimiter='::',
                    seed=seed, transpose=True, valfrac=0.1)

  tm = np.greater(tr, 1e-12).astype('float32')  # masks indicating non-zero entries
  vm = np.greater(vr, 1e-12).astype('float32')

  n_m = tr.shape[0]  # number of movies
  n_u = tr.shape[1]  # number of users (may be switched depending on 'transpose' in loadData)

  # Set hyper-parameters
  n_hid = 500
  lambda_2 = 60.0
  lambda_s = 0.013
  n_layers = 2
  output_every = 50  # evaluate performance on test set; breaks l-bfgs loop
  n_epoch = n_layers * 10 * output_every
  verbose_bfgs = True
  use_gpu = True
  if not use_gpu:
      os.environ['CUDA_VISIBLE_DEVICES'] = ''
      
  # Input placeholders
  R = tf.placeholder("float", [None, n_u])

  # Instantiate network
  y = R
  reg_losses = None
  for i in range(n_layers):
      y, reg_loss = kernel_layer(x=y, n_hid=n_hid, n_dim=5, activation=tf.nn.sigmoid, lambda_s=lambda_s,
                  lambda_2=lambda_2, name=str(i))
      reg_losses = reg_loss if reg_losses is None else reg_losses + reg_loss
  prediction, reg_loss = kernel_layer(x=y, n_hid=n_u, n_dim=5, activation=tf.identity, lambda_s=lambda_s,
                  lambda_2=lambda_2, name='out')
  reg_losses = reg_losses + reg_loss

  # Compute loss (symbolic)
  diff = tm*(R - prediction)
  sqE = tf.nn.l2_loss(diff)
  loss = sqE + reg_losses

  # Instantiate L-BFGS Optimizer
  optimizer = tf.contrib.opt.ScipyOptimizerInterface(loss, options={'maxiter': output_every,
                                                                    'disp': verbose_bfgs,
                                                                    'maxcor': 10},
                                                    method='L-BFGS-B')


  summary_file = 'summary_' + dataset + '.txt'

  # Training and validation loop
  init = tf.global_variables_initializer()
  with tf.Session() as sess:
      sess.run(init)
      for i in range(int(n_epoch / output_every)):
          optimizer.minimize(sess, feed_dict={R: tr}) #do maxiter optimization steps
          pre = sess.run(prediction, feed_dict={R: tr}) #predict ratings

          error = (vm * (np.clip(pre, 1., 5.) - vr) ** 2).sum() / vm.sum() #compute validation error
          error_train = (tm * (np.clip(pre, 1., 5.) - tr) ** 2).sum() / tm.sum() #compute train error

          print('.-^-._' * 12)
          print('epoch:', i, 'validation rmse:', np.sqrt(error), 'train rmse:', np.sqrt(error_train))
          print('.-^-._' * 12)

      with open(summary_file, 'a') as file:
          for a in sys.argv[1:]:
              file.write(a + ' ')
          file.write(str(np.sqrt(error)) + ' ' + str(np.sqrt(error_train))
                    + ' ' + str(seed) + '\n')
          file.close()

### 3. Requirement 3

Run your implementation on the standard benchmark MovieLens 1M movie ratings.

In [14]:
kernelNet(dataset='ml-1m', seed=seed)

reading data...
data read in 10.036483526229858 seconds
loaded dense data matrix
The TensorFlow contrib module will not be included in TensorFlow 2.0.
For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
  * https://github.com/tensorflow/io (for I/O related ops)
If you depend on functionality not listed there, please file an issue.

Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
INFO:tensorflow:Optimization terminated with:
  Message: b'STOP: TOTAL NO. of ITERATIONS REACHED LIMIT'
  Objective function value: 441513.218750
  Number of iterations: 50
  Number of functions evaluations: 54
.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._
epoch: 0 validation rmse: 0.88670534 train rmse: 0.87031895
.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._.-^-._
INFO:tensorflow:Optimization terminated with

### 4. Requirement 4

#### 4.1. Implement the baseline algorithm supported by Apache Spark and another SOTA approach that is comparative to your approach

##### 4.1.1. Implement the ALS algorithm by Spark

In [15]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, Row

def dataLoaderALS(path='./ml-1m/ratings.dat', delimiter='::',
                  seed=seed, valfrac=0.1):
  schema = StructType([
        StructField("userId",IntegerType(),True),
        StructField("movieId",IntegerType(),True),
        StructField("rating",DoubleType(),True),
        StructField("timestamp", IntegerType(), True),
        ])
  rdd = spark.sparkContext.textFile(path)\
        .map(lambda x: x.split('::'))\
        .map(lambda x: [int(x[0]), int(x[1]), float(x[2]), int(x[3])])
  data = spark.createDataFrame(data=rdd, schema=schema)
  return data


In [16]:
path = os.getcwd() + '/ml-1m/ratings.dat'
data = dataLoaderALS(path, '::', seed, 0.1)
data = data.drop('timestamp')
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [17]:
# split data into train and test sets with 90:10 proportions
train, test = data.randomSplit([0.9, 0.1], seed=seed)
# cache to reduce time taken
train.cache()

DataFrame[userId: int, movieId: int, rating: double]

In [18]:
# import the ALS algorithm we will be using
from pyspark.ml.recommendation import ALS

#instantiate model with the "drop" cold start strategy
model = ALS(coldStartStrategy="drop")

In [19]:
# set the column names for the required data
model.setItemCol("movieId")\
    .setUserCol("userId")\
    .setRatingCol("rating")

ALS_687af8489039

In [20]:
model = model.fit(train)

In [21]:
predictions = model.transform(test)

In [23]:
# import the regression evaluator
from pyspark.ml.evaluation import RegressionEvaluator

# instantiate evaluator, specifying the desired metric "mae" and the columns
# that contain the predictions and the actual values
evaluator = RegressionEvaluator(metricName="rmse", predictionCol="prediction", labelCol="rating")

In [24]:
# evaluate the output of our model
rmse = evaluator.evaluate(predictions)
print('The ALS RMSE is ' + str(rmse))

The ALS RMSE is 0.8665453923948111


Nhận xét: Thuật toán ALS trên Spark đã cho kết quả RMSE xấp xỉ 0.867

##### 4.1.2. Implement another SOTA

In [1]:
!git clone https://github.com/gtshs2/Autorec.git

Cloning into 'Autorec'...
remote: Enumerating objects: 45, done.[K
remote: Total 45 (delta 0), reused 0 (delta 0), pack-reused 45[K
Unpacking objects: 100% (45/45), done.


In [5]:
%tensorflow_version 1.x

TensorFlow 1.x selected.


In [6]:
# %cd Autorec/
!python main.py

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Testing // Epoch 333 //  Total cost = 75849.66  RMSE = 0.85465 Elapsed time : 0 sec
Training // Epoch 334 //  Total cost = 234187.26 Elapsed time : 0 sec
Testing // Epoch 334 //  Total cost = 76249.35  RMSE = 0.85696 Elapsed time : 0 sec
Training // Epoch 335 //  Total cost = 233902.24 Elapsed time : 0 sec
Testing // Epoch 335 //  Total cost = 76509.70  RMSE = 0.85850 Elapsed time : 0 sec
Training // Epoch 336 //  Total cost = 233208.69 Elapsed time : 0 sec
Testing // Epoch 336 //  Total cost = 76379.53  RMSE = 0.85774 Elapsed time : 0 sec
Training // Epoch 337 //  Total cost = 233245.68 Elapsed time : 0 sec
Testing // Epoch 337 //  Total cost = 76076.51  RMSE = 0.85600 Elapsed time : 0 sec
Training // Epoch 338 //  Total cost = 233067.86 Elapsed time : 0 sec
Testing // Epoch 338 //  Total cost = 75922.28  RMSE = 0.85507 Elapsed time : 0 sec
Training // Epoch 339 //  Total cost = 233521.92 Elapsed time : 0 sec
Testing // 

Nhóm đã cài đặt thuật toán I-AutoRec và đạt kết quả RMSE xấp xỉ 0.849 ở epoch thứ 2000

#### 4.2. Compare performances on MovieLens 1M, using RMSE.

Kết quả cho thấy I-AutoRec tốt hơn ALS (RMSE 0.849 vs 0.867)

### 5. Requirement 5

Discuss the performance of the three above approaches on another dataset, which is different from the benchmark above in term of data size (should be bigger) and/or sparsity.

Chọn dataset movielens 10M:  10 triệu ratings, 10,680 movies, 71,000 users, density = 0.013  
Do giới hạn của RAM nên không thể chạy các thuật toán trên với tập dataset này, nhóm thu thập kết quả RMSE từ các bài báo để so sánh hiệu năng của các thuật toán

Ngoài ra Spark ALS recommender là một thuật toán matrix factorization sử dụng Alternating Least Squares với Weighted-Lamda-Regularization để tối ưu (ALS-WR) nên nhóm sẽ lấy kết quả của thuật toán ALS-WR làm kết quả cho thuật toán baseline được cung cấp bởi Spark

| Method           | RMSE   | Result from                                                                                             |
| ---------------- | ------ | ------------------------------------------------------------------------------------------------------- |
| ALS-WR           | 0.7830 | Strub, F., Mary, J., and Gaudel, R. Hybrid recommender system based on autoencoders                     |
| SparseFC         | 0.769  | Lorenz K. Muller, Julien N.P. Martel, Giacomo Indiveri. Kernelized Synaptic Weight Matrices             |
| CF-NADE 2 layers | 0.771  | Zheng, Y., Tang, B., Ding, W., and Zhou, H. A neural autoregressive approach to collaborative filtering |

Nhận xét: Ta có thể thấy SparseFC cho kết quả RSME vượt trội hơn so với ALS-WR và CF-NADE trên tập dataset có cả kích thước lớn hơn cũng như mật độ dữ liệu thấp hơn movielens 1M

## References
- Cài đặt PySpark trên colab: file notebook seminar [PySpark Dataframe](https://colab.research.google.com/drive/1mlULkE3YAnWnWsXj2woGvFInmiR6mV8C?usp=sharing)
- Source code SparseFC: https://github.com/lorenzMuller/kernelNet_MovieLens
- Cài đặt thuật toán ALS: [Spark Collaborative Filtering document](https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html)
- Source code I-AutoRec: https://github.com/gtshs2/Autorec
- Kết quả RSME của các thuật toán: [Steffen Rendle, Li Zhang, Yehuda Koren. On the Difficulty of Evaluating Baselines](https://arxiv.org/pdf/1905.01395.pdf)