**1. Setup**

In [None]:
# Start Spark
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions
from pyspark import SparkConf
from pyspark.context import SparkContext

spark = SparkSession.builder \
    .appName("Project") \
    .master("spark://10.10.28.60:7077") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "2g") \
    .config("spark.local.dir", "/tmp/spark-temp") \
    .getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
    # .config("spark.driver.memory", "2g") \
    # .config("spark.executor.memory", "2g") \
    
print(spark.version)

In [None]:
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml.feature import StopWordsRemover, VectorAssembler
from pyspark.ml.feature import Word2Vec, Word2VecModel
from pyspark.ml.feature import IDF
from operator import add
from pyspark.ml import Pipeline, PipelineModel


from pyspark.sql.functions import *

from pyspark.sql.types import *

import pandas as pd

**2. Data Preparation**




In [None]:
#Read data
movies = spark.read.parquet('hdfs://master5:9000/user/dis/movielens/movies.parquet')
ratings = spark.read.parquet('hdfs://master5:9000/user/dis/movielens/ratings.parquet')
tags = spark.read.parquet('hdfs://master5:9000/user/dis/movielens/tags.parquet')

In [None]:
#Create a temporary view for to access data using SQL-like queries
df_movies = movies
df_ratings = ratings
df_tags = tags
df_movies.createOrReplaceTempView("movies")
df_ratings.createOrReplaceTempView("ratings")
df_tags.createOrReplaceTempView("tags")

In [None]:
#Load the tags data from the database and preprocessing tags data by removing space and concatenating them
tags_text = spark.sql("SELECT movieId, CONCAT(REPLACE(TRIM(tag), ' ', ''),' ') as tag FROM tags")

#Group by tag by movieId and sort
tags_text_rdd = tags_text.rdd
tags_by_movie_rdd = tags_text_rdd.map(tuple).reduceByKey(add)
tags_by_movie_df = spark.createDataFrame(tags_by_movie_rdd).orderBy("_1", ascending = True)

tags_by_movie_df = tags_by_movie_df  \
                            .withColumnRenamed('_1', 'movie_id') \
                            .withColumnRenamed('_2', 'tag')


In [None]:
model_path = 'hdfs://master5:9000/user/dis/output-3'
output_path = 'hdfs://master5:9000/user/dis/output-11'

In [None]:
#Create a pipeline for calculating the vector word
regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'tag', outputCol = 'token')
stopWordsRemover = StopWordsRemover(inputCol = 'token', outputCol = 'nostopwrd')
countVectorizer = CountVectorizer(inputCol="nostopwrd", outputCol="rawFeature")
iDF = IDF(inputCol="rawFeature", outputCol="idf_vec")
word2Vec = Word2Vec(vectorSize = 50, minCount = 5, inputCol = 'nostopwrd', outputCol = 'word_vec', seed=123)
vectorAssembler = VectorAssembler(inputCols=['idf_vec', 'word_vec'], outputCol='comb_vec')
pipeline = Pipeline(stages=[regexTokenizer, stopWordsRemover, countVectorizer, iDF, word2Vec, vectorAssembler])

#Fit the pipeline to the tags data
pipeline_mdl = pipeline.fit(tags_by_movie_df)

#save the pipeline model
pipeline_mdl.write().overwrite().save(model_path + 'pipe_txt')

In [None]:
#Load the previous saved pipeline model
pipeline_mdl = PipelineModel.load(model_path + 'pipe_txt')

#Transform the tags data using the pre-trained pipeline
tags_by_movie_trf_df = pipeline_mdl.transform(tags_by_movie_df)

#Save the dataframe to parquet for loading without text transformation
movieId_vecs= tags_by_movie_trf_df.select('movie_id', 'word_vec')
movieId_vecs.write.mode('overwrite').parquet(output_path + 'movieId_vecs.parquet')

In [None]:
import numpy as np

def CosineSim(vec1, vec2):
    """
    Calculate the cosine similarity between two vectors.
    
    Args:
    vec1 (numpy.ndarray): First vector.
    vec2 (numpy.ndarray): Second vector.
    
    Returns:
    float: Cosine similarity between the two vectors.
    """
    dot_product = np.dot(vec1, vec2)
    norm_vec1 = np.linalg.norm(vec1)
    norm_vec2 = np.linalg.norm(vec2)
    if norm_vec1 == 0 or norm_vec2 == 0:
        return 0

    return dot_product / (norm_vec1 * norm_vec2)

In [None]:
#Read data from a Parquet file
movieId_vecs = spark.read.parquet(output_path + 'movieId_vecs.parquet')

all_movieId_vecs = movieId_vecs.select('movie_id', 'word_vec').rdd.map(lambda x: (x[0], x[1])).collect()

