In [1]:
import pyspark
from pyspark.sql import SparkSession, functions as F

sc = pyspark.SparkContext('local[*]')
spark = SparkSession \
    .builder \
    .getOrCreate()

# Loading Data

### Reading the Lens data

In [2]:
stripYear = F.udf(lambda title: title[:-7])
movies_ddf = (spark.read.csv('../../data/movies.csv', header=True, inferSchema=True)
              .drop('genres')
              .withColumn('Title', stripYear(F.col('title'))))
movies_ddf.show(5, False)
print(f"{movies_ddf.count()} movies")

+-------+---------------------------+
|movieId|Title                      |
+-------+---------------------------+
|1      |Toy Story                  |
|2      |Jumanji                    |
|3      |Grumpier Old Men           |
|4      |Waiting to Exhale          |
|5      |Father of the Bride Part II|
+-------+---------------------------+
only showing top 5 rows

86537 movies


In [3]:
ratings_ddf = (spark.read.csv('../../data/ratings.csv', header=True, inferSchema=True)
                .drop('timestamp'))
ratings_ddf.show(5, False)
print(f"{ratings_ddf.count()} ratings")

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |1      |4.0   |
|1     |110    |4.0   |
|1     |158    |4.0   |
|1     |260    |4.5   |
|1     |356    |5.0   |
+------+-------+------+
only showing top 5 rows

33832162 ratings


In [4]:
lens_ddf = (ratings_ddf
  .groupby('movieId')
  .avg('rating')
  .select(F.col('movieId'), F.col('avg(rating)').alias('Rating'))
  .join(movies_ddf, 'movieId'))
lens_ddf.show(5, False)

+-------+------------------+-------------------------+
|movieId|Rating            |Title                    |
+-------+------------------+-------------------------+
|1959   |3.633800573431029 |Out of Africa            |
|1591   |2.6484306887532694|Spawn                    |
|1580   |3.595333426758223 |Men in Black (a.k.a. MIB)|
|1645   |3.56697669143084  |The Devil's Advocate     |
|44022  |3.2430483271375463|Ice Age 2: The Meltdown  |
+-------+------------------+-------------------------+
only showing top 5 rows



### Reading the IMDB data

In [5]:
IMDB_df = (spark.read.csv('../../data/imdb_sample.csv', sep=';', header='true')
           .select('Id', 'Title', F.col('ImdbScore').alias('Rating')))
IMDB_df.show(5, False)
IMDB_df.count()

+---+------------------------+------+
|Id |Title                   |Rating|
+---+------------------------+------+
|1  |The Shawshank Redemption|9.2   |
|2  |The Godfather           |9.2   |
|3  |The Godfather: Part II  |9     |
|4  |Pulp Fiction            |8.9   |
|5  |Schindler's List        |8.9   |
+---+------------------------+------+
only showing top 5 rows



100

## When joining on (exact) Title

In [6]:
result = IMDB_df.join(lens_ddf, 'Title')
result.count()

72

## Record Linkage (Fuzzy Matching)

### Prepare join column by doing multiple transformations

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover, Tokenizer, NGram, HashingTF, MinHashLSH, RegexTokenizer, SQLTransformer

In [8]:
sqlTrans = SQLTransformer(statement="SELECT *, lower(Title) lower FROM __THIS__")
df1 = sqlTrans.transform(lens_ddf)
tokenizer = Tokenizer(inputCol="lower", outputCol="token")
df2 = tokenizer.transform(df1)
remover = StopWordsRemover(inputCol="token", outputCol="stop")
df3 = remover.transform(df2)
sqlTrans = SQLTransformer(statement="SELECT *, concat_ws(' ', stop) concat FROM __THIS__")
df4 = sqlTrans.transform(df3)
rtokenizer = RegexTokenizer(pattern="", inputCol="concat", outputCol="char", minTokenLength=1)
df5 = rtokenizer.transform(df4)
ngram = NGram(n=2, inputCol="char", outputCol="ngram")
df6 = ngram.transform(df5)
hashtf = HashingTF(inputCol="ngram", outputCol="vector")
df7 = hashtf.transform(df6)
minhash = MinHashLSH(inputCol="vector", outputCol="lsh", numHashTables=3)
model = minhash.fit(df7)
model.setInputCol("vector")


