In [13]:
%load_ext autoreload
%autoreload 2
from IPython.core.interactiveshell import InteractiveShell
from IPython.display import IFrame, display
InteractiveShell.ast_node_interactivity = "all"

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


# 🔌Hybrid additive playground

Just a place to get linkers running.

In [85]:
from src import locations as loc
from src.data import utils as du
from src.data.star import Star
from src.data.datasets import Dataset
from src.data.probabilities import Probabilities
from src.data.clusters import Clusters
from src.data.validation import Validation
from src.link.splink_linker import SplinkLinker
from src.config import link_pipeline, stopwords
from src.features.clean_complex import clean_comp_names

from splink.duckdb.linker import DuckDBLinker
import splink.duckdb.comparison_library as cl
import splink.duckdb.comparison_template_library as ctl

import uuid
import types
from pathlib import Path
from dotenv import load_dotenv, find_dotenv
import os
import io
import pandas as pd
import duckdb
import json

load_dotenv(find_dotenv())

True

21/9 TODO:

* Finish adding model column
    * It's in `Linker.link()` and the probabilities table
    * How is the model name declared? At the class instantiation? Feels right...
    * How does `Clusters.add_clusters()` deal with competing models?
        * Does this need a unit test?
    * This might be big enough for a separate MR?
    * How is this dealt with in `Linker.evaluate()`
        * And the children

## Setup

In [86]:
star = Star(
    schema = os.getenv("SCHEMA"),
    table = os.getenv("STAR_TABLE")
)
probabilities = Probabilities(
    schema = os.getenv("SCHEMA"),
    table = os.getenv("PROBABILITIES_TABLE"),
    star = star
)
clusters = Clusters(
    schema = os.getenv("SCHEMA"),
    table = os.getenv("CLUSTERS_TABLE"),
    star = star
)
validation = Validation(
    schema = os.getenv("SCHEMA"),
    table = os.getenv("VALIDATE_TABLE")
)

In [76]:
cluster_pipeline={
    "clean_comp_names": {
        "function": clean_comp_names,
        "arguments": {
            "primary_col": "company_name",
            "secondary_col": None,
            "stopwords": stopwords,
        },
    }
}
dim_pipeline={
    "clean_comp_names": {
        "function": clean_comp_names,
        "arguments": {
            "primary_col": "company_name",
            "secondary_col": None,
            "stopwords": stopwords,
        },
    }
}
linker_settings={
    "link_type": "link_only",
    "unique_id_column_name": "id",
    "retain_matching_columns": False,
    "retain_intermediate_calculation_columns": False,
    "blocking_rules_to_generate_predictions": [
        """
            (l.company_name = r.company_name)
            and (
                l.company_name <> ''
                and r.company_name <> ''
            )
        """,
        """
            (l.postcode = r.postcode)
            and (
                l.postcode <> ''
                and r.postcode <> ''
            )
        """,
    ],
    "comparisons": [
        cl.jaro_winkler_at_thresholds(
            "company_name", [0.9, 0.6], term_frequency_adjustments=True
        ),
        ctl.postcode_comparison("postcode"),
    ],
}
train_pipeline={
    "estimate_probability_two_random_records_match": {
        "function": "estimate_probability_two_random_records_match",
        "arguments": {
            "deterministic_matching_rules": """
                l.company_name = r.company_name
            """,
            "recall": 0.7,
        },
    },
    "estimate_u_using_random_sampling": {
        "function": "estimate_u_using_random_sampling",
        "arguments": {"max_pairs": 1e6},
    },
    "estimate_parameters_using_expectation_maximisation": {
        "function": "estimate_parameters_using_expectation_maximisation",
        "arguments": {
            "blocking_rule": """
                l.company_name = r.company_name
            """
        },
    },
}

## Splink

In [87]:
cl_x_exp = SplinkLinker(
    name="exp_n2_splink_basic",
    dataset = Dataset(
        star_id=54717,
        star=star
    ), 
    probabilities=probabilities, 
    clusters=clusters, 
    n=1,
    db_path=du.DEFAULT_DUCKDB_PATH.as_posix(),
    overwrite=True
)

In [88]:
cl_x_exp.get_data(
    # sample=5,
    cluster_select={
        '"companieshouse"."companies"': [
            "company_name as company_name",
            "postcode as postcode"
        ]
    },
    dim_select=[
        "id",
        "company_name",
        "postcode"
    ],
)

