In [1]:
#######################################
###!@0 START INIT ENVIRONMENT
from google.colab import drive
drive.mount('/content/drive')
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!tar xf /content/drive/Shareddrives/DA231-2021-Aug-Public/spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

###!@0 END INIT ENVIRONMENT

Mounted at /content/drive


In [2]:
#######################################
###!@1 START OF PYSPARK INIT
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
spark = SparkSession.builder\
         .master("local")\
         .appName("Colab")\
         .config('spark.ui.port', '4050')\
         .getOrCreate()
spark
# Spark is ready to go within Colab!
###!@1 END OF PYSPARK INIT

In [3]:
#######################################
###!@2 START OF DEFINING INPUT FILES
genomeScoresFile = "/content/drive/Shareddrives/Project-Building-a-Movie-Catalog/ml-25m/genome-scores.csv"
genomeTagsFile = "/content/drive/Shareddrives/Project-Building-a-Movie-Catalog/ml-25m/genome-tags.csv"
moviesFile = "/content/drive/Shareddrives/Project-Building-a-Movie-Catalog/ml-25m/movies.csv"
ratingsFile = "/content/drive/Shareddrives/Project-Building-a-Movie-Catalog/ml-25m/ratings.csv"
tagsFile = "/content/drive/Shareddrives/Project-Building-a-Movie-Catalog/ml-25m/tags.csv"
###!@2 END OF DEFINING INPUT FILES

In [4]:
#######################################
###!@3 Initialising spark context
from pyspark import SparkContext


sc = SparkContext.getOrCreate()

##########################

In [5]:
#Importing all necessary packages
from pyspark.ml.clustering import LocalLDAModel
from pyspark.ml.feature import CountVectorizerModel, VectorAssembler, BucketedRandomProjectionLSHModel, IDFModel
from pyspark.ml.linalg import Vectors, DenseVector
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType, FloatType, StringType, ArrayType, StructType, StructField
from pyspark import StorageLevel
from pyspark.sql.functions import expr
from pyspark.sql.functions import lit
from pyspark.sql.functions import levenshtein 
from pyspark.sql.functions import col
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

from gensim.models import Phrases
from array import array
import time
import re
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.snowball import SnowballStemmer
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet
import nltk
import numpy as np
import warnings  
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


True

In [6]:
warnings.filterwarnings(action='ignore',category=UserWarning,module='gensim')  

In [7]:
#Loading data and pre-trained models
moviesDF = spark.read.option("header",True).option("inferSchema",True).csv(moviesFile)
ratingsDF = spark.read.option("header",True).option("inferSchema",True).csv(ratingsFile)
tagsDF = spark.read.option("header",True).option("inferSchema",True).csv(tagsFile)
genome_scores_df=spark.read.csv(genomeScoresFile,header='true')
genome_scores_df=genome_scores_df.orderBy(f.col("relevance").desc()).filter(f.col("relevance")>=0.7)
genome_tags_df=spark.read.csv(genomeTagsFile,header='true')
genome_tags_df=genome_tags_df.withColumnRenamed("tagId","gtagId")
genome_df = genome_scores_df.join(genome_tags_df,f.col("tagId")==f.col("gtagId")).drop("gtagId").select("tagId","movieId","tag")
lda_features_df=spark.read.parquet("/content/drive/Shared drives/Project-Building-a-Movie-Catalog/Clustering_by_tags/lda_features.parquet")
movies_df=spark.read.parquet("/content/drive/Shared drives/Project-Building-a-Movie-Catalog/Clustering_by_tags/movies.parquet")
cv_model = CountVectorizerModel.load("/content/drive/Shared drives/Project-Building-a-Movie-Catalog/Clustering_by_tags/cv_model")
tfidf_model = IDFModel.load("/content/drive/Shared drives/Project-Building-a-Movie-Catalog/Clustering_by_tags/tfidf_model")
lda_model=  LocalLDAModel.load("/content/drive/Shared drives/Project-Building-a-Movie-Catalog/Clustering_by_tags/lda_model")
lsh_model = BucketedRandomProjectionLSHModel.load("/content/drive/Shared drives/Project-Building-a-Movie-Catalog/Clustering_by_tags/lsh_model")

