In [0]:
sc

In [0]:
%sh 
pip install nltk
pip install --upgrade pip
python -m nltk.downloader all

In [0]:
from nltk.corpus import stopwords
import re
import string
import math

In [0]:
movie_metadata = sc.textFile('/FileStore/tables/movie_metadata.tsv')
plot_summaries = sc.textFile('/FileStore/tables/plot_summaries.txt')

# Loading the stopwords document from nltk library
stop_words = set(stopwords.words("english"))

# https://stackoverflow.com/questions/265960/best-way-to-strip-punctuation-from-a-string
# plot_summaries_punct.collect() would remove the punctuation from the input text
plot_summaries_punct = plot_summaries.map(lambda x: x.translate(str.maketrans('', '', string.punctuation)))

# plot_summaries_stop_wds.collect() would output (movie_id, summary) with stopwords removed from the summary
plot_summaries_stop_wds = plot_summaries_punct.map(lambda x: (x.split('\t')[0],' '.join([w for w in x.split('\t')[1].split(' ') if not w.lower() in stop_words])))

# below two plot_summaries.collect() would give ((movie_id,word),1)
plot_summaries_mapped = plot_summaries_stop_wds.flatMap(lambda x: (((x[0],word.lower()),1) for word in x[1].split(' ')))
plot_summaries_mapped = plot_summaries_mapped.filter(lambda x: x[0][1]!='')        # removing empty '' words from the data

# term_frequency.collect() would output ((movie_id, word),count_of_word_in_that_movie_summary)
term_frequency = plot_summaries_mapped.reduceByKey(lambda x,y : x+y)

# calculating document frequency for a word, intermediate_step would have (word,1) and when we reduce it by key we will get (word,document_frequency_of_the_word)
intermediate_step = term_frequency.map(lambda x: (x[0][1],1))
document_frequency = intermediate_step.reduceByKey(lambda x,y: x+y)

# steps to count total number of documents using mapReduce
# we take the given input file data and map it to (1,1) and then reduce it to get total number of documents
count_doc = plot_summaries.map(lambda x:(1,1))
total_doc = count_doc.reduceByKey(lambda x,y: x+y).collect()
total_doc = total_doc[0][1]

# This step will calculate log(N/df) output would be (word, log(N/df)_value)
log_of_N_by_document_freq = document_frequency.map(lambda x: (x[0],math.log(total_doc/x[1])))

# we map term_frequency correctly so that we could join it with log_of_N_by_document_freq
# term_frequency_for_join.collect() would output (word, (movie_id, count_of_word_in_that_movie_summary))
term_frequency_for_join = term_frequency.map(lambda x: (x[0][1],(x[0][0],x[1])))

# joining the term_frequency_for_join and log_of_N_by_document_freq which gives us (word, ((movie_id,count_of_word_in_that_movie_summary), log(N/df)_value))
tf_idf = term_frequency_for_join.join(log_of_N_by_document_freq)

# now we need to multiply count_of_word_in_that_movie_summary and log(N/df)_value to get tfidf values
# at last we have output as ((movie_id, word), tfidf_for_that_word)
tf_idf_values = tf_idf.map(lambda x: ((x[1][0][0],x[0]),x[1][0][1]*x[1][1])).collect()

tf_idf_data = sc.parallelize(tf_idf_values)

# to answer query of a single word we will need movie_metadata to extract movie names from the movie_id
movie_id_and_name = movie_metadata.map(lambda x: (x.split('\t')[0],x.split('\t')[2]))

In [0]:
# Input word, can be changed as per needs
word_input = "scifi"

# would check if words in movie_summary match with the input word and filter out those not matching
# query_filter would contain ((movie_id, word), tfidf_for_that_word)
query_filter = tf_idf_data.filter(lambda x: x[0][1].lower()==word_input)

# Mapping query_filter to get just the movie_id and tfidf_for_that_word and join it with movie_id_and_name to get movie names 
# sorting that to get only top 10 tfidf values and then mapping that output to just get the movie name
query_filter.map(lambda x: (x[0][0],x[1])).join(movie_id_and_name).sortBy(lambda x: -x[1][0]).map(lambda x: x[1][1]).take(10)

In [0]:
# Input query, can be changed as per need
sentence_query = "A murder mystery weekend"

# removing punctuation from the query if any
sentence_query = sentence_query.translate(str.maketrans('', '', string.punctuation))

# removing the stopwords from the query
sentence_query = [word for word in sentence_query.lower().split(" ") if not word in stop_words]

# converting the sentence to words
sentence_query_rdd = sc.parallelize(sentence_query)
sentence_query_rdd = sentence_query_rdd.filter(lambda x: x!='')

words_to_search = sentence_query_rdd.collect()

# getting inverse document frequency of the query: output would be ((word, count_of_word_in_query), log(N/df)_value)
query_idf = sentence_query_rdd.map(lambda x: (x,1)).reduceByKey(lambda x,y : x+y).join(log_of_N_by_document_freq)

# getting the term frequency for the query words, output would be ((word, term_frequency_of_that_word), 1)
query_tf = sentence_query_rdd.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)

# joining the term frequency and log_of_N_by_document_freq and mapping to get tfidf for the query words
query_tfidf = query_tf.join(query_idf).map(lambda x: (x[0],x[1][0]*x[1][1][1])).reduceByKey(lambda x,y: x+y)

# output (movie_id, sum_of_suqares_of_tfidf)
square_of_document_tfidf = tf_idf_data.map(lambda x:(x[0][0],x[1]*x[1])).reduceByKey(lambda x,y: x + y)

# output sum_of_squares_of_tfidf of query document
square_of_query_tfidf = query_tfidf.map(lambda x:(x[1]*x[1])).reduce(lambda x,y: x + y) 

# filters the words which are included in the query
data_for_cosine_similarity = tf_idf_data.filter(lambda x: x[0][1].lower() in words_to_search)

# mapping the filtered data to (word, (movie_id, tfidf_value))
data_for_cosine_similarity = data_for_cosine_similarity.map(lambda x: (x[0][1],(x[0][0],x[1])))

# joining the data_fodata_for_cosine_similarity with the query_tfidf on basis of words so as to calculate the cosine similarity
# output would be (word, ((movie_id, tfidf_value_of_that_word), tfidf_value_of_that_word_in_query))
final_data = data_for_cosine_similarity.join(query_tfidf)

# mapping final_data to (movie_id, (tfidf_value_of_that_word, tfidf_value_of_that_word_in_query))
final_data = final_data.map(lambda x: (x[1][0][0], (x[1][0][1], x[1][1])))

# here we get the data from which we can calculate the cosine similarity
# output of the map would be (movie_id, (tfidf_value_of_that_word, tfidf_value_of_that_word_in_query), sum_of_squares_of_tfidf_of_that_document)
final_data = final_data.join(square_of_document_tfidf)

# applies the cosine similarity formula on the data, output is (movie_id, cosine_similarity_of_query_with_document) 
intermediate_result = final_data.map(lambda x: (x[0], (x[1][0][0]*x[1][0][1])/(math.sqrt(x[1][1])*math.sqrt(square_of_query_tfidf))))
cosine_similarity = intermediate_result.reduceByKey(lambda x,y: x+y)

top_10_documents = cosine_similarity.sortBy(lambda x: -x[1])

# output represents (Movie Name, cosine similarity between the query and document)
top_10_documents.join(movie_id_and_name).sortBy(lambda x: -x[1][0]).map(lambda x:(x[1][1],x[1][0])).take(10)