# Alias testing

Need to dedupe a toy dataset with several records (aliases) for the same unique person ID.

_e.g. - John Smith/Jonty Smithington, David Jones/Davy Jones etc._

In [1]:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType
import pyspark.sql.functions as f

conf = SparkConf()

# Load in a jar that provides extended string comparison functions such as Jaro Winkler.
conf.set('spark.driver.extraClassPath', 'jars/scala-udf-similarity-0.0.6.jar')
conf.set('spark.jars', 'jars/scala-udf-similarity-0.0.6.jar')   

# WARNING:
# These config options are appropriate only if you're running Spark locally!!!
conf.set('spark.driver.memory', '4g')
conf.set("spark.sql.shuffle.partitions", "8") 

sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

 # Register UDFs
from pyspark.sql import types
spark.udf.registerJavaFunction('jaro_winkler_sim', 'uk.gov.moj.dash.linkage.JaroWinklerSimilarity', types.DoubleType())
spark.udf.registerJavaFunction('Dmetaphone', 'uk.gov.moj.dash.linkage.DoubleMetaphone', types.StringType())

In [64]:
import pandas as pd 
pd.options.display.max_columns = 500
pd.options.display.max_rows = 500
%matplotlib inline

data = {'record_id': [1,2,3,4,5,6],
     'person_id': [1,1,1,2,2,3],
     'first_name': ["John", "Johnny", "Jonty", "David", "Dave", "Jonty"], 
     'surname': ["Smith", "Jones", "Smithington", "Jones", "Jones", "Smith"],
     'birth_year': [1970, 1970, 1980, 1934, 1934, 1980],
     'city': ["Birmingham", "Birmingham", "Oxford", "London", "Reading", "Birmingham"]
}

df = spark.createDataFrame(pd.DataFrame(data))

df.show()

+---------+---------+----------+-----------+----------+----------+
|record_id|person_id|first_name|    surname|birth_year|      city|
+---------+---------+----------+-----------+----------+----------+
|        1|        1|      John|      Smith|      1970|Birmingham|
|        2|        1|    Johnny|      Jones|      1970|Birmingham|
|        3|        1|     Jonty|Smithington|      1980|    Oxford|
|        4|        2|     David|      Jones|      1934|    London|
|        5|        2|      Dave|      Jones|      1934|   Reading|
|        6|        3|     Jonty|      Smith|      1980|Birmingham|
+---------+---------+----------+-----------+----------+----------+



# Dedupe with `splink`

In [65]:
from splink.blocking import block_using_rules, cartesian_block
from splink.gammas import add_gammas

## 1) All comparisons w/ gammas

- `"unique_id_column_name": "record_id"` - all record comparisons
- `"unique_id_column_name": "person_id"` - all comparisons between people, but not within person_id groups

In [66]:
settings = {
    "comparison_columns": [
        {
            "num_levels": 3,
            "col_name": "first_name"
        },
        {
            "num_levels": 3,
            "col_name": "surname"
        },
        {
            "num_levels": 2,
            "col_name": "birth_year",
            "data_type": "numeric"
        },
        {
            "col_name": "city"
        }
    ],
    "blocking_rules": [],
    "link_type": "dedupe_only",
    "unique_id_column_name": "person_id",
    "additional_columns_to_retain": []
}

df_comparison = cartesian_block(settings, spark, df=df)
df_gammas = add_gammas(df_comparison, settings, spark)

df_gammas.toPandas().style.background_gradient(cmap='viridis', subset = [f"gamma_{col}" for col in comparisons_cols])

  "You have not specified any blocking rules, meaning all comparisons between the "


