## Splink deduplication demo 

In this demo we de-duplicate a small dataset.

The purpose is to provide an end-to-end example of how to use the package

I print the output at each stage using `spark_dataframe.show()`.  This is for instructional purposes only - it degrades performance and shouldn't be used in a production setting.

## Step 1:  Imports and setup

The following is just boilerplate code that sets up the Spark session and sets some other non-essential configuration options

In [1]:
import pandas as pd 
pd.options.display.max_columns = 500
pd.options.display.max_rows = 100

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1604358633194_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
import logging 
logging.basicConfig()  # Means logs will print in Jupyter Lab

# Set to DEBUG if you want splink to log the SQL statements it's executing under the hood
logging.getLogger("splink").setLevel(logging.INFO)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
from utility_functions.demo_utils import get_spark
spark = get_spark() # See utility_functions/demo_utils.py for how to set up Spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Step 2:  Read in data

In [4]:
# Changed to use S3 versus local filesystem so this works in an EMR Notebook
df = spark.read.parquet("s3://splink-emr-init-test/data/fake_1000.parquet")
df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+-------+----------+------+--------------------+-----+
|unique_id|first_name|surname|       dob|  city|               email|group|
+---------+----------+-------+----------+------+--------------------+-----+
|        0|    Julia |   null|2015-10-29|London| hannah88@powers.com|    0|
|        1|    Julia | Taylor|2015-07-31|London| hannah88@powers.com|    0|
|        2|    Julia | Taylor|2016-01-27|London| hannah88@powers.com|    0|
|        3|    Julia | Taylor|2015-10-29|  null|  hannah88opowersc@m|    0|
|        4|      oNah| Watson|2008-03-23|Bolton|matthew78@ballard...|    1|
+---------+----------+-------+----------+------+--------------------+-----+
only showing top 5 rows

## Step 3:  Configure splink using the `settings` object

Most of `splink` configuration options are stored in a settings dictionary.  This dictionary allows significant customisation, and can therefore get quite complex.  

