## Лаба 9. Ваш собственный алгоритм рекомендаций фильмов

### Дедлайн

⏰ Четверг, 13 июня 2019 года, 23:59.

### Задача

Нужно построить рекомендательный алгоритм наилучшей предсказательной точности по метрике RMSE.

### Обработка данных на вход

Имеются следующие входные данные:

* Имеется табличка рейтингов `train.csv`. Формат: `userId, movieId, rating`. Вам отдаётся случайная половина этой таблички (рандомизация по items и users), половина остается нам. Мы используем скрытый датасет для оценки качества предсказания по RMSE. 
* Табличка `tags.csv` с текстовыми тэгами, которые пользователя проставил фильму. Формат: `userId, movieId, tag`.
* Табличка `movies.csv` с названием фильма и его жанром. Формат: `movieId, title, genres`.
* Табличка `links.csv` соответствия id фильма с базами данных imdb и themoviedb, в которых можно найти дополнительные характеристики фильмов. Формат: `movieId, imdbId, tmdbId`. 
* Табличка `test.csv`, для которой надо сгенерировать предсказания.

Для выполнения работы вам следует взять все файлы из папки на HDFS `/labs/lab09data/`.

### Подсказки

* Вы можете использовать любые алгоритмы и их смеси (NP, User-user, Item-item, SVD, ALS...) для предсказания рейтингов. 
* Вы также можете использовать дополнительные content данные для обогащения вашего алгоритма (`tags.csv, movies.csv, links.csv`)
* Обучив свой алгоритм на таблице с рейтингами `train.csv`, нужно сгенерировать предсказания для таблички `test.csv` и засабмитить.
* В `test.csv` могут попасть пользователи, по которым у вас в `train.csv` нет (или мало) наблюдений. Придумайте, что с ними делать (дефолтные рейтинги?).

### Проверка

Мы будем оценивать точность работы вашего алгоритма по метрике среднего квадратичного отклонения предсказанного рейтинга от истинного рейтинга RMSE: 

<img width="350px" src="laba09_rmse.png">

**Важно!** Для точной проверки сохраняйте порядок и количество строк исходного файла `test.csv` (сортировка там идет по целочисленному `userId`, а потом по `movieId`). Названия колонок и сепаратор так же следует сохранить. Файл `test.csv` сам является образцом для засылки.

**Если RMSE вашей рекомендательной системы будет ниже 0.9, то лаба будет засчитана.**

Результат следует сохранить в файл `lab09.csv` в своей домашней директории.

