In [0]:
%pip install nltk

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import numpy as np
import nltk
from nltk.corpus import stopwords

In [0]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [0]:
nltk.download("stopwords")

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Out[132]: True

In [0]:
# Load plot summaries
plot_summaries = spark.read.option("delimiter", "\t").csv("dbfs:/FileStore/plot_summaries.txt")
plot_summaries = plot_summaries.selectExpr("_c0 as movie_id", "_c1 as plot_summary")

In [0]:
# Load movie metadata
movie_metadata = spark.read.option("delimiter", "\t").csv("dbfs:/FileStore/movie_metadata.tsv")
movie_metadata = movie_metadata.selectExpr("_c0 as movie_id", "_c2 as movie_name")

In [0]:
# Convert DataFrames to RDDs
plot_summaries_rdd = plot_summaries.rdd
movie_metadata_rdd = movie_metadata.rdd

In [0]:
# Load NLTK stopwords
stopwords = set(stopwords.words("english"))

In [0]:
# Tokenization and Stopword Removal
def tokenize_and_filter(row):
    movie_id, plot_summary = row
    words = plot_summary.lower().split()
    filtered_words = [word for word in words if word not in stopwords]
    return [(movie_id, word) for word in filtered_words]

words_rdd = plot_summaries_rdd.flatMap(tokenize_and_filter)

In [0]:
# TF Calculation
tf_rdd = words_rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(lambda a, b: a + b)

In [0]:
# DF Calculation
df_rdd = words_rdd.map(lambda x: (x[1], 1)).distinct().reduceByKey(lambda a, b: a + b)

In [0]:
# IDF Calculation
num_docs = plot_summaries_rdd.count()
idf_rdd = df_rdd.map(lambda x: (x[0], np.log(num_docs / x[1])))

In [0]:
# TF-IDF Calculation
tf_idf_rdd = tf_rdd.map(lambda x: (x[0][1], (x[0][0], x[1]))).join(idf_rdd).map(lambda x: (x[1][0][0], (x[0], x[1][0][1] * x[1][1])))

In [0]:
# Convert to Lookup Table for easier access
tf_idf_lookup = tf_idf_rdd.collectAsMap()

In [0]:
def search(query, tf_idf_lookup, movie_metadata_rdd, top_n=10):
    query_terms = query.lower().split()
    query_terms = [term for term in query_terms if term not in stopwords]

    if len(query_terms) == 1:
        # Single-term query: Rank by TF-IDF
        term = query_terms[0]
        results = tf_idf_rdd.filter(lambda x: x[1][0] == term).map(lambda x: (x[0], x[1][1])).takeOrdered(top_n, key=lambda x: -x[1])
    else:
        # Multi-term query: Rank by cosine similarity
        query_vector = {}
        for term in query_terms:
            query_vector[term] = 1  # Binary representation for simplicity

        # Compute cosine similarity for each document
        def compute_cosine_similarity(doc_id, tf_idf_entries):
            doc_vector = {}
            for term, tf_idf in tf_idf_entries:
                doc_vector[term] = tf_idf
            dot_product = sum(query_vector.get(term, 0) * doc_vector.get(term, 0) for term in query_vector)
            query_norm = np.sqrt(sum(val ** 2 for val in query_vector.values()))
            doc_norm = np.sqrt(sum(val ** 2 for val in doc_vector.values()))
            if query_norm == 0 or doc_norm == 0:
                return (doc_id, 0)
            return (doc_id, dot_product / (query_norm * doc_norm))

        results = tf_idf_rdd.groupByKey().map(lambda x: compute_cosine_similarity(x[0], x[1])).takeOrdered(top_n, key=lambda x: -x[1])

    # Map movie IDs to movie names
    movie_id_to_name = movie_metadata_rdd.collectAsMap()
    return [(movie_id_to_name.get(movie_id, "Unknown"), score) for movie_id, score in results]

In [0]:
# Read and Process search terms
search_terms_path = "dbfs:/FileStore/searchQueries.txt"
search_terms = spark.read.text(search_terms_path).rdd.flatMap(lambda x: x).collect()

In [0]:
for query in search_terms:
    print(f"Results for query: {query}")
    results = search(query, tf_idf_lookup, movie_metadata_rdd)
    for movie_name, score in results:
        print(f"{movie_name}: {score}")
    print("\n" + "=" * 50 + "\n")

Results for query: action
Crayon Shin-chan: Action Kamen vs Leotard Devil: 117.17952618841805
Action Man: Robot Atak: 85.22147359157677
Crayon Shin-chan: The Storm Called The Jungle: 63.916105193682576
Rosencrantz & Guildenstern Are Dead: 42.610736795788384
Bombaat: 42.610736795788384
Smallpox: 42.610736795788384
West Side Story: 42.610736795788384
The 40-Year-Old Virgin: 31.958052596841288
The Daredevil Men: 31.958052596841288
Crayon Shin-chan: The Storm Called: Operation Golden Spy: 31.958052596841288


Results for query: romantic
The Little Rascals: 42.610736795788384
Genova: 42.610736795788384
It Had to Be You!: 31.958052596841288
Boogie Town: 31.958052596841288
Once: 31.958052596841288
Gidget: 31.958052596841288
A Cold Wind in August: 31.958052596841288
Billy's Hollywood Screen Kiss: 31.958052596841288
He's Just Not That Into You: 31.958052596841288
Cocktail: 31.958052596841288


Results for query: comedy
Where the Truth Lies: 42.610736795788384
General Motors 50th Anniversary Sho