# Setup

In [1]:
pip install kaggle

Note: you may need to restart the kernel to use updated packages.


In [1]:
import pyspark
from pyspark.sql import SparkSession

# Create Spark Session with Hive enabled
spark = SparkSession\
        .builder\
        .master("local")\
        .appName("jupyter-spark")\
        .config("hive.metastore.uris", "thrift://hive-metastore:9083")\
        .config("spark.driver.memory", "6g")\
        .config("spark.executor.memory", "4g")\
        .config("spark.executor.cores", "8")\
        .config("spark.driver.maxResultSize", "8g")\
        .config("spark.sql.warehouse.dir","/users/hive/warehouse")\
        .config("spark.hadoop.fs.s3a.fast.upload", True) \
        .config("spark.hadoop.fs.s3a.path.style.access", True) \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .enableHiveSupport()\
        .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")

23/06/26 23:03:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# import opendatasets as od
# od.download("https://www.kaggle.com/datasets/grouplens/movielens-20m-dataset")

In [3]:
!mkdir ~/.kaggle

In [5]:
!ls ~/.kaggle

kaggle.json


In [22]:
import os
import kaggle
kaggle.api.dataset_download_files('grouplens/movielens-20m-dataset', path='movielens', unzip=True)

# Reading Data Frames

In [2]:
from pyspark.sql.functions import split, col, explode, to_timestamp, concat_ws

from pyspark.sql import functions as f
movie = spark.read.option("inferSchema", "true").option("header","true").csv("./movielens/movie.csv")\
        .select("movieId", "title", split(col("genres"), "\\|").alias("genres"))

rating = spark.read.option("inferSchema", "true").option("header","true").csv("./movielens/rating.csv")\
         .withColumn("timestamp",to_timestamp("timestamp"))

link = spark.read.option("inferSchema", "true").option("header","true").csv("./movielens/link.csv")
tag = spark.read.option("inferSchema", "true").option("header","true").csv("./movielens/tag.csv")

genome_tags = spark.read.option("inferSchema", "true").option("header","true").csv("./movielens/genome_tags.csv")
genome_score = spark.read.option("inferSchema", "true").option("header","true").csv("./movielens/genome_scores.csv")

                                                                                

## Creating the dataframe for algorithm

In [3]:
movie_rating = rating.groupBy("movieId").agg(f.mean("rating").alias("avg_rating"), f.count("movieId").alias("number_of_votes"))                

In [4]:
relevance_scores = genome_score.join(genome_tags, genome_score["tagId"] == genome_tags["tagId"])\
                    .select(genome_score.movieId,genome_score.tagId,genome_score.relevance, genome_tags.tag)

In [5]:
# recommendation_df = movie.join(movie_rating, movie["movieId"] == movie_rating["movieId"])\
#                     .join(tag, movie["movieId"] == tag["movieId"])\
#                     .select(movie.movieId, movie.title,explode(movie.genres).alias("genres"), movie_rating.avg_rating,
#                             movie_rating.number_of_votes, tag.tag)\
#                     .groupBy("movieId").agg(f.collect_list("tag").alias("tags"), 
#                                          f.collect_set("genres").alias("genres"))\
#                     .select("movieId", concat_ws(" ", "tags").alias("tags"), concat_ws(" ","genres").alias("genres"))

In [5]:
recommendation_df = movie.join(movie_rating, movie["movieId"] == movie_rating["movieId"])\
                    .join(tag, movie["movieId"] == tag["movieId"])\
                    .select(movie.movieId, movie.title,explode(movie.genres).alias("genres"), movie_rating.avg_rating,
                            movie_rating.number_of_votes, tag.tag)\
                    .groupBy("movieId").agg(f.collect_set("tag").alias("tags"), 
                                         f.collect_set("genres").alias("genres"))\
                    .select("movieId", concat_ws(" ", "tags","genres").alias("tags_genres"))

# Feature Engineering

In [6]:
import requests
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, VectorAssembler, IDF, Normalizer

stop_words = requests.get('http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words').text.split()

tags_genres_tokenizer = RegexTokenizer().setInputCol('tags_genres').setOutputCol('token_tags_genres')
remove_stop_words = StopWordsRemover().setStopWords(stop_words)\
                        .setCaseSensitive(False).setInputCol("token_tags_genres").setOutputCol("filtered_tags_genres")