MinHashLSHModel: uid=MinHashLSH_651afdbe70c8, numHashTables=3

In [9]:
#model.transform(df7).show(10,False)
#df6.count()
model.getOutputCol()

'lsh'

In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover, Tokenizer, NGram, HashingTF, MinHashLSH, RegexTokenizer, SQLTransformer

model = Pipeline(stages=[
    SQLTransformer(statement="SELECT *, lower(Title) lower FROM __THIS__"),
    Tokenizer(inputCol="lower", outputCol="token"),
    StopWordsRemover(inputCol="token", outputCol="stop"),
    SQLTransformer(statement="SELECT *, concat_ws(' ', stop) concat FROM __THIS__"),
    RegexTokenizer(pattern="", inputCol="concat", outputCol="char", minTokenLength=1),
    NGram(n=2, inputCol="char", outputCol="ngram"),
    HashingTF(inputCol="ngram", outputCol="vector"),
    MinHashLSH(inputCol="vector", outputCol="lsh", numHashTables=3)
]).fit(lens_ddf)

result_lens = model.transform(lens_ddf)
result_lens = result_lens.filter(F.size(F.col("ngram")) > 0)
print(f"Example transformation ({result_lens.count()} movies left):")
result_lens.select('movieId', 'Title', 'concat', 'char', 'ngram', 'vector', 'lsh').show(1)

