<a href="https://colab.research.google.com/github/vaaraaf/Search_Engine_for_Movies_Using_PySpark/blob/main/Search_Engine_for_Movies_Using_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.ml.feature import StopWordsRemover
from math import log, sqrt
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("MovieSearch").getOrCreate()
# Get the SparkContext from the SparkSession
sc = spark.sparkContext
# Load data
summaries = sc.textFile('./plot_summaries.txt')
metadata = sc.textFile('./movie_metadata.tsv')

# List of additional stop words to exclude
additional_stop_words = ['movie', 'movies' , 'serie' , 'series', 'scene', 'scenes']

# Get stopwords from StopWordsRemover and merge with additional stop words
stopWords = set(StopWordsRemover().getStopWords() + additional_stop_words)

# Total number of documents
N = summaries.count()
print("Total Documents (N):", N)

Total Documents (N): 42306


In [None]:
# Function to preprocess text (lowercase, remove punctuation, strip whitespace)
def preprocess_text(text):
    return [word.lower().strip('.,!?()[]{}"') for word in text.split() if word.lower() not in stopWords]

# Extract movie ID and tokenized words from summaries
doc_tf = summaries.map(lambda x: (x.split('\t')[0], preprocess_text(x.split('\t')[1])))

# Compute term frequency (TF) for each document
doc_tf = doc_tf.mapValues(lambda words: {word: words.count(word) for word in set(words)})

# Compute document frequency (DF) for each term
df = doc_tf.flatMap(lambda x: [(word, 1) for word in x[1].keys()]) \
           .reduceByKey(lambda a, b: a + b)

df_dict = dict(df.collect())  # Convert DF to dictionary
df_broadcast = sc.broadcast(df_dict)  # Broadcast for efficiency

# Compute TF-IDF for each document
tf_idf = doc_tf.map(lambda x: (x[0],
    {word: tf * log(N / (df_broadcast.value.get(word, 1))) for word, tf in x[1].items()}
))

# Convert TF-IDF to dictionary for cosine similarity calculation
doc_vectors = tf_idf.collectAsMap()

#print("Sample Document TF-IDF:", list(doc_vectors.items())[:2])  # Debugging line

In [None]:
query = input("Enter your search query: ").strip().lower()
query_words = preprocess_text(query)

if len(query_words) == 1:
    # -------------------- SINGLE-WORD QUERY (Part A) --------------------
    search_word = query_words[0]

    # Compute TF-IDF scores for the given word
    word_tf_idf = tf_idf.flatMap(lambda x: [(x[0], x[1].get(search_word, 0))])

    # Get top 10 documents by highest TF-IDF score
    top_10_ids = word_tf_idf.sortBy(lambda x: -x[1]).take(10)

    print("Using TF-IDF ranking for:", search_word) # Debugging line

else:
    # -------------------- MULTI-WORD QUERY (Part B) --------------------

    # Compute query TF
    query_tf = {word: query_words.count(word) for word in set(query_words)}

    # Compute query TF-IDF using the same IDF values as documents
    query_tf_idf = {word: query_tf[word] * log(N / (df_broadcast.value.get(word, 1)))
                    for word in query_tf}

    #print("Query TF-IDF:", query_tf_idf) # Debugging line

    # Function to compute cosine similarity
    def cosine_similarity(doc_vector, query_vector):
        common_words = set(doc_vector.keys()).intersection(query_vector.keys())
        #print(f"Common words: {common_words}")  # Debugging line
        if not common_words:
            return 0  # No similarity if there are no matching words

        dot_product = sum(doc_vector[word] * query_vector[word] for word in common_words)
        doc_norm = sqrt(sum(value ** 2 for value in doc_vector.values()))
        query_norm = sqrt(sum(value ** 2 for value in query_vector.values()))

        return dot_product / (doc_norm * query_norm) if doc_norm and query_norm else 0

    # Compute cosine similarity for all documents
    cosine_similarities = [(doc_id, cosine_similarity(doc_vector, query_tf_idf))
                            for doc_id, doc_vector in doc_vectors.items()]

    # Get top 10 documents with highest cosine similarity
    top_10_ids = sorted(cosine_similarities, key=lambda x: -x[1])[:10]

    print("Using Cosine Similarity for:", query) # Debugging line

Enter your search query: taxi
Using TF-IDF ranking for: taxi


In [None]:
# Load metadata to map movie IDs to movie titles
metadata_rdd = metadata.map(lambda x: (x.split('\t')[0], x.split('\t')[2]))
metadata_dict = dict(metadata_rdd.collect())  # Convert to dictionary

# Get the top 10 documents with their IDs and scores
top_10_movies_with_scores = [
    (metadata_dict.get(doc_id, "Unknown"), score)
    for doc_id, score in top_10_ids
]

# Print in the desired format: Movie name (Score: score)
print("Top 10 Relevant Movies:")
for movie, score in top_10_movies_with_scores:
    print(f"{movie} (Score: {score:.4f})")

Top 10 Relevant Movies:
If Only (Score: 54.0506)
Decalogue V (Score: 36.0337)
Devudu Chesina Manushulu (Score: 31.5295)
Unnaruge Naan Irundhal (Score: 22.5211)
Midnight (Score: 22.5211)
All a Bir-r-r-rd (Score: 22.5211)
Ghost Dad (Score: 22.5211)
Man on the Moon (Score: 22.5211)
One Cab's Family (Score: 22.5211)
Marriage With a Fool (Score: 22.5211)
