# ALS (Alternating Least Squares)

* used to solve non-convex optimizations
* a matrix factorization algorithm, built for a larges-scale collaborative filtering problems
* can be runned in a parallel fashion
* solve scalability and sparseness of the ratings data

Target function：
$$
\min_{p^*,q^*} \sum_{(i, u)}\left(r_{u i}-\mathbf{q}_i^T \mathbf{p}_u\right)^2+\lambda\left(\left\|\mathbf{q}_i\right\|^2+\left\|\mathbf{p}_u\right\|^2\right)
$$

> [1] [Kevin Liao, 2018 11, Prototyping a Recommender System Step by Step Part 2: Alternating Least Square (ALS) Matrix Factorization in Collaborative Filtering](https://towardsdatascience.com/prototyping-a-recommender-system-step-by-step-part-2-alternating-least-square-als-matrix-4a76c58714a1)
>
>ALS in spark has hyper-params, of which the most important, use grid search CV and RMSE to choose hyper-params: 
>* maxIter: the maximum number of iterations to run (defaults to 10)
>* rank: the number of latent factors in the model (defaults to 10)
>* regParam: the regularization parameter in ALS (defaults to 1.0) #正则化参数，即lambda

# Data Process

In [1]:
# fill_in_movie_index
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
# read_data
data = pd.read_csv("combined_data_1.txt",sep=",",header=None,names = ['customer_id', 'rating'], usecols = [0,1])
df_movie = data[pd.isnull(data["rating"])]
data["movie_id"] = df_movie["customer_id"].apply(lambda x: x[:-1])
data["movie_id"].fillna(method ="ffill", inplace = True)
data = data[data["rating"].notna()]

# ALS using spark

In [2]:
#[2] [jamenlong, 2017 11, Recommendation Engines Using ALS in PySpark (MovieLens Dataset)](https://www.youtube.com/watch?v=FgGjc5oabrA&ab_channel=jamenlong1)
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

def tune_ALS(train_data, validation_data, maxIter, regParams, ranks):
    """
    grid search function to select the best model based on RMSE of
    validation data
    Parameters
    ----------
    train_data: spark DF with columns ['userId', 'movieId', 'rating']
    
    validation_data: spark DF with columns ['userId', 'movieId', 'rating']
    
    maxIter: int, max number of learning iterations
    
    regParams: list of float, one dimension of hyper-param tuning grid
    
    ranks: list of float, one dimension of hyper-param tuning grid
    
    Return
    ------
    The best fitted ALS model with lowest RMSE score on validation data
    """
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in regParams:
            # get ALS model
            als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",nonnegative=True).setMaxIter(maxIter).setRank(rank).setRegParam(reg)
            # train ALS model
            model = als.fit(train_data)
            # evaluate the model by computing the RMSE on the validation data
            predictions = model.transform(validation_data)
            evaluator = RegressionEvaluator(metricName="rmse",
                                            labelCol="rating",
                                            predictionCol="prediction")
            rmse = evaluator.evaluate(predictions)
            print('{} latent factors and regularization = {}: '
                  'validation RMSE is {}'.format(rank, reg, rmse))
            if rmse < min_error:
                min_error = rmse
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and '
          'regularization = {}'.format(best_rank, best_regularization))
    return best_model

In [None]:
# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Movie Rate").getOrCreate()
(training,test) = spark.createDataFrame(data).randomSplit([0.8,0.2])

questions:
1. predicted ratings is not between 1-5, sometines higher than 5 and lower than 1