Проверка осуществляется [автоматическим скриптом](http://lk.newprolab.com/lab/laba09) из Личного кабинета.

## Решение

In [2]:
# Запуск pyspark
import os
import sys
os.environ["PYSPARK_SUBMIT_ARGS"]='pyspark-shell'
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [4]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Load and Review Data

In [3]:
!hadoop fs -ls /labs/lab09data/

Found 5 items
-rw-r--r--   3 hdfs hdfs     561456 2018-09-13 13:49 /labs/lab09data/links.csv
-rw-r--r--   3 hdfs hdfs    1390073 2018-09-13 13:49 /labs/lab09data/movies.csv
-rw-r--r--   3 hdfs hdfs   11364859 2018-09-13 13:49 /labs/lab09data/tags.csv
-rw-r--r--   3 hdfs hdfs  169754328 2018-09-13 13:49 /labs/lab09data/test.csv
-rw-r--r--   3 hdfs hdfs  169755196 2018-09-13 13:49 /labs/lab09data/train.csv


### train.csv

In [5]:
train = spark.read.csv('/labs/lab09data/train.csv', inferSchema=True, header=True)
train.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|   2244|   4.5|
|     1|   2464|   4.5|
|     1|   6361|   4.5|
|     1|  10620|   4.5|
|     1|  12012|   4.0|
+------+-------+------+
only showing top 5 rows



In [8]:
train.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [5]:
%time train.summary().show()

+-------+------------------+------------------+------------------+
|summary|            userId|           movieId|            rating|
+-------+------------------+------------------+------------------+
|  count|          10531564|          10531564|          10531564|
|   mean|114525.73807926344|13712.597933602265|3.5218638466233507|
| stddev| 66094.26611665961| 7860.215369765642| 1.058361838990463|
|    min|                 1|                 1|               0.5|
|    25%|             57387|              6812|               3.0|
|    50%|            114363|             13705|               3.5|
|    75%|            171482|             20611|               4.0|
|    max|            229060|             27302|               5.0|
+-------+------------------+------------------+------------------+

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 24.8 s


In [6]:
train.rdd.getNumPartitions()

2

### test.csv

In [7]:
test = spark.read.csv('/labs/lab09data/test.csv', inferSchema=True, header=True)
test.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|   1414|   0.0|
|     1|   2346|   0.0|
|     1|   5278|   0.0|
|     1|   9303|   0.0|
|     1|  11817|   0.0|
+------+-------+------+
only showing top 5 rows



In [10]:
test.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [7]:
%time test.summary().show()

+-------+------------------+------------------+--------+
|summary|            userId|           movieId|  rating|
+-------+------------------+------------------+--------+
|  count|          10531564|          10531564|10531564|
|   mean|114525.72723301116|13711.874988273346|     0.0|
| stddev| 66094.26609743753|  7860.38945378376|     0.0|
|    min|                 1|                 1|     0.0|
|    25%|             57387|              6812|     0.0|
|    50%|            114373|             13705|     0.0|
|    75%|            171494|             20611|     0.0|
|    max|            229060|             27303|     0.0|
+-------+------------------+------------------+--------+

CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 16.4 s


In [8]:
test.rdd.getNumPartitions()

2

### ALS in Spark MLlib

In [9]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [11]:
# define rmse evaluator
evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='rating')

#### Journal

maxIter | regParam | rank | nonnegative | rmse_train | rmse_test | clamp/na.fill | time
:--- | :---: | :---: | :---: | :---: | :--- | :--- |
10 | 0.1 | 8 | True | 0.7673 | 0.8321 | -/3.5 |
20 | 0.01 | 10 | True | 0.6922  | 0.8581 | 0.5/3.5  |1m57s
50 | 0.01 | 20 | True | 0.6177 | 0.8758 | 3.5/3.5  | 7m37s
30 | 0.1 | 8 | True | 0.7547 | 0.8276  | 0.5/5.0/3.52  | 2m9s

In [52]:
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=30, regParam=0.1, rank=8, nonnegative=True, coldStartStrategy="nan",\
          userCol='userId', itemCol='movieId', ratingCol='rating')

% time model = als.fit(train)

CPU times: user 16 ms, sys: 12 ms, total: 28 ms
Wall time: 2min 9s


In [53]:
train.rdd.getNumPartitions()

2

In [64]:
#Let see how the model perform on train set
predict_train = model.transform(train)
%time predict_train.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
| 75052|    148|   3.0| 2.9280884|
|109268|    148|   3.0| 2.8745887|
| 19362|    463|   3.0| 3.2669127|
|  6990|    463|   4.5| 3.8141394|
|189548|    463|   4.0| 3.4103394|
+------+-------+------+----------+
only showing top 5 rows

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 13.6 s


In [65]:
predict_train.rdd.getNumPartitions()

200

In [66]:
predict_train = predict_train.coalesce(4).cache()

In [67]:
predict_train.rdd.getNumPartitions()

4

In [68]:
# check the root mean squared error on train set
%time rmse_train = evaluator.evaluate(predict_train)
print(f'Root mean squared error of the train_data: {rmse_train}')

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 18.7 s
Root mean squared error of the train_data: 0.7547829501877498


#### Predict test ratings

In [54]:
# predict test data
predict_test = model.transform(test)
%time predict_test.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|155572|    148|   0.0| 2.9904509|
| 94231|    148|   0.0| 3.3644466|
|178586|    148|   0.0| 2.6055057|
|  3855|    148|   0.0| 2.4866748|
|198955|    463|   0.0| 3.7445066|
+------+-------+------+----------+
only showing top 5 rows

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 19.4 s