In [89]:
cl_x_exp.evaluate(
    link_experiment="cm_hmrc-trade-exporters",
    evaluation_description="Simple company name clean, nothing else",
    prepare_kwargs={
        "cluster_pipeline": cluster_pipeline,
        "dim_pipeline": dim_pipeline,
        "linker_settings": linker_settings,
        "train_pipeline": train_pipeline
    },
    link_kwargs={
        "threshold": 0.7
    },
    report_dir=Path(loc.PROJECT_DIR, 'scratch', 'reports', 'cm_hmrc-trade-exporters'),
    log_mlflow=True,
    log_output=True,
)

INFO:src.link.linker:Running pipeline
INFO:src.link.linker:Logging outputs to the Probabilities table
INFO:src.link.linker:Logging as MLflow experiment
DEBUG:urllib3.connectionpool:Resetting dropped connection: mlflow--data-science.data.trade.gov.uk
DEBUG:urllib3.connectionpool:http://mlflow--data-science.data.trade.gov.uk:8004 "GET /api/2.0/mlflow/experiments/get-by-name?experiment_name=cm_hmrc-trade-exporters HTTP/1.1" 200 245
DEBUG:urllib3.connectionpool:Resetting dropped connection: mlflow--data-science.data.trade.gov.uk
DEBUG:urllib3.connectionpool:http://mlflow--data-science.data.trade.gov.uk:8004 "GET /api/2.0/mlflow/experiments/get-by-name?experiment_name=cm_hmrc-trade-exporters HTTP/1.1" 200 245
DEBUG:urllib3.connectionpool:Resetting dropped connection: mlflow--data-science.data.trade.gov.uk
DEBUG:urllib3.connectionpool:http://mlflow--data-science.data.trade.gov.uk:8004 "POST /api/2.0/mlflow/runs/create HTTP/1.1" 200 1033
DEBUG:urllib3.connectionpool:Resetting dropped connecti

In [90]:
clusters.add_clusters(
    probabilities=probabilities,
    models=cl_x_exp.name,
    validation=validation,
    n=cl_x_exp.n,
    threshold=0.7,
    add_unmatched_dims=True,
)

OperationalError: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout

[SQL: 
                drop table if exists to_insert_temp;
                create temporary table to_insert_temp as
                    select
                        distinct on (id_rank.id, id_rank.source)
                        gen_random_uuid() as uuid,
                        id_rank.cluster,
                        id_rank.id,
                        id_rank.source,
                        1 as n
                    from (
                        select
                            distinct on (clus_rank.cluster, clus_rank.source)
                            clus_rank.*,
                            rank() over (
                                partition by
                                    clus_rank.id,
                                    clus_rank.source
                                order by
                                    clus_rank.probability desc
                            ) as id_rank
                        from (
                            select
                                prob.*,
                                rank() over(
                                    partition by
                                        prob.cluster,
                                        prob.source
                                    order by
                                        prob.probability desc
                                ) as clus_rank
                            from
                                probabilities_temp prob
                        ) clus_rank
                        where
                            clus_rank.clus_rank = 1
                            and (
                                not exists (
                                    select
                                        id,
                                        source
                                    from
                                        clusters_temp clus
                                    where
                                        clus.id = clus_rank.id
                                        and clus.source = clus_rank.source
                                )
                                or not exists (
                                    select
                                        cluster,
                                        source
                                    from
                                        clusters_temp clus
                                    where
                                        clus.cluster = clus_rank.cluster
                                        and clus.source = clus_rank.source
                                )
                            )
                        order by
                            clus_rank.cluster,
                            clus_rank.source
                    ) id_rank
                    where
                        id_rank.id_rank = 1
                    order by
                        id_rank.id,
                        id_rank.source;
            ]
