# Entity Resolution using Artificial Neural Networks

Another very popular implementation choice for entity resolution is a standard
classification network.
In this notebook we're looking into the difference between training a feed
forward classifier using synthetic comparison data and vector data obtained
using pre-defined character-level word embeddings.

In [1]:
from matching.ml.datasets.test_blocking_dataset import block_engine
!test -f ~/requirements.txt && pip install -r ~/requirements.txt

In [2]:
import itertools
from dataclasses import dataclass

import numpy as np

from matchescu.matching.entity_reference import RawComparison

In [3]:
import os
import polars as pl

from matchescu.matching.extraction import CsvDataSource, Traits

Most of our other notebooks use the Abt-Buy dataset and we're going to use it
here, too.

In [4]:
LANG = "en"
DATADIR = os.path.abspath("../../data")
LEFT_CSV_PATH = os.path.join(DATADIR, "abt-buy", "Abt.csv")
RIGHT_CSV_PATH = os.path.join(DATADIR, "abt-buy", "Buy.csv")
GROUND_TRUTH_PATH = os.path.join(DATADIR, "abt-buy", "abt_buy_perfectMapping.csv")

Let's create some word embeddings using the `fasttext` library.
We need to download a prebuilt model.

In [5]:
import fasttext

from fasttext.util import download_model


download_model(LANG, if_exists="ignore")
ft_model = fasttext.load_model(f"cc.{LANG}.300.bin")

When computing the word embeddings, we're going to account for the training
dataset's particularities by using an algorithm similar to SIF (smooth inverse
frequency) in a similar way [@deepmatcher] uses it.

In [6]:
import re

# Model hyperparameters
SIF_ALPHA = 1.0
INPUT_SIZE = ft_model.get_dimension()
TOKEN_RE = re.compile(r'[\W_]+', re.IGNORECASE)

Let's implement some helper functions that help us with determining SIF.

In [7]:
from typing import Any, Generator, Iterable

import torch
from matchescu.typing import Record


@dataclass
class TokenEmbedding:
    token: str
    embedding: torch.Tensor


def tokenize_attribute_value(value: Any) -> Generator[str, None, None]:
    if value is None:
        return
    if not isinstance(value, str):
        value = str(value)
    value = value.lower()
    yield from TOKEN_RE.split(value)


def embed_str(value: str) -> torch.Tensor:
    return torch.from_numpy(ft_model.get_word_vector(value))


def token_embedding(value: str) -> TokenEmbedding:
    return TokenEmbedding(value, embed_str(value))


def tokenize_words(record: tuple) -> list[list[TokenEmbedding]]:
    return [
        [token_embedding(token) for token in tokenize_attribute_value(value)]
        for value in record
    ]


def tokenize_all(
    records: Iterable[Record],
) -> Generator[list[list[TokenEmbedding]], None, None]:
    yield from (tokenize_words(record) for record in records)


def build_normalized_unigram_frequencies(
    processed_data_sources: Iterable[list[list[TokenEmbedding]]],
) -> dict[str, float]:
    token_frequencies = {}
    word_count = 0
    for ds in processed_data_sources:
        for tokenized_record in ds:
            for token_embedding in tokenized_record:
                token_frequency = token_frequencies.get(token_embedding.token, 0)
                token_frequency += 1
                token_frequencies[token_embedding.token] = token_frequency
                word_count += 1
    return {
        token: token_frequency / word_count
        for token, token_frequency in token_frequencies.items()
    }


def compute_token_weights(
    token_embeddings: list[TokenEmbedding],
    frequency_table: dict[str, float],
    a: float = 1.0,
) -> torch.Tensor:
    return torch.Tensor(
        [(a / (a + frequency_table.get(te.token, 1000000))) for te in token_embeddings]
    )

Now we define our data sources so we can extract entity references.

In [8]:
from typing import Hashable
from matchescu.typing import EntityReference
from matchescu.matching.blocking import BlockEngine


abt_traits = Traits().int([0]).string([1, 2]).currency([3])
abt = CsvDataSource(name="abt", traits=abt_traits).read_csv(LEFT_CSV_PATH)
buy_traits = Traits().int([0]).string([1, 2, 3]).currency([4])
buy = CsvDataSource(name="buy", traits=buy_traits).read_csv(RIGHT_CSV_PATH)
gt = set(
    pl.read_csv(
        os.path.join(DATADIR, "abt-buy", "abt_buy_perfectMapping.csv"),
        ignore_errors=True,
    ).iter_rows()
)


