Task 1

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import *

In [2]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


In [3]:
conf = SparkConf().set("spark.executor.instances", "2").set("spark.executor.cores", "1").set("spark.executor.memory", "1g")
spark = SparkSession.builder.appName("Zhuravev_spark").config(conf=conf).master(master="yarn").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/15 16:43:01 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [11]:
!hdfs dfs -rm -r ml-latest-small
!hdfs dfs -put ml-latest-small

Deleted ml-latest-small


In [12]:
ratings_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", LongType()),
])
spark.read.format("csv").option("header", "True").schema(ratings_schema).load("ml-latest-small/ratings.csv").count()

                                                                                

100836

In [14]:
tags_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("tag", StringType()),
    StructField("timestamp", LongType()),
])
spark.read.format("csv").option("header", "True").schema(tags_schema).load("ml-latest-small/tags.csv").count()

3683

Task 2

In [15]:
ratings = spark.read.format("csv").option("header", "True").schema(ratings_schema).load("ml-latest-small/ratings.csv")
tags = spark.read.format("csv").option("header", "True").schema(tags_schema).load("ml-latest-small/tags.csv")

In [16]:
unique_users = ratings.select("userId").distinct().count()
unique_movies = ratings.select("movieId").distinct().count()
print("Unique users:", unique_users)
print("Unique movies:", unique_movies)

                                                                                

Unique users: 610
Unique movies: 9724


In [17]:
num_high_ratings = ratings.filter(ratings["rating"] >= 4.0).count()
print(f"Number of ratings >= 4.0: {num_high_ratings}")

[Stage 18:>                                                         (0 + 1) / 1]

Number of ratings >= 4.0: 48580


                                                                                

In [18]:
print("Top 100 movies by average rating:")
top_100_movies = ratings.groupBy("movieId").agg(f.avg("rating").alias("average_rating"))\
    .orderBy(f.desc("average_rating"), f.asc("movieId")).show(100)

Top 100 movies by average rating:
+-------+--------------+
|movieId|average_rating|
+-------+--------------+
|     53|           5.0|
|     99|           5.0|
|    148|           5.0|
|    467|           5.0|
|    495|           5.0|
|    496|           5.0|
|    626|           5.0|
|    633|           5.0|
|    876|           5.0|
|   1140|           5.0|
|   1151|           5.0|
|   1310|           5.0|
|   1349|           5.0|
|   1631|           5.0|
|   1759|           5.0|
|   2075|           5.0|
|   2196|           5.0|
|   2512|           5.0|
|   2824|           5.0|
|   2969|           5.0|
|   2972|           5.0|
|   3073|           5.0|
|   3086|           5.0|
|   3096|           5.0|
|   3303|           5.0|
|   3473|           5.0|
|   3496|           5.0|
|   3531|           5.0|
|   3567|           5.0|
|   3637|           5.0|
|   3678|           5.0|
|   3687|           5.0|
|   3792|           5.0|
|   3795|           5.0|
|   3851|           5.0|
|   3939|       

In [25]:
print("Average time delta:")
tags_with_ratings = tags.alias("tags").join(ratings.alias("ratings"), on=['userId', 'movieId'])
time_diff = tags_with_ratings.withColumn("time_diff", (f.col("tags.timestamp") - f.col("ratings.timestamp")))
time_diff.select(f.mean('time_diff').alias('avg_delta')).show()

Average time delta:
+--------------------+
|           avg_delta|
+--------------------+
|2.6243727372266974E7|
+--------------------+



In [27]:
print("Average of all averaged ratings:")
avg_ratings_per_user = ratings.groupBy("userId").agg(f.mean('rating').alias('avg_rating'))
overall_avg_rating = avg_ratings_per_user.select(f.mean('avg_rating').alias('avg_of_avg_ratings')).show()

Average of all averaged ratings:
+------------------+
|avg_of_avg_ratings|
+------------------+
|3.6572223377474016|
+------------------+



Task 3

In [28]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDRegressor
from pyspark.ml.linalg import Vectors, VectorUDT

Train TfidfVectorizer and  SGDRegressor

In [68]:
tags_with_ratings = tags.alias("tags").join(ratings.alias("ratings"), on=['userId', 'movieId'])
tags_with_ratings = tags_with_ratings.groupBy("tag").agg(f.mean('rating').alias('avg_rating'))
tags_with_ratings_pd = tags_with_ratings.toPandas()

tfidf_vect = TfidfVectorizer()
tags_features = tfidf_vect.fit_transform(tags_with_ratings_pd['tag'])

sgd_regressor = SGDRegressor()
sgd_regressor.fit(tags_features, tags_with_ratings_pd['avg_rating'])

Create UDF, predicting rating by tag

In [73]:
def predict_rating(tag):
    transformed_tag = tfidf_vect.transform([tag])
    predicted_rating = sgd_regressor.predict(transformed_tag)
    return float(predicted_rating[0])

predict_rating_udf = udf(predict_rating, FloatType())

Apply UDF to tags, then compare predicted rating with real

In [74]:
tags_with_predicted_ratings = tags_with_ratings.withColumn("predicted_rating", predict_rating_udf(tags_with_ratings['tag']))
tags_with_predicted_ratings.show(50)

+--------------------+------------------+----------------+
|                 tag|        avg_rating|predicted_rating|
+--------------------+------------------+----------------+
|               anime| 4.166666666666667|       3.5808225|
|                hope|               4.0|       3.5582614|
|wrongful imprison...|               5.0|       3.6914632|
|               1970s|3.8333333333333335|       3.5362763|
|              freaks|               4.5|       3.6246061|
|                 art|              3.75|       3.7475877|
|          creativity|               5.0|       3.6919618|
|  intelligent sci-fi|               4.5|       4.1977453|
|        Heartwarming|               3.5|       3.6027691|
|             lyrical|              3.25|       3.4592369|
|              ransom|               5.0|       3.6904726|
|               mafia| 4.666666666666667|       3.8126597|
|              sequel|3.7142857142857144|       3.6473424|
|          Emma Stone|               3.5|       3.869351

Calculate RMSE

In [85]:
ratings_with_predictions = tags_with_predicted_ratings.withColumn("squared_error", (f.col("avg_rating") - f.col("predicted_rating"))**2)
mse = ratings_with_predictions.groupBy().avg("squared_error")
errors = mse.withColumn("RMSE", f.sqrt("avg(squared_error)"))
errors.select("RMSE").show()

+------------------+
|              RMSE|
+------------------+
|0.7818799487962442|
+------------------+



                                                                                

Всего было 4 джобы и 4 стейджа (7 всего, 3 пропущено)