In [8]:
movies = movies_df.take(1)
movies_df.persist()
ratings = ratingsDF.take(1)
ratingsDF.persist()

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

In [9]:
"""
Calculate average rating of each movie.
Ignore movies which have less than 10 ratings.
"""

avgRating_temp = ratingsDF.groupBy("movieId").agg(f.avg("rating"), f.count("rating"))
avgRating = avgRating_temp.withColumn("countRating", avgRating_temp["count(rating)"]) \
.where(col("count(rating)") >= 10).withColumnRenamed("avg(rating)", "ratingAvg").select("movieId", "ratingAvg")


"""
Split movie name into name and year.
Drop movies which don't have the year in the name.
"""

movieNameDF = moviesDF.select("title", "movieId","genres")
movieNameJoinDF = movieNameDF.join(avgRating, "movieId") \
.withColumn("Year", f.element_at(f.split(f.col('title'), ' '), -1)) \
.withColumn("Year", f.expr("substring(Year, 2, 4)").cast(IntegerType())) \
.withColumn("title", f.expr("substring(title, 1, length(title)-7)")) \
.withColumn("genres", f.split(col("genres"), "\\|")) \
.filter(col("Year").isNotNull()).orderBy("ratingAvg", ascending=False)

In [10]:
#persisting dataframes
movieNameJoinDF.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[movieId: int, title: string, genres: array<string>, ratingAvg: double, Year: int]

In [11]:
#Initialising broadcast variables required for text pre-processing
stop_words = list(stopwords.words('english'))
stop_words = sc.broadcast(stop_words)
vocab = cv_model.vocabulary
lemmatizer = sc.broadcast(WordNetLemmatizer())
stemmer = sc.broadcast(SnowballStemmer("english"))
excluded_words=['original','jay','bob','pack','base','based','new','york','san','fransisco']
sim_words={'scifi':'sci-fi','sci':'sci-fi','romance':'romantic','violence':'violent'}
similar_words = sc.broadcast(sim_words)
excluded_tags = sc.broadcast(excluded_words)
vecAssembler = VectorAssembler(inputCols=["topicDistribution"], outputCol="values")

In [12]:
"""
Get a list of genres available in the entire dataset
"""
def getAsList(s):
  l=[]
  for i in s.collect():
    l.append(i.genre)
  return l

In [13]:
"""
Create Dictionary for genres
"""
def createGernerdictionary():
  df1 = movieNameJoinDF.filter((col('genres').isNotNull())).withColumn("genre", f.explode(col("genres")))
  df2 = df1.select("genre").distinct()
  gernerList = getAsList(df2)
  return gernerList

In [14]:
#build similar words dataframe
def get_semantically_sim(word):
  sim=list()
  syn = wordnet.synsets(word)
  if syn:
    for x in syn:
      for y in x.hypernyms():
        for z in y.hyponyms():
            sim.extend(z.lemma_names())
      for l in x.lemmas():
        sim.append(l.name())
  return list(set(sim))
word_mappings = []
for word in vocab:
  word_mappings.append((word,get_semantically_sim(word)))
word_synonyms_df = spark.createDataFrame(data=word_mappings, schema = ["word","synonyms"])

In [15]:
#created dataframe with available tag in genre list and it's synonyms
gernerList = createGernerdictionary()
word_mappings_gerner= []
for word in gernerList:
  #word_mappings_gerner.append((word,get_semantically_sim(word)))
  word_mappings_gerner.append((word,[word]))
gerner_synonyms_df = spark.createDataFrame(data=word_mappings_gerner, schema = ["word","synonyms"])
gerner_synonyms_df = gerner_synonyms_df.withColumn("synonyms", f.explode(col("synonyms")))
gerner_synonyms_df.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[word: string, synonyms: string]