(Background on this error at: https://sqlalche.me/e/14/e3q8)

In [None]:
clusters.get_data(
    select={
        '"companieshouse"."companies"': [
            "company_name",
            "sic_code_1"
        ],
        '"hmrc"."trade__exporters"': [
            "address"
        ]
    },
    sample=5
)

## First level functions (within `evaluate()`)

In [None]:
cl_x_exp.prepare(
    low_memory=True,
    cluster_pipeline={
        "clean_comp_names": {
            "function": clean_comp_names,
            "arguments": {
                "primary_col": "company_name",
                "secondary_col": None,
                "stopwords": stopwords,
            },
        }
    },
    dim_pipeline={
        "clean_comp_names": {
            "function": clean_comp_names,
            "arguments": {
                "primary_col": "company_name",
                "secondary_col": None,
                "stopwords": stopwords,
            },
        }
    },
    linker_settings={
        "link_type": "link_only",
        "unique_id_column_name": "id",
        "retain_matching_columns": False,
        "retain_intermediate_calculation_columns": False,
        "blocking_rules_to_generate_predictions": [
            """
                (l.company_name = r.company_name)
                and (
                    l.company_name <> ''
                    and r.company_name <> ''
                )
            """,
            """
                (l.postcode = r.postcode)
                and (
                    l.postcode <> ''
                    and r.postcode <> ''
                )
            """,
        ],
        "comparisons": [
            cl.jaro_winkler_at_thresholds(
                "company_name", [0.9, 0.6], term_frequency_adjustments=True
            ),
            ctl.postcode_comparison("postcode"),
        ],
    },
    train_pipeline={
        "estimate_probability_two_random_records_match": {
            "function": "estimate_probability_two_random_records_match",
            "arguments": {
                "deterministic_matching_rules": """
                    l.company_name = r.company_name
                """,
                "recall": 0.7,
            },
        },
        "estimate_u_using_random_sampling": {
            "function": "estimate_u_using_random_sampling",
            "arguments": {"max_pairs": 1e6},
        },
        "estimate_parameters_using_expectation_maximisation": {
            "function": "estimate_parameters_using_expectation_maximisation",
            "arguments": {
                "blocking_rule": """
                    l.company_name = r.company_name
                """
            },
        },
    }
)

See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])  # type: ignore[arg-type]


In [None]:
cl_x_exp.link(threshold=0.7, log_output=True)

In [None]:
cl_x_exp.save(path=Path(loc.DATA_SUBDIR['raw'], 'ch_x_exp.pickle'))

## Second level functions (within `prepare()` and `link()`)

### `prepare()` private methods

In [5]:
cl_x_exp._clean_data(
    cluster_pipeline={
        "clean_comp_names": {
            "function": clean_comp_names,
            "arguments": {
                "primary_col": "company_name",
                "secondary_col": None,
                "stopwords": stopwords,
            },
        }
    },
    dim_pipeline={
        "clean_comp_names": {
            "function": clean_comp_names,
            "arguments": {
                "primary_col": "company_name",
                "secondary_col": None,
                "stopwords": stopwords,
            },
        }
    }
)

In [6]:
cl_x_exp._substitute_ids()

See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])  # type: ignore[arg-type]


In [13]:
# def _register_tables(self):
#     self.con.register('cls', self.cluster_processed)
#     self.con.register('dim', self.dim_processed)

# cl_x_exp._register_tables = types.MethodType(_register_tables, cl_x_exp)

In [7]:
cl_x_exp._register_tables()

In [5]:
cl_x_exp._create_linker(
    linker_settings={
        "link_type": "link_only",
        "unique_id_column_name": "id",
        "retain_matching_columns": False,
        "retain_intermediate_calculation_columns": False,
        "blocking_rules_to_generate_predictions": [
            """
                (l.company_name = r.company_name)
                and (
                    l.company_name <> ''
                    and r.company_name <> ''
                )
            """,
            """
                (l.postcode = r.postcode)
                and (
                    l.postcode <> ''
                    and r.postcode <> ''
                )
            """,
        ],
        "comparisons": [
            cl.jaro_winkler_at_thresholds(
                "company_name", [0.9, 0.6], term_frequency_adjustments=True
            ),
            ctl.postcode_comparison("postcode"),
        ],
    }
)

In [10]:
cl_x_exp._train_linker(
    train_pipeline={
        "estimate_probability_two_random_records_match": {
            "function": "estimate_probability_two_random_records_match",
            "arguments": {
                "deterministic_matching_rules": """
                    l.company_name = r.company_name
                """,
                "recall": 0.7,
            },
        },
        "estimate_u_using_random_sampling": {
            "function": "estimate_u_using_random_sampling",
            "arguments": {"max_pairs": 1e6},
        },
        "estimate_parameters_using_expectation_maximisation": {
            "function": "estimate_parameters_using_expectation_maximisation",
            "arguments": {
                "blocking_rule": """
                    l.company_name = r.company_name
                """
            },
        },
    }
)

INFO:splink.linker:Probability two random records match is estimated to be  2.25e-07.
This means that amongst all possible pairwise record comparisons, one in 4,447,653.50 are expected to match.  With 2,764,864,733,924 total possible comparisons, we expect a total of around 621,645.71 matching pairs
INFO:splink.estimate_u:----- Estimating u probabilities using random sampling -----
INFO:splink.estimate_u:
Estimated u probabilities using random sampling
INFO:splink.settings:
Your model is not yet fully trained. Missing estimates for:
    - company_name (no m values are trained).
    - postcode (no m values are trained).
