In [1]:
import pyspark
from pyspark.sql import SparkSession, functions as F

sc = pyspark.SparkContext('local[*]')
spark = SparkSession \
    .builder \
    .getOrCreate()

# Loading Data

### Reading the Lens data

In [2]:
stripYear = F.udf(lambda title: title[:-7])
movies_ddf = (spark.read.csv('movies.csv', header=True, inferSchema=True)
              .drop('genres')
              .withColumn('Title', stripYear(F.col('title'))))
movies_ddf.show(5, False)
print(f"{movies_ddf.count()} movies")

+-------+---------------------------+
|movieId|Title                      |
+-------+---------------------------+
|1      |Toy Story                  |
|2      |Jumanji                    |
|3      |Grumpier Old Men           |
|4      |Waiting to Exhale          |
|5      |Father of the Bride Part II|
+-------+---------------------------+
only showing top 5 rows

62423 movies


In [3]:
ratings_ddf = (spark.read.csv('ratings.csv', header=True, inferSchema=True)
                .drop('timestamp'))
ratings_ddf.show(5, False)
print(f"{ratings_ddf.count()} ratings")

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |296    |5.0   |
|1     |306    |3.5   |
|1     |307    |5.0   |
|1     |665    |5.0   |
|1     |899    |3.5   |
+------+-------+------+
only showing top 5 rows

25000095 ratings


In [4]:
lens_ddf = (ratings_ddf
  .groupby('movieId')
  .avg('rating')
  .select(F.col('movieId'), F.col('avg(rating)').alias('Rating'))
  .join(movies_ddf, 'movieId'))
lens_ddf.show(5, False)

+-------+------------------+-------------------------+
|movieId|Rating            |Title                    |
+-------+------------------+-------------------------+
|1088   |3.25002094679514  |Dirty Dancing            |
|1580   |3.5817083457378187|Men in Black (a.k.a. MIB)|
|3175   |3.6077836141619484|Galaxy Quest             |
|44022  |3.2593627146699773|Ice Age 2: The Meltdown  |
|175197 |2.754918032786885 |The Dark Tower           |
+-------+------------------+-------------------------+
only showing top 5 rows



### Reading the IMDB data

In [5]:
IMDB_df = (spark.read.csv('imdb_sample.csv', sep=';', header='true')
           .select('Id', 'Title', F.col('ImdbScore').alias('Rating')))
IMDB_df.show(5, False)
IMDB_df.count()

+---+------------------------+------+
|Id |Title                   |Rating|
+---+------------------------+------+
|1  |The Shawshank Redemption|9.2   |
|2  |The Godfather           |9.2   |
|3  |The Godfather: Part II  |9     |
|4  |Pulp Fiction            |8.9   |
|5  |Schindler's List        |8.9   |
+---+------------------------+------+
only showing top 5 rows



100

## When joining on (exact) Title

In [6]:
result = IMDB_df.join(lens_ddf, 'Title')
result.count()

64

## Record Linkage (Fuzzy Matching)

### Prepare join column by doing multiple transformations

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover, Tokenizer, NGram, HashingTF, MinHashLSH, RegexTokenizer, SQLTransformer

model = Pipeline(stages=[
    SQLTransformer(statement="SELECT *, lower(Title) lower FROM __THIS__"),
    Tokenizer(inputCol="lower", outputCol="token"),
    StopWordsRemover(inputCol="token", outputCol="stop"),
    SQLTransformer(statement="SELECT *, concat_ws(' ', stop) concat FROM __THIS__"),
    RegexTokenizer(pattern="", inputCol="concat", outputCol="char", minTokenLength=1),
    NGram(n=2, inputCol="char", outputCol="ngram"),
    HashingTF(inputCol="ngram", outputCol="vector"),
    MinHashLSH(inputCol="vector", outputCol="lsh", numHashTables=3)
]).fit(lens_ddf)

result_lens = model.transform(lens_ddf)
result_lens = result_lens.filter(F.size(F.col("ngram")) > 0)
print(f"Example transformation ({result_lens.count()} movies left):")
result_lens.select('movieId', 'Title', 'concat', 'char', 'ngram', 'vector', 'lsh').show(1)