In [16]:
def searchByTitle(inputString,yearMin,yearMax,limit):
  """
  Search for movies based on movie name
  """
  movieNameFullStringDF = movieNameJoinDF.where(col("Year").between(yearMin, yearMax)) \
  .where(f.upper(col("title")).contains(inputString.upper())) \
  .limit(limit)
  fullStringCount = movieNameFullStringDF.count()

  movieNamePartialStringDF = movieNameFullStringDF.limit(0)

  movieNamePartialStringDF = movieNameJoinDF.where(col("Year").between(yearMin, yearMax)) \
  .join(inputtags, f.upper(col('title')).contains(f.upper(col('value')))) \
  .groupBy("movieId","title","genres","ratingAvg","Year") \
  .count() \
  .orderBy("count","ratingAvg",ascending=False).limit(limit + fullStringCount) \
  .join(movieNameFullStringDF, "title","left_anti").select("count", "movieId","title","genres","ratingAvg", "Year") \
  .limit(limit - fullStringCount).orderBy("count","ratingAvg", ascending=False) \
  .select("movieId","title","genres","ratingAvg", "Year")

  titleSearchResult = movieNameFullStringDF.union(movieNamePartialStringDF)
  titleSearchResult.persist(StorageLevel.MEMORY_AND_DISK)
  titleSearchResult.select("title","genres","ratingAvg", "Year").show(truncate=False)

  if(titleSearchResult.count() > 0):
    return titleSearchResult.first()

In [17]:
def searchByGerner(input):
  """
  Search by genre
  """
  distictsearch = gerner_synonyms_df.join(inputtags,levenshtein(f.upper(gerner_synonyms_df.synonyms),f.upper(inputtags.value))<2).select("word").distinct()
  outDf = movieNameJoinDF.where(col("Year").between(yearMin, yearMax)) \
  .join(distictsearch, f.array_contains(col('genres'),col('word'))).groupBy("title","genres","ratingAvg","Year").count().orderBy("count","ratingAvg",ascending=False).limit(5)

  outDf.select("title","genres","ratingAvg","Year").show(truncate=False)

In [18]:
def text_preprocessing(text):
  text = re.sub(r'[^\w\s-]',' ',text)
  words = nltk.word_tokenize(text)
  phraser = bigram.value
  words = phraser[words]
  tokens=[]
  lemm = lemmatizer.value
  for word in words:
    if word not in stop_words.value and word not in excluded_tags.value and len(word)>2:
      if word in similar_words.value.keys():
        word = similar_words.value[word]
      tokens.append((lemm.lemmatize(word,"v")))
  return tokens

def remove_stop_words(text):
  """
  remove stop-words
  """
  return [word for word in text  if word not in stop_words.value]

def train_bigrams(genome_df):
  tagList = [x[0] for x in genome_df.select('tag').collect()]
  tags = [doc.split(" ") for doc in tagList]
  bigram = Phrases(tags, min_count=1, threshold=2)  
  return bigram

In [19]:
def query_preprocessing(text,movies_df):
  """
  Checks if the given query is a substring of a movie name, else 
  preprocess by finding similar words to the search in vocabulary.
  """
  words = text.split()
  tokens=[]
  for word in words:
    if word not in vocab:
      x = word_synonyms_df.filter(f.array_contains(word_synonyms_df.synonyms,word))
      if not x.rdd.isEmpty():
        tokens.append(x.first().word)
      else:
        word = word_synonyms_df.select('word',f.levenshtein(f.lit(word),f.col('word')).alias('score')).orderBy(f.col('score')).first().word
        tokens.append(word)
    else:
      tokens.append(word)
  return ' '.join(x for x in tokens)

In [20]:
def get_lda_features(cv_model,tfidf_model,lda_model,lda_features_df,movies_df,text,id=-1):
  """
  Get Lda features for the given text or movie id
  """
  if id!=-1:
    topics = lda_features_df.filter(f.col("cMovieId")==id)
    if not topics.rdd.isEmpty():
      return topics
  text=text.strip()
  search_tag=text
  if text:
    search_tag=''
    text=' '.join(x for x in text_preprocessing(text))
    search_tag = query_preprocessing(text,movies_df)
  search_tag = search_tag.strip() 
  if not search_tag: 
    search_tag = vocab[np.random.randint(len(vocab), size=1)[0]]
  search_tag=text_preprocessing(search_tag)
  tags_df = [(search_tag,0)]
  tag_df=spark.createDataFrame(data=tags_df, schema = ["tags","index"])
  cv_features_df = cv_model.transform(tag_df)
  tf_idf_df = tfidf_model.transform(cv_features_df)
  features = lda_model.transform(tf_idf_df)
  return features