💥 We provide an tool for helping to author valid settings dictionaries, which includes tooltips and autocomplete, which you can find [here](http://robinlinacre.com/splink_settings_editor/).

Customisation overrides default values built into splink.  For the purposes of this demo, we will specify a simple settings dictionary, which means we will be relying on these sensible defaults.

To help with authoring and validation of the settings dictionary, we have written a [json schema](https://json-schema.org/), which can be found [here](https://github.com/moj-analytical-services/splink/blob/master/splink/files/settings_jsonschema.json).  




In [5]:
settings = {
    "link_type": "dedupe_only",
    "blocking_rules": [
        "l.first_name = r.first_name",
        "l.surname = r.surname",
        "l.dob = r.dob"
    ],
    "comparison_columns": [
        {
            "col_name": "first_name",
            "num_levels": 3,
            "term_frequency_adjustments": True
        },
        {
            "col_name": "surname",
            "num_levels": 3,
            "term_frequency_adjustments": True
        },
        {
            "col_name": "dob"
        },
        {
            "col_name": "city"
        },
        {
            "col_name": "email"
        }
    ],
    "additional_columns_to_retain": ["group"],
    "em_convergence": 0.01
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In words, this setting dictionary says:
- We are performing a deduplication task (the other options are `link_only`, or `link_and_dedupe`)
- We are going generate comparisons subject to the blocking rules contained in the specified array
- When comparing records, we will use information from the `first_name`, `surname`, `dob`, `city` and `email` columns to compute a match score.
- For `first_name` and `surname`, string comparisons will have three levels:
    - Level 2: Strings are (almost) exactly the same
    - Level 1: Strings are similar 
    - Level 0: No match
- We will make adjustments for term frequencies on the `first_name` and `surname` columns
- We will retain the `group` column in the results even though this is not used as part of comparisons.  This is a labelled dataset and `group` contains the true match - i.e. where group matches, the records pertain to the same person
- Consider the algorithm to have converged when no parameter changes by more than 0.01 between iterations

## Step 4:  Estimate match scores using the Expectation Maximisation algorithm
Columns are assumed to be strings by default.  See the 'comparison vector settings' notebook for details of configuration options.

In [6]:
from splink import Splink

linker = Splink(settings, spark, df=df)
df_e = linker.get_scored_comparisons()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Step 5: Inspect results 



In [7]:
# Inspect main dataframe that contains the match scores
cols_to_inspect = ["match_probability","unique_id_l","unique_id_r","group_l", "group_r", "first_name_l","first_name_r","surname_l","surname_r","dob_l","dob_r","city_l","city_r","email_l","email_r",]

df_e.toPandas()[cols_to_inspect].sort_values(["unique_id_l", "unique_id_r"]).head(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

      match_probability  unique_id_l  unique_id_r  group_l  group_r  \
2              0.985811            0            1        0        0   
1              0.985811            0            2        0        0   
0              0.999646            0            3        0        0   
4              0.983115            1            2        0        0   
3              0.916171            1            3        0        0   
2290           0.027342            1           89        0       18   
2289           0.027342            1          142        0       26   
2288           0.027342            1          148        0       26   
4821           0.792436            1          246        0       43   
2287           0.039123            1          362        0       62   

     first_name_l first_name_r surname_l surname_r       dob_l       dob_r  \
2          Julia        Julia       None    Taylor  2015-10-29  2015-07-31   
1          Julia        Julia       None    Taylor  2015-10-29

In [8]:
df_e.toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

      tf_adjusted_match_prob  match_probability  unique_id_l  unique_id_r  \
0                   0.999989           0.999646            0            3   
1                   0.999566           0.985811            0            2   
2                   0.999566           0.985811            0            1   
3                   0.988398           0.916171            1            3   
4                   0.997802           0.983115            1            2   
...                      ...                ...          ...          ...   
5706                0.999920           0.999920          984          985   
5707                0.999884           0.999884          991          996   
5708                0.987535           0.987535          991          994   
5709                0.999998           0.999998          992          996   
5710                0.999824           0.999824          992          994   

     first_name_l first_name_r  gamma_first_name  \
0          Julia       

The `params` property of the `linker` is an object that contains a lot of diagnostic information about how the match probability was computed.  The following cells demonstrate some of its functionality

In [17]:
params = linker.params
params.probability_distribution_chart()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

alt.HConcatChart(...)

An alternative representation of the parameters displays them in terms of the effect different values in the comparison vectors have on the match probability:

In [18]:
# This references a chart that has been removed from Splink (Kenrick)
params.adjustment_factor_chart()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'Params' object has no attribute 'adjustment_factor_chart'
Traceback (most recent call last):
AttributeError: 'Params' object has no attribute 'adjustment_factor_chart'



In [19]:
# If charts aren't displaying correctly in your notebook, you can write them to a file (by default splink_charts.html)
# This function by MoJ Analytical Services is not compatible with a distributed file system like S3
params.all_charts_write_html_file(filename="splink_charts.html", overwrite=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Errno 13] Permission denied: 'splink_charts.html'
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/splink/params.py", line 534, in all_charts_write_html_file
    with open(filename, "w") as f:
PermissionError: [Errno 13] Permission denied: 'splink_charts.html'



You can also generate a report which explains how the match probability was computed for an individual comparison row.  

Note that you need to convert the row to a dictionary for this to work

In [23]:
from splink.intuition import intuition_report
row_dict = df_e.toPandas().sample(1).to_dict(orient="records")[0]
print(intuition_report(row_dict, params))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Initial probability of match (prior) = λ = 0.4007764458656311

Comparison of first_name.  Values are:
first_name_l: Alfie 
first_name_r: None
Comparison has 3 levels
𝛾 for this comparison = gamma_first_name = -1
Amongst matches, m = P(𝛾|match) = 1.0
Amongst non matches, u = P(𝛾|non-match) = 1.0
Bayes factor = m/u = 1.0
New probability of match (updated belief): 0.4007764458656311

Comparison of surname.  Values are:
surname_l: Tyolr
surname_r: Taylor
Comparison has 3 levels
𝛾 for this comparison = gamma_surname = 1
Amongst matches, m = P(𝛾|match) = 0.058804191648960114
Amongst non matches, u = P(𝛾|non-match) = 0.00023243666510097682
Bayes factor = m/u = 252.99017099309168
New probability of match (updated belief): 0.9941247826389082

Comparison of dob.  Values are:
dob_l: 2005-06-20
dob_r: 2005-06-20
Comparison has 2 levels
𝛾 for this comparison = gamma_dob = 1
Amongst matches, m = P(𝛾|match) = 0.8753615617752075
Amongst non matches, u = P(𝛾|non-match) = 0.0313960500061512
Bayes facto

# Use graphframes to resolve links into groups

In [21]:
from graphframes import GraphFrame


df_e.createOrReplaceTempView("df_e")
sql = """
select unique_id_l as id
from df_e
union
select unique_id_r as id
from df_e
"""
nodes = spark.sql(sql)

sql = """
select
unique_id_l as src,
unique_id_r as dst,
tf_adjusted_match_prob
from df_e
where tf_adjusted_match_prob > 0.99
"""
edges = spark.sql(sql)

g = GraphFrame(nodes, edges)

cc = g.connectedComponents()

cc.createOrReplaceTempView("cc")
df.createOrReplaceTempView("df")
sql = """
select cc.component as estimated_group, df.*
from cc
left join
df 
on cc.id = df.unique_id
order by group, estimated_group
"""
results = spark.sql(sql)
results.toPandas().head(30)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

    estimated_group  unique_id first_name    surname         dob  \
0                 0          2     Julia      Taylor  2016-01-27   
1                 0          0     Julia        None  2015-10-29   
2                 0          1     Julia      Taylor  2015-07-31   
3                 0          3     Julia      Taylor  2015-10-29   
4                 4          6     Watson      Noah   2008-03-23   
5                 4         11     Watson      Noah   2008-01-21   
6                 4          8     Watson      Noah   2008-06-15   
7                 4          5      Noah      Watson  2008-03-23   
8                 4         10     Watson      Noah   2008-03-23   
9                 4          4       oNah     Watson  2008-03-23   
10                7          7      Noah      Watson  2008-02-05   
11                9          9      Noah      Watson  2008-01-19   
12               12         12      Noah        None  2008-03-23   
13               13         13     Molly        