# Spark-Matcher Deduplicator example 

This notebook shows how to use the `spark_matcher` for deduplication. First we create a Spark session:

In [None]:
%config Completer.use_jedi = False  # for proper autocompletion
import os
from pyspark.sql import SparkSession
import pandas as pd
pd.set_option('max_colwidth', None)

When creating a Spark session, make sure you point to the right location of the GraphFrames jar-file

In [None]:
spark = (SparkSession
             .builder
             .master("local")
             .enableHiveSupport()
             .config('spark.jars', os.path.join('..', 'external_dependencies', 'graphframes-0.8.1-spark3.0-s_2.12.jar'))
             .getOrCreate())

In this notebook we use some example data that comes with `spark_matcher`

In [None]:
from spark_matcher.data import load_data

In [None]:
sdf = load_data(spark, kind='stoxx50')

This dataset contains a single column containing the concatenation of Eurostoxx 50 company names and addresses.

In [None]:
sdf.limit(5).toPandas()

                                                                                

Unnamed: 0,name
0,adidas ag adi dassler strasse 1 91074 germany
1,adidas ag adi dassler strasse 1 91074 herzogenaurach
2,adidas ag adi dassler strasse 1 91074 herzogenaurach germany
3,airbus se 2333 cs leiden netherlands
4,airbus se 2333 cs netherlands


We use the `spark_matcher`'s `Deduplicator` to combine different representations of an entity under the same entity identifier. In the example above, there are 3 records for the 'adidas ag' entity with small differences. What we want to achieve is to have the same identifier for these three records, so that we could understand that these different representations belong to same entity. <br>

Let's first import the `Deduplicator` and create an instance.

In [None]:
from spark_matcher.deduplicator.deduplicator import Deduplicator

In [None]:
myDeduplicator = Deduplicator(spark, col_names=['name'], checkpoint_dir='path_to_checkpoints')

Now we are ready for fitting the `Deduplicator` object using 'active learning'; this means that the user has to enter whether a pair is a match or not. 
You enter 'y' if a pair is a match or 'n' when a pair is not a match. You will be notified when the model has converged and you can stop training by pressing 'f'.

In [None]:
myDeduplicator.fit(sdf)

                                                                                


Nr. 1 (0+/0-)
Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish

name_1: fresenius se   co  kgaa else kroner strasse 1 61352 bad homburg vor der hohe germany

name_2: fresenius se   co  kgaa else kroner strasse 1 61352 bad homburg vor der hohe germany



 y



Nr. 2 (1+/0-)
Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish

name_1: deutsche boerse frankfurt

name_2: adidas ag adi dassler strasse 1 91074 herzogenaurach germany



 n



Nr. 3 (1+/1-)
Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish

name_1: deutsche post ag platz der deutschen post 53113 germany

name_2: deutsche boerse



 n



Nr. 4 (1+/2-)
Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish

name_1: telefonica  s a  ronda de la comunicacion

name_2: telefonica  s a  28050 madrid



 y



Nr. 5 (2+/2-)
Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish

name_1: deutsche boerse 60485 frankfurt

name_2: deutsche post ag platz der deutschen post bonn germany



 n



Nr. 6 (2+/3-)
Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish

name_1: koninklijke ahold delhaize n v   provincialeweg 11

name_2: koninklijke philips n v  amstelplein 2 1096 bc



 y



Nr. 7 (3+/3-)
Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish

name_1: deutsche telekom ag 53113 germany

name_2: deutsche post ag platz der deutschen post bonn germany



 p



Nr. 7 (3+/3-)
Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish

name_1: koninklijke ahold delhaize n v   provincialeweg 11

name_2: koninklijke philips n v  amstelplein 2 1096 bc



 n



Nr. 8 (3+/4-)
Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish

name_1: safran sa paris france

name_2: safran sa 2  boulevard du general martial valin paris france



 y



Nr. 9 (4+/4-)
Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish

name_1: bayer aktiengesellschaft 51368 leverkusen germany

name_2: bayerische motoren werke aktiengesellschaft munich germany



 n



Nr. 10 (4+/5-)
Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish

name_1: koninklijke philips n v  amstelplein 2 1096 bc

name_2: koninklijke ahold delhaize n v   provincialeweg 11 netherlands



 n



Nr. 11 (4+/6-)
Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish

name_1: deutsche boerse 60485 frankfurt

name_2: deutsche telekom ag 53113 bonn germany



 n


Classifier converged, enter 'f' to stop training

Nr. 12 (4+/7-)
Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish

name_1: koninklijke ahold delhaize n v   1506 ma zaandam netherlands

name_2: koninklijke philips n v  amstelplein 2 1096 bc



 f


                                                                                

<spark_matcher.deduplicator.deduplicator.Deduplicator at 0x7f854157e790>

The `Deduplicator` is now trained and can be used to predict on all data. This can be the data used for training or new data that was not seen by the model yet.
By default the `threshold` is 0.5. A lower threshold results in more matches but also in more incorrect matches.

In [None]:
result = myDeduplicator.predict(sdf)

Now let's have a look at the results:

In [None]:
df_result = result.toPandas()
df_result.sort_values('name').head()

Unnamed: 0,name,entity_identifier
79,adidas ag adi dassler strasse 1 91074 germany,137438953472
10,adidas ag adi dassler strasse 1 91074 herzogenaurach,137438953472
53,adidas ag adi dassler strasse 1 91074 herzogenaurach germany,137438953472
89,airbus se 2333 cs leiden netherlands,34359738368
44,airbus se 2333 cs netherlands,34359738368


As can be seen above, all records belonging to 'Adidas AG' are combined under one entity_identifier. Same applies for 'Airbus SE' as well.

If you want to use the `Deduplicator` later without having to retrain, you can save the `Deduplicator` and load it later:

In [None]:
myDeduplicator.save('myDeduplicator.pkl')

In [None]:
myRestoredDeduplicator = Deduplicator(spark)

In [None]:
myRestoredDeduplicator.load('myDeduplicator.pkl')

This `Deduplicator` object can be used to predict on new data.