In [1]:
from pyspark.sql import SparkSession
import pandas as pd

In [2]:
from pyspark.sql.functions import col, sqrt
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import Normalizer
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
import pyspark.sql.functions as F

def cosine_similarity(matrix, movieId):
    target = matrix.filter(matrix.movieId == movieId).first()
    target_vector = Vectors.dense(target[1:])
    norm_target = Vectors.norm(target_vector, 2)
    def cos_sim(row):
            vector = Vectors.dense(row[1:])
            dot_product = target_vector.dot(vector)
            norm = Vectors.norm(vector, 2)
            similarity = float(dot_product / (norm_target * norm)) if norm != 0 else 0.0
            return (row.movieId, similarity)

    similarities = matrix.rdd.map(cos_sim)
    return similarities

In [3]:
spark = SparkSession.builder.appName("MovieAnalyzer").getOrCreate()

# Перед импортом файлов, они были предварительно залиты в hdfs с помощью команд:
./hdfs dfs -copyFromLocal /home/ubuntu/Desktop/ratings.csv /HW1_3_ratings
./hdfs dfs -copyFromLocal /home/ubuntu/Desktop/movies.csv /HW1_3_movies


In [4]:
# Загрузка данных рейтингов
ratings_df = spark.read.csv("/HW1_3_ratings", header=True, inferSchema=True)
ratings_df = ratings_df.select("userId", "movieId", "rating")

# Загрузка данных о фильмах
movies_df = spark.read.csv("/HW1_3_movies", header=True, inferSchema=True)
movies_df = movies_df.select("movieId", "title")
movies_rdd = movies_df.rdd.map(lambda row: (row.movieId, row.title))

In [5]:
movie_ratings = ratings_df.groupBy("movieId").pivot("userId").sum("rating").na.fill(0)

In [6]:
similarities_rdd = cosine_similarity(movie_ratings, 589)

In [7]:
joined_rdd = similarities_rdd.join(movies_rdd)

In [8]:
top_10_similar_movies_rdd = joined_rdd.filter(lambda x: x[0] != 589) \
                                     .sortBy(lambda x: x[1][0], ascending=False) \
                                     .map(lambda x: (x[0], x[1][1], x[1][0])) \
                                     .take(10)

In [9]:
data_to_display = pd.DataFrame(top_10_similar_movies_rdd, columns=['movieId', 'title', 'similarity'])
display(data_to_display)

Unnamed: 0,movieId,title,similarity
0,480,Jurassic Park (1993),0.719983
1,1240,"Terminator, The (1984)",0.695724
2,110,Braveheart (1995),0.659827
3,592,Batman (1989),0.645603
4,457,"Fugitive, The (1993)",0.637561
5,377,Speed (1994),0.630092
6,1196,Star Wars: Episode V - The Empire Strikes Back...,0.61853
7,380,True Lies (1994),0.611164
8,296,Pulp Fiction (1994),0.610284
9,356,Forrest Gump (1994),0.600886