Unnamed: 0,person_id_l,person_id_r,first_name_l,first_name_r,gamma_first_name,surname_l,surname_r,gamma_surname,birth_year_l,birth_year_r,gamma_birth_year,city_l,city_r,gamma_city
0,1,2,John,David,0,Smith,Jones,0,1970,1934,0,Birmingham,London,0
1,1,2,John,Dave,0,Smith,Jones,0,1970,1934,0,Birmingham,Reading,0
2,1,3,John,Jonty,0,Smith,Smith,2,1970,1980,0,Birmingham,Birmingham,1
3,1,2,Johnny,David,0,Jones,Jones,2,1970,1934,0,Birmingham,London,0
4,1,2,Johnny,Dave,0,Jones,Jones,2,1970,1934,0,Birmingham,Reading,0
5,1,3,Johnny,Jonty,0,Jones,Smith,0,1970,1980,0,Birmingham,Birmingham,1
6,1,2,Jonty,David,0,Smithington,Jones,0,1980,1934,0,Oxford,London,0
7,1,2,Jonty,Dave,0,Smithington,Jones,0,1980,1934,0,Oxford,Reading,0
8,1,3,Jonty,Jonty,2,Smithington,Smith,1,1980,1980,1,Oxford,Birmingham,0
9,2,3,David,Jonty,0,Jones,Smith,0,1934,1980,0,London,Birmingham,0


### Aggregate by `person_id` 

In [67]:
unique_id_col = "person_id"
comparison_cols = [i['col_name'] for i in settings['comparison_columns']]
#custom_cols = [i['custom_column_name'] for i in settings['comparison_columns']] # different kettle of fish

sql_for_cols = [f"collect_set({col}_l) as {col}_l, collect_set({col}_r) as {col}_r, max(gamma_{col}) as gamma_{col}" for col in comparisons_cols]

sql = f"SELECT {unique_id_col}_l, {unique_id_col}_r, {', '.join(sql_for_cols)} FROM df_gammas WHERE {unique_id_col}_l != {unique_id_col}_r GROUP BY {unique_id_col}_l, {unique_id_col}_r"

In [68]:
df_gammas.createOrReplaceTempView("df_gammas")
df_gammas_agg = spark.sql(sql)

df_gammas_agg.orderBy(["person_id_l", "person_id_r"]).toPandas().style.background_gradient(cmap='viridis', subset = [f"gamma_{col}" for col in comparisons_cols])

Unnamed: 0,person_id_l,person_id_r,first_name_l,first_name_r,gamma_first_name,surname_l,surname_r,gamma_surname,birth_year_l,birth_year_r,gamma_birth_year,city_l,city_r,gamma_city
0,1,2,"['Johnny', 'Jonty', 'John']","['Dave', 'David']",0,"['Smith', 'Jones', 'Smithington']",['Jones'],2,"[1980, 1970]",[1934],0,"['Oxford', 'Birmingham']","['Reading', 'London']",0
1,1,3,"['Johnny', 'Jonty', 'John']",['Jonty'],2,"['Smith', 'Jones', 'Smithington']",['Smith'],2,"[1980, 1970]",[1980],1,"['Oxford', 'Birmingham']",['Birmingham'],1
2,2,3,"['Dave', 'David']",['Jonty'],0,['Jones'],['Smith'],0,[1934],[1980],0,"['Reading', 'London']",['Birmingham'],0


## 2) Blocking on `birth_year` or `city`

In [84]:
settings["blocking_rules"]=["l.birth_year = r.birth_year", "l.city = r.city"]

df_comparison2 = block_using_rules(settings, spark, df=df)
df_gammas2 = add_gammas(df_comparison2, settings, spark)

df_gammas2.toPandas().style.background_gradient(cmap='viridis', subset = [f"gamma_{col}" for col in comparisons_cols])

Unnamed: 0,person_id_l,person_id_r,first_name_l,first_name_r,gamma_first_name,surname_l,surname_r,gamma_surname,birth_year_l,birth_year_r,gamma_birth_year,city_l,city_r,gamma_city
0,1,3,Jonty,Jonty,2,Smithington,Smith,1,1980,1980,1,Oxford,Birmingham,0
1,1,3,John,Jonty,0,Smith,Smith,2,1970,1980,0,Birmingham,Birmingham,1
2,1,3,Johnny,Jonty,0,Jones,Smith,0,1970,1980,0,Birmingham,Birmingham,1


