<a href="https://colab.research.google.com/github/rajeshmore1/Entity_Resolution/blob/main/Entity_Resolution_10Feb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import requests
from io import BytesIO
import pandas as pd
import numpy as np

pd.options.display.max_colwidth = None
url = "https://raw.githubusercontent.com/tomonori-masui/entity-resolution/main/data/musicbrainz_200k.csv"
res = requests.get(url)
df = pd.read_csv(BytesIO(res.content))

In [None]:
df.head()

In [None]:
english_cids = df[
    df.language.str.lower().str.contains("^en|^eg", na=False)
].CID.unique()

df = df[df.CID.isin(english_cids)].reset_index(drop=True)

In [None]:
df.shape

In [None]:
#cleaning
for col in ["title", "artist", "album"]:
    df[col] = (
        df[col]
        .str.lower()
        .replace("[^a-z0-9]", " ", regex=True)  # replacing special characters with a space
        .replace(" +", " ", regex=True)         # removing consecutive spaces
        .str.strip()                            # removing leading and tailing spaces
    )

df.loc[df.number.notna(), "number"] = (
    df[df.number.notna()]
    .number.replace("[^0-9]", "", regex=True)              # removing non-digits
    .apply(lambda x: str(int(x)) if len(x) > 0 else None)  # removing leading zeros
)

In [75]:
df.head()

Row(TID=4, CID=2, CTID=1, SourceID=3, id='unk.', number='17', title='mustard gas there and back again lane', length='2.15', artist='action painting', album='NaN', year="'95", language='English')

In [None]:
df.info()

#featurization and blocking

In the context of ER, featurization means transforming existing columns into derived features that can inform whether disparate records refer to the same thing. Blocking means selecting a targeted subset of features to use as self join keys to efficiently create potential match candidate pairs. The reason these two steps are grouped in our discussion is blocking keys are often derived from features.

Good featurization enables efficient blocking and also good match accuracy downstream, therefore it is a critical piece of the ER process. It is also the least defined step, where a lot of creativity can be injected into the process.

In [49]:
pip install pyspark



In [73]:
import pyspark
from pyspark.sql import SparkSession


In [74]:
#spark = SparkSession.builder.appName("entity_resolution").config("spark.jars.packages", "graphframes:graphframes:0.6").getOrCreate()
spark = SparkSession.builder.master("local[*]").config("spark.jars.packages", "graphframes:graphframes:0.7.0-spark2.4-s_2.11").getOrCreate()

df = spark.createDataFrame(df)

PySparkTypeError: [SHOULD_NOT_DATAFRAME] Argument `data` should not be a DataFrame.

In [52]:
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql import Window as w

from pyspark.ml.linalg import DenseVector, SparseVector
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, RegexTokenizer, CountVectorizer, StopWordsRemover, NGram, Normalizer, VectorAssembler, Word2Vec, Word2VecModel, PCA
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.linalg import VectorUDT, Vectors
import tensorflow_hub as hub
from pyspark.sql.functions import udf

def tokenize(df, string_cols):
  output = df
  for c in string_cols:
    output = output.withColumn('temp', f.coalesce(f.col(c), f.lit('')))
    tokenizer = RegexTokenizer(inputCol='temp', outputCol=c+"_tokens", pattern = "\\W")
    remover = StopWordsRemover(inputCol=c+"_tokens", outputCol=c+"_swRemoved")
    output = tokenizer.transform(output)
    output = remover.transform(output)\
      .drop('temp', c+"_tokens")

  return output

def top_kw_from_tfidf(vocab, n=3):
  @udf(returnType=t.ArrayType(t.StringType()))
  def _(arr):
    inds = arr.indices
    vals = arr.values
    top_inds = vals.argsort()[-n:][::-1]
    top_keys = inds[top_inds]
    output = []

    for k in top_keys:
      kw = vocab.value[k]
      output.append(kw)

    return output
  return _

def tfidf_top_tokens(df, token_cols, min_freq=1):
  output = df
  for c in token_cols:
    pre = c
    cv = CountVectorizer(inputCol=pre, outputCol=pre+'_rawFeatures', minDF=min_freq)
    idf = IDF(inputCol=pre+"_rawFeatures", outputCol=pre+"_features", minDocFreq=min_freq)
    normalizer = Normalizer(p=2.0, inputCol=pre+"_features", outputCol=pre+'_tfidf')
    stages = [cv, idf, normalizer]
    pipeline = Pipeline(stages=stages)
    model = pipeline.fit(output)
    output = model.transform(output)\
      .drop(pre+'_rawFeatures', pre+'_features')

    cvModel = model.stages[0]
    vocab = spark.sparkContext.broadcast(cvModel.vocabulary)
    output = output.withColumn(pre+'_top_tokens', top_kw_from_tfidf(vocab, n=5)(f.col(pre+"_tfidf")))

  return output


# magic function to load model only once per executor
MODEL = None
def get_model_magic():
  global MODEL
  if MODEL is None:
      MODEL = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")
  return MODEL