def get_similar_movies(cv_model,tfidf_model,lda_model,lsh_model,lda_features_df,movies_df,text,id=-1,n=10):
  """
   Get Similar movies for the given text or movie id
  """
  features_df = get_lda_features(cv_model,tfidf_model,lda_model,lda_features_df,movies_df,text,id)
  features_df = vecAssembler.transform(features_df) 
  vm = (Vectors.dense(features_df.take(1)[0]['values']))
  return lsh_model.approxNearestNeighbors(movies_df, vm, n).orderBy(f.col('distCol'),f.col('rating').desc()).select("title","genres","rating")


In [21]:
def getSearchResult(inputString,userId,yearMin,yearMax):
  """
  Master function to get movies based on the title, genres and suggestion based on tag-clustering.

  """
  limit = 10
  if(yearMin is None):
    yearMin = 1800
  if(yearMax is None):
    yearMax = 2021
  start_time_title = time.time()


  print("Movies based on Title: ")
  firstMovieRow = searchByTitle(inputString,yearMin,yearMax,limit)
  print("--- %s seconds ---\n" % (time.time() - start_time_title))

  # print("ID of first movie is", firstMovieId)

  start_time_genre = time.time()
  print("Movies based on Genre: ")
  searchByGerner(inputString)
  print("--- %s seconds ---\n" % (time.time() - start_time_genre))

  print("Similar movies based on Tag Clustering: ")
  start_time_tagcluster = time.time()
  checkExactNameExists = movieNameJoinDF.where(col("title") == inputString)
  # checkExactNameExists.show()
  firstRow = checkExactNameExists.head(1)
  #print(firstRow)
  if(len(firstRow) > 0):
    similarMovies = get_similar_movies(cv_model,tfidf_model,lda_model,lsh_model,lda_features_df,movies_df,inputString,id=firstRow[0][0])
  elif(firstMovieRow is not None):
    similarMovies = get_similar_movies(cv_model,tfidf_model,lda_model,lsh_model,lda_features_df,movies_df,inputString,firstMovieRow.movieId)
  else:
    similarMovies = get_similar_movies(cv_model,tfidf_model,lda_model,lsh_model,lda_features_df,movies_df,inputString)
  similarMovies.show(truncate=False)
  print("--- %s seconds ---\n" % (time.time() - start_time_tagcluster))
  
  # return (titleSearchResult, similarMovies)

In [22]:
def get_users_clusters():

    genre_vector = VectorAssembler(inputCols = genre_columns, outputCol = 'genre')
    genres_data = genre_vector.transform(avgGenreRating).select('userId', 'genre')

    ## Identify the value of k with highest ClusteringEvaluator value
    # k_value = []
    # wss_value = []

    # for k in range(2,100):
    #     kmeans = KMeans(featuresCol = 'genre', k=k)
    #     model = kmeans.fit(genres_data)
    #
    #     evaluator = ClusteringEvaluator()
    #     evaluator.setFeaturesCol("genre")
    #     evaluator.setPredictionCol("prediction")
    #     wss = evaluator.evaluate(model.transform(genres_data))
    #     k_value.append(k)
    #     wss_value.append(float(wss))

    # plt.plot(k_value, wss_value)
    # plt.xlabel('k')
    # plt.ylabel('wss')
    #
    # plt.savefig('k_2to100.png')
    # plt.show()

    # k = k_value[wss_value.index(max(wss_value))]
    k = 9

    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol('genre')
    model = kmeans.fit(genres_data)

    user_clusters = model.transform(genres_data).select('userId', 'prediction')
    user_cluster_rows = user_clusters.collect()

    user_clusters_df = spark.createDataFrame(user_cluster_rows).orderBy('userId').withColumnRenamed('prediction','cluster')
    
    return user_clusters_df

In [23]:
def get_similar_user_movie_recommendation(user, moviesDF=moviesDF, ratingsDF=ratingsDF):
    top_10_recommended_movies = top_recommended_movies.select('title','genres').orderBy('avg_rating', ascending=False)
    top_10_recommended_movies.show(10,truncate=False)

In [24]:
def login_processing(user):
  cluster = user_clusters_df.filter(col('userId') == user).first()['cluster']
  user_watched_movies = [movie[0] for movie in ratingsDF.filter(ratingsDF.userId == user).select('movieId').collect()]

  cluster_filtered = movie_cluster_df.filter(movie_cluster_df.cluster == cluster)
  top_recommended_movies = cluster_filtered.filter(~movie_cluster_df.movieId.isin(user_watched_movies))
  movie_rows = top_recommended_movies.take(1)
  top_recommended_movies.persist(StorageLevel.MEMORY_AND_DISK)

  return top_recommended_movies

