# HW02

### Part 1

In [1]:
!pip install pyspark
!pip install scikit-learn
!pip install pandas

[0m

In [2]:
import os

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as f
from pyspark.sql.types import *

import pandas as pd

from sklearn.pipeline import make_pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDRegressor
from sklearn.metrics import mean_squared_error

In [3]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


In [4]:
conf = SparkConf() \
    .setAppName("popov_spark") \
    .set("spark.executor.instances", "2") \
    .set("spark.executor.cores", "1") \
    .set("spark.executor.memory", "1g")

In [5]:
spark = SparkSession.builder \
    .config(conf=conf) \
    .master(master="yarn") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/26 01:37:03 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


![yarn1](./img/yarn1.png)
![yarn2](./img/yarn2.png)
![spark](./img/spark.png)

In [6]:
data_dir = 'ml-latest-small'
ratings_path = os.path.join(data_dir, 'ratings.csv')
tags_path = os.path.join(data_dir, 'tags.csv')

In [7]:
ratings_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", LongType()),
])

ratings_df = spark.read \
    .option("header", True) \
    .schema(ratings_schema) \
    .csv(ratings_path)
print(f"Ratings length: {ratings_df.count()}")

[Stage 0:>                                                          (0 + 1) / 1]

Ratings length: 100836


                                                                                

In [8]:
tags_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("tag", StringType()),
    StructField("timestamp", LongType()),
])

tags_df = spark.read \
    .option("header", True) \
    .schema(tags_schema)\
    .csv(tags_path)
print(f"Tags length: {tags_df.count()}")

Tags length: 3683


**For each dataset load:**
- 2 stages - csv load and count
- 6 tasks - 3 per stage
![stages1](./img/stages1.png)
![stages2](./img/stages2.png)

In [9]:
!hdfs dfs -rm -r ml-latest-small

Deleted ml-latest-small


In [10]:
!hdfs dfs -put ml-latest-small .

### Part 2

In [11]:
ratings_df.take(5)

[Row(userId=1, movieId=1, rating=4.0, timestamp=964982703),
 Row(userId=1, movieId=3, rating=4.0, timestamp=964981247),
 Row(userId=1, movieId=6, rating=4.0, timestamp=964982224),
 Row(userId=1, movieId=47, rating=5.0, timestamp=964983815),
 Row(userId=1, movieId=50, rating=5.0, timestamp=964982931)]

In [12]:
tags_df.take(5)

[Row(userId=2, movieId=60756, tag='funny', timestamp=1445714994),
 Row(userId=2, movieId=60756, tag='Highly quotable', timestamp=1445714996),
 Row(userId=2, movieId=60756, tag='will ferrell', timestamp=1445714992),
 Row(userId=2, movieId=89774, tag='Boxing story', timestamp=1445715207),
 Row(userId=2, movieId=89774, tag='MMA', timestamp=1445715200)]

- Посчитать количество уникальных фильмов и уникальных юзеров в таблице “ratings”

In [13]:
movies_count = ratings_df \
    .select("movieId") \
    .distinct() \
    .count()
print(f"Movies unique: {movies_count}")

Movies unique: 9724


In [14]:
user_id_count = ratings_df \
    .select("userId") \
    .distinct() \
    .count()
print(f"Users unique: {user_id_count}")

Users unique: 610


- Посчитать, сколько было поставлено оценок >= 4.0

In [15]:
good_rating_count = ratings_df \
    .where(f.col("rating") >= 4.0) \
    .count()
good_rating_count

48580

- Вывести топ100 фильмов с самым высоким рейтингом

In [16]:
top100_movies_ids = ratings_df \
    .groupBy("movieId") \
    .agg(f.mean("rating").alias("movieRating")) \
    .sort(f.desc("movieRating")) \
    .limit(100)

top100_movies = tags_df \
    .join(f.broadcast(top100_movies_ids), on=['movieId'], how="inner") \
    .select(f.col("tag").alias("movieName"), "movieRating") \
    .sort(f.desc("movieRating"))

