In [1]:
from splink.datasets import splink_datasets
from splink.duckdb.linker import DuckDBLinker
import altair as alt
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
import pandas as pd 
pd.options.display.max_rows = 1000

sparkdriver = SparkSession.builder.master('local').appName('demoapp') \
    .config('spark.jars.packages', 'com.microsoft.sqlserver:mssql-jdbc:9.4.1.jre8') \
    .getOrCreate()

df_1 = sparkdriver.read.format('jdbc') \
    .option('url', 'jdbc:sqlserver://localhost:47777;databaseName=HealthSystem') \
    .option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver') \
    .option('user', 'datahubadmin') \
    .option('password', 'datahub') \
    .option('query', 'SELECT [unique_id]      ,[first_name]      ,[surname]      , cast([dob] as varchar(50)) dob      ,[email]      ,[CHI_Number]      ,[address]      ,Phone_number as phone     ,[postcode]  FROM [dbo].[PatientData]') \
    .load()

df_2 = sparkdriver.read.format('jdbc') \
    .option('url', 'jdbc:sqlserver://localhost:47777;databaseName=HealthSystem') \
    .option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver') \
    .option('user', 'datahubadmin') \
    .option('password', 'datahub') \
    .option('query', 'SELECT [unique_id]      ,[first_name]      ,[surname]      ,cast([dob] as varchar(50)) dob     ,[email]      ,[CHI_Number]      ,[address]      ,Phone_number as phone      ,[postcode]  FROM [dbo].[LegacyPatientData]') \
    .load()

df_3 = sparkdriver.read.format('jdbc') \
    .option('url', 'jdbc:sqlserver://localhost:47777;databaseName=CouncilData') \
    .option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver') \
    .option('user', 'datahubadmin') \
    .option('password', 'datahub') \
    .option('query', 'SELECT [unique_id]      ,[first_name]      ,[surname]      ,cast([dob] as varchar(50)) dob     ,[email]      ,[CHI_Number]      ,[address]      ,[phone]      ,[postcode]  FROM [CouncilData].[dbo].[ResidentData]') \
    .load()

sparkdriver.udf.registerJavaFunction('jaro_winkler', 'uk.gov.moj.dash.linkage.JaroWinklerSimilarity', DoubleType())

df_1.show(5)
df_2.show(5)
df_3.show(5)

+---------+----------+---------+----------+--------------------+----------+--------------------+------------------+---------+
|unique_id|first_name|  surname|       dob|               email|CHI_Number|             address|             phone| postcode|
+---------+----------+---------+----------+--------------------+----------+--------------------+------------------+---------+
|        1|   Yesenia|     Sosa|1996-03-05| jared47@example.net|   000275Z|461 Davila Cove S...|  705-346-7155x236| NY 23398|
|        2|     Tammy|    Jones|1942-04-10|wilsoncaitlin@exa...|   0002962|3235 Patterson Ex...|(361)937-6629x0692| TX 62203|
|        3|    Joseph|Cervantes|1915-10-01|longkimberly@exam...|    00033Q|33646 Smith Islan...|      545-506-3491| MT 12286|
|        4|   Timothy|    Young|2019-03-17|ihernandez@exampl...|    0006WP|6306 Andrews Vall...| 544-618-4901x9242| VT 80875|
|        5| Stephanie|     Lane|1921-10-25|masonrhonda@examp...|    000RQL|745 Kathryn Exten...|  804.548.5638x391| TX

In [2]:



import splink.spark.comparison_template_library as ctl
import splink.spark.comparison_library as cl
from splink.spark.blocking_rule_library import block_on
from functools import wraps

import splink.spark.comparison_library as cl
import splink.spark.comparison_template_library as ctl
from splink.spark.blocking_rule_library import block_on

settings = {
    "link_type": "link_and_dedupe",
    "comparisons": [
        ctl.name_comparison("first_name"),
        ctl.name_comparison("surname"),
        ctl.name_comparison("CHI_Number"),
        ctl.date_comparison("dob", cast_strings_to_date=False),
        cl.exact_match("email", term_frequency_adjustments=True),
    ],
    "blocking_rules_to_generate_predictions": [
        "l.first_name = r.first_name and l.surname = r.surname",
        "l.dob = r.dob",  # Replace the Comparison object with a string here
        "l.CHI_Number = r.CHI_Number",
        block_on("postcode"),
    ],
    "retain_matching_columns": True,
    "retain_intermediate_calculation_columns": True,
    "max_iterations": 10,
    "em_convergence": 0.01
}
from splink.spark.linker import SparkLinker
linker = SparkLinker([df_1, df_2, df_3], settings, spark=sparkdriver)
deterministic_rules = [
    "l.first_name = r.first_name and levenshtein(r.dob, l.dob) <= 1",
    "l.surname = r.surname and levenshtein(r.dob, l.dob) <= 1",
    "l.first_name = r.first_name and levenshtein(r.surname, l.surname) <= 2",
    "levenshtein(r.first_name, l.first_name) <= 2 and levenshtein(r.surname, l.surname) <= 2",
    "levenshtein(r.CHI_Number, l.CHI_Number) <= 2"
    ]

sc = sparkdriver.sparkContext  # Access the SparkContext
sc.setCheckpointDir("C:/Users/seanj/Documents/MyProjects/Splink")  # Set checkpoint directory