def _id(ref: EntityReference) -> Hashable:
    return ref[0]

We know from our logistic regression notebook that blocking and filtering both
dramatically improve training effectiveness and speed up performance.
In this next step, we extract candidate pairs without sacrificing too much of
the comparison quality in relation to the ground truth.

In [9]:
block_engine = BlockEngine().add_source(abt, _id).add_source(buy, _id).tf_idf(0.26)
block_engine.filter_candidates_jaccard(0.5)
block_engine.update_candidate_pairs(False)
metrics = block_engine.calculate_metrics(gt)

print("Pair completeness:", metrics.pair_completeness)
print("Pair quality:", metrics.pair_quality)
print("Reduction ratio:", metrics.reduction_ratio)

Pair completeness: 0.9134001823154057
Pair quality: 0.04418573885434581
Reduction ratio: 0.9807895619644


In [10]:
all_references = tokenize_all(block_engine)
token_frequency_table = build_normalized_unigram_frequencies(all_references)
print(", ".join(f"{k}={round(v, 4)}" for k, v in itertools.islice(token_frequency_table.items(), 10)))

552=0.0, sony=0.0077, turntable=0.0003, pslx350h=0.0, belt=0.0002, drive=0.0014, system=0.0052, 33=0.0001, 1=0.0064, 3=0.0051


In [11]:
from typing import Iterator


class SifTransform:
    def __init__(
        self,
        frequency_table: dict[str, float],
        alpha: float = 1.0,
        input_dim: int = 300,
        excluded_cols: list[int] = None,
    ) -> None:
        self._ft = frequency_table
        self._a = alpha
        self._n = input_dim
        self._excluded_cols = set(excluded_cols or [])

    def _create_tensor(self, attr_value: Any) -> torch.Tensor:
        token_embeddings = list(
            map(token_embedding, tokenize_attribute_value(attr_value))
        )
        if len(token_embeddings) < 1:
            return torch.zeros(self._n)
        token_weights = compute_token_weights(
            token_embeddings, self._ft, self._a
        ).reshape(len(token_embeddings), 1)
        word_embeddings = torch.atleast_2d(
            torch.stack([te.embedding for te in token_embeddings])
        )
        weighted_embeddings = token_weights * word_embeddings
        colwise_weighted_sum = weighted_embeddings.sum(dim=0)
        total_weight = token_weights.sum().float()
        return colwise_weighted_sum / total_weight

    def _transform(self, ref: EntityReference) -> Iterator:
        for idx, value in enumerate(ref):
            if idx not in self._excluded_cols:
                yield self._create_tensor(value)
            else:
                yield value

    def __call__(self, ref: EntityReference) -> EntityReference:
        return tuple(self._transform(ref))

In [12]:
sif = SifTransform(token_frequency_table, SIF_ALPHA, INPUT_SIZE, excluded_cols=[0])

Let's add a few more helper functions that enable us to train an ANN classifier.

In [58]:
import torch

from sklearn.model_selection import train_test_split
from torch.optim import Adam
from torch.utils.data import Dataset, DataLoader


def get_torch_device():
    return f"mps:{torch.mps.device_count()-1}" if torch.mps.is_available() else "cpu"


def get_torch_generator():
    return torch.Generator(device=get_torch_device())


def create_dataloader(
    input_dataset: Dataset, batch_size: int = 32, shuffle: bool = True
):
    return DataLoader(
        input_dataset,
        batch_size=batch_size,
        shuffle=shuffle,
    )

In [60]:
class NpDataset(Dataset):
    def __init__(self, features: np.ndarray, labels: np.ndarray):
        self._features = features
        self._targets = labels

    def __len__(self):
        return len(self._features)

    def __getitem__(self, idx):
        features = torch.tensor(self._features[idx], dtype=torch.float32)
        targets = torch.tensor(self._targets[idx], dtype=torch.float32)
        return features, targets

In [64]:
from torch import nn


class FFClassifier(torch.nn.Module):
    def __init__(self, input_dim: int, hidden_size: int):
        super().__init__()
        self._layers = torch.nn.Sequential(
            nn.Linear(input_dim, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 8),
            nn.ReLU(),
            nn.Linear(8, 1),
            nn.Sigmoid()
        )

    def forward(self, input_tensor):
        return self._layers(input_tensor)