@udf(returnType=VectorUDT())
def encode_sentence(x):
  model = get_model_magic()
  emb = model([x]).numpy()[0]
  return Vectors.dense(emb)

blocking_df = tokenize(df, ['title',	'artist','album'])
blocking_df = tfidf_top_tokens(blocking_df, [c+'_swRemoved' for c in ['title',	'artist','album']])
blocking_df = blocking_df.withColumn('title_encoding', encode_sentence(f.coalesce(f.col('title'), f.lit(''))))\
  .withColumn('artist_encoding', encode_sentence(f.coalesce(f.col('artist'), f.lit(''))))\
  .withColumn('blocking_keys',
                f.array_union(f.col('title_swRemoved_top_tokens'), f.array_union(f.col('artist_swRemoved_top_tokens'), f.col('album_swRemoved_top_tokens')))
             )\
  .withColumn('uid', f.concat_ws('|', 'id', 'SourceID'))

In [53]:
# magic function to load model only once per executor
MODEL = None
def get_model_magic():
  global MODEL
  if MODEL is None:
      MODEL = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")
  return MODEL
@udf(returnType=VectorUDT())
def encode_sentence(x):
  model = get_model_magic()
  emb = model([x]).numpy()[0]
  return Vectors.dense(emb)

# candidate pair generation and match scoring

In [54]:
!pip install graphframes




In [55]:
import graphframes

In [56]:
blocking_df.show()

+---+-----+----+--------+---------------+------+--------------------+--------+--------------------+--------------------+----+--------+--------------------+--------------------+--------------------+---------------------+--------------------------+----------------------+---------------------------+---------------------+--------------------------+--------------------+--------------------+--------------------+-----------------+
|TID|  CID|CTID|SourceID|             id|number|               title|  length|              artist|               album|year|language|     title_swRemoved|    artist_swRemoved|     album_swRemoved|title_swRemoved_tfidf|title_swRemoved_top_tokens|artist_swRemoved_tfidf|artist_swRemoved_top_tokens|album_swRemoved_tfidf|album_swRemoved_top_tokens|      title_encoding|     artist_encoding|       blocking_keys|              uid|
+---+-----+----+--------+---------------+------+--------------------+--------+--------------------+--------------------+----+--------+----------

In [57]:
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql import Window as w
import numpy as np
from graphframes import GraphFrame
"""
keep_cols = ['source', 'name', 'description', 'manufacturer', 'price',
              'name_swRemoved', 'description_swRemoved', 'manufacturer_swRemoved',
             'name_swRemoved_tfidf', 'description_swRemoved_tfidf', 'manufacturer_swRemoved_tfidf',
             'name_encoding', 'description_encoding']
"""
keep_cols = [ 'title', 'length', 'artist', 'album',
                'year', 'language', 'title_swRemoved', 'artist_swRemoved', 'album_swRemoved',
                'title_swRemoved_tfidf', 'artist_swRemoved_tfidf',
                'album_swRemoved_tfidf',
                'title_encoding', 'artist_encoding', 'blocking_keys']


LARGEST_BLOCK = 100

node = blocking_df.select(f.col('uid').alias('id'), *keep_cols)
keep_pairs = blocking_df.select(f.explode('blocking_keys').alias('blocking_key'), 'uid')\
  .groupBy('blocking_key')\
  .agg(
    f.count('uid').alias('block_size'),
    f.collect_set('uid').alias('uid'),
  )\
  .filter(f.col('block_size').between(2,LARGEST_BLOCK))\
  .select('blocking_key', f.explode('uid').alias('uid'))

left = keep_pairs.withColumnRenamed('uid', 'src')
right = keep_pairs.withColumnRenamed('uid', 'dst')

candidate_pairs = left.join(right, ['blocking_key'], 'inner')\
  .filter(f.col('src') < f.col('dst'))\
  .select('src', 'dst')\
  .distinct()

#g = GraphFrame(node, candidate_pairs)

In [61]:
candidate_pairs.show()

+-----------------+-----------------+
|              src|              dst|
+-----------------+-----------------+
|MBox41116018-HH|2|    WoM22988127|1|
|    160610-A054|4|    214856-A019|4|
|       11494061|5|MBox17282825-HH|2|
|    214856-A019|4|     68205-A038|4|
|    193216-A060|4|     WoM2241521|1|
|    184432-A055|4|    225414-A013|4|
|    184432-A055|4|    233739-A019|4|
|       11121229|5|   7866219MB-01|3|
|    233739-A019|4|     88409-A015|4|
|     38282-A066|4|    WoM20957349|1|
|     28669-A064|4|     WoM2241521|1|
|    222772-A011|4|    WoM32698435|1|
|    126902-A015|4|    WoM32698435|1|
|     214267-A02|4|MBox28752110-HH|2|
|    118021-A055|4|     25482-A049|4|
|    197493-A049|4|    WoM32698435|1|
|    197493-A049|4|     25526-A031|4|
|    204596-A028|4|    213216-A015|4|
|    204596-A028|4|    221072-A012|4|
|    178212-A021|4|    180258-A048|4|
+-----------------+-----------------+
only showing top 20 rows