count_vectorizer = CountVectorizer().setInputCol("filtered_tags_genres").setOutputCol("tf_tags_genres")

idf = IDF().setInputCol('tf_tags_genres').setOutputCol('tfidf_tags_genres')

normalizer = Normalizer(inputCol="tfidf_tags_genres", outputCol="features")

In [7]:
df_training, df_validation, df_test = recommendation_df.randomSplit([0.5,0.4,0.1],0)
fe_pipe = Pipeline(stages = [tags_genres_tokenizer,remove_stop_words, count_vectorizer, idf, normalizer])

In [8]:
# from pyspark.ml.functions import vector_to_array
# features = fe_pipe.fit(recommendation_df).transform(recommendation_df).select("features").rdd.flatMap(lambda x: x).collect()

In [8]:
from pyspark.ml.functions import vector_to_array
fe_pipe_model = fe_pipe.fit(recommendation_df)
transformed_df = fe_pipe_model.transform(recommendation_df)
features_df = transformed_df.select(vector_to_array("features").alias("features_array"))
# features_list = features_df.collect()[0]["features_array"]

                                                                                

In [9]:
x = features_df.toPandas()

ERROR:root:Exception while sending command.                                     
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:44569)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gat

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:44569)

In [23]:
x["features_array"]

0      [0.08650232265396901, 0.0, 0.0, 0.0, 0.5380485...
1      [0.013395785962984625, 0.0655861793938, 0.0340...
2      [0.015610078836633899, 0.025475808431094708, 0...
3      [0.0, 0.09273260814310545, 0.0, 0.0, 0.0, 0.0,...
4      [0.014276575452509412, 0.0, 0.1449486314481251...
                             ...                        
986    [0.04227165128924075, 0.0, 0.21458990721583845...
987    [0.06374009929893923, 0.0, 0.0, 0.0, 0.1982332...
988    [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
989    [0.0, 0.08269390259595016, 0.0, 0.0, 0.0, 0.0,...
990    [0.03025540754761347, 0.0, 0.0, 0.0, 0.0, 0.07...
Name: features_array, Length: 991, dtype: object

In [None]:
from pyspark.sql.functions import array, lit
import numpy as np
y = []
for i in x:
    y.append(list(i))
model_output = np.array(y)

In [None]:
model_output.shape

In [34]:
from sklearn.metrics.pairwise import cosine_similarity
sim = cosine_similarity(model_output)

In [49]:
recommendation_list = sorted(list(enumerate(sim[5])),reverse = True, key = lambda v:v[1])

[(5, 1.0000000000000002),
 (1698, 0.4537647173957099),
 (1285, 0.34108921796376507),
 (546, 0.28451342097182647),
 (824, 0.2676947522803445),
 (1083, 0.24910089058120818),
 (904, 0.24777091118105965),
 (791, 0.2437792622817688),
 (521, 0.2276324310676479),
 (1278, 0.22644262988222882),
 (1043, 0.22464269535604214),
 (392, 0.2239162944344881),
 (399, 0.21268778449662698),
 (1249, 0.21164104708447243),
 (888, 0.19638268884163665),
 (672, 0.19564746858109447),
 (883, 0.19445947754029344),
 (941, 0.1852267857198265),
 (1137, 0.18006418397583832),
 (887, 0.1774106885074716),
 (1597, 0.17486876034798624),
 (714, 0.17322963069692127),
 (1656, 0.16647282258984664),
 (934, 0.1657671591869671),
 (1060, 0.16274346837087003),
 (844, 0.16182381344398242),
 (812, 0.15974516171430095),
 (1432, 0.15938953979877682),
 (1533, 0.15491332193382984),
 (348, 0.15453107354330886),
 (1499, 0.1543175169303066),
 (1733, 0.14898723348326018),
 (837, 0.1460087390479508),
 (127, 0.14489442695830282),
 (1066, 0.142

In [47]:
movie.select("*").where(col("movieId") == 5).show(3, truncate = False)
movie.select("*").where(col("movieId") == 1698).show(3, truncate = False)

+-------+----------------------------------+--------+
|movieId|title                             |genres  |
+-------+----------------------------------+--------+
|5      |Father of the Bride Part II (1995)|[Comedy]|
+-------+----------------------------------+--------+

+-------+----------------+--------+
|movieId|title           |genres  |
+-------+----------------+--------+
|1698   |Boys, Les (1997)|[Comedy]|
+-------+----------------+--------+