In [None]:
import time
def getSimilarMovies(m_id, sim_mos_limit=5):
    """
    This function finds the most similar movies for a given set of input movie IDs.
    
    Args:
        m_ids (list): A list of movie IDs for which to find similar movies.
        sim_mos_limit (int, optional): The maximum number of similar movies to return for each input movie. Defaults to 5.
    
    Returns:
        pyspark.sql.DataFrame: A DataFrame containing the similar movies, their similarity scores, and the input movie ID.
    """
    schema = StructType([
                            StructField("movie_id", IntegerType(), True)
                            ,StructField("score", IntegerType(), True)
                            ,StructField("input_movie_id", StringType(), True)
                        ])

    similar_movies_df = spark.createDataFrame([], schema)
    print(m_id)
    m_id = int(m_id)
    input_vec = movieId_vecs.select('word_vec')\
                .filter(movieId_vecs['movie_id'] == m_id)\
                .collect()[0][0]
    start_time = time.time()
    similar_movie_rdd = sc.parallelize((i[0], float(CosineSim(input_vec, i[1]))) for i in all_movieId_vecs)
    print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))
    similar_movie_df = spark.createDataFrame(similar_movie_rdd) \
            .withColumnRenamed('_1', 'movie_id') \
            .withColumnRenamed('_2', 'score') \
            .orderBy("score", ascending = False)

    similar_movie_df = similar_movie_df.filter(col("movie_id") != m_id).limit(sim_mos_limit)
    similar_movie_df = similar_movie_df.withColumn('input_movie_id', lit(m_id))

    similar_movies_df = similar_movies_df \
                                .union(similar_movie_df)

    return similar_movies_df

In [None]:
def getMovieDetails(in_mos):
    """
    This function retrieves additional details for a set of input movies.
    
    Args:
        in_mos (pyspark.sql.DataFrame): A DataFrame containing movie IDs.
    
    Returns:
        pyspark.sql.DataFrame: A DataFrame containing the input movie IDs, movie titles, and movie genres.
    """
    a = in_mos.alias("a")
    b = df_movies.alias("b")

    #Join the movie DataFrame to get titles and genres to the correspond movieId
    return a.join(b, col("a.movie_id") == col("b.movieId"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.title'),col('b.genres')])


In [None]:
#Get n movies in movieId_vecs
number_movies = movieId_vecs.select("movie_id").limit(5).collect()
for row in number_movies:
    movie_id = row.movie_id
    print(movie_id)

In [None]:
mids = []
for row in number_movies:
    mids.append(str(row.movie_id))
print(mids)
#Get 5 movies recommendation for n movies
for i in range(0,len(mids)):
    print('\ninput movies details:')
    df_movies.select('movieId', 'title', 'genres') \
        .filter(df_movies.movieId == mids[i]).show(truncate=False)
    try:
        sims = getMovieDetails(getSimilarMovies(mids[i]))
        print(f'Top 5 similar movies for {mids[i]} each input movies are:"')
        display(sims.select('input_movie_id', 'movie_id', 'title', 'score').toPandas())
    except IndexError as e:
        print(f"Error processing movie ID {mids[i]}: {e}")

In [None]:
def getContentRecoms(u_id, sim_mos_limit=5):
    """
    Generates content-based movie recommendations for a given user.

    Args:
        u_id (str): The ID of the user to generate recommendations for.
        sim_mos_limit (int): The maximum number of similar movies to retrieve for each of the user's reviewed movies.

    Returns:
        pyspark.sql.DataFrame: A DataFrame containing the movie IDs, titles, and genres of the recommended movies.
    """
    
    # select movies having rating >= 3
    query = """
    SELECT distinct movieId as movie_id FROM ratings
    where rating >= 3.0
    and userId = "{}"
    """.format(u_id)

    usr_rev_mos = sqlContext.sql(query)

    # from these get sample of 5 movies
    usr_rev_mos = usr_rev_mos.sample(False, 0.5).limit(5)

    usr_rev_mos_det = getMovieDetails(usr_rev_mos)

    # show the sample details
    print('\nMovies previously reviewed by user {}'.format(u_id))
    usr_rev_mos_det.select(['movie_id', 'title', 'genres']).show(truncate = False)

    mos_list = [i.movie_id for i in usr_rev_mos.collect()]

    # get movies similar to a list
    sim_mos_dfs = []
    for i in mos_list:
     sim_mos_df = getSimilarMovies(i, sim_mos_limit)
     sim_mos_dfs.append(sim_mos_df)

     # Change to a DataFrame  
    sim_mos_df = sim_mos_dfs[0]
    for i in range(1, len(sim_mos_dfs)):
     sim_mos_df = sim_mos_df.union(sim_mos_dfs[i])

    # filter out those have been reviewd before by the user
    a = sim_mos_df.alias("a")
    b = usr_rev_mos.alias("b")
    c = a.join(b, col("a.movie_id") == col("b.movie_id"), 'left_outer') \
         .where(col("b.movie_id").isNull()) \
         .select([col('a.movie_id'),col('a.score')]).orderBy("a.score", ascending = False)

    x = c.limit(sim_mos_limit)

    return getMovieDetails(x)

In [None]:
getContentRecoms(3).toPandas()