INFO:splink.em_training_session:
----- Starting EM training session -----

INFO:splink.em_training_session:Estimating the m probabilities of the model by blocking on:

                    l.company_name = r.company_name
                

Parameter estimates will be made for the following comparison(s):
    - postcode

Parameter estimates cannot be made for the following 

### `link()` private methods

* Preds stuff
    * Make preds
    * Rejoin IDs
    * Send to probs table
    * Log params
    * Log metrics (none yet, nothing to eval against)
* Model stuff
    * Add model uuid to predictions table
    * Add model table to hold model name
    * Update unit tests to deal with this

In [8]:
cl_x_exp.cluster_processed.info()
cl_x_exp.dim_processed.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10774831 entries, 0 to 10774830
Data columns (total 3 columns):
 #   Column        Dtype 
---  ------        ----- 
 0   id            int64 
 1   company_name  object
 2   postcode      object
dtypes: int64(1), object(2)
memory usage: 246.6+ MB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 256604 entries, 0 to 256603
Data columns (total 3 columns):
 #   Column        Non-Null Count   Dtype 
---  ------        --------------   ----- 
 0   id            256604 non-null  int64 
 1   company_name  256596 non-null  object
 2   postcode      256604 non-null  object
dtypes: int64(1), object(2)
memory usage: 5.9+ MB


In [10]:
preds = cl_x_exp.linker.predict(threshold_match_probability=0.7)

You have called predict(), but there are some parameter estimates which have neither been estimated or specified in your settings dictionary.  To produce predictions the following untrained trained parameters will use default values.
Comparison: 'company_name':
    m values not fully trained
Comparison: 'company_name':
    u values not fully trained


In [None]:
# {"cluster", "id", "probability", "source"}

In [14]:
cl_x_exp.id_lookup.head(1)

Unnamed: 0,duckdb_id,id
0,0,51f3f15d-ea7c-44d9-889c-e6e77918e886


In [31]:
probs = (
    preds
    .as_pandas_dataframe()
    .merge(
        right=cl_x_exp.id_lookup.rename(columns={"id": "cluster"}),
        how="left",
        left_on="id_l",
        right_on="duckdb_id"
    )
    .merge(
        right=cl_x_exp.id_lookup,
        how="left",
        left_on="id_r",
        right_on="duckdb_id"
    )
    .rename(
        columns={
            "match_probability": "probability"
        }
    )
)[['cluster', 'id', 'probability']]
probs["source"] = cl_x_exp.dataset.id
probs.head(5)

Unnamed: 0,cluster,id,probability,source
0,3ce18bbd-ca73-4173-bb1f-91e0441e5223,3241808,0.998251,54717
1,a3f03e2d-f976-4596-b345-17b13946bd71,191621,0.843849,54717
2,8b37d820-cfdb-4c28-9910-c0fa6e1bf773,112895,0.998251,54717
3,5259db87-4701-4209-9ac9-36b4c0d11dd7,2997499,0.998251,54717
4,1bb90293-52dd-45a3-95f1-219264d7b760,2411254,0.99738,54717


In [32]:
probabilities.add_probabilities(probs)

Unnamed: 0,cluster,id,probability,source,uuid,link_type
0,3ce18bbd-ca73-4173-bb1f-91e0441e5223,3241808,0.998251,54717,51d9eeff-46d4-48be-b7cc-24161549450d,link
1,a3f03e2d-f976-4596-b345-17b13946bd71,191621,0.843849,54717,b222c116-2ba4-48d0-9696-f758b0b9c2cc,link
2,8b37d820-cfdb-4c28-9910-c0fa6e1bf773,112895,0.998251,54717,10319d55-52b2-43e7-8afe-95ab59a76fa1,link
3,5259db87-4701-4209-9ac9-36b4c0d11dd7,2997499,0.998251,54717,fc2ce3e6-1ac2-433d-b8c5-e4bdc405dea8,link
4,1bb90293-52dd-45a3-95f1-219264d7b760,2411254,0.997380,54717,8717545c-219f-4208-a8b5-df4eb2234e56,link
...,...,...,...,...,...,...
670,7b4aac6e-1f05-4665-9631-5d33fd4632db,3270689,0.805509,54717,91fb7cdd-9fff-4c38-9956-ff4ae48853f4,link
671,fd2b2db0-ba8a-4559-831e-06f7574d6e0d,160681,0.805509,54717,19bc1708-5513-489d-a356-3f3cb17ce0c2,link
672,841949c0-3a7c-4dae-afbf-8b5d13b1117f,1682657,0.805509,54717,fd48fa28-5f17-496d-b11b-143c4dba5651,link
673,841949c0-3a7c-4dae-afbf-8b5d13b1117f,3193742,0.805509,54717,b04ca25b-6338-4c0b-b3af-a52ff15541fe,link


