In [3]:
from pyspark.sql import SparkSession

import pandas as pd


In [2]:
spark = SparkSession.builder \
    .appName("EntityResolution") \
    .getOrCreate()

24/10/22 02:37:39 WARN Utils: Your hostname, selena resolves to a loopback address: 127.0.1.1; using 10.86.1.141 instead (on interface wlp0s20f3)
24/10/22 02:37:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/22 02:37:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/22 02:37:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
df = pd.read_csv("data/english_movies.csv")
spark_df = spark.createDataFrame(df)
spark_df.show()

24/10/22 02:39:12 WARN TaskSetManager: Stage 0 contains a task of very large size (4923 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+---------+--------------------+--------------------+--------------------+-----+---------------+
|  imdb_id|               title|       plot_synopsis|                tags|split|synopsis_source|
+---------+--------------------+--------------------+--------------------+-----+---------------+
|tt1733125|Dungeons & Dragon...|Two thousand year...|            violence|train|           imdb|
|tt0033045|The Shop Around t...|Matuschek's, a gi...|            romantic| test|           imdb|
|tt0113862|  Mr. Holland's Opus|Glenn Holland, no...|inspiring, romant...|train|           imdb|
|tt0408790|          Flightplan|Kyle Pratt (Jodie...|mystery, suspense...|train|           imdb|
|tt0078908|           The Brood|At the Somafree I...|cult, psychedelic...|train|           imdb|
|tt0795493|   Cassandra's Dream|Brothers Terry (C...|tragedy, dramatic...|train|      wikipedia|
|tt0093389|    The Last Emperor|Arrival.\nA train...|              murder|train|           imdb|
|tt0120899|      My Life So Fa

In [7]:
distinct_english_titles = spark_df.select("title").distinct()
movie_list = [row.title for row in distinct_english_titles.collect()]
print("\n Number of distinct English movie titles: ", len(movie_list))
movie_list

24/10/22 02:42:52 WARN TaskSetManager: Stage 4 contains a task of very large size (4923 KiB). The maximum recommended task size is 1000 KiB.



 Number of distinct English movie titles:  7557


['Big Nothing',
 "Evolution's Achilles' Heels",
 'How to Deal',
 'The Honeymoon Killers',
 "Cassandra's Dream",
 'The Final Destination',
 'Thralls',
 'Funny Show Part Two: The Video - Movie',
 'Night of the Lepus',
 'American History X',
 'Extremely Loud & Incredibly Close',
 'Point Break',
 'The Men Who Stare at Goats',
 'Pandora and the Flying Dutchman',
 'Bad Day at Black Rock',
 'The Wild Bunch',
 'A Murder of Crows',
 'Gangs of New York',
 "That's My Boy",
 "Cat's Eye",
 'Confessions of a Shopaholic',
 'Ghost Town',
 'Red: Werewolf Hunter',
 'Chasing Fire',
 'Maps to the Stars',
 'Executive Action',
 'The Heart of No Place',
 'Terms of Endearment',
 'Just Before Dawn',
 "There's Always Vanilla",
 'The Fisher King',
 'The Wolverine',
 'War of the Satellites',
 'Nine to Five',
 'The Fall',
 'The Quatermass Xperiment',
 'Justice League: War',
 'GirlFight: inVite',
 'Dungeons & Dragons: The Book of Vile Darkness',
 'Foul Play',
 'Survival of the Dead',
 'Prom Night',
 'The Thrill of 

In [10]:
from pyspark.sql.functions import col, regexp_extract
import re

In [11]:
escaped_titles = [re.escape(title) for title in movie_list]
movie_pattern = "|".join(escaped_titles)

In [13]:
reddit_df = pd.read_csv("data/reddit_posts.csv")
reddit_df = spark.createDataFrame(reddit_df)
reddit_df.show()

+--------------------+--------------------+-----+-----+---+------------+----------+-----------+----------+
|               title|             content|score|   id|url|num_comments|   created|  subreddit|      date|
+--------------------+--------------------+-----+-----+---+------------+----------+-----------+----------+
|Hellboy II: The G...|                 NaN|    7|am07r|NaN|           0|1262739798|moviecritic|2010-01-01|
|At Play in the Fi...|                 NaN|    6|ammjm|NaN|           2|1262859933|moviecritic|2010-01-01|
|Blacktrospective ...|                 NaN|    1|aklk1|NaN|           0|1262379003|moviecritic|2010-01-01|
|Movie Tuesday « 1...|                 NaN|    1|ako0b|NaN|           0|1262401769|moviecritic|2010-01-01|
|Let the WTFs Fly:...|                 NaN|    1|ale17|NaN|           0|1262615469|moviecritic|2010-01-01|
|20 Best Vampire M...|                 NaN|    1|almm3|NaN|           0|1262663783|moviecritic|2010-01-01|
|One More for Good...|               

In [16]:
entities_matched = reddit_df.withColumn("movie_title", regexp_extract(col("title"), movie_pattern, 0))
entities_matched.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+-----+---+------------+----------+-----------+----------+---------------------------------+
|title                                                                                                                                        |content                                                                                                                                                                                                                         

In [24]:
print("\n Number of entities matched: ", entities_matched.filter(col("movie_title") != "").count())

24/10/22 03:19:58 WARN TaskSetManager: Stage 11 contains a task of very large size (1316 KiB). The maximum recommended task size is 1000 KiB.


 Number of entities matched:  13896


                                                                                

In [18]:
pandas_entities_matched = entities_matched.toPandas()
pandas_entities_matched.to_csv("data/entities_matched.csv", index=False)

24/10/22 03:13:25 WARN TaskSetManager: Stage 10 contains a task of very large size (1316 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

## Spacy NER

In [20]:
import spacy
from spacy.pipeline import EntityRuler

nlp = spacy.load("en_core_web_sm")

In [22]:
ruler = ruler = nlp.add_pipe("entity_ruler", before="ner")

patterns = [{"label": "MOVIE", "pattern": title} for title in movie_list]
ruler.add_patterns(patterns)

In [23]:
nlp.add_pipe(ruler, before="ner")

ValueError: [E966] `nlp.add_pipe` now takes the string name of the registered component factory, not a callable component. Expected string, but got <spacy.pipeline.entityruler.EntityRuler object at 0x7427eeffeac0> (name: 'None').

- If you created your component with `nlp.create_pipe('name')`: remove nlp.create_pipe and call `nlp.add_pipe('name')` instead.

- If you passed in a component like `TextCategorizer()`: call `nlp.add_pipe` with the string name instead, e.g. `nlp.add_pipe('textcat')`.

- If you're using a custom component: Add the decorator `@Language.component` (for function components) or `@Language.factory` (for class components / factories) to your custom component and assign it a name, e.g. `@Language.component('your_name')`. You can then run `nlp.add_pipe('your_name')` to add it to the pipeline.