In [3]:


linker.estimate_probability_two_random_records_match(deterministic_rules, recall=0.6)
linker.estimate_u_using_random_sampling(max_pairs=5e5)

training_blocking_rule = "l.first_name = r.first_name and l.surname = r.surname"
training_session_fname_sname = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)

training_blocking_rule = "l.dob = r.dob"
training_session_dob = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)

training_blocking_rule = "l.CHI_Number = r.CHI_Number"
training_session_chi_number = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)

results = linker.predict(threshold_match_probability=0.9)


Probability two random records match is estimated to be  0.00022.
This means that amongst all possible pairwise record comparisons, one in 4,551.29 are expected to match.  With 66,084,756 total possible comparisons, we expect a total of around 14,520.00 matching pairs
----- Estimating u probabilities using random sampling -----
u probability not trained for CHI_Number - Exact match CHI_Number (comparison vector value: 4). This usually means the comparison level was never observed in the training data.
u probability not trained for CHI_Number - Damerau_levenshtein <= 1 (comparison vector value: 3). This usually means the comparison level was never observed in the training data.

Estimated u probabilities using random sampling

Your model is not yet fully trained. Missing estimates for:
    - first_name (no m values are trained).
    - surname (no m values are trained).
    - CHI_Number (some u values are not trained, no m values are trained).
    - dob (no m values are trained).
    - e

In [4]:

df_e = results.as_pandas_dataframe()
df_e.to_csv('OutputFull.csv')

print("Results written to results.csv successfully!")

Results written to results.csv successfully!


In [None]:
from sqlalchemy import create_engine
import pandas as pd

# Assuming df_e is your DataFrame containing the results

# Create a dictionary to store mappings between original IDs and unified/common ID
id_mapping = {}

# Assign the same unified/common ID to linked individuals
linked_individuals = df_e[df_e['match_probability'] >= 0.9]  # Assuming match_probability >= 0.9 indicates a match
for index, row in linked_individuals.iterrows():
    if row['unique_id_l'] not in id_mapping:
        id_mapping[row['unique_id_l']] = len(id_mapping) + 1
    if row['unique_id_r'] not in id_mapping:
        id_mapping[row['unique_id_r']] = len(id_mapping) + 1

# Generate new unified/common IDs for individuals not linked
not_linked_individuals = df_e[df_e['match_probability'] < 0.9]
for index, row in not_linked_individuals.iterrows():
    if row['unique_id_l'] not in id_mapping:
        id_mapping[row['unique_id_l']] = len(id_mapping) + 1
    if row['unique_id_r'] not in id_mapping:
        id_mapping[row['unique_id_r']] = len(id_mapping) + 1

# Update the DataFrame with unified/common IDs
df_e['unified_id_l'] = df_e['unique_id_l'].map(id_mapping)
df_e['unified_id_r'] = df_e['unique_id_r'].map(id_mapping)

# Store the mapping in a database (Assuming you're using SQLAlchemy for database operations)

# Create an SQLAlchemy engine connected to the SQL Server database
engine = create_engine('mssql+pyodbc://datahubadmin:datahub@localhost:47777/Splink?driver=ODBC+Driver+17+for+SQL+Server')

# Create if not exists scripts
create_id_mapping_table_script = '''
CREATE TABLE IF NOT EXISTS id_mapping_table (
    original_id VARCHAR(255),
    unified_id INT
)
'''

create_linked_individuals_table_script = '''
CREATE TABLE IF NOT EXISTS linked_individuals_table (
    match_weight FLOAT,
    match_probability FLOAT,
    source_dataset_l VARCHAR(255),
    source_dataset_r VARCHAR(255),
    unique_id_l VARCHAR(255),
    unique_id_r VARCHAR(255),
    first_name_l VARCHAR(255),
    first_name_r VARCHAR(255),
    gamma_first_name INT,
    bf_first_name FLOAT,
    surname_l VARCHAR(255),
    surname_r VARCHAR(255),
    gamma_surname INT,
    bf_surname FLOAT,
    CHI_Number_l VARCHAR(255),
    CHI_Number_r VARCHAR(255),
    gamma_CHI_Number INT,
    bf_CHI_Number FLOAT,
    dob_l DATE,
    dob_r DATE,
    gamma_dob INT,
    bf_dob FLOAT,
    email_l VARCHAR(255),
    email_r VARCHAR(255),
    gamma_email INT,
    tf_email_l FLOAT,
    tf_email_r FLOAT,
    bf_email FLOAT,
    bf_tf_adj_email FLOAT,
    postcode_l VARCHAR(255),
    postcode_r VARCHAR(255),
    match_key INT,
    unified_id_l INT,
    unified_id_r INT
)
'''

# Execute the create if not exists scripts
with engine.connect() as connection:
    connection.execute(create_id_mapping_table_script)
    connection.execute(create_linked_individuals_table_script)

# Convert the id_mapping dictionary to a DataFrame
id_mapping_df = pd.DataFrame(id_mapping.items(), columns=['original_id', 'unified_id'])

# Store the DataFrame in the database
id_mapping_df.to_sql('id_mapping_table', con=engine, if_exists='append', index=False)

# Store the DataFrame containing linked and not-linked individuals in the database
df_e.to_sql('linked_individuals_table', con=engine, if_exists='append', index=False)