In [None]:
cl_x_exp.id_lookup

### `evaluate()`

Implemented in the Linker class, this should log stuff to MLFlow. Let's fix it.

In [6]:
cluster_pipeline={
    "clean_comp_names": {
        "function": clean_comp_names,
        "arguments": {
            "primary_col": "company_name",
            "secondary_col": None,
            "stopwords": stopwords,
        },
    }
}
dim_pipeline={
    "clean_comp_names": {
        "function": clean_comp_names,
        "arguments": {
            "primary_col": "company_name",
            "secondary_col": None,
            "stopwords": stopwords,
        },
    }
}
linker_settings={
    "link_type": "link_only",
    "unique_id_column_name": "id",
    "retain_matching_columns": False,
    "retain_intermediate_calculation_columns": False,
    "blocking_rules_to_generate_predictions": [
        """
            (l.company_name = r.company_name)
            and (
                l.company_name <> ''
                and r.company_name <> ''
            )
        """,
        """
            (l.postcode = r.postcode)
            and (
                l.postcode <> ''
                and r.postcode <> ''
            )
        """,
    ],
    "comparisons": [
        cl.jaro_winkler_at_thresholds(
            "company_name", [0.9, 0.6], term_frequency_adjustments=True
        ),
        ctl.postcode_comparison("postcode"),
    ],
}
train_pipeline={
    "estimate_probability_two_random_records_match": {
        "function": "estimate_probability_two_random_records_match",
        "arguments": {
            "deterministic_matching_rules": """
                l.company_name = r.company_name
            """,
            "recall": 0.7,
        },
    },
    "estimate_u_using_random_sampling": {
        "function": "estimate_u_using_random_sampling",
        "arguments": {"max_pairs": 1e6},
    },
    "estimate_parameters_using_expectation_maximisation": {
        "function": "estimate_parameters_using_expectation_maximisation",
        "arguments": {
            "blocking_rule": """
                l.company_name = r.company_name
            """
        },
    },
}

In [7]:
cl_x_exp.evaluate(
    link_experiment="cm_hmrc-trade-exporters",
    evaluation_name="Basic link",
    evaluation_description="Simple company name clean, nothing else",
    prepare_kwargs={
        "cluster_pipeline": cluster_pipeline,
        "dim_pipeline": dim_pipeline,
        "linker_settings": linker_settings,
        "train_pipeline": train_pipeline
    },
    link_kwargs={
        "threshold": 0.7
    },
    report_dir=Path(loc.PROJECT_DIR, 'scratch', 'reports', 'cm_hmrc-trade-exporters'),
    log_mlflow=True,
    log_output=False,
)

INFO:src.link.linker:Running pipeline
INFO:src.link.linker:Logging as MLflow experiment
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): mlflow--data-science.data.trade.gov.uk:8004
DEBUG:urllib3.connectionpool:http://mlflow--data-science.data.trade.gov.uk:8004 "GET /api/2.0/mlflow/experiments/get-by-name?experiment_name=cm_hmrc-trade-exporters HTTP/1.1" 200 245
DEBUG:urllib3.connectionpool:Resetting dropped connection: mlflow--data-science.data.trade.gov.uk
DEBUG:urllib3.connectionpool:http://mlflow--data-science.data.trade.gov.uk:8004 "GET /api/2.0/mlflow/experiments/get-by-name?experiment_name=cm_hmrc-trade-exporters HTTP/1.1" 200 245
DEBUG:git.util:Failed checking if running in CYGWIN due to: FileNotFoundError(2, 'No such file or directory')
DEBUG:urllib3.connectionpool:Resetting dropped connection: mlflow--data-science.data.trade.gov.uk
DEBUG:urllib3.connectionpool:http://mlflow--data-science.data.trade.gov.uk:8004 "POST /api/2.0/mlflow/runs/create HTTP/1.1" 200 1015