Example transformation (82905 movies left):
+-------+-------------+------+------------------+--------------------+--------------------+--------------------+
|movieId|        Title|concat|              char|               ngram|              vector|                 lsh|
+-------+-------------+------+------------------+--------------------+--------------------+--------------------+
|   1959|Out of Africa|africa|[a, f, r, i, c, a]|[a f, f r, r i, i...|(262144,[5118,318...|[[1.79617336E8], ...|
+-------+-------------+------+------------------+--------------------+--------------------+--------------------+
only showing top 1 row



In [22]:
model.stages[-1]

MinHashLSHModel: uid=MinHashLSH_7d2003f0168a, numHashTables=3

In [23]:
# Use pipeline previous defined
result_imdb = model.transform(IMDB_df)
filtered = result_imdb.filter(F.size(F.col("ngram")) < 1)
print(f"Filtered out rows: {filtered.count()}")
filtered.select('id', 'Title', 'concat', 'char', 'ngram', 'vector').show()
result_imdb = result_imdb.filter(F.size(F.col("ngram")) > 0)
print(f"Example transformation ({result_imdb.count()} movies left):")
result_imdb.select('id', 'Title', 'concat', 'char', 'ngram', 'vector', 'lsh').show(1)

Filtered out rows: 1
+---+-----+------+----+-----+--------------+
| id|Title|concat|char|ngram|        vector|
+---+-----+------+----+-----+--------------+
| 69|    M|     m| [m]|   []|(262144,[],[])|
+---+-----+------+----+-----+--------------+

Example transformation (99 movies left):
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| id|               Title|              concat|                char|               ngram|              vector|                 lsh|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  1|The Shawshank Red...|shawshank redemption|[s, h, a, w, s, h...|[s h, h a, a w, w...|(262144,[6924,327...|[[1.95771365E8], ...|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 1 row



### Join based on Jaccard Distance

In [24]:
result = model.stages[-1].approxSimilarityJoin(result_imdb, result_lens, 0.5, "jaccardDist")
print(f"{result.count()} matches")
(result
 .select('datasetA.id', 'datasetA.Title', 'datasetB.Title', 'jaccardDist')
 .sort(F.col('datasetA.id'))
 .show(20, True))

434 matches
+---+--------------------+--------------------+--------------------+
| id|               Title|               Title|         jaccardDist|
+---+--------------------+--------------------+--------------------+
|  1|The Shawshank Red...|Shawshank Redempt...| 0.05555555555555558|
| 10|          Fight Club|   Female Fight Club|              0.4375|
| 10|          Fight Club|          Fight Club|                 0.0|
| 10|          Fight Club|   Zombie Fight Club|              0.4375|
|100| Inglorious Basterds|Inglourious Basterds| 0.10526315789473684|
|100| Inglorious Basterds|The Real Inglorio...|                0.36|
| 11|The Lord of the R...|Lord of the Rings...|0.045454545454545414|
| 12|        Forrest Gump|        Forrest Gump|                 0.0|
| 13|          Goodfellas|          Goodfellas|                 0.0|
| 14|One Flew Over the...|One Flew Over the...| 0.04761904761904767|
| 15|Star Wars Episode...|Star Wars: Episod...|  0.1578947368421053|
| 16|           Incept

In [26]:
result.printSchema()

root
 |-- datasetA: struct (nullable = false)
 |    |-- Id: string (nullable = true)
 |    |-- Title: string (nullable = true)
 |    |-- Rating: string (nullable = true)
 |    |-- lower: string (nullable = true)
 |    |-- token: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- stop: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- concat: string (nullable = false)
 |    |-- char: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- ngram: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- vector: vector (nullable = true)
 |    |-- lsh: array (nullable = true)
 |    |    |-- element: vector (containsNull = true)
 |-- datasetB: struct (nullable = false)
 |    |-- movieId: integer (nullable = true)
 |    |-- Rating: double (nullable = true)
 |    |-- Title: string (nullable = true)
 |    |-- lower: string (nullable = true)
 |    |-- token: array

### Optimization: Only keep single row with lowest distance

In [28]:
from pyspark.sql import Window
w = Window.partitionBy('datasetA.id')
result = (result
           .withColumn('minDist', F.min('jaccardDist').over(w))
           .where(F.col('jaccardDist') == F.col('minDist'))
           .drop('minDist'))
print(f"{result.count()} matches")
(result
 .select('datasetA.Title', 'datasetB.Title', 'jaccardDist')
 .sort(F.col('datasetA.id'))
 .show(20,False))

141 matches
+---------------------------------------------+--------------------------------------------------+--------------------+
|Title                                        |Title                                             |jaccardDist         |
+---------------------------------------------+--------------------------------------------------+--------------------+
|The Shawshank Redemption                     |Shawshank Redemption, The                         |0.05555555555555558 |
|Fight Club                                   |Fight Club                                        |0.0                 |
|Inglorious Basterds                          |Inglourious Basterds                              |0.10526315789473684 |
|The Lord of the Rings: Fellowship of the Ring|Lord of the Rings: The Fellowship of the Ring, The|0.045454545454545414|
|Forrest Gump                                 |Forrest Gump                                      |0.0                 |
|Goodfellas                 

* Missed IDs: 27, 34, 49, 67, 69, 70, 89, 96
* Faulty Matches: 2, 20, 29, 31, 62

## Final DataFrame

In [11]:
result.select('datasetA.Title', 'datasetB.Title', 'datasetA.Rating', 'datasetB.Rating').show(5)

+--------------------+--------------------+------+------------------+
|               Title|               Title|Rating|            Rating|
+--------------------+--------------------+------+------------------+
|               Alien|               Alien|   8.5| 4.055518882196001|
|Star Wars Episode...|Star Wars: Episod...|   8.7| 4.144122313069856|
|       The Lion King|       The Lion King|   8.4|  3.14922480620155|
|The Lord of the R...|Lord of the Rings...|   8.8| 4.091188818716808|
|Once upon a Time ...|    Once Upon a Time|   8.6|3.3636363636363638|
+--------------------+--------------------+------+------------------+
only showing top 5 rows

