Skip to content

Commit

Permalink
name inversion
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinL committed Nov 23, 2020
1 parent 5b5fa0e commit 9126e35
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "splink"
version = "0.2.8"
version = "0.2.9"
description = "Implementation in Apache Spark of the EM algorithm to estimate parameters of Fellegi-Sunter's canonical model of record linkage."
authors = ["Robin Linacre <robinlinacre@hotmail.com>", "Sam Lindsay", "Theodore Manassis"]
license = "MIT"
Expand Down
16 changes: 13 additions & 3 deletions splink/case_statements.py
Expand Up @@ -279,10 +279,10 @@ def sql_gen_case_stmt_numeric_perc_4(
return c


def _sql_gen_get_or_list(col_name, other_name_cols, threshold=0.94):
def _sql_gen_get_or_list_jaro(col_name, other_name_cols, threshold=0.94):
# Note the ifnull 1234 just ensures that if one of the other columns is null, the jaro score is lower than the threshold
ors = [
f"jaro_winkler_sim({col_name}_l, ifnull({n}_r, '1234')) > {threshold}"
f"jaro_winkler_sim(ifnull({col_name}_l, '1234abcd5678'), ifnull({n}_r, '987pqrxyz654')) > {threshold}"
for n in other_name_cols
]
ors_string = " OR ".join(ors)
Expand All @@ -295,6 +295,7 @@ def sql_gen_gammas_name_inversion_4(
gamma_col_name=None,
threshold1=0.94,
threshold2=0.88,
include_dmeta=False,
):
"""Generate a case expression which can handle name inversions where e.g. surname and forename are inverted
Expand All @@ -304,15 +305,24 @@ def sql_gen_gammas_name_inversion_4(
gamma_col_name (str, optional): . The name of the column, for the alias e.g. surname
threshold1 (float, optional): Jaro threshold for almost exact match. Defaults to 0.94.
threshold2 (float, optional): Jaro threshold for close match Defaults to 0.88.
include_dmeta (bool, optional): Also allow a dmetaphone match at threshold2
Returns:
str: A sql string
"""

dmeta_statment = ""
if include_dmeta:
dmeta_statment = f"""
when Dmetaphone({col_name}_l) = Dmetaphone({col_name}_r) then 1
when DmetaphoneAlt({col_name}_l) = DmetaphoneAlt({col_name}_r) then 1
"""

c = f"""case
when {col_name}_l is null or {col_name}_r is null then -1
when jaro_winkler_sim({col_name}_l, {col_name}_r) > {threshold1} then 3
when {_sql_gen_get_or_list(col_name, other_name_cols, threshold1)} then 2
when {_sql_gen_get_or_list_jaro(col_name, other_name_cols, threshold1)} then 2
{dmeta_statment}
when jaro_winkler_sim({col_name}_l, {col_name}_r) > {threshold2} then 1
else 0 end"""
if gamma_col_name is not None:
Expand Down
98 changes: 97 additions & 1 deletion tests/test_case_statements_spark.py
Expand Up @@ -6,6 +6,7 @@
sql_gen_case_stmt_array_combinations_leven_3,
sql_gen_case_stmt_array_combinations_jaro_3,
sql_gen_case_stmt_array_combinations_jaro_dmeta_4,
sql_gen_gammas_name_inversion_4,
)


Expand Down Expand Up @@ -74,4 +75,99 @@ def test_size_intersection(spark):

result = list(df_pd["result_jd4"])
expected = [3, 3, 2, 1, -1, -1]
assert result == expected
assert result == expected


def test_name_inversion(spark):

data_list = [
{
"name_1_l": "john",
"name_1_r": "john",
"name_2_l": "richard",
"name_2_r": "richard",
"name_3_l": "michael",
"name_3_r": "michael",
"name_4_l": "smith",
"name_4_r": "smith",
},
{
"name_1_l": "richard",
"name_1_r": "john",
"name_2_l": "john",
"name_2_r": "richard",
"name_3_l": "michael",
"name_3_r": "michael",
"name_4_l": "smith",
"name_4_r": "smith",
},
{
"name_1_l": "jonathon",
"name_1_r": "richard",
"name_2_l": "richard",
"name_2_r": "jonathan",
"name_3_l": "michael",
"name_3_r": "michael",
"name_4_l": "smith",
"name_4_r": "smith",
},
{
"name_1_l": "caitlin",
"name_1_r": "michael",
"name_2_l": "richard",
"name_2_r": "richard",
"name_3_l": "michael",
"name_3_r": "smith",
"name_4_l": "smith",
"name_4_r": "katelyn",
},
{
"name_1_l": "john",
"name_1_r": "james",
"name_2_l": "richard",
"name_2_r": "richard",
"name_3_l": "michael",
"name_3_r": "michael",
"name_4_l": "smith",
"name_4_r": "smith",
},
{
"name_1_l": None,
"name_1_r": "james",
"name_2_l": "richard",
"name_2_r": "richard",
"name_3_l": "michael",
"name_3_r": "michael",
"name_4_l": "smith",
"name_4_r": "smith",
},
{
"name_1_l": "richard",
"name_1_r": "john",
"name_2_l": "john",
"name_2_r": "richard",
"name_3_l": "michael",
"name_3_r": None,
"name_4_l": "smith",
"name_4_r": None,
},
]

df = spark.createDataFrame(Row(**x) for x in data_list)
df.createOrReplaceTempView("df")

sql = f"""
select
{sql_gen_gammas_name_inversion_4("name_1",["name_2","name_3","name_4"])} as result_1
from df
"""

df_pd = spark.sql(sql).toPandas()

result = list(df_pd["result_1"])
expected = [3, 2, 2, 0, 0, -1, 2]
assert result == expected

result = list(df_pd["result_1"])
expected = [3, 2, 2, 0, 0, -1, 2]
assert result == expected

0 comments on commit 9126e35

Please sign in to comment.