Example transformation (58818 movies left):
+-------+-------------+-------------+--------------------+--------------------+--------------------+--------------------+
|movieId|        Title|       concat|                char|               ngram|              vector|                 lsh|
+-------+-------------+-------------+--------------------+--------------------+--------------------+--------------------+
|   1088|Dirty Dancing|dirty dancing|[d, i, r, t, y,  ...|[d i, i r, r t, t...|(262144,[1726,265...|[[9307048.0], [1....|
+-------+-------------+-------------+--------------------+--------------------+--------------------+--------------------+
only showing top 1 row



In [8]:
# Use pipeline previous defined
result_imdb = model.transform(IMDB_df)
filtered = result_imdb.filter(F.size(F.col("ngram")) < 1)
print(f"Filtered out rows: {filtered.count()}")
filtered.select('id', 'Title', 'concat', 'char', 'ngram', 'vector').show()
result_imdb = result_imdb.filter(F.size(F.col("ngram")) > 0)
print(f"Example transformation ({result_imdb.count()} movies left):")
result_imdb.select('id', 'Title', 'concat', 'char', 'ngram', 'vector', 'lsh').show(1)

Filtered out rows: 1
+---+-----+------+----+-----+--------------+
| id|Title|concat|char|ngram|        vector|
+---+-----+------+----+-----+--------------+
| 69|    M|     m| [m]|   []|(262144,[],[])|
+---+-----+------+----+-----+--------------+

Example transformation (99 movies left):
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| id|               Title|              concat|                char|               ngram|              vector|                 lsh|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  1|The Shawshank Red...|shawshank redemption|[s, h, a, w, s, h...|[s h, h a, a w, w...|(262144,[5535,212...|[[4.6614889E7], [...|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 1 row



### Join based on Jaccard Distance

In [9]:
result = model.stages[-1].approxSimilarityJoin(result_imdb, result_lens, 0.5, "jaccardDist")
print(f"{result.count()} matches")
(result
 .select('datasetA.id', 'datasetA.Title', 'datasetB.Title', 'jaccardDist')
 .sort(F.col('datasetA.id'))
 .show(5, True))

335 matches
+---+--------------------+--------------------+-------------------+
| id|               Title|               Title|        jaccardDist|
+---+--------------------+--------------------+-------------------+
|  1|The Shawshank Red...|Shawshank Redempt...|0.05555555555555558|
|  1|The Shawshank Red...|          Redemption|0.47058823529411764|
| 10|          Fight Club|   Female Fight Club|             0.4375|
| 10|          Fight Club|          Fight Club|                0.0|
| 10|          Fight Club|   Zombie Fight Club|             0.4375|
+---+--------------------+--------------------+-------------------+
only showing top 5 rows



### Optimization: Only keep single row with lowest distance

In [10]:
from pyspark.sql import Window
w = Window.partitionBy('datasetA.id')
result = (result
           .withColumn('minDist', F.min('jaccardDist').over(w))
           .where(F.col('jaccardDist') == F.col('minDist'))
           .drop('minDist'))
print(f"{result.count()} matches")
(result
 .select('datasetA.Title', 'datasetB.Title', 'jaccardDist')
 .sort(F.col('datasetA.id'))
 .show(5))

120 matches
+--------------------+--------------------+--------------------+
|               Title|               Title|         jaccardDist|
+--------------------+--------------------+--------------------+
|The Shawshank Red...|Shawshank Redempt...| 0.05555555555555558|
|          Fight Club|          Fight Club|                 0.0|
| Inglorious Basterds|Inglourious Basterds| 0.10526315789473684|
|The Lord of the R...|Lord of the Rings...|0.045454545454545414|
|        Forrest Gump|        Forrest Gump|                 0.0|
+--------------------+--------------------+--------------------+
only showing top 5 rows



* Missed IDs: 27, 34, 49, 67, 69, 70, 89, 96
* Faulty Matches: 2, 20, 29, 31, 62

## Final DataFrame

In [11]:
result.select('datasetA.Title', 'datasetB.Title', 'datasetA.Rating', 'datasetB.Rating').show(5)

+--------------------+--------------------+------+------------------+
|               Title|               Title|Rating|            Rating|
+--------------------+--------------------+------+------------------+
|               Alien|               Alien|   8.5| 4.055518882196001|
|Star Wars Episode...|Star Wars: Episod...|   8.7| 4.144122313069856|
|       The Lion King|       The Lion King|   8.4|  3.14922480620155|
|The Lord of the R...|Lord of the Rings...|   8.8| 4.091188818716808|
|Once upon a Time ...|    Once Upon a Time|   8.6|3.3636363636363638|
+--------------------+--------------------+------+------------------+
only showing top 5 rows

