## Fuzzy Match POC with Apache Spark
The objective of this project is to test the execution of native spark functions to perform string similarity analysis, with variated similarity analysis algorithms

### Approaches

- 1st Approach: Use of native Scala Spark SQL fuzzy match algorithms, crossjoining the input dataset with the target dataset, generating a quatratic computational time
- 2nd Approach: Use of Term Frequency, Inverse Document Frequency (TF-IDF) and only then applying native Scala Spark SQL fuzzy match algorithms

References:
- [Josh Taylor: Fuzzy matching at scale](https://towardsdatascience.com/fuzzy-matching-at-scale-84f2bfd0c536#:~:text=The%20problem%20with%20Fuzzy%20Matching%20on%20large%20data&text=In%20computer%20science%2C%20this%20is,that%20works%20in%20quadratic%20time.)

In [1]:
import os
import sys
import names
import pyspark.sql.functions as F
import numpy as np
import sparse_dot_topn.sparse_dot_topn as ct
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession
from scipy.sparse import csr_matrix
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.neighbors import NearestNeighbors
from sklearn.feature_extraction.text import TfidfVectorizer

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

sc = SparkContext().getOrCreate()

spark = SparkSession.builder.appName(
    'Fuzzy Match POC').getOrCreate()

### Prepare datasets
`targets` is the name list that will be looked up inside `comparison`

In [2]:
shuffle = ['A', 'E', 'I', 'O', 'U', 'H' , 'R', 'P' ,'B', 'J', 'N', 'M', 'G']
targets = []
comparison = []

for i in range(5000):
    name = names.get_full_name().upper()
    targets.append({'SimilarityWith': name})
    comparison.append({'Name': name})

    #replace shuffle
    for shuffle_char in shuffle:
        for shuffle_char_aux in shuffle:
            if shuffle_char != shuffle_char_aux:
                shuffled_name = name.replace(shuffle_char, shuffle_char_aux)
                comparison.append({'Name': shuffled_name})

targets = set(targets)
comparison = set(comparison)

targets_df = spark.createDataFrame(targets).alias('t')
comparison_df = spark.createDataFrame(comparison).alias('c')

print(len(targets))
print(len(comparison))

print(f"CrossJoined dataset size: {len(targets) * len(comparison)}")

10000
126304
CrossJoined dataset size: 1263040000


<hr>

### Conventional CrossJoin Approach

In [3]:
lev = F.expr('1 - (levenshtein(Name, SimilarityWith) / array_max(array(length(Name), length(SimilarityWith))) )')
jaro = F.expr('balogo_jarowinkler(Name, SimilarityWith)')

In [4]:
df = targets_df.crossJoin(comparison_df)
df = df.withColumn('Similarity', (lev + jaro) / 2).filter(F.expr('Similarity > 0.8')).cache()

print(f"Filtered Fuzzy Match count: {df.count()}")
df.show()

Filtered Fuzzy Match count: 199549
+----------------+----------------+------------------+
|  SimilarityWith|            Name|        Similarity|
+----------------+----------------+------------------+
|   GEORGE POWELL|   GEORGE POWELL|               1.0|
|   GEORGE POWELL|   GAORGA POWALL|0.8153846153846154|
|   GEORGE POWELL|   GIORGI POWILL|0.8153846153846154|
|   GEORGE POWELL|   GUORGU POWULL|0.8153846153846154|
|   GEORGE POWELL|   GEARGE PAWELL| 0.882051282051282|
|   GEORGE POWELL|   GEERGE PEWELL|0.8396270396270396|
|   GEORGE POWELL|   GEIRGE PIWELL| 0.882051282051282|
|   GEORGE POWELL|   GEURGE PUWELL| 0.882051282051282|
|MAURICE ERICKSON|MAURICE ERICKSON|               1.0|
|MAURICE ERICKSON|MEURICE ERICKSON|             0.915|
|MAURICE ERICKSON|MIURICE ERICKSON|             0.905|
|MAURICE ERICKSON|MOURICE ERICKSON|              0.95|
|MAURICE ERICKSON|MUURICE ERICKSON|              0.95|
|MAURICE ERICKSON|MAURICA ARICKSON|            0.9125|
|MAURICE ERICKSON|MAURICI IRIC

In [5]:
pandas_df = df.toPandas()
pandas_df.to_csv('conventional.csv', index=False)

<hr>

### Term Frequency, Inverse Document Frequency (TF-IDF) Approach 


Function that generates list of 3 char length ngrams from full string 

In [17]:
def ngrams(string, n=2):
    ngs = zip(*[string[i:] for i in range(n)])
    return [''.join(n) for n in ngs]

In [18]:
targets_list = list(map(lambda x: x['SimilarityWith'], targets))
comparison_list = list(map(lambda x: x['Name'], comparison))

vectorizer = TfidfVectorizer(min_df=1, analyzer=ngrams, lowercase=False)
tfidf = vectorizer.fit_transform(targets_list)
nbrs = NearestNeighbors(n_neighbors=1, n_jobs=-1).fit(tfidf)

def getNearestN(query):
  queryTFIDF_ = vectorizer.transform(query)
  distances, indices = nbrs.kneighbors(queryTFIDF_)
  return distances, indices

distances, indices = getNearestN(comparison_list)
comparison_list = list(comparison_list)

matches = []
for i,j in enumerate(indices):
  temp = [round(distances[i][0],2), targets_list[j[0]], comparison_list[i]]
  matches.append(temp)

In [19]:
matches_df = pd.DataFrame(matches, columns=['Similarity','Matched name','Origional name'])

m = matches_df["Similarity"].max()
matches_df["Similarity"] = matches_df["Similarity"].apply(lambda x: 1 - (x / m))
#matches_df = matches_df.loc[matches_df['Similarity'] > 0.3]  

matches_df = matches_df.sort_values(by=['Similarity'], ascending=False)
matches_df.to_csv('tf-idf.csv', index=False)
print(len(matches_df))

126304


Benchmarks

Approach|Targets Count|Comparison Count|CrossJoined Dataset Size| Hit Count | Score Filter | Duration
|--|--|--|--|--|--|--|
Conventional|10.000|126.304|1.263.040.000|199.549|80% > |~191s (4m 41s)
TFIDF|10.000|126.304|-|91.622|0.3 >|15.7s
TFIDF|10.000|126.304|-|126.304|N/A|15s
TFIDF|5.000|63.635|-|46.411|0.3 >|4.6s
Conventional|5.000|63.635|318.175.000|82.011|80% >|~31.2s (2m 1.2s)

* Spark seems to take about 1m 30s initialization time independently of the dataset size, at the computer i'm currently running the script