Blocking of any kind means aggregated comparisons have no knowledge of other aliases that did not also pass the blocking...

In [73]:
df_gammas2.createOrReplaceTempView("df_gammas")
df_gammas_agg2 = spark.sql(sql)

df_gammas_agg2.orderBy(["person_id_l", "person_id_r"]).toPandas().style.background_gradient(cmap='viridis', subset = [f"gamma_{col}" for col in comparisons_cols])

Unnamed: 0,person_id_l,person_id_r,first_name_l,first_name_r,gamma_first_name,surname_l,surname_r,gamma_surname,birth_year_l,birth_year_r,gamma_birth_year,city_l,city_r,gamma_city
0,1,3,"['Johnny', 'Jonty', 'John']",['Jonty'],2,"['Smith', 'Jones', 'Smithington']",['Smith'],2,"[1980, 1970]",[1980],1,"['Oxford', 'Birmingham']",['Birmingham'],1


## 3) Blocking on `surname` or `city`

In [83]:
settings["blocking_rules"]=["l.surname = r.surname", "l.city = r.city"]

df_comparison3 = block_using_rules(settings, spark, df=df)
df_gammas3 = add_gammas(df_comparison3, settings, spark)

df_gammas3.toPandas().style.background_gradient(cmap='viridis', subset = [f"gamma_{col}" for col in comparisons_cols])

Unnamed: 0,person_id_l,person_id_r,first_name_l,first_name_r,gamma_first_name,surname_l,surname_r,gamma_surname,birth_year_l,birth_year_r,gamma_birth_year,city_l,city_r,gamma_city
0,1,3,John,Jonty,0,Smith,Smith,2,1970,1980,0,Birmingham,Birmingham,1
1,1,2,Johnny,David,0,Jones,Jones,2,1970,1934,0,Birmingham,London,0
2,1,2,Johnny,Dave,0,Jones,Jones,2,1970,1934,0,Birmingham,Reading,0
3,1,3,Johnny,Jonty,0,Jones,Smith,0,1970,1980,0,Birmingham,Birmingham,1


In [82]:
df_gammas3.createOrReplaceTempView("df_gammas")
df_gammas_agg3 = spark.sql(sql)

df_gammas_agg3.orderBy(["person_id_l", "person_id_r"]).toPandas().style.background_gradient(cmap='viridis', subset = [f"gamma_{col}" for col in comparisons_cols])

Unnamed: 0,person_id_l,person_id_r,first_name_l,first_name_r,gamma_first_name,surname_l,surname_r,gamma_surname,birth_year_l,birth_year_r,gamma_birth_year,city_l,city_r,gamma_city
0,1,2,['Johnny'],"['Dave', 'David']",0,['Jones'],['Jones'],2,[1970],[1934],0,['Birmingham'],"['Reading', 'London']",0
1,1,3,"['Johnny', 'John']",['Jonty'],0,"['Smith', 'Jones']",['Smith'],2,[1970],[1980],0,['Birmingham'],['Birmingham'],1


✅ Person 1 and 2 match on `surname` only, so no problem there. 

❓Person 1 and 3 match on `surname` and `city`, but not `first_name` or `birth_year`. 

❌ However Person 1 also has a further record (`record_id = 6`) where `first_name` and `birth_year` both match (but does not pass the blocking on `city` or `surname`)

# Problem

- For any comparison between person A and person B, need knowledge of all "aliases" of all fields for each


# Solution

- Either perform all possible record comparisons between candidates and then aggregate (as above)
- Or perform some sort of person-level blocking/comparisons (haven't fully thought through how it would work, but bound to be a total pain)