In [59]:
from matchescu.matching.ml.datasets._blocking import BlockDataSet


def match_non_match_ratio(arr):
    k = np.count_nonzero(arr)
    n = len(arr)
    if n == k:
        return 0
    return round(k / (n - k), 5)


attr_cmp_config = (
    RawComparison()
    .levenshtein("name", 1, 1)
    .levenshtein("description", 2, 2)
    .levenshtein("name_manufacturer", 1, 3)
    .levenshtein("description_manufacturer", 2, 3)
    .diff("price", 3, 4)
)
attr_ds = BlockDataSet(block_engine, gt, _id, _id).attr_compare(attr_cmp_config)
attr_ds.cross_sources()

X = attr_ds.feature_matrix
y = attr_ds.target_vector.reshape(
    len(attr_ds.target_vector), 1
)
print(X.shape, y.shape)
print(len(y[y == 1]))
print("total comparisons:", len(X))
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.6, stratify=y)
X_cv, X_test, y_cv, y_test = train_test_split(
    X_test, y_test, train_size=0.5, stratify=y_test
)

print(
    "training dataset size:",
    X_train.shape,
    y_train.shape,
    "; match to non-match ratio:",
    match_non_match_ratio(y_train),
)
print(
    "cross-validation dataset size:",
    X_cv.shape,
    y_cv.shape,
    "; match to non-match ratio:",
    match_non_match_ratio(y_cv),
)
print(
    "test dataset size:",
    X_test.shape,
    y_test.shape,
    "; match to non-match ratio:",
    match_non_match_ratio(y_test),
)

total comparisons: 29155
training dataset size: (17493, 5) (17493, 1) ; match to non-match ratio: 0.08672
cross-validation dataset size: (5831, 5) (5831, 1) ; match to non-match ratio: 0.08666
test dataset size: (5831, 5) (5831, 1) ; match to non-match ratio: 0.08686


In [66]:
from matchescu.matching.ml import TorchEngine


N_EPOCHS = 10
TORCH_DEV = get_torch_device()

train = NpDataset(X_train, y_train)
cv = NpDataset(X_cv, y_cv)
test = NpDataset(X_test, y_test)
loss_function = torch.nn.BCELoss()

print("comparison vector input size:", len(attr_cmp_config))
print("training on device:", TORCH_DEV)

matcher = FFClassifier(len(attr_cmp_config), 10)
attr_engine = TorchEngine(
    matcher, loss_function, Adam(params=matcher.parameters(), lr=1e-3), TORCH_DEV
)
attr_engine.train(create_dataloader(train), create_dataloader(cv), N_EPOCHS)

comparison vector input size: 5
training on device: mps:0
Epoch 1/10, Train Loss: 0.4002, Val Loss: 0.2866
Epoch 2/10, Train Loss: 0.2793, Val Loss: 0.2726
Epoch 3/10, Train Loss: 0.2631, Val Loss: 0.2559
Epoch 4/10, Train Loss: 0.2461, Val Loss: 0.2569
Epoch 5/10, Train Loss: 0.2328, Val Loss: 0.2374
Epoch 6/10, Train Loss: 0.2380, Val Loss: 0.2267
Epoch 7/10, Train Loss: 0.2238, Val Loss: 0.2260
Epoch 8/10, Train Loss: 0.2230, Val Loss: 0.2221
Epoch 9/10, Train Loss: 0.2221, Val Loss: 0.2261
Epoch 10/10, Train Loss: 0.2314, Val Loss: 0.2256


FFClassifier(
  (_layers): Sequential(
    (0): Linear(in_features=5, out_features=10, bias=True)
    (1): ReLU()
    (2): Linear(in_features=10, out_features=8, bias=True)
    (3): ReLU()
    (4): Linear(in_features=8, out_features=1, bias=True)
    (5): Sigmoid()
  )
)

In [None]:
test_loss = attr_engine.evaluate(create_dataloader(test), compute_stats=True)
print(f"Test Loss: {test_loss:.4f}")
print(
    "Precision: %(precision).4f, Recall: %(recall).4f, F1: %(f1).4f"
    % attr_engine.stats
)

