In [None]:
# %pip install "blocklib==0.1.7" "pandas==2.3.2" "blockingpy==0.2.8" "numpy==1.26.4"
#py 3.12.10

In [None]:
from blocklib import generate_candidate_blocks
import itertools, time, pandas as pd
from blockingpy import Blocker
import faiss, os
from datetime import datetime
faiss.omp_set_num_threads(os.cpu_count()) # 12

In [3]:
df_1k = pd.read_csv("data_1000.csv", index_col=0)
df_10k = pd.read_csv("data_10000.csv", index_col=0)
df_100k = pd.read_csv("data_100000.csv", index_col=0)
dfs = [df_1k, df_10k, df_100k]


for df in dfs:
  df['gmina'] = df['gmina'].astype(str).str.replace("nan", "")
  df.fillna("", inplace=True)
  df.replace("<NA>", "", inplace=True)

In [None]:
def block_metrics(cand_blocks, true_ids, total_true_pairs):
    n_records   = len(true_ids)
    cartesian   = n_records * (n_records - 1) // 2

    cand_pairs  = 0
    dedup_pairs = set()

    for block in cand_blocks.values():
        k = len(block)
        if k < 2:
            continue
        cand_pairs += k * (k - 1) // 2

        for i, j in itertools.combinations(block, 2):
            if true_ids[i] == true_ids[j]:
                dedup_pairs.add((i if i < j else j,
                                 j if i < j else i))

    rr      = 1 - cand_pairs / cartesian
    recall  = len(dedup_pairs) / total_true_pairs if total_true_pairs else 1.0
    return cand_pairs, rr, float(recall)


blocking_features = ["imie", "nazwisko", "by", "bm", "bd", "gmina"]

psig_schema = {
      "type": "p-sig",
      "version": 1,
      "config": {
          "blocking-features": blocking_features,
          "filter": {"type": "ratio", "max": 0.02, "min": 0.0},

          "blocking-filter": {
              "type": "bloom filter",
              "number-hash-functions": 20,
              "bf-len": 4096, 
          },
          "signatureSpecs": [

              [
                  {"type": "characters-at", "feature": "imie",     "config": {"pos": [0]}},
                  {"type": "characters-at", "feature": "nazwisko", "config": {"pos": [0]}},
                  {"type": "characters-at", "feature": "by",       "config": {"pos": [-1]}},
                  {"type": "characters-at", "feature": "gmina",    "config": {"pos": [0]}},
              ],

              [
                  {"type": "metaphone", "feature": "nazwisko"},
              ],

              [
                  {"type": "characters-at", "feature": "nazwisko", "config": {"pos": [-2, -1]}},
                  {"type": "characters-at", "feature": "bm",       "config": {"pos": [0]}},
              ],
          ],
      },
  } 

results = []  

control_ann_voy = {
    'voyager': {
        "k_search": 30,
    }
}

control_ann_faiss_lsh = {
    'faiss': {
        "k_search": 5,
        "index_type": "lsh",
        "lsh_nbits": 1,
    }
}

control_ann_faiss_hnsw = {
    'faiss': {
        "k_search": 5,
        "index_type": "hnsw",
        'hnsw_M': 12,
        'hnsw_ef_construction': 200,
        'hnsw_ef_search': 200,
    }
}

control_ann_voy_fast = {
    'voyager': {
        "k_search": 30,
        'distance': 'cosine',
        'M': 5,               
        'ef_construction': 60, 
    }
}

dfs = [df_1k, df_10k, df_100k] 

In [5]:

results = []

