In [0]:
##########
# PART 1 #
##########

import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.tag import pos_tag
from nltk.chunk import ne_chunk

In [0]:
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('maxent_ne_chunker')
nltk.download('words')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package maxent_ne_chunker to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package maxent_ne_chunker is already up-to-date!
[nltk_data] Downloading package words to /root/nltk_data...
[nltk_data]   Package words is already up-to-date!


Out[106]: True

In [0]:
def extract_named_entities(text):
    words = word_tokenize(text)
    tagged = pos_tag(words)
    return [" ".join([c[0] for c in chunk]) for chunk in ne_chunk(tagged) if hasattr(chunk, 'label')]

In [0]:
# Import Gutenberg text file into RDD
text_rdd = sc.textFile('dbfs:/FileStore/tables/moby_dick.txt')

# Extract named entities from the RDD
named_entities_rdd = text_rdd.flatMap(extract_named_entities)

# MapReduce to count named entities and sort the result in descending order of count
sorted_named_entities_counts = named_entities_rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: -x[1])

# Display the top 10 named entities and their counts
for entity, count in sorted_named_entities_counts.take(10):
    print(f'{entity}: {count}')

Ahab: 406
Stubb: 236
Queequeg: 209
Pequod: 158
Starbuck: 148
Flask: 91
Jonah: 75
Sperm Whale: 70
Nantucket: 66
Captain: 64


In [0]:
##########
# PART 2 #
##########

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

from numpy import dot
from numpy.linalg import norm

from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

from collections import Counter

import string
import math

In [0]:
nltk.download('punkt')
nltk.download("stopwords")
stop_words = set(stopwords.words('english'))

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [0]:
# Step 1: Load Data
plot_df = spark.read.csv('dbfs:/FileStore/tables/plot_summaries.txt', sep="\t")
new_plot_column_names = ["movie_id", "plot_summary"]
for old_col, new_col in zip(plot_df.columns, new_plot_column_names):
    plot_df = plot_df.withColumnRenamed(old_col, new_col)

In [0]:
plot_df.show()

+--------+--------------------+
|movie_id|        plot_summary|
+--------+--------------------+
|23890098|Shlykov, a hard-w...|
|31186339|The nation of Pan...|
|20663735|Poovalli Induchoo...|
| 2231378|The Lemon Drop Ki...|
|  595909|Seventh-day Adven...|
| 5272176|The president is ...|
| 1952976|{{plot}} The film...|
|24225279|The story begins ...|
| 2462689|Infuriated at bei...|
|20532852|A line of people ...|
|15401493|Lola  attempts to...|
|18188932|Milan and Goran a...|
| 2940516|Bumbling pirate c...|
| 1335380|The film is based...|
| 1480747|{{plot}} Followin...|
|24448645|Despite Lucy's re...|
|15072401|Alan Colby, heir ...|
| 4018288|Debbie's favorite...|
| 4596602|Ashes to Ashes is...|
|15224586|The film follows ...|
+--------+--------------------+
only showing top 20 rows



In [0]:
# Step 1.5: Create mapping of movie ids to movie names
movie_df = spark.read.csv("dbfs:/FileStore/tables/movie_metadata.tsv", sep="\t")
new_movie_column_names = [
    "movie_id",
    "freebase_id",
    "movie_name",
    "release_date",
    "revenue",
    "runtime",
    "languages",
    "countries",
    "genres",
]
for old_col, new_col in zip(movie_df.columns, new_movie_column_names):
    movie_df = movie_df.withColumnRenamed(old_col, new_col)
    
movie_plot_df = plot_df.join(
    movie_df.select("movie_id", "movie_name"), on="movie_id", how="inner"
)
movie_id_name_dict = movie_df.select("movie_id", "movie_name").rdd.collectAsMap()

In [0]:
def preprocess_text(text):
    # Tokenize the text
    tokens = word_tokenize(text)
    
    # Remove punctuation and convert to lowercase
    tokens = [token.translate(str.maketrans('', '', string.punctuation)).lower() for token in tokens]
    
    # Remove stopwords
    tokens = [token for token in tokens if token not in stop_words]

    # Remove words under 2 letters
    tokens = [token for token in tokens if len(token) > 2]
    
    return ' '.join(tokens)

In [0]:
# Step 2: Preprocessing
# Apply the preprocessing UDF to the plot summary column
plot_df = plot_df.withColumn(
    "plot_summary", udf(preprocess_text, StringType())(plot_df["plot_summary"])
)
plot_df.show()

+--------+--------------------+
|movie_id|        plot_summary|
+--------+--------------------+
|23890098|shlykov hardworki...|
|31186339|nation panem cons...|
|20663735|poovalli induchoo...|
| 2231378|lemon drop kid ne...|
|  595909|seventhday advent...|
| 5272176|president way giv...|
| 1952976|plot film opens 1...|
|24225279|story begins hann...|
| 2462689|infuriated told w...|
|20532852|line people drool...|
|15401493|lola attempts gai...|
|18188932|milan goran two c...|
| 2940516|bumbling pirate c...|
| 1335380|film based events...|
| 1480747|plot following su...|
|24448645|despite lucy rese...|
|15072401|alan colby heir v...|
| 4018288|debbie favorite b...|
| 4596602|ashes ashes set l...|
|15224586|film follows expe...|
+--------+--------------------+
only showing top 20 rows



In [0]:
N = plot_df.count()

