## Importing Spark

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
import gc

In [2]:
### creating a spark session

In [3]:
spark= SparkSession.builder.appName('movie_recommender').getOrCreate()

21/12/05 19:31:56 WARN Utils: Your hostname, amenemope-HP-250-G5-Notebook-PC resolves to a loopback address: 127.0.1.1; using 192.168.43.219 instead (on interface wlp2s0)
21/12/05 19:31:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/05 19:32:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### reading the rating data

In [4]:
data= spark.read.option('header', 'true').csv('u.data', sep='\t',header=False,inferSchema=True)

                                                                                

In [5]:
data.limit(5).show()

                                                                                

+---+---+---+---------+
|_c0|_c1|_c2|      _c3|
+---+---+---+---------+
|196|242|  3|881250949|
|186|302|  3|891717742|
| 22|377|  1|878887116|
|244| 51|  2|880606923|
|166|346|  1|886397596|
+---+---+---+---------+



### renaming the rating columns

In [6]:
data=data.withColumnRenamed(existing='_c0',new='userID')
data=data.withColumnRenamed(existing='_c1',new='movieID')
data=data.withColumnRenamed(existing='_c2',new='rating')
data=data.withColumnRenamed(existing='_c3',new='timeStamp')
data

DataFrame[userID: int, movieID: int, rating: int, timeStamp: int]

### reading the movies dataset

In [7]:
movies= spark.read.option('header', 'true').csv('u.movies',header=True,inferSchema=True)
movies

DataFrame[movieID: int, movie title: string, release date: string, video release date: string, IMDbURL: string, unknown: int, Action: double, Adventure: double, Animation: double, Children: double, Comedy: double, Crime: double, Documentary: double, Drama: double, Fantasy: double, Film-Noir: double, Horror: double, Musical: double, Mystery: double, Romance: double, Sci-Fi: double, Thriller: double, War: double, Western: double]

### selecting the tags available

In [8]:
non_tags=['movieID', 'movie title','release date','video release date','IMDbURL','unknown']
tags=[i for i in movies.columns if i not in non_tags]
tags

['Action',
 'Adventure',
 'Animation',
 'Children',
 'Comedy',
 'Crime',
 'Documentary',
 'Drama',
 'Fantasy',
 'Film-Noir',
 'Horror',
 'Musical',
 'Mystery',
 'Romance',
 'Sci-Fi',
 'Thriller',
 'War',
 'Western']

### showing tag value of the first 5  movies

In [9]:
movies.select(tags).head(5)

[Row(Action=0.1, Adventure=0.2, Animation=1.1, Children=1.2, Comedy=1.3, Crime=0.3, Documentary=0.4, Drama=0.5, Fantasy=0.6, Film-Noir=0.7, Horror=0.8, Musical=0.9, Mystery=0.1, Romance=0.11, Sci-Fi=0.12, Thriller=0.13, War=0.14, Western=0.15),
 Row(Action=0.0, Adventure=0.0, Animation=0.0, Children=0.0, Comedy=0.0, Crime=0.0, Documentary=0.0, Drama=0.0, Fantasy=0.0, Film-Noir=0.0, Horror=0.0, Musical=0.0, Mystery=0.0, Romance=0.0, Sci-Fi=0.0, Thriller=1.0, War=0.0, Western=0.0),
 Row(Action=1.0, Adventure=0.0, Animation=0.0, Children=0.0, Comedy=1.0, Crime=0.0, Documentary=0.0, Drama=1.0, Fantasy=0.0, Film-Noir=0.0, Horror=0.0, Musical=0.0, Mystery=0.0, Romance=0.0, Sci-Fi=0.0, Thriller=0.0, War=0.0, Western=0.0),
 Row(Action=0.0, Adventure=0.0, Animation=0.0, Children=0.0, Comedy=0.0, Crime=1.0, Documentary=0.0, Drama=1.0, Fantasy=0.0, Film-Noir=0.0, Horror=0.0, Musical=0.0, Mystery=0.0, Romance=0.0, Sci-Fi=0.0, Thriller=1.0, War=0.0, Western=0.0),
 Row(Action=0.0, Adventure=0.0, Ani

### vector assembling the features

In [10]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler().setInputCols(tags).setOutputCol("tags")

assembled_tags= assembler.transform(movies)
assembled_tags

DataFrame[movieID: int, movie title: string, release date: string, video release date: string, IMDbURL: string, unknown: int, Action: double, Adventure: double, Animation: double, Children: double, Comedy: double, Crime: double, Documentary: double, Drama: double, Fantasy: double, Film-Noir: double, Horror: double, Musical: double, Mystery: double, Romance: double, Sci-Fi: double, Thriller: double, War: double, Western: double, tags: vector]

### showing the assembled tags

In [26]:
assembled_tags.select('tags').show()

21/12/05 17:29:11 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+
|                tags|
+--------------------+
|[0.1,0.2,1.1,1.2,...|
|     (18,[15],[1.0])|
|(18,[0,4,7],[1.0,...|
|(18,[5,7,15],[1.0...|
|      (18,[7],[1.0])|
|(18,[7,14],[1.0,1...|
|(18,[3,4,7],[1.0,...|
|      (18,[7],[1.0])|
|(18,[7,16],[1.0,1...|
|(18,[5,15],[1.0,1...|
|(18,[5,15],[1.0,1...|
|      (18,[4],[1.0])|
|(18,[7,13],[1.0,1...|
|      (18,[7],[1.0])|
|(18,[4,13],[1.0,1...|
|(18,[0,4,5,10,15]...|
|      (18,[7],[1.0])|
|      (18,[7],[1.0])|
|(18,[7,13],[1.0,1...|
|(18,[0,1,4,11,15]...|
+--------------------+
only showing top 20 rows



[Stage 8:>                                                          (0 + 1) / 1]                                                                                

In [27]:
import numpy as np
from pyspark.ml.linalg import *
from pyspark.sql.types import * 
from pyspark.sql.functions import *

In [28]:
### cosine simililarity function

In [29]:
def cos_sim(a,b):
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

In [30]:
movies_list= movies.select('movie title').rdd.flatMap(lambda x: x).collect()

                                                                                

In [31]:
### this function searches for a movie

In [32]:
def search(movie_name):
    result= []
    for i in movies_list:
        if movie_name.title() in i:
            print(i)
            result.append(i)

In [33]:
### recommending function

In [35]:
def recommend():
    query= input('search for a movie: ')
    search(query)
    movie_name= input('movie choice: ')
    movie_name_id=(movies[movies['movie title']==movie_name][['movieID']]).collect()[0][0]
    
    
    film= assembled_tags[assembled_tags['movieId']==movie_name_id][tags]
    df = assembled_tags.withColumn("coSim", udf(cos_sim, FloatType())(col("tags"), array([lit(v) for v in film])))
    print(df.select('coSim').tail(5))
    result=df.orderBy(['coSim'], ascending= False).limit(10)
    #.join(movies, on=['movieId'])
    return result[['movie title']].show()

In [None]:
recommend()

search for a movie: death
Sudden Death (1995)
Love and Death on Long Island (1997)
Death and the Maiden (1994)
Letter From Death Row, A (1998)
Kiss of Death (1995)
Marked for Death (1990)
Two Deaths (1995)
Three Lives and Only One Death (1996)
Death in the Garden (Mort en ce jardin, La) (1956)
Death in Brunswick (1991)
movie choice: Sudden Death (1995)
