# 1. Spark application (70 баллов в сумме)
* Развернуть hadoop-кластер в конфигурации 1 namenode, 1 datanode, 1 resourcemanager, 1 nodemanager. Также развернуть jupyter в контейнере (все перечисленное уже есть в docker-compose (https://github.com/smalyshevv/bigdata-docker-pyspark)) (30 баллов)
* Запустить спарк-сессию (SparkSession) с мастером YARN, 2-мя экзекьюторами и именем приложения “{фамилия}_spark”- перед этим обязательно выйдите из savemode в hdfs (hdfs dfsadmin -safemode leave). Приложить скрин YARN-а, где запущено приложение, приложить скрин UI приложения spark (30 баллов) 
* Прочитать таблицы ratings, tags в директории ml-latest-small; отобразить количество строчек и в том, и в другом датасете. (Не забудьте перекинуть ваши данные из контейнера jupyter в hdfs: 
hdfs dfs -rm -r ml-latest-small - это на всякий случай, если будете перезапускать контейнеры
hdfs dfs -put ml-latest-small .
Приложить скрин spark-ui с выполненной job-ой (можно приложить прям в markdown ячейки, можно положить в той же папке, что и ноутбук). Написать, сколько было выполнено стейджей и тасок. (10 баллов)
* В качестве результата приложите ноутбук с названием hw_spark в папке notebooks и кодом созданной спарк-сессии, скринами, и прочитанными датасетами

In [1]:
# Чтобы не было в винде ошибки /opt/hadoop/etc/hadoop/hadoop-env.sh: line 127: $'\r': command not found
# Запустить один раз
!sed -i 's/\r$//' /opt/hadoop/etc/hadoop/hadoop-env.sh

Установка и импорты библиотек

In [2]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

Создание сессии и загрузка датасетов

In [4]:
conf = SparkConf()\
    .set("spark.executor.instances", "2")\
    .set("spark.executor.cores", "1")\
    .set("spark.executor.memory", "1g")

spark = SparkSession.builder.config(conf=conf).master(master="yarn").appName("Pribludenko_spark").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/14 10:44:36 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [5]:
!hdfs dfs -put notebooks/ml-latest-small

put: `ml-latest-small/links.csv': File exists
put: `ml-latest-small/movies.csv': File exists
put: `ml-latest-small/ratings.csv': File exists
put: `ml-latest-small/README.txt': File exists
put: `ml-latest-small/tags.csv': File exists


Чтение датасетов

In [6]:
from pyspark.sql.types import *

In [7]:
ratings_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", LongType()),
])

ratings_df = spark\
    .read\
    .format("csv")\
    .option("header", "True")\
    .schema(ratings_schema)\
    .load("ml-latest-small/ratings.csv")

print('Кол-во строк',ratings_df.count())

[Stage 2:>                                                          (0 + 1) / 1]

Кол-во строк 100836


                                                                                

In [8]:
tags_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("tag", StringType()),
    StructField("timestamp", LongType()),
])

tags_df = spark\
    .read\
    .format("csv")\
    .option("header", "True")\
    .schema(tags_schema)\
    .load("ml-latest-small/tags.csv")

print('Кол-во строк', tags_df.count())

Кол-во строк 3683


---

# 2. Работа с данными (30 баллов в сумме)


* Посчитать количество уникальных фильмов и уникальных юзеров в таблице “ratings”. (5 баллов)
* Посчитать, сколько было поставлено оценок >= 4.0. (5 баллов)
* Вывести топ100 фильмов с самым высоким рейтингом. (6 баллов)
* Посчитать разницу во времени в секундах между временем тегирования пользователя данного фильма и временем, когда пользователь поставил оценку фильму. В качестве ответа выведете среднюю дельту по времени. (7 баллов)
* Посчитать среднюю оценку от каждого пользователя, в качестве ответа выведете среднее от всех усредненных оценок всех пользователей. (7 баллов)
* Результаты должны быть в том же ноутбуке из прошлого пункта и написаны на pyspark-методах без toPandas.

## Посчитать количество уникальных фильмов и уникальных юзеров в таблице “ratings”

In [9]:
from pyspark.sql.functions import countDistinct

# Фильмы
unique_movies = ratings_df.select(countDistinct("movieId")).alias("unique_movies")
unique_movies.show()

# Пользователи
unique_users = ratings_df.select(countDistinct("userId")).alias("unique_users")
unique_users.show()

                                                                                

+-----------------------+
|count(DISTINCT movieId)|
+-----------------------+
|                   9724|
+-----------------------+

+----------------------+
|count(DISTINCT userId)|
+----------------------+
|                   610|
+----------------------+



## Посчитать, сколько было поставлено оценок >= 4.0

In [10]:
highly_rated_movies = ratings_df.filter(ratings_df["rating"] >= 4.0).count()
print("Кол-во оценок >= 4.0:", highly_rated_movies)

Кол-во оценок >= 4.0: 48580


## Вывести топ100 фильмов с самым высоким рейтингом

In [11]:
from pyspark.sql.functions import avg, col

top_100_movies = ratings_df.groupBy("movieId").agg(avg("rating").alias("average_rating"))
top_100_movies = top_100_movies.orderBy(col("average_rating").desc()).limit(100)
top_100_movies.show()

[Stage 21:>                                                         (0 + 1) / 1]

+-------+--------------+
|movieId|average_rating|
+-------+--------------+
|  26350|           5.0|
|   3795|           5.0|
|  25887|           5.0|
| 157775|           5.0|
|    633|           5.0|
|  33138|           5.0|
|  67618|           5.0|
|    876|           5.0|
|    496|           5.0|
|  27373|           5.0|
| 113829|           5.0|
|  53578|           5.0|
| 152711|           5.0|
| 118894|           5.0|
|     53|           5.0|
| 160644|           5.0|
|    148|           5.0|
|   8911|           5.0|
| 147300|           5.0|
|  84273|           5.0|
+-------+--------------+
only showing top 20 rows



                                                                                

## Посчитать разницу во времени в секундах

In [12]:
from pyspark.sql.functions import abs

tag_and_rating = tags_df.join(ratings_df, ["userId", "movieId"])

time_diff = tag_and_rating.select(abs(tags_df['timestamp'] - ratings_df['timestamp']).alias("time_diff"))
avg_time_diff = time_diff.select(avg("time_diff"))
avg_time_diff.show()

+--------------------+
|      avg(time_diff)|
+--------------------+
|2.9203715568469506E7|
+--------------------+



## Посчитать среднюю оценку от каждого пользователя

In [13]:
user_avg_rating = ratings_df.groupBy("userId").agg(avg("rating").alias("avg_rating"))

overall_avg_rating = user_avg_rating.select(avg("avg_rating"))
overall_avg_rating.show()

+------------------+
|   avg(avg_rating)|
+------------------+
|3.6572223377474016|
+------------------+



---

# 3. UDF (25 баллов в сумме)


* Обучите модель предсказания оценок по тегам с помощью TfidfVectorizer и SGDRegressor из модуля scikit-learn - тут уже можно сконвертировать два датасета в pandas через .toPandas
    * сначала  TfidfVectorizer обучаете на колонке “tag”
    * получаете численные признаки transform-ом от tfidf на той же колонке “tag”
    * обучаете SGDRegressor на новых численных признаках от  TfidfVectorizer-а с лейблом “rating”
* Напишите UDF, которая делает предсказание рейтинга по столбцу “tag”
    * сначала transform от TfidfVectorizer
    * затем predict от SGDRegressor на полученных признаках из 1 этапа
* Примените UDF к spar-dataframe-у и убедитесь, что udf работает (можно вызвать какой нибудь action, например show(50)). Приложите скрин дага вычислений этой джобы в spark-ui. (15 баллов)
* Напишите, чему равен корень суммы квадратов разностей (RMSE) между предсказанным и истинным значением рейтинга (напишите это на pyspark-е). *Напишите, сколько было выполнено стейджей и тасок в рамках джобы. (10 баллов)

In [14]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDRegressor
from sklearn.metrics import mean_squared_error
import numpy as np

# Конвертация в пандас дф
tags_pd = tags_df.toPandas()
ratings_pd = ratings_df.toPandas()

# TfidfVectorizer
tfidf = TfidfVectorizer()
tfidf_matrix = tfidf.fit_transform(tags_pd['tag'])

# Подготовка данных SGDRegressor
X = tfidf_matrix
y = tags_pd.merge(ratings_pd.groupby(['movieId'])['rating'].mean(), on='movieId', how='left')['rating']
y = y.fillna(y.mean())

# Обучение SGDRegressor
sgd = SGDRegressor()
sgd.fit(X, y)

In [15]:
from pyspark.sql import functions as F

def predict_tag_rating(tag):
    # Трансофрмация и предикт
    tfidf_features = tfidf.transform([tag])
    prediction = sgd.predict(tfidf_features)[0]
    return float(prediction)

# UDF функция
predict_rating_udf = F.udf(predict_tag_rating, FloatType())

# Применение UDF функции
tags_df_with_predictions = tags_df.withColumn("predicted_rating", predict_rating_udf(F.col("tag")))
tags_df_with_predictions.show(50)

[Stage 36:>                                                         (0 + 1) / 1]

+------+-------+--------------------+----------+----------------+
|userId|movieId|                 tag| timestamp|predicted_rating|
+------+-------+--------------------+----------+----------------+
|     2|  60756|               funny|1445714994|        3.990981|
|     2|  60756|     Highly quotable|1445714996|       3.3995721|
|     2|  60756|        will ferrell|1445714992|       3.6068056|
|     2|  89774|        Boxing story|1445715207|       3.9034495|
|     2|  89774|                 MMA|1445715200|        3.068867|
|     2|  89774|           Tom Hardy|1445715205|       3.4618847|
|     2| 106782|               drugs|1445715054|        3.807145|
|     2| 106782|   Leonardo DiCaprio|1445715051|       3.8281453|
|     2| 106782|     Martin Scorsese|1445715056|       3.4350429|
|     7|  48516|        way too long|1169687325|       3.4251742|
|    18|    431|           Al Pacino|1462138765|       3.4118843|
|    18|    431|            gangster|1462138749|       3.2631786|
|    18|  

                                                                                

In [16]:
from pyspark.sql.functions import sqrt

# джойним таблички
df_with_true_ratings = tags_df_with_predictions.join(ratings_df, ["userId", "movieId"], "inner")

# Считаем RMSE
rmse = df_with_true_ratings.select(sqrt(avg((col("rating") - col("predicted_rating"))**2)).alias("rmse"))
rmse.show()

                                                                                

+------------------+
|              rmse|
+------------------+
|1.0233588979711892|
+------------------+