In [0]:
# Step 3: Calculate TF-IDF
tf = (
    plot_df.rdd.flatMap(
        lambda x: [((x[0], i, len(x[1].split())), 1) for i in x[1].split()]
    )
    .reduceByKey(lambda x, y: x + y)
    .map(lambda x: (x[0][1], (x[0][0], x[1] / x[0][2])))
)

In [0]:
idf = (
    tf.map(lambda x: (x[0], 1))
    .reduceByKey(lambda x, y: x + y)
    .map(lambda x: (x[0], math.log10(N / x[1])))
)

In [0]:
idf_dict = idf.collectAsMap()

In [0]:
tf_idf = (
    tf.join(idf)
    .map(lambda x: (x[1][0][0], x[0], x[1][0][1], x[1][1], x[1][0][1] * x[1][1]))
    .sortBy(lambda x: x[0])
)

In [0]:
tf_idf.toDF(["movie_id","term","tf","idf","tf-idf"]).show()

+--------+-----------+--------------------+------------------+--------------------+
|movie_id|       term|                  tf|               idf|              tf-idf|
+--------+-----------+--------------------+------------------+--------------------+
|10000053|    decides|0.003215434083601286| 0.759701209018187|0.002442769160830...|
|10000053|     starts|0.006430868167202572|0.9384273450261305|0.006034902540360968|
|10000053| threatened|0.003215434083601286|1.9062426616547292|0.006129397625899451|
|10000053|   murdered|0.003215434083601286|1.3580580511096214|0.004366746145047014|
|10000053| eventually|0.003215434083601286|0.8083080959140502|0.002599061401652...|
|10000053|     spends|0.003215434083601286|1.5917735084353657|0.005118242792396675|
|10000053|opportunity|0.003215434083601286| 1.577571878532336|0.005072578387563782|
|10000053|      shock|0.003215434083601286|1.7227694489764485|0.005539451604425879|
|10000053|    waiting|0.003215434083601286|1.3604055945656068|0.004374294516

In [0]:
tf_idf_dict = tf_idf.map(lambda x: ((x[0], x[1]), x[4])).collectAsMap()

In [0]:
movie_ids = tf_idf.map(lambda x: x[0]).distinct()

In [0]:
def print_search_results(query, results):
    if len(results) == 0:
        print(f"No results found for '{query}'")
    else:
        print(f"Search results for '{query}':")
        for i, result in enumerate(results, start=1):
            print(f"{i}. {result}")
    print('----------------------------------------')

In [0]:
def tf_idf_terms(terms):
    tf = sorted(map(lambda x: (x[0], x[1] / len(terms)), Counter(terms).items()))
    idf = sorted([(term, idf_dict.get(term, 0)) for term in terms])
    tf_idf = [(term, tf * idf) for (term, tf), (term, idf) in zip(tf, idf)]
    return tf_idf

def compute_cosine_similarity(terms, movie_id):
    terms_tf_idf = tf_idf_terms(terms)
    movies_tf_idf = [(term[0], tf_idf_dict.get((movie_id, term[0]), 0)) for term in terms_tf_idf]
    terms_vals = [tf_idf for term, tf_idf in terms_tf_idf]
    movie_vals = [tf_idf for term, tf_idf in movies_tf_idf]

    # # Compute dot product
    dot_product = dot(terms_vals, movie_vals)

    # # Compute magnitudes
    magnitude = norm(terms_vals) * norm(movie_vals)

    return dot_product / magnitude if magnitude != 0 else 0

In [0]:
# Step 4: Queries
queries = [
    "action",
    "romance",
    "Funny movie with actions scenes",
    "Bloody movie with vampires"
]

for query in queries:
    terms = preprocess_text(query).split()
    if len(terms) == 1:
        term = terms[0]
        top_movies = tf_idf.filter(lambda x: x[1] == term).sortBy(lambda x: x[4], ascending=False).take(10)
        top_movie_names = [movie_id_name_dict[top_movie_id] for top_movie_id, *_ in top_movies]
    else:
        top_movies = movie_ids.map(lambda x: (x, compute_cosine_similarity(terms, x))).sortBy(lambda x: x[1], ascending=False).take(10)
        top_movie_names = [movie_id_name_dict[top_movie_id] for top_movie_id, *_ in top_movies]

    print_search_results(query, top_movie_names)

Search results for 'action':
1. Giri
2. Tiger
3. Ganga Ki Kasam
4. Hitler
5. Numbri Aadmi
6. Kaalia
7. Anyay Abichar
8. Crayon Shin-chan: The Storm Called: Operation Golden Spy
9. Cracker Jack
10. Kranti Kshetra
----------------------------------------
Search results for 'romance':
1. Ranmuthu Duwa
2. L'Amour
3. Office Lady Love Juice
4. That Dangerous Age
5. Lovers and Liars
6. Moment by Moment
7. Hollywood Dreams
8. Barefooted Youth
9. Coming Back
10. Dance Pretty Lady
----------------------------------------
Search results for 'Funny movie with actions scenes':
1. The Perfect Holiday
2. Phobia 2
3. Le Distrait
4. My Name Is Joe
5. Full Grown Men
6. Delta Farce
7. Taking Five
8. Meet the Fockers
9. Trail of the Pink Panther
10. Modalasala
----------------------------------------
Search results for 'Bloody movie with vampires':
1. Blood of Dracula's Castle
2. The Blood Spattered Bride
3. Goke, Body Snatcher from Hell
4. The Addiction
5. The Return of Count Yorga
6. 30 Days of Night: B