# CloudXLab Movie Recomedation

In [1]:
# Import the liberary to setup the pyspark envrionment in python Jupyter notebook
import os
import sys

os.environ["SPARK_HOME"] = "/usr/spark2.4.3"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
from pyspark.sql import SparkSession

# Create the spark session
spark = SparkSession.builder.master("local[*]").appName("cxl_movie_recommender").getOrCreate()
spark

### Read the data file 
Here read the data file using `spark.read.text` not using the `spark.read.csv` as the file have two delaminators and till Spark3 csv except only one, otherwise it gives error.</p>

If using spark 3 then it is good to use the csv as we don't need to write couple step which is actually create the data frame from a single value dataframe to the multicolumn(as per data file). Below are the steps
- read file as a text which return the Dataframe
- now split the data with the delaminator `::` using map and here we have to convert the dataframe to rdd.
- Convert RDD to dataframe with the supplied header(as no header in data file) as well.
- Now need to convert the data type as well as currently all are `String` type

In [3]:
## Read the file from the HDFS folder.
# This method return the dataframe with the value as the column header name
data_file = spark.read.text("/data/ml-1m/ratings.dat")
data_file.show(5)

+--------------------+
|               value|
+--------------------+
|1::1193::5::97830...|
|1::661::3::978302109|
|1::914::3::978301968|
|1::3408::4::97830...|
|1::2355::5::97882...|
+--------------------+
only showing top 5 rows



###### Canvert single column data frame to multi column

In [4]:
#using the map create the column by using the split

data_rdd = data_file.rdd.map(lambda data: data[0].split('::'))

header_col = ['userId', 'movieId', 'rating', 'timestamp']

rating_df = data_rdd.toDF(header_col)



###### The above code can written as this way as well
```python
rating_df = data_file.rdd.map(lambda data: data[0].split('::')).toDF(header_col)
```

In [5]:
# Display the data for checking, but in actual code it is not required as it execute as we are taking action
rating_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|   1193|     5|978300760|
|     1|    661|     3|978302109|
|     1|    914|     3|978301968|
|     1|   3408|     4|978300275|
|     1|   2355|     5|978824291|
+------+-------+------+---------+
only showing top 5 rows



In [6]:
# Checking the schema
rating_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



###### Change the data type

In [7]:

# here chnage the datatype from string to int for all except timestamp which is converted into long

#Import the liberary for datatype
from pyspark.sql.types import IntegerType, LongType

# Here read each column except timestamp for interger conversion.
for col in header_col[:-1] :
    rating_df = rating_df.withColumn(col, rating_df[col].cast(IntegerType()))

rating_df = rating_df.withColumn('timestamp', rating_df['timestamp'].cast(LongType()))

# Again print the schema again to check
rating_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: long (nullable = true)



### Build the Model
###### Train Test Split

In [8]:
# Split into the train and test of 70 30 ratio
train_df, test_df = rating_df.randomSplit([0.7, 0.3], seed=123)

###### Build the model using ALS

In [9]:
# Import the ALS to build the model. Here using the same parameter used by the training.
from pyspark.ml.recommendation import ALS
als = ALS(maxIter=5, regParam=0.01, userCol='userId', itemCol='movieId', ratingCol='rating')

model = als.fit(train_df)

###### Model apply on test and preditct the rating 

In [10]:
# Predict the rating in the test dataset.
predictions = model.transform(test_df)
# print the first five row of the prediction
predictions.take(5)

[Row(userId=4169, movieId=148, rating=3, timestamp=976588402, prediction=3.5438382625579834),
 Row(userId=3184, movieId=148, rating=4, timestamp=968708953, prediction=4.268411636352539),
 Row(userId=1069, movieId=148, rating=2, timestamp=974945135, prediction=3.6641809940338135),
 Row(userId=1150, movieId=148, rating=2, timestamp=974875106, prediction=2.8830413818359375),
 Row(userId=3829, movieId=148, rating=2, timestamp=965940170, prediction=3.246178388595581)]

###### Evaluate the model

In [11]:
import math
def print_rmse(pred_df):
    result = pred_df.rdd.map(lambda data: math.pow(data[2]-data[4], 2)).filter(lambda x: not math.isnan(x))
    result = result.reduce(lambda x,y: x+y)
    rmse = math.sqrt(result/pred_df.count())

    print('rmse of the model is {}%'.format(round(rmse, 2)*100))

In [12]:
print_rmse(predictions)

rmse of the model is 91.0%


## Cross Validation and Hyper parameter tuning

By using this cross validation and hyper parameter can find the best model

- Here set the different parameter in range so that cross validation use every combination and evalute the result based on the provided evaluator.
- After geeting the best model on first iteration tune the paramter again and run the cross validation again. 
- Need todo this process till that look there is not much chnages.
- Here I am doing only two iteration as demostration purpose. Also not putting much params as it take huge time.

In [13]:
# Import the require liberary.

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator


###### Combine all steps in a method

In [14]:
# this is a method which run the cross validation and then print best params
def model_tunning(estimator, estimatorParams, evaluator, dataset) :
    cv = CrossValidator(estimator=estimator, estimatorParamMaps=estimatorParams, evaluator=evaluator, numFolds=5)

    model = cv.fit(dataset)
    
    print('---------Best params---------')
    print('Rank:', model.bestModel._java_obj.parent().getRank())
    print('RegParam:', model.bestModel._java_obj.parent().getRegParam())
    
    return model.bestModel

###### Iteration 1

In [15]:
# setting up the initail hyper parameter for tunning
hyper_param = ParamGridBuilder()\
                            .addGrid(als.rank, [x for x in range(5, 15, 5) ])\
                            .addGrid(als.regParam, [x * 0.01 for x in range(1, 4, 2)])\
                            .build()

# Setting up the evaluation matrics so that cross validation use to find the best one.
evaluator = RegressionEvaluator(metricName='rmse', labelCol="rating", predictionCol="prediction")

best_model = model_tunning(als, hyper_param, evaluator, train_df)

---------Best params---------
Rank: 5
RegParam: 0.01


###### Iteration 2

In [16]:
# Tune the parameters
hyper_param = ParamGridBuilder()\
                            .addGrid(als.rank, [3, 5])\
                            .addGrid(als.regParam, [0.005, 0.01])\
                            .build()

# Here just doing for learning and training purpose as I put more range then it will take more time
#second iteration
best_model = model_tunning(als, hyper_param, evaluator, train_df)

---------Best params---------
Rank: 3
RegParam: 0.005


###### Model apply on test and preditct the rating 

In [17]:
predictions = best_model.transform(test_df)

predictions.show(5)

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|  4169|    148|     3|976588402|  3.018169|
|  3184|    148|     4|968708953| 3.5615373|
|  1069|    148|     2|974945135| 3.8229532|
|  1150|    148|     2|974875106| 3.1401463|
|  3829|    148|     2|965940170| 3.0570917|
+------+-------+------+---------+----------+
only showing top 5 rows



###### RMSE of the new best model

In [18]:
print_rmse(predictions)

rmse of the model is 89.0%


##### Note: Here the original one gives the better result compare to the results coems from the cross validation. Even on the first iteration it shows the same as original.

</p>This is just for the demo purpose. I enjoy this :)