In [55]:
predict_test.rdd.getNumPartitions()

200

In [56]:
predict_test = predict_test.coalesce(4).cache()

In [39]:
%time predict_test.summary().show()

+-------+------------------+------------------+--------+----------+
|summary|            userId|           movieId|  rating|prediction|
+-------+------------------+------------------+--------+----------+
|  count|          10531564|          10531564|10531564|  10531564|
|   mean|114525.72723301116|13711.874988273346|     0.0|       NaN|
| stddev| 66094.26609743979|7860.3894537839815|     0.0|       NaN|
|    min|                 1|                 1|     0.0|       0.0|
|    25%|             57415|              6812|     0.0| 2.9837773|
|    50%|            114390|             13705|     0.0|  3.546124|
|    75%|            171498|             20611|     0.0| 4.0359564|
|    max|            229060|             27303|     0.0|       NaN|
+-------+------------------+------------------+--------+----------+

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 28.5 s


#### Fill in NaN and zero values in predict_test with avgRating (approx 3.5)

In [57]:
predict_clean = predict_test \
            .withColumn("prediction", when(col("prediction") < 0.5, 0.5).otherwise(col("prediction"))) \
            .withColumn("prediction", when(col("prediction") > 5.0, 5.0).otherwise(col("prediction"))) \
            .na.fill(3.52)
%time predict_clean.summary().show()

+-------+------------------+------------------+--------+------------------+
|summary|            userId|           movieId|  rating|        prediction|
+-------+------------------+------------------+--------+------------------+
|  count|          10531564|          10531564|10531564|          10531564|
|   mean|114525.72723301116|13711.874988273346|     0.0| 3.415006611794796|
| stddev| 66094.26609743858| 7860.389453783982|     0.0|0.6512843965369679|
|    min|                 1|                 1|     0.0|               0.5|
|    25%|             57406|              6812|     0.0|3.0246684551239014|
|    50%|            114412|             13705|     0.0| 3.477482795715332|
|    75%|            171523|             20611|     0.0| 3.869719982147217|
|    max|            229060|             27303|     0.0|               5.0|
+-------+------------------+------------------+--------+------------------+

CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 32.3 s


In [58]:
predict_clean.rdd.getNumPartitions()

4

#### Make output dataframe

In [59]:
# make output dataframe
output = predict_clean.select('userId', 'movieId', col('prediction').alias('rating')) \
                    .orderBy(['userId', 'movieId'])
%time output.show(5)

+------+-------+------------------+
|userId|movieId|            rating|
+------+-------+------------------+
|     1|   1414| 3.961900234222412|
|     1|   2346| 4.125975608825684|
|     1|   5278| 3.106297492980957|
|     1|   9303| 4.092289447784424|
|     1|  11817|4.3787150382995605|
+------+-------+------------------+
only showing top 5 rows

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 530 ms


In [60]:
output.rdd.getNumPartitions()

200

#### Write csv file to hdfs and copy to local

In [61]:
# write csv file with predictions
%time output.coalesce(1).write.csv('/user/sergey.zaytsev/lab09', header=True, sep=',', mode='overwrite')

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 19.9 s


In [62]:
!hadoop fs -ls /user/sergey.zaytsev/lab09

Found 2 items
-rw-r--r--   3 sergey.zaytsev sergey.zaytsev          0 2019-06-11 16:42 /user/sergey.zaytsev/lab09/_SUCCESS
-rw-r--r--   3 sergey.zaytsev sergey.zaytsev  321316751 2019-06-11 16:42 /user/sergey.zaytsev/lab09/part-00000-1f06842c-c7a0-4d67-b4d3-968b3034057b-c000.csv


In [63]:
# Copy output file to local directory
!hadoop fs -copyToLocal /user/sergey.zaytsev/lab09/part-00000-1f06842c-c7a0-4d67-b4d3-968b3034057b-c000.csv ~/

### shut down spark

In [69]:
sc.stop()

## Content Data (Ignore for now)

### tags.csv

### movies.csv

### links.csv