# Установка PySpark

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [0]:
!wget -q www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz

In [3]:
!tar -xvf spark-2.4.4-bin-hadoop2.7.tgz

spark-2.4.4-bin-hadoop2.7/
spark-2.4.4-bin-hadoop2.7/R/
spark-2.4.4-bin-hadoop2.7/R/lib/
spark-2.4.4-bin-hadoop2.7/R/lib/sparkr.zip
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/INDEX
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/html/
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/html/R.css
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/html/00Index.html
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/help/
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/help/aliases.rds
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/help/AnIndex
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/help/SparkR.rdx
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/help/SparkR.rdb
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/help/paths.rds
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/worker/
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/worker/worker.R
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/worker/daemon.R
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/tests/
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/tests/testthat/
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/tests/testthat/te

In [0]:
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

# Получаем данные

In [7]:
!wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip

--2019-11-19 21:39:25--  http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 978202 (955K) [application/zip]
Saving to: ‘ml-latest-small.zip’


2019-11-19 21:39:27 (1.47 MB/s) - ‘ml-latest-small.zip’ saved [978202/978202]



In [8]:
!unzip ml-latest-small.zip

Archive:  ml-latest-small.zip
   creating: ml-latest-small/
  inflating: ml-latest-small/links.csv  
  inflating: ml-latest-small/tags.csv  
  inflating: ml-latest-small/ratings.csv  
  inflating: ml-latest-small/README.txt  
  inflating: ml-latest-small/movies.csv  


# Загружаем данные

In [0]:
ratings = spark.read.csv('ml-latest-small/ratings.csv', header=True, inferSchema=True)
movies = spark.read.csv('ml-latest-small/movies.csv', header=True, inferSchema=True)

In [10]:
ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [11]:
movies.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

## Подход 1 - фильмы с наибольшим средним рейтингом и как минимум 150 отзывами

In [0]:
from pyspark.sql import functions as F

In [0]:
ratings_with_avg = ratings.groupBy('movieId').agg(F.count(ratings.rating).alias('count'), F.avg(ratings.rating).alias('avg'))

In [14]:
ratings_with_avg.show()

+-------+-----+------------------+
|movieId|count|               avg|
+-------+-----+------------------+
|   1580|  165| 3.487878787878788|
|   2366|   25|              3.64|
|   3175|   75|              3.58|
|   1088|   42| 3.369047619047619|
|  32460|    4|              4.25|
|  44022|   23| 3.217391304347826|
|  96488|    4|              4.25|
|   1238|    9| 4.055555555555555|
|   1342|   11|               2.5|
|   1591|   26|2.6346153846153846|
|   1645|   51| 3.411764705882353|
|   4519|    9|3.3333333333333335|
|   2142|   10|               2.7|
|    471|   40|              3.55|
|   3997|   12|1.8333333333333333|
|    833|    6|               2.0|
|   3918|    9|3.2777777777777777|
|   7982|    4|              3.25|
|   1959|   15|3.6666666666666665|
|  68135|   10|              3.55|
+-------+-----+------------------+
only showing top 20 rows



## Практика 1.
1. Добавьте в DataFrame название фильма
2. Оставьте в DataFrame только те фильмы, которые набрали не менее 150 оценок
3. Выведите top-25 фильмов с наивысшим средним баллом

In [0]:
ratings_with_avg = ratings_with_avg.join(movies, on='movieId').select('movieId', 'title', 'count', 'avg')

In [0]:
ratings_with_avg_amd_more_than_150 = ratings_with_avg[ratings_with_avg['count'] >= 150]

In [17]:
ratings_with_avg_amd_more_than_150.orderBy('avg', ascending=False).show(25)