In [68]:
vector_cmp_config = (
    RawComparison()
    .tensor_diff("name", 1, 1)
    .tensor_diff("description", 2, 2)
    .tensor_diff("name_manufacturer", 1, 3)
    .tensor_diff("description_manufacturer", 2, 3)
    .tensor_diff("price", 3, 4)
)
vector_ds = BlockDataSet(block_engine, gt, _id, _id).vector_compare(vector_cmp_config)
vector_ds.transforms.append(sif)
vector_ds.cross_sources()

X = vector_ds.feature_matrix
y = vector_ds.target_vector.reshape(
    len(vector_ds.target_vector), 1
)
print(X.shape, y.shape)
print(len(y[y == 1]))
print("total comparisons:", len(X))
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.6, stratify=y)
X_cv, X_test, y_cv, y_test = train_test_split(
    X_test, y_test, train_size=0.5, stratify=y_test
)


def match_non_match_ratio(arr):
    k = np.count_nonzero(arr)
    n = len(arr)
    if n == k:
        return 0
    return round(k / (n - k), 5)


print(
    "training dataset size:",
    X_train.shape,
    y_train.shape,
    "; match to non-match ratio:",
    match_non_match_ratio(y_train),
)
print(
    "cross-validation dataset size:",
    X_cv.shape,
    y_cv.shape,
    "; match to non-match ratio:",
    match_non_match_ratio(y_cv),
)
print(
    "test dataset size:",
    X_test.shape,
    y_test.shape,
    "; match to non-match ratio:",
    match_non_match_ratio(y_test),
)

(29155, 1500) (29155, 1)
2327
total comparisons: 29155
training dataset size: (17493, 1500) (17493, 1) ; match to non-match ratio: 0.08672
cross-validation dataset size: (5831, 1500) (5831, 1) ; match to non-match ratio: 0.08666
test dataset size: (5831, 1500) (5831, 1) ; match to non-match ratio: 0.08686


In [78]:
CLASSIFIER_INPUT_SIZE = ft_model.get_dimension()*len(vector_cmp_config)
N_EPOCHS = 10
TORCH_DEV = get_torch_device()

train = NpDataset(X_train, y_train)
cv = NpDataset(X_cv, y_cv)
test = NpDataset(X_test, y_test)
loss_function = torch.nn.BCELoss()

print("comparison vector input size:", len(attr_cmp_config))
print("training on device:", TORCH_DEV)

vector_matcher = FFClassifier(CLASSIFIER_INPUT_SIZE, 100)
vector_engine = TorchEngine(
    vector_matcher, loss_function, Adam(params=vector_matcher.parameters(), lr=5e-3), TORCH_DEV
)
vector_engine.train(create_dataloader(train), create_dataloader(cv), 10)

comparison vector input size: 5
training on device: mps:0
Epoch 1/10, Train Loss: 0.2720, Val Loss: 0.2581
Epoch 2/10, Train Loss: 0.2501, Val Loss: 0.2510
Epoch 3/10, Train Loss: 0.2346, Val Loss: 0.2471
Epoch 4/10, Train Loss: 0.2225, Val Loss: 0.2423
Epoch 5/10, Train Loss: 0.2085, Val Loss: 0.2429
Epoch 6/10, Train Loss: 0.1987, Val Loss: 0.2495
Epoch 7/10, Train Loss: 0.1881, Val Loss: 0.2370
Epoch 8/10, Train Loss: 0.1784, Val Loss: 0.2320
Epoch 9/10, Train Loss: 0.1662, Val Loss: 0.2346
Epoch 10/10, Train Loss: 0.1559, Val Loss: 0.2445


FFClassifier(
  (_layers): Sequential(
    (0): Linear(in_features=1500, out_features=100, bias=True)
    (1): ReLU()
    (2): Linear(in_features=100, out_features=8, bias=True)
    (3): ReLU()
    (4): Linear(in_features=8, out_features=1, bias=True)
    (5): Sigmoid()
  )
)

In [80]:
test_loss = vector_engine.evaluate(create_dataloader(test), compute_stats=True)
print(f"Test Loss: {test_loss:.4f}")
print(
    "Precision: %(precision).4f, Recall: %(recall).4f, F1: %(f1).4f"
    % vector_engine.stats
)

Test Loss: 0.2470
Precision: 0.5545, Recall: 0.2618, F1: 0.3557


We can see a marked improvement over the attribute comparisons using simple
distance metrics.