In [65]:
from graphframes import *

In [67]:
node.show()

+-----------------+--------------------+--------+--------------------+--------------------+----+--------+--------------------+--------------------+--------------------+---------------------+----------------------+---------------------+--------------------+--------------------+--------------------+
|               id|               title|  length|              artist|               album|year|language|     title_swRemoved|    artist_swRemoved|     album_swRemoved|title_swRemoved_tfidf|artist_swRemoved_tfidf|album_swRemoved_tfidf|      title_encoding|     artist_encoding|       blocking_keys|
+-----------------+--------------------+--------+--------------------+--------------------+----+--------+--------------------+--------------------+--------------------+---------------------+----------------------+---------------------+--------------------+--------------------+--------------------+
|           unk.|3|mustard gas there...|    2.15|     action painting|                 NaN| '95| Englis

In [71]:
!curl -L -o "/usr/local/lib/python3.10/dist-packages/pyspark/jars/graphframes-0.8.0-spark2.4-s_2.11.jar" http://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.8.0-spark2.4-s_2.11/graphframes-0.8.0-spark2.4-s_2.11.jar


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   164  100   164    0     0   1086      0 --:--:-- --:--:-- --:--:--  1093
100   146  100   146    0     0    202      0 --:--:-- --:--:-- --:--:--   695


In [72]:
g = GraphFrame(node, candidate_pairs)



Py4JJavaError: An error occurred while calling o3329.loadClass.
: java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [62]:
@udf("double")
def dot(x, y):
  if x is not None and y is not None:
    return float(x.dot(y))
  else:
    return 0

def null_safe_levenshtein_sim(c1, c2):
  output = f.when(f.col(c1).isNull() | f.col(c2).isNull(), 0)\
            .otherwise(1 - f.levenshtein(c1, c2) / f.greatest(f.length(c1), f.length(c2)))
  return output

def null_safe_num_sim(c1, c2):
  output = f.when(f.col(c1).isNull() | f.col(c2).isNull(), 0)\
            .when((f.col(c1) == 0) & (f.col(c2) == 0), 1)\
            .when((f.col(c1) == 0) | (f.col(c2) == 0), 0)\
            .otherwise(1 - f.abs(f.col(c1) - f.col(c2)) / f.greatest(c1, c2))
  return output

def null_safe_token_overlap(c1, c2):
  # is the overlap a significant part of the shorter string
  output = f.when(f.col(c1).isNull() | f.col(c2).isNull(), 0)\
            .when((f.size(f.array_distinct(c1)) == 0) | (f.size(f.array_distinct(c2)) == 0), 0)\
            .otherwise(f.size(f.array_intersect(c1, c2)) / f.least(f.size(f.array_distinct(c1)), f.size(f.array_distinct(c1))))
  return output

def calc_sim(df):
  df = df.withColumn('title_lev', null_safe_levenshtein_sim('src.title', 'dst.title'))\
      .withColumn('artist_lev', null_safe_levenshtein_sim('src.artist', 'dst.artist'))\
      .withColumn('album_lev', null_safe_levenshtein_sim('src.album', 'dst.album'))\
      .withColumn('title_token_sim', null_safe_token_overlap('src.title_swRemoved', 'dst.title_swRemoved'))\
      .withColumn('artist_token_sim', null_safe_token_overlap('src.artist_swRemoved', 'dst.artist_swRemoved'))\
      .withColumn('album_token_sim', null_safe_token_overlap('src.album_swRemoved', 'dst.album_swRemoved'))\
      .withColumn('year_sim', null_safe_num_sim('src.year', 'dst.year'))\
      .withColumn('title_tfidf_sim', dot(f.col('src.title_swRemoved_tfidf'), f.col('dst.title_swRemoved_tfidf')))\
      .withColumn('description_tfidf_sim', dot(f.col('src.description_swRemoved_tfidf'), f.col('dst.description_swRemoved_tfidf')))\
      .withColumn('artist_tfidf_sim', dot(f.col('src.artist_swRemoved_tfidf'), f.col('dst.artist_swRemoved_tfidf')))\
      .withColumn('title_encoding_sim', dot(f.col('src.title_encoding'), f.col('dst.title_encoding')))\
      .withColumn('album_encoding_sim', dot(f.col('src.album_encoding'), f.col('dst.album_encoding')))

  metrics = ['artist_lev', 'album_lev', 'title_lev', 'year_sim', 'title_tfidf_sim', 'description_tfidf_sim',
             'artist_tfidf_sim', 'title_encoding_sim', 'album_encoding_sim',
             'title_token_sim', 'artistr_token_sim', 'album_token_sim'
            ]

  df = df.withColumn('overall_sim', reduce(add, [f.col(c) for c in metrics]) / len(metrics))
  return df


distance_df = calc_sim(g.triplets)

distance_df.write.mode('overwrite').parquet("YOUR_STORAGE_PATH/amazon_google_distance.parquet")

NameError: name 'g' is not defined

#### The next step after candidate pair generation, is to score the candidate pair match likelihood. This is crucial to removing non-matches and creating the final resolved entities.