+-------+--------------------+-----+------------------+
|movieId|               title|count|               avg|
+-------+--------------------+-----+------------------+
|    318|Shawshank Redempt...|  317| 4.429022082018927|
|    858|Godfather, The (1...|  192|         4.2890625|
|   2959|   Fight Club (1999)|  218| 4.272935779816514|
|     50|Usual Suspects, T...|  204| 4.237745098039215|
|    260|Star Wars: Episod...|  251| 4.231075697211155|
|    527|Schindler's List ...|  220|             4.225|
|   1196|Star Wars: Episod...|  211|4.2156398104265405|
|   1198|Raiders of the Lo...|  200|            4.2075|
|    296| Pulp Fiction (1994)|  307| 4.197068403908795|
|   2571|  Matrix, The (1999)|  278| 4.192446043165468|
|    356| Forrest Gump (1994)|  329| 4.164133738601824|
|    593|Silence of the La...|  279| 4.161290322580645|
|   2028|Saving Private Ry...|  188|4.1462765957446805|
|   1210|Star Wars: Episod...|  196| 4.137755102040816|
|   4226|      Memento (2000)|  159| 4.122641509

In [18]:
ratings_with_avg_amd_more_than_150.describe().show()

+-------+------------------+--------------------+------------------+------------------+
|summary|           movieId|               title|             count|               avg|
+-------+------------------+--------------------+------------------+------------------+
|  count|                43|                  43|                43|                43|
|   mean|1409.3488372093022|                null|204.27906976744185| 3.943627246165199|
| stddev|1723.2235971839154|                null|  42.7135116245965|0.3107530572975634|
|    min|                 1|Ace Ventura: Pet ...|               157| 3.040372670807453|
|    max|              7153|Usual Suspects, T...|               329| 4.429022082018927|
+-------+------------------+--------------------+------------------+------------------+



## Подход 2 - Колобративная фильтрация

![alt text](https://upload.wikimedia.org/wikipedia/commons/thumb/5/52/Collaborative_filtering.gif/300px-Collaborative_filtering.gif)

это один из методов построения прогнозов (рекомендаций) в рекомендательных системах, использующий известные предпочтения (оценки) группы пользователей для прогнозирования неизвестных предпочтений другого пользователя. Его основное допущение состоит в следующем: те, кто одинаково оценивал какие-либо предметы в прошлом, склонны давать похожие оценки другим предметам и в будущем. 

(c) Wikipedia
https://ru.wikipedia.org/wiki/Коллаборативная_фильтрация

Для решения будем использовать алгоритм [ALS](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html)

![alt text](https://www.researchgate.net/profile/Huu_Hoa_Nguyen/publication/314071424/figure/fig1/AS:570666408529920@1513068882014/An-example-of-matrix-factorization.png)

Параметры:
* numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
* rank is the number of features to use (also referred to as the number of latent factors).
* iterations is the number of iterations of ALS to run. ALS typically converges to a reasonable solution in 20 iterations or less.
* lambda specifies the regularization parameter in ALS.
* implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
* alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

In [0]:
# Удаляем не нужную колонку
ratings = ratings.drop('timestamp')

In [0]:
from pyspark.mllib.recommendation import ALS
import math

In [0]:
# Разбиваем датасет на train и test
ratings_train, ratings_test = ratings.randomSplit([0.8, 0.2])

In [22]:
ratings_test.count()

20266

In [23]:
ratings_train.count()

80570

In [0]:
iterations = 10
rank = 8

In [0]:
model = ALS.train(ratings_train, rank, iterations=iterations)

In [26]:
ratings_test.select('userId', 'movieId').show()

+------+-------+
|userId|movieId|
+------+-------+
|     1|     50|
|     1|     70|
|     1|    163|
|     1|    260|
|     1|    349|
|     1|    367|
|     1|    736|
|     1|    923|
|     1|   1032|
|     1|   1049|
|     1|   1060|
|     1|   1198|
|     1|   1219|
|     1|   1220|
|     1|   1282|
|     1|   1408|
|     1|   1473|
|     1|   1517|
|     1|   1552|
|     1|   1573|
+------+-------+
only showing top 20 rows



In [0]:
predictions = model.predictAll(ratings_test.select('userId', 'movieId').rdd)

In [0]:
predictions_converted = predictions.map(lambda r: (r[0], r[1], r[2]))

In [0]:
predictions_df = predictions_converted.toDF(["userId", "movieId", "rating_pred"])

In [30]:
predictions_df.show()

+------+-------+------------------+
|userId|movieId|       rating_pred|
+------+-------+------------------+
|   288|   1084|  3.84491335906689|
|   600|   1084| 2.877877151245005|
|   402|   1084|3.6065368087237806|
|     6|   1084| 4.054556363842397|
|   103|   1084| 4.361266172794276|
|   315|   1084|3.0671498117917797|
|   160|   3702|1.8647499406185184|
|   122|   3702|4.2838549487297435|
|   298|   3702| 3.455726206842579|
|   303|   3702| 3.595588100154446|
|   428|   6754|2.7033725509079813|
|   274|   6754|3.2410579042416425|
|   594|   6754| 4.479564887761359|
|   331|   6754|2.5181295484303936|
|   381|   6754| 3.486078581498663|
|   111|   6754|3.0856524431115635|
|   561|   6754|2.8418693878581074|
|   610|  44828|3.8879500430503486|
|   307|  44828|3.0652412063919403|
|   454|   5618| 5.009583133794776|
+------+-------+------------------+
only showing top 20 rows



## Практика 2
1. Посчитайте среднее абсолютное отклонение предсказанной оценки от реальной оценки

In [0]:
from pyspark.sql.functions import abs

In [0]:
result = ratings_test.join(predictions_df, on=['userId', 'movieId'])

In [0]:
result = result.withColumn('difference', F.abs(result['rating'] - result['rating_pred']))

In [34]:
result.orderBy('difference', ascending=False).show()

+------+-------+------+--------------------+------------------+
|userId|movieId|rating|         rating_pred|        difference|
+------+-------+------+--------------------+------------------+
|     4|   2843|   5.0| -2.6371142698190333| 7.637114269819033|
|   502|    520|   5.0| -1.7482872104323715|6.7482872104323715|
|   542|   2710|   0.5|   6.853540916787756| 6.353540916787756|
|   261|  27611|   4.5|  10.779081950762937|6.2790819507629365|
|   207|   2384|   0.5|   6.643708737077766| 6.143708737077766|
|   543|  89904|   0.5|   6.571171901498214| 6.071171901498214|
|   508|   2872|   1.5|   7.419334492080935| 5.919334492080935|
|   418|  45672|   0.5|   6.311038659584842| 5.811038659584842|
|   301|   2950|   3.0| -2.6798717834799923| 5.679871783479992|
|   591|   2840|   2.0|   7.638201057453573| 5.638201057453573|
|   344|   2617|   2.0|   7.487443810869818| 5.487443810869818|
|   329|   1884|   5.0| -0.3925605205990337| 5.392560520599034|
|     3|   2851|   5.0| -0.3786044619191

## Предсказание

In [35]:
USER_ID_FOR_PREDICTION = 320
data_for_prediction = ratings_train[ratings_train['userId']==USER_ID_FOR_PREDICTION]
data_for_prediction.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|   320|   1013|   3.5|
|   320|   3534|   4.0|
|   320|   3703|   3.0|
|   320|   3745|   3.5|
|   320|   4387|   4.0|
|   320|   5785|   4.0|
|   320|   6283|   3.5|
|   320|   6857|   3.5|
|   320|  26555|   3.5|
|   320|  31184|   4.0|
|   320|  42718|   3.5|
|   320|  59315|   4.0|
|   320|  62999|   3.5|
|   320|  68358|   4.0|
|   320|  71535|   4.0|
|   320|  72998|   4.0|
+------+-------+------+



In [36]:
data_for_prediction.join(movies, on='movieId').show()

+-------+------+------+--------------------+--------------------+
|movieId|userId|rating|               title|              genres|
+-------+------+------+--------------------+--------------------+
|   1013|   320|   3.5|Parent Trap, The ...|Children|Comedy|R...|
|   3534|   320|   4.0|      28 Days (2000)|               Drama|
|   3703|   320|   3.0|Road Warrior, The...|Action|Adventure|...|
|   3745|   320|   3.5|   Titan A.E. (2000)|Action|Adventure|...|
|   4387|   320|   4.0|Kiss of the Drago...|              Action|
|   5785|   320|   4.0|Jackass: The Movi...|Action|Comedy|Doc...|
|   6283|   320|   3.5|Cowboy Bebop: The...|Action|Animation|...|
|   6857|   320|   3.5|Ninja Scroll (Jûb...|Action|Adventure|...|
|  26555|   320|   3.5|Spies Like Us (1985)|              Comedy|
|  31184|   320|   4.0|Appleseed (Appuru...|Action|Animation|...|
|  42718|   320|   3.5|District 13 (Banl...| Action|Crime|Sci-Fi|
|  59315|   320|   4.0|     Iron Man (2008)|Action|Adventure|...|
|  62999| 

In [0]:
predicted_user_watched = data_for_prediction.rdd.map(lambda r: r[1])

In [38]:
predicted_user_watched.take(20)

[1013,
 3534,
 3703,
 3745,
 4387,
 5785,
 6283,
 6857,
 26555,
 31184,
 42718,
 59315,
 62999,
 68358,
 71535,
 72998]

In [0]:
predicted_user_watched = predicted_user_watched.take(20)

In [0]:
prediction_user_unrated = movies.rdd.filter(lambda m: m[0] not in predicted_user_watched).map(lambda m: (USER_ID_FOR_PREDICTION, m[0]))

In [41]:
prediction_user_unrated.take(10)

[(320, 1),
 (320, 2),
 (320, 3),
 (320, 4),
 (320, 5),
 (320, 6),
 (320, 7),
 (320, 8),
 (320, 9),
 (320, 10)]

## Практика 3
1. Предскажите значения рейтингов для тех фильмов, которые пользователь еще не ввидел (prediction_user_unrated)
2. На основе предсказанных значений постройте top-10 фильмов для пользователя

In [0]:
user_predictions = model.predictAll(prediction_user_unrated)

In [43]:
user_predictions.take(5)

[Rating(user=320, product=1084, rating=2.9505190632478477),
 Rating(user=320, product=6400, rating=2.078768135113463),
 Rating(user=320, product=3702, rating=3.2486556990913975),
 Rating(user=320, product=6754, rating=3.5252056127597426),
 Rating(user=320, product=81132, rating=1.8885091628349677)]

In [0]:
updf = user_predictions.toDF(['userId', 'movieId', 'rating_pred'])

In [45]:
updf.join(movies, on='movieId').show()

+-------+------+------------------+--------------------+--------------------+
|movieId|userId|       rating_pred|               title|              genres|
+-------+------+------------------+--------------------+--------------------+
|   1084|   320|2.9505190632478477|Bonnie and Clyde ...|         Crime|Drama|
|   6400|   320| 2.078768135113463|Murder on a Sunda...|         Documentary|
|   3702|   320|3.2486556990913975|      Mad Max (1979)|Action|Adventure|...|
|   6754|   320|3.5252056127597426|   Underworld (2003)|Action|Fantasy|Ho...|
|  81132|   320|1.8885091628349677|       Rubber (2010)|Action|Adventure|...|
|   6308|   320| 2.578051065541861| Legal Eagles (1986)|Comedy|Crime|Romance|
|  91622|   320|4.0299418143307255|  Young Adult (2011)|        Comedy|Drama|
|  44828|   320| 2.434383476348877|      Slither (2006)|Comedy|Horror|Sci-Fi|
|   5618|   320|3.7503864346505518|Spirited Away (Se...|Adventure|Animati...|
|  26158|   320|3.6120888009914465|Closely Watched T...|    Come

In [46]:
updf.join(movies, on='movieId').orderBy('rating_pred', ascending=False).show()

+-------+------+------------------+--------------------+--------------------+
|movieId|userId|       rating_pred|               title|              genres|
+-------+------+------------------+--------------------+--------------------+
|   2843|   320| 6.863961770553104|Black Cat, White ...|      Comedy|Romance|
| 103984|   320|  5.66754761017932|Great Beauty, The...|        Comedy|Drama|
|  89904|   320|5.4813783862262735|   The Artist (2011)|Comedy|Drama|Romance|
|    945|   320| 5.424784056388437|      Top Hat (1935)|Comedy|Musical|Ro...|
|  79224|   320| 5.409605878604892|Karate Kid, The (...|Action|Children|D...|
|  53123|   320| 5.316978316386292|         Once (2006)|Drama|Musical|Rom...|
|   4373|   320|  5.22919453547258|  Pootie Tang (2001)|              Comedy|
|   7346|   320| 5.137648145387921|Girl Next Door, T...|      Comedy|Romance|
|   1290|   320| 5.132726761475286|Some Kind of Wond...|       Drama|Romance|
|   3925|   320|  5.10929461247228|Stranger Than Par...|        

##  Индивидуальное предсказание

In [47]:
model.predict(320, 48322)

3.1259904380434027

## Сохранение модели

In [0]:
model.save(spark.sparkContext, 'model')

## Домашнее задание
2 варианта.
1. Вариант легкий: Решите задачу классификации цветков ирисов с использованием PySpark
2. Вариант сложный: Решите задачу классификации пассажиров титаника с использованием PySpark (https://www.kaggle.com/c/titanic)

**При выполнении ДЗ не разрешается:**
1. Использовать библиотеку pandas
2. Использовать библиотеку sklearn

Полезные импорты:
1. from pyspark.ml.classification import LogisticRegression
2. from pyspark.ml.evaluation import MulticlassClassificationEvaluator - для оценки качества работы алгоритма
3. from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler - для предобработки признаков

Полезные ссылки:
1. https://spark.apache.org/docs/latest/ml-classification-regression.html#classification - алгоримты классификации в pyspark
2. https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa - пример решения задачи классификации на pyspark

In [0]:
iris_train = spark.read.csv('train_iris.csv', header=True, inferSchema=True)
iris_test = spark.read.csv('test_iris.csv', header=True, inferSchema=True)

In [0]:
from pyspark.ml.feature import VectorAssembler

In [0]:
vectorAssembler = VectorAssembler(inputCols= ['sepal length', 'sepal width', 'petal length', 'petal width'], outputCol = 'features')

In [0]:
iris_train = vectorAssembler.transform(iris_train)
iris_test = vectorAssembler.transform(iris_test)

In [0]:
from pyspark.ml.classification import LogisticRegression

In [0]:
logistic = LogisticRegression(featuresCol = 'features', labelCol = 'species')

In [0]:
logistic_model = logistic.fit(iris_train)

In [0]:
logistic_predictions = logistic_model.transform(iris_test)

In [58]:
logistic_predictions.select('prediction', 'species', 'features').show()

+----------+-------+-----------------+
|prediction|species|         features|
+----------+-------+-----------------+
|       2.0|      2|[5.8,2.8,5.1,2.4]|
|       2.0|      1|[5.9,3.2,4.8,1.8]|
|       2.0|      2|[6.0,3.0,4.8,1.8]|
|       1.0|      1|[5.2,2.7,3.9,1.4]|
|       0.0|      0|[5.1,3.8,1.9,0.4]|
|       2.0|      2|[7.2,3.2,6.0,1.8]|
|       0.0|      0|[5.5,3.5,1.3,0.2]|
|       2.0|      2|[6.9,3.1,5.1,2.3]|
|       0.0|      0|[4.8,3.0,1.4,0.1]|
|       1.0|      1|[6.1,2.9,4.7,1.4]|
|       2.0|      2|[4.9,2.5,4.5,1.7]|
|       1.0|      1|[5.7,2.9,4.2,1.3]|
|       2.0|      2|[6.3,2.9,5.6,1.8]|
|       0.0|      0|[4.6,3.6,1.0,0.2]|
|       0.0|      0|[5.0,3.4,1.6,0.4]|
|       2.0|      2|[6.5,3.0,5.8,2.2]|
|       2.0|      1|[5.4,3.0,4.5,1.5]|
|       0.0|      0|[5.2,4.1,1.5,0.1]|
|       0.0|      0|[5.7,3.8,1.7,0.3]|
|       1.0|      1|[5.8,2.7,4.1,1.0]|
+----------+-------+-----------------+
only showing top 20 rows



In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="species", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(logistic_predictions)

In [71]:
accuracy

0.92