def bench_once(data: pd.DataFrame, seed: int) -> list[dict]:
    out = []

    lsh_schema = {
        "type": "lambda-fold",
        "version": 1,
        "config": {
            "blocking-features": [1, 2],  
            "Lambda": 6,                  
            "K": 110,                      
            "bf-len": 4096,
            "num-hash-funcs": 12,
            "random_state": seed,
            "input-clks": False,
        },
    } 
    schemata = [("P-Sig", psig_schema), ("λ-fold LSH", lsh_schema)]

    bl = data.copy().reset_index(drop=True)
    dob = pd.to_datetime(bl["data_ur"], errors="coerce", format="%Y/%m/%d")
    bl["by"] = dob.dt.year.astype("Int64").astype(str).str.zfill(4)
    bl["bm"] = dob.dt.month.astype("Int64").astype(str).str.zfill(2)
    bl["bd"] = dob.dt.day.astype("Int64").astype(str).str.zfill(2)

    bl["id"] = bl.index
    header   = ["id"] + blocking_features
    records  = bl[header].values.tolist()
    true_ids = bl["true_id"].to_numpy()
    total_true_pairs = int(
        bl.groupby("true_id").size().apply(lambda k: k * (k - 1) // 2).sum()
    )

    df = data.copy()
    df["txt_raw_kraj"] = (
        df["imie"] + " " + df["imie2"] + " " + df["nazwisko"] + " " +
        df["data_ur"] + " " + df["gmina"] + " " + df["kraj"].astype(str)
    )
    df["x"] = range(len(df))
    df["block"] = df["true_id"]
    tb = df[["x", "block"]]

    for label, schema in schemata:
        s_tim = time.perf_counter()
        cand  = generate_candidate_blocks(records, schema, header=header, verbose=False)
        e_tim = time.perf_counter()

        pairs, rr, recall = block_metrics(cand.blocks, true_ids, total_true_pairs)
        out.append({
            "run": seed, "algorithm": label, "dataset_size": len(bl),
            "pairs": pairs, "reduction_ratio": rr, "recall": recall,
            "time_sec": round(e_tim - s_tim, 3),
        })

    blocker = Blocker()

    def _append_row(name, res, t0, t1):
        out.append({
            "run": seed, "algorithm": name, "dataset_size": len(df),
            "pairs": int((df.shape[0] * (df.shape[0]-1))/2 * (1 - res.reduction_ratio)),
            "reduction_ratio": res.reduction_ratio,
            "recall": res.metrics["recall"],
            "time_sec": round(t1 - t0, 3),
        })

    t0 = time.perf_counter()
    r  = blocker.block(x=df["txt_raw_kraj"], ann="voyager", verbose=0,
                       random_seed=seed, control_ann=control_ann_voy,)
    t1 = time.perf_counter()
    r  = blocker.eval(r, tb)
    _append_row("BlockingPy (voyager)", r, t0, t1)

    t0 = time.perf_counter()
    r  = blocker.block(x=df["txt_raw_kraj"], ann="faiss", verbose=0,
                       random_seed=seed, control_ann=control_ann_faiss_hnsw,)
    t1 = time.perf_counter()
    r  = blocker.eval(r, tb)
    _append_row("BlockingPy (faiss_hnsw)", r, t0, t1)

    t0 = time.perf_counter()
    r  = blocker.block(x=df["txt_raw_kraj"], ann="faiss", verbose=0,
                       random_seed=seed, control_ann=control_ann_faiss_lsh,)
    t1 = time.perf_counter()
    r  = blocker.eval(r, tb)
    _append_row("BlockingPy (faiss_lsh)", r, t0, t1)

    t0 = time.perf_counter()
    r  = blocker.block(x=df["txt_raw_kraj"], ann="voyager", verbose=0,
                       random_seed=seed, control_ann=control_ann_voy_fast,)
    t1 = time.perf_counter()
    r  = blocker.eval(r, tb)
    _append_row("BlockingPy (voyager) - fast", r, t0, t1)

    return out

In [None]:
N_RUNS = 10

for run in range(N_RUNS):
    seed = 42 + run
    for _df in dfs:
        results.extend(bench_once(_df, seed))
    print(f"[{run+1}/{N_RUNS}] {datetime.now().isoformat(timespec='seconds')} seed={seed} — done", flush=True)


cols = ["run", "algorithm", "dataset_size", "time_sec", "recall", "reduction_ratio", "pairs"]
results_df = pd.DataFrame(results)[cols]

summary = (
    results_df
    .groupby(["algorithm", "dataset_size"], as_index=False)
    .agg(
        n_runs=("run", "nunique"),
        time_sec_mean=("time_sec", "mean"),
        time_sec_sd=("time_sec", "std"),
        recall_mean=("recall", "mean"),
        recall_sd=("recall", "std"),
        rr_mean=("reduction_ratio", "mean"),
        rr_sd=("reduction_ratio", "std"),
        pairs_mean=("pairs", "mean"),
        pairs_sd=("pairs", "std"),
    )
)

results_df.to_csv("all_runs_raw_cpu.csv", index=False)
summary.to_csv("all_runs_summary_cpu.csv", index=False)
summary, results_df.tail(10) 