In [None]:
#### Pre-Processing ####


ratedDF = ratingsDF.withColumnRenamed("movieId","ratedmovieId")
moviesRatingDF = moviesDF.join(ratedDF, moviesDF.movieId == ratedDF.ratedmovieId).drop("ratedmovieId").drop("timestamp")
moviesRatingDF.take(1)
moviesRatingDF.persist(StorageLevel.MEMORY_AND_DISK)

moviesGenresDF = moviesRatingDF.withColumn("Action", f.when(col("genres").contains("Action"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Adventure", f.when(col("genres").contains("Adventure"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Animation", f.when(col("genres").contains("Animation"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Children", f.when(col("genres").contains("Children"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Comedy", f.when(col("genres").contains("Comedy"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Crime", f.when(col("genres").contains("Crime"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Documentary", f.when(col("genres").contains("Documentary"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Drama", f.when(col("genres").contains("Drama"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Fantasy", f.when(col("genres").contains("Fantasy"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Film-Noir", f.when(col("genres").contains("Film-Noir"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Horror", f.when(col("genres").contains("Horror"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Musical", f.when(col("genres").contains("Musical"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Mystery", f.when(col("genres").contains("Mystery"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Romance", f.when(col("genres").contains("Romance"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Sci-Fi", f.when(col("genres").contains("Sci-Fi"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Thriller", f.when(col("genres").contains("Thriller"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("War", f.when(col("genres").contains("War"), col("rating")).otherwise(f.lit(None).cast('double')))\
                        .withColumn("Western", f.when(col("genres").contains("Western"), col("rating")).otherwise(f.lit(None).cast('double')))

avgGenreRating = moviesGenresDF.groupBy("userId").agg(f.avg("Action").alias("avg_Action"), \
                                                    f.avg("Adventure").alias("avg_Adventure"), \
                                                    f.avg("Animation").alias("avg_Animation"), \
                                                    f.avg("Children").alias("avg_Children"), \
                                                    f.avg("Comedy").alias("avg_Comedy"), \
                                                    f.avg("Crime").alias("avg_Crime"), \
                                                    f.avg("Documentary").alias("avg_Documentary"), \
                                                    f.avg("Drama").alias("avg_Drama"), \
                                                    f.avg("Fantasy").alias("avg_Fantasy"), \
                                                    f.avg("Film-Noir").alias("avg_Film-Noir"), \
                                                    f.avg("Horror").alias("avg_Horror"), \
                                                    f.avg("Musical").alias("avg_Musical"), \
                                                    f.avg("Mystery").alias("avg_Mystery"), \
                                                    f.avg("Romance").alias("avg_Romance"), \
                                                    f.avg("Sci-Fi").alias("avg_Sci-Fi"), \
                                                    f.avg("Thriller").alias("avg_Thriller"), \
                                                    f.avg("War").alias("avg_War"), \
                                                    f.avg("Western").alias("avg_Western")).na.fill(0)

genre_columns =  ["avg_Action", \
                    "avg_Adventure", \
                    "avg_Animation", \
                    "avg_Children", \
                    "avg_Comedy", \
                    "avg_Crime", \
                    "avg_Documentary", \
                    "avg_Drama", \
                    "avg_Fantasy", \
                    "avg_Film-Noir", \
                    "avg_Horror", \
                    "avg_Musical", \
                    "avg_Mystery", \
                    "avg_Romance", \
                    "avg_Sci-Fi", \
                    "avg_Thriller", \
                    "avg_War", \
                    "avg_Western"]

avgGenreRating.take(1)
avgGenreRating.persist()                    

user_clusters_df = get_users_clusters().withColumnRenamed('userId','clusterUserId')
uc_rows = user_clusters_df.take(1)

ratings_cluster_df = ratingsDF.join(user_clusters_df, ratingsDF.userId == user_clusters_df.clusterUserId)

avg_cluster_ratings = ratings_cluster_df.groupBy('cluster','movieId').agg(f.avg(col('rating')).alias('avg_rating'), f.count("rating").alias('rating_count')).withColumnRenamed('movieId','ratedMovieId').filter(col('rating_count')>=10)

movie_cluster_df = avg_cluster_ratings.join(moviesDF, moviesDF.movieId == avg_cluster_ratings.ratedMovieId).select('cluster','movieId','title','genres','avg_rating')
mc_rows = movie_cluster_df.take(1)

user_clusters_df.persist(StorageLevel.MEMORY_AND_DISK)
movie_cluster_df.persist(StorageLevel.MEMORY_AND_DISK)

user = 1
login_processing(user)

users = [user[0] for user in ratingsDF.select('userId').distinct().collect()]

bigram = train_bigrams(genome_df)
bigram = sc.broadcast(bigram) 

inputValue = "Sample"
yearMin = 1800
yearMax = 2021
inputList = inputValue.split()
inputList = remove_stop_words(inputList)
inputtags = spark.createDataFrame(inputList,StringType())
getSearchResult(inputValue,1,1900,2021)

In [26]:
avg_cluster_ratings = ratings_cluster_df.groupBy('cluster','movieId').agg(f.avg(col('rating')).alias('avg_rating'), f.count("rating").alias('rating_count')).withColumnRenamed('movieId','ratedMovieId').filter(col('rating_count')>=10)

movie_cluster_df = avg_cluster_ratings.join(moviesDF, moviesDF.movieId == avg_cluster_ratings.ratedMovieId).select('cluster','movieId','title','genres','avg_rating')
mc_rows = movie_cluster_df.take(1)

user_clusters_df.persist(StorageLevel.MEMORY_AND_DISK)
movie_cluster_df.persist(StorageLevel.MEMORY_AND_DISK)

user = 1
login_processing(user)

DataFrame[cluster: bigint, movieId: int, title: string, genres: string, avg_rating: double]

In [34]:
while True:
  userID = int(input("User ID: "))
  if userID in users:
    break
  else:
    print("Please enter a valid User ID!\n")

start_time_user = time.time()    
top_recommended_movies = login_processing(userID)
print("Welcome User ID: {}\n".format(userID))
print("Movies you might like: ")
get_similar_user_movie_recommendation(userID)
print("--- %s seconds ---\n" % (time.time() - start_time_user))

User ID: 1
Welcome User ID: 1

Movies you might like: 
+-----------------------------------------------------------------+---------------------------------+
|title                                                            |genres                           |
+-----------------------------------------------------------------+---------------------------------+
|There Once Was a Dog (1982)                                      |Animation|Children|Comedy        |
|Cashback (2004)                                                  |Comedy|Drama                     |
|Hopscotch (1980)                                                 |Comedy                           |
|Hedgehog in the Fog (1975)                                       |Animation                        |
|Vacations in Prostokvashino (1980)                               |Animation                        |
|Last Year's Snow Was Falling (1983)                              |Animation|Children|Comedy|Fantasy|
|Woman Under the Influence,

In [36]:
yearMin = input("Starting year: ")
yearMax = input("Ending year: ")

Starting year: 1990
Ending year: 2021


In [39]:
inputValue = input("Search key: ")
inputList = inputValue.split()
inputList = remove_stop_words(inputList)
inputtags = spark.createDataFrame(inputList,StringType())
answer = getSearchResult(inputValue,userID,yearMin,yearMax)

Search key: comedy
Movies based on Title: 
+-----------------------------------------------------+------------------------+------------------+----+
|title                                                |genres                  |ratingAvg         |Year|
+-----------------------------------------------------+------------------------+------------------+----+
|Louis C.K.: Live at The Comedy Store                 |[Comedy]                |3.944394618834081 |2015|
|Hannibal Buress: Comedy Camisado                     |[Comedy]                |3.761904761904762 |2016|
|Fear City: A Family-Style Comedy (La cité de la peur)|[Comedy]                |3.760869565217391 |1994|
|King of Comedy (Hei kek ji wong)                     |[Comedy, Drama, Romance]|3.7333333333333334|1999|
|Comedians of Comedy, The                             |[Comedy, Documentary]   |3.655982905982906 |2005|
|The Comedians of Comedy: Live at The Troubadour      |[Comedy, Documentary]   |3.611111111111111 |2007|
|God's Comed