In [1]:
# installing spark in colab and creating spark session

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

import findspark
findspark.init()

findspark.find()

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

sc = spark.sparkContext

In [2]:
plotSummaries = sc.textFile('/content/plot_summaries.txt')

In [4]:
moviesMetadata = spark.read.csv('/content/movie.metadata.tsv', sep=r'\t', header=False) # here we get dataframe
movies = moviesMetadata.rdd # converts dataframe to rdd
movieNames = movies.map(lambda row : (row._c0, row._c2)) # maps over rdd
movies = movieNames.collectAsMap()

In [14]:
summaries = plotSummaries.map(lambda line : line.split('\t')).map(lambda x : (x[0], x[1])) # get key-value tuple pair using maps

In [15]:
summaries.take(5)

[('23890098',
  "Shlykov, a hard-working taxi driver and Lyosha, a saxophonist, develop a bizarre love-hate relationship, and despite their prejudices, realize they aren't so different after all."),
 ('31186339',
  'The nation of Panem consists of a wealthy Capitol and twelve poorer districts. As punishment for a past rebellion, each district must provide a boy and girl  between the ages of 12 and 18 selected by lottery  for the annual Hunger Games. The tributes must fight to the death in an arena; the sole survivor is rewarded with fame and wealth. In her first Reaping, 12-year-old Primrose Everdeen is chosen from District 12. Her older sister Katniss volunteers to take her place. Peeta Mellark, a baker\'s son who once gave Katniss bread when she was starving, is the other District 12 tribute. Katniss and Peeta are taken to the Capitol, accompanied by their frequently drunk mentor, past victor Haymitch Abernathy. He warns them about the "Career" tributes who train intensively at speci

In [16]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [17]:
import string
from nltk.corpus import stopwords
# return words which are not in the list of stopwords(which is list of stopwords and punctuated tokens)
def removeInvalidWords(words):
    stopWords = list(stopwords.words('english'))
    puncs = string.punctuation+' '
    ansList = []
    for word in words:
        word = word.strip(puncs)
        if len(word) == 0:
            continue
        if word not in stopWords:
            ansList.append(word)
    return ansList

In [18]:
# converts summary to list of words and removes stopwords from them
def removeStopWords(summary):
    words = summary.lower().split(" ")
    newWords = removeInvalidWords(words)
    return newWords

In [19]:
# maps through summaries and returns list of summary words
summaries = summaries.map(lambda x : (x[0], removeStopWords(x[1])))

In [20]:
N = summaries.count() # total count of all movie documents

In [21]:
# calculating tij which is count of ith word in a jth movie document
summary = summaries.flatMapValues(lambda x : x)
tij = summary.map(lambda t : ((t[0], t[1]), 1)).reduceByKey(lambda x, y : x+y)
tij.take(10)

[(('32137084', 'together'), 1),
 (('19755481', 'lisa'), 7),
 (('34501845', 'become'), 1),
 (('31083557', 'kasia'), 1),
 (('3177319', 'however'), 1),
 (('61494', 'boarded'), 1),
 (('204774', 'opera'), 6),
 (('6004920', 'encounters'), 2),
 (('6215073', 'city'), 2),
 (('33383946', 'complications'), 1)]

In [22]:
# calculating count of a ith word in all the movie documents
ni = summary.distinct().map(lambda t : (t[1], t[0])).reduceByKey(lambda x, y : x+" "+y).map(lambda t : (t[0], len(t[1].split(" "))))
ni = ni.collectAsMap()
ni

{'enters': 2040,
 'also': 11072,
 'vaguely': 49,
 'rope': 438,
 'path': 641,
 'luxury': 211,
 'wife': 7519,
 'describes': 468,
 'book': 1568,
 'arrivals': 24,
 'says': 4113,
 'turks': 23,
 'large': 2705,
 'sharpened': 16,
 'becomes': 7972,
 'position': 1108,
 'reunion': 332,
 'women': 2766,
 'pile': 245,
 'believes': 2710,
 'spree': 197,
 'residents': 486,
 'died': 2465,
 'meets': 6491,
 'authentic': 55,
 'leaves': 6662,
 'taught': 314,
 'serious': 700,
 'returns': 6325,
 'enough': 2360,
 'send': 1230,
 'sizes—granting': 1,
 'turn': 3395,
 'drinking': 929,
 'battle': 2200,
 'directly': 491,
 'approach': 593,
 'wants': 5527,
 'even': 5562,
 'exclusive': 134,
 'shubhavari': 1,
 'across': 2692,
 'mitchell': 174,
 'leading': 1872,
 'share': 1411,
 'well': 5287,
 'seen': 2835,
 'frustrated': 788,
 'finds': 10318,
 'sole': 320,
 'join': 2198,
 'unaware': 1163,
 'war': 3778,
 'tales': 187,
 'grew': 334,
 'portray': 65,
 'boss': 1855,
 'video': 874,
 'plans': 3152,
 'spent': 659,
 'bouquet': 6

In [23]:
# calculating and saving the results of tf-idf for the dataset
import math
tfIdf = tij
tfIdf = tfIdf.map(lambda x : (x[0], float(x[1]*float(math.log(N/ni[x[0][1]]))))).sortBy(lambda x : -x[1])
tfIdf = tfIdf.collectAsMap()

tfIdf

{('21768047', 'keroro'): 991.503712647511,
 ('6283620', 'hyun'): 949.5408600838645,
 ('6283620', 'soo'): 939.2580213502184,
 ('22175264', 'timmy'): 837.9563369335178,
 ('15465392', 'lucian'): 822.9354120726825,
 ('7548602', 'marcellus'): 818.4367606898276,
 ('4457831', 'morgaine'): 809.6039991199793,
 ('6283620', 'eun'): 742.9310033533852,
 ('28443267', 'luk'): 711.5791405531826,
 ('4969625', 'orry'): 703.0771571305083,
 ('3926180', 'franklin'): 692.1518727474786,
 ('11467890', 'sabata'): 687.893177540087,
 ('11584626', 'hyeon-su'): 671.119104533667,
 ('15769706', 'vamsi'): 669.2002252019616,
 ('2650227', 'janie'): 665.4244736118126,
 ('10619079', 'genocyber'): 660.4664203347199,
 ('21673015', 'ged'): 651.0703908640374,
 ('25092646', 'obie'): 637.4103691767776,
 ('35581515', 'hushpuppy'): 617.8556835389315,
 ('2065944', 'shogo'): 617.4912951400033,
 ('3257858', 'gidget'): 614.9407474828836,
 ('23325348', 'cherie'): 601.0220151998877,
 ('20887118', 'cavil'): 596.5503151410373,
 ('130806

In [24]:
movies

{'975900': 'Ghosts of Mars',
 '3196793': 'Getting Away with Murder: The JonBenét Ramsey Mystery',
 '28463795': 'Brun bitter',
 '9363483': 'White Of The Eye',
 '261236': 'A Woman in Flames',
 '13696889': 'The Gangsters',
 '18998739': "The Sorcerer's Apprentice",
 '10408933': "Alexander's Ragtime Band",
 '9997961': 'Contigo y aquí',
 '2345652': 'City of the Dead',
 '175026': 'Sarah and Son',
 '24229100': 'Lady Snowblood 2: Love Song of Vengeance',
 '6631279': 'Little city',
 '171005': 'Henry V',
 '18296435': 'Aaah Belinda',
 '11250635': 'The Mechanical Monsters',
 '30388930': '1919',
 '77856': 'Mary Poppins',
 '32456683': 'Die Fahne von Kriwoj Rog',
 '33420460': 'Keep the Change',
 '175024': "The Devil's Holiday",
 '612710': 'New Rose Hotel',
 '21926710': 'White on Rice',
 '33427105': 'Freddy and the Song of the South Pacific',
 '31983669': 'Road to Life',
 '17715326': 'Camera Thrills',
 '22087420': 'Ferdinando I, re di Napoli',
 '20604092': 'Anbu Thozhi',
 '21344842': 'Middle Age Spread

In [25]:
def searchEngine_single(query, topHitsCount=10):
#     output = tfIdf.filter(lambda x : x[0][1] == query).take(10)  
    output = []
    for values in tfIdf.keys():
        if values[1] == query:
            output.append(values[0])
    ans = []
    for val in range(min(len(output), topHitsCount)):
        ans.append(movies[output[val]])
    print("Results : "+str(ans))

In [26]:
from scipy import spatial
def searchEngine_multiple(query, topHitsCount=10):
    qcount = {}
    for q in query:
        if q not in qcount.keys():
            qcount[q] = 1
        else:
            qcount[q]+=1
    words = []
    qVector = []
    for key, value in qcount.items():
        words.append(key)
        qVector.append(value)
 
    movieId = list(movies.keys())
    cosine = {}
    for doc in movieId:
        dVector = []
        for word in words:
            t = (doc, word)
            if t in tfIdf:
                dVector.append(tfIdf[t])
            else:
                dVector.append(0)
        s = set(dVector)
        if len(s) == 1 and 0 in s:
            continue
        result = 1 - spatial.distance.cosine(qVector, dVector)
        cosine[doc] = result
    
    sortedHits = list(sorted(cosine.items(), key=lambda item: -item[1])) # list maintains the order of sort
    sortedHits = sortedHits[0:min(len(sortedHits), topHitsCount)]
    ans=[]
    for k in range(len(sortedHits)):# getting movie names for the resultant movie ids
        temp_key = sortedHits[k][0] # movie id
        #sortedHits[k]= (movies[temp_key], sortedHits[k][1])
        ans.append(movies[temp_key])
    print("Results : "+str(ans))

In [30]:
query_file = spark.read.csv("/content/Queries-1.csv") # all search queries stored at first row of the csv file with delimiter','
queries=[]
for i in query_file.collect()[0]:
    queries.append([removeStopWords(i.strip().lower()),i])
for query in queries:
    print("\n")
    print('For Query "'+ query[1]+'"')
    if len(query[0])==0:
        print("Result: Query is invalid")
        continue
    
    if len(query[0])>1:
        searchEngine_multiple(query[0])
    else:
        searchEngine_single(query[0][0])



For Query "horror"
Results : ['Garo: Red Requiem', 'The Last Horror Film', 'The Pagemaster', 'Microwave Massacre', 'Kiba Gaiden', 'The Pit and the Pendulum', 'Fright Night', 'Dark House', 'Poison', 'Alien invasion arizona']


For Query "funny"
Results : ["Garfield's Fun Fest", 'The Last Circus', 'Funny Man', '14 Carrot Rabbit', 'Ullathai Allitha', 'Bheja fry', 'Heart Like a Wheel', 'Main Sunder Hoon', 'Decoding Annie Parker', 'War Dogs']


For Query "The heart"
Results : ['Care Bears Movie II: A New Generation', 'The Care Bears Adventure in Wonderland', 'Crank: High Voltage', 'Captain Sindbad', 'Snow White', "Pirates of the Caribbean: At World's End", 'Bordello of Blood', 'Dragonheart: A New Beginning', '21 Grams', 'Frankenstein Conquers the World']


For Query "Funny Movies with Action scenes"
Results : ['The Muppets: A Celebration of 30 Years', 'Three the Hard Way', 'Megazone 23', 'Tony', "Fellini: I'm a Born Liar", 'Ullathai Allitha', 'Nell', 'Imtihaan', 'Om Shanti Om', 'My Name i