top100_movies.show(5)

+------------+-----------+
|   movieName|movieRating|
+------------+-----------+
|  irreverent|        5.0|
|    Cambodia|        5.0|
|       crime|        5.0|
|human rights|        5.0|
|      murder|        5.0|
+------------+-----------+
only showing top 5 rows



- Посчитать разницу во времени в секундах между временем тегирования пользователя данного фильма и временем, когда пользователь поставил оценку фильму. В качестве ответа выведете среднюю дельту по времени.

In [17]:
delta = tags_df.alias('l') \
    .join(ratings_df.alias('r'), on=['userId', 'movieId'], how='inner') \
    .withColumn('delta', f.col('l.timestamp') - f.col('r.timestamp')) \
    .groupBy('userId', 'movieId') \
    .mean('delta') \
    .cache()

mean_delta = int(delta.agg(f.mean('avg(delta)')).first()['avg(avg(delta))'])
print(f'Mean delta: {mean_delta}')
delta.show(5)

Mean delta: 44536397
+------+-------+------------+
|userId|movieId|  avg(delta)|
+------+-------+------------+
|    62| 108190|        13.0|
|   474|    412|1.62008122E8|
|   474|    551|1.62708921E8|
|   474|   1348| 8.4353215E7|
|   474|   1513| 9.0488464E7|
+------+-------+------------+
only showing top 5 rows



- Посчитать среднюю оценку от каждого пользователя, в качестве ответа выведете среднее от всех усредненных оценок всех пользователей

In [18]:
mean = ratings_df \
    .groupBy("userId") \
    .mean("rating") \
    .agg(f.mean("avg(rating)")) \
    .first()["avg(avg(rating))"]

mean

3.6572223377474016

### Part 3

In [19]:
learning_df_spark = ratings_df \
    .join(tags_df, on=['movieId'], how='inner') \
    .select("tag", "rating") \
    .cache()

learning_df = learning_df_spark.toPandas()
learning_df.head(5)

Unnamed: 0,tag,rating
0,fun,4.0
1,pixar,4.0
2,pixar,4.0
3,old,4.0
4,moldy,4.0


In [20]:
len(learning_df)

233213

In [21]:
model = make_pipeline(TfidfVectorizer(), SGDRegressor(random_state=42))

In [22]:
model.fit(learning_df['tag'], learning_df['rating'])

In [23]:
model.predict(["jqehioqhro", "Horror movie"])

array([3.3679659, 4.0436624])

In [24]:
@f.pandas_udf(DoubleType())
def tags_rating(tags: pd.Series) -> pd.Series:
    return pd.Series(model.predict(tags))

In [25]:
learning_df_spark = learning_df_spark \
    .withColumn("tags_rating", tags_rating("tag"))

learning_df_spark.show(10)

+-------------+------+------------------+
|          tag|rating|       tags_rating|
+-------------+------+------------------+
|          fun|   4.0|  4.08019481944535|
|        pixar|   4.0|3.9155194596438916|
|        pixar|   4.0|3.9155194596438916|
|          old|   4.0|3.4093771019280896|
|        moldy|   4.0| 3.410239597025357|
|serial killer|   5.0|3.8375621859910116|
| twist ending|   5.0| 4.097225346883247|
|      mystery|   5.0| 3.988479121686566|
|        heist|   5.0|4.0552802333775135|
| twist ending|   5.0| 4.097225346883247|
+-------------+------+------------------+
only showing top 10 rows



                                                                                

![tags-rating](./img/tags-rating.png)

In [26]:
rmse = learning_df_spark \
    .agg((f.mean((f.col("tags_rating") - f.col("rating")) ** 2) ** 0.5)) \
    .first()[0]
rmse

                                                                                

0.9423286411653277

**1 stage, 3 tasks**
![metric](./img/metric.png)