In [1]:
import networkx as nx
import requests, math, logging
from concurrent.futures import ThreadPoolExecutor, as_completed

# show only warnings+ by default
logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(levelname)s: %(message)s')

DATASET_SCOPES = ["sanctions","peps","regulatory","crime"]

def is_valid(v):
    if v is None: 
        return False
    if isinstance(v, str):
        vv = v.strip().lower()
        if vv in ("", "nan", "unknown", "-"):
            return False
    if isinstance(v, float) and math.isnan(v):
        return False
    return True

def build_entity_payload(nid, data):
    name = data.get("the_name")
    if not is_valid(name):
        return None

    t = data.get("type","").strip().lower()
    if t=="company":
        schema="Company"
    elif t=="person":
        schema="Person"
    else:
        return None

    props = {"name":[name.strip()]}
    if schema=="Company" and is_valid(data.get("the_identifier")):
        props["registrationNumber"] = [data["the_identifier"].strip()]
    if is_valid(data.get("the_country")):
        c = data["the_country"].strip()
        props["jurisdiction" if schema=="Company" else "nationality"] = [c]
    if is_valid(data.get("the_postal_code")):
        props["postalCode"] = [data["the_postal_code"].strip()]
    if is_valid(data.get("the_town")):
        props["town"] = [data["the_town"].strip()]

    props.setdefault("identifiers", []).append({"source":"GraphID","value":str(nid)})
    return {"schema":schema,"properties":props}

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

def _match_batch(scope, batch, all_queries, yente_base):
    """
    Thread‐friendly batch matcher.
    Returns list of (nid, record_dict).
    """
    url = f"{yente_base}/match/{scope}"
    payload = {"queries": {nid: all_queries[nid] for nid in batch}}
    try:
        r = requests.post(url, json=payload, timeout=30)
        r.raise_for_status()
    except Exception as e:
        logging.error("Scope %s batch failed: %s", scope, e)
        return [
            (nid, {
                "match_found": False,
                "dataset": scope
            }) 
            for nid in batch
        ]

    out = []
    reps = r.json().get("responses", {})
    for nid, res in reps.items():
        hits = res.get("results", [])
        if hits:
            top = hits[0]
            entity = top.get("entity", {})

            # 1) top‐level name property
            matched_name = entity.get("name")
            # 2) fallback to properties.name[0]
            if not matched_name:
                matched_name = entity.get("properties", {})\
                                     .get("name", [None])[0]
            # 3) fallback to ID
            if not matched_name:
                matched_name = entity.get("id")

            rec = {
                "match_found":   True,
                "matched_name":  matched_name,
                "matched_entity": entity,       # <-- full dict for debugging
                "dataset":       scope,
                "score":         top.get("score")
            }
        else:
            rec = {
                "match_found": False,
                "dataset":     scope
            }
        out.append((nid, rec))
    return out

def screen_graph_multi_threads(
    graph: nx.DiGraph,
    yente_base: str="http://localhost:8000",
    batch_size: int=50,
    max_workers: int=None
) -> nx.DiGraph:
    # build all queries
    all_q = {}
    for nid, data in graph.nodes(data=True):
        p = build_entity_payload(nid, data)
        if p:
            all_q[nid] = p
    if not all_q:
        logging.error("No valid nodes to screen.")
        return graph

    nodes = list(all_q.keys())
    total_batches = sum(math.ceil(len(nodes)/batch_size) for _ in DATASET_SCOPES)
    print(f"{len(nodes)} nodes → {total_batches} batches across {len(DATASET_SCOPES)} scopes.")

    if max_workers is None:
        import os
        max_workers = os.cpu_count() or 4

    with ThreadPoolExecutor(max_workers=max_workers) as exe:
        futures = []
        for scope in DATASET_SCOPES:
            for batch in chunks(nodes, batch_size):
                futures.append(
                    exe.submit(_match_batch, scope, batch, all_q, yente_base)
                )

        completed = 0
        for fut in as_completed(futures):
            completed += 1
            for nid, record in fut.result():
                graph.nodes[nid].setdefault("screening", {})[record["dataset"]] = record

            # progress every 100 batches
            if completed == 1 or completed % 100 == 0 or completed == len(futures):
                print(f"  completed {completed}/{len(futures)} batches")

    return graph

# Example (in Jupyter):
G = nx.DiGraph()
G.add_node("C1", type="Company", the_name="BlackRock, Inc.", the_identifier="US012345", the_postal_code="10022", the_town="New York", the_country="US")
G.add_node("P1", type="Person", the_name="Jane Doe", the_country="US")

# Run with 10 threads:
screened = screen_graph_multi_threads(G, batch_size=20, max_workers=10)

import json
for nid, data in screened.nodes(data=True):
    print(nid, json.dumps(data["screening"], indent=2))



2 nodes → 4 batches across 4 scopes.
  completed 1/4 batches
  completed 4/4 batches
C1 {
  "regulatory": {
    "match_found": true,
    "matched_name": null,
    "matched_entity": {},
    "dataset": "regulatory",
    "score": 0.75
  },
  "crime": {
    "match_found": true,
    "matched_name": null,
    "matched_entity": {},
    "dataset": "crime",
    "score": 0.75
  },
  "peps": {
    "match_found": false,
    "dataset": "peps"
  },
  "sanctions": {
    "match_found": false,
    "dataset": "sanctions"
  }
}
P1 {
  "regulatory": {
    "match_found": false,
    "dataset": "regulatory"
  },
  "crime": {
    "match_found": false,
    "dataset": "crime"
  },
  "peps": {
    "match_found": true,
    "matched_name": null,
    "matched_entity": {},
    "dataset": "peps",
    "score": 0.9
  },
  "sanctions": {
    "match_found": false,
    "dataset": "sanctions"
  }
}


In [4]:
graph = nx.read_graphml(f"/Users/wiktorrajca/Documents/GitHub/Data-Science-Honors-Thesis/code/procurement_graph_france_build_full_clean.graphml")

In [None]:
screened_graph = screen_graph_multi_threads(graph, batch_size=50, max_workers=10)

408728 nodes → 32700 batches across 4 scopes.
  completed 1/32700 batches


In [2]:
import os
import json
import math
import logging
import pandas as pd
import networkx as nx
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

# show only warnings+ by default
logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(levelname)s: %(message)s')

DATASET_SCOPES = ["sanctions", "peps", "regulatory", "crime"]

def is_valid(v):
    if v is None:
        return False
    if isinstance(v, str):
        vv = v.strip().lower()
        if vv in ("", "nan", "unknown", "-"):
            return False
    if isinstance(v, float) and math.isnan(v):
        return False
    return True

def build_entity_payload(nid, data):
    name = data.get("the_name")
    if not is_valid(name):
        return None

    t = data.get("type", "").strip().lower()
    if t == "company":
        schema = "Company"
    elif t == "person":
        schema = "Person"
    else:
        return None

    props = {"name": [name.strip()]}
    if schema == "Company" and is_valid(data.get("the_identifier")):
        props["registrationNumber"] = [data["the_identifier"].strip()]
    if is_valid(data.get("the_country")):
        c = data["the_country"].strip()
        props["jurisdiction" if schema == "Company" else "nationality"] = [c]
    if is_valid(data.get("the_postal_code")):
        props["postalCode"] = [data["the_postal_code"].strip()]
    if is_valid(data.get("the_town")):
        props["town"] = [data["the_town"].strip()]

    props.setdefault("identifiers", []).append({"source": "GraphID", "value": str(nid)})
    return {"schema": schema, "properties": props}

def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

def _match_batch(scope, batch, all_queries, yente_base):
    """
    Thread-friendly batch matcher.
    Returns list of (nid, record_dict).
    """
    url = f"{yente_base}/match/{scope}"
    payload = {"queries": {nid: all_queries[nid] for nid in batch}}
    try:
        r = requests.post(url, json=payload, timeout=60)
        r.raise_for_status()
    except Exception as e:
        logging.error("Scope %s batch failed: %s", scope, e)
        return [
            (nid, {
                "match_found": False,
                "dataset": scope
            })
            for nid in batch
        ]

    out = []
    reps = r.json().get("responses", {})
    for nid, res in reps.items():
        hits = res.get("results", [])
        if hits:
            top = hits[0]
            entity = top.get("entity", {})

            # 1) top-level name property
            matched_name = entity.get("name")
            # 2) fallback to properties.name[0]
            if not matched_name:
                matched_name = entity.get("properties", {})\
                                     .get("name", [None])[0]
            # 3) fallback to ID
            if not matched_name:
                matched_name = entity.get("id")

            rec = {
                "match_found":   True,
                "matched_name":  matched_name,
                "matched_entity": entity,       # full dict for debugging
                "dataset":       scope,
                "score":         top.get("score")
            }
        else:
            rec = {
                "match_found": False,
                "dataset":     scope
            }
        out.append((nid, rec))
    return out

def screen_graph_multi_threads(
    graph: nx.DiGraph,
    yente_base: str = "http://localhost:8000",
    batch_size: int = 50,
    max_workers: int = None,
    checkpoint_file: str = "screening_checkpoint.csv",
    checkpoint_frequency: int = 100
) -> nx.DiGraph:
    """
    Multi-threaded screening with CSV checkpointing & resume support.
    - checkpoint_file: path to the CSV (overwritten at start, appended thereafter)
    - checkpoint_frequency: how many batches between writes
    """
    # 1) Load or initialize checkpoint
    if os.path.exists(checkpoint_file):
        df_exist = pd.read_csv(checkpoint_file, dtype=str)
        skip_pairs = set(zip(df_exist['node_id'], df_exist['dataset']))
    else:
        pd.DataFrame(
            columns=['node_id','dataset','match_found','matched_name','score','record_json']
        ).to_csv(checkpoint_file, index=False)
        skip_pairs = set()

    # 2) Build all queries
    all_q = {}
    for nid, data in graph.nodes(data=True):
        p = build_entity_payload(nid, data)
        if p:
            all_q[nid] = p
    if not all_q:
        logging.error("No valid nodes to screen.")
        return graph

    nodes = list(all_q.keys())

    # 3) Prepare pending batches
    batches = []
    for scope in DATASET_SCOPES:
        pending = [nid for nid in nodes if (nid, scope) not in skip_pairs]
        for b in chunks(pending, batch_size):
            batches.append((scope, b))

    total_batches = len(batches)
    print(f"{len(nodes)} nodes → {total_batches} pending batches across {len(DATASET_SCOPES)} scopes.")

    if max_workers is None:
        max_workers = os.cpu_count() or 4

    # 4) Execute & checkpoint
    with ThreadPoolExecutor(max_workers=max_workers) as exe:
        future_map = {
            exe.submit(_match_batch, scope, batch, all_q, yente_base): (scope, batch)
            for scope, batch in batches
        }

        completed = 0
        buffer = []

        for fut in as_completed(future_map):
            completed += 1
            scope, batch = future_map[fut]
            try:
                results = fut.result()
            except Exception as e:
                logging.error("Batch %s/%s failed: %s", scope, batch, e)
                results = [(nid, {"match_found": False, "dataset": scope}) for nid in batch]

            for nid, rec in results:
                buffer.append({
                    'node_id':      nid,
                    'dataset':      rec.get('dataset', scope),
                    'match_found':  rec.get('match_found', False),
                    'matched_name': rec.get('matched_name', ''),
                    'score':        rec.get('score', None),
                    'record_json':  json.dumps(rec)
                })

            # flush buffer to CSV periodically or at end
            if checkpoint_frequency > 0 and (completed % checkpoint_frequency == 0 or completed == total_batches):
                pd.DataFrame(buffer).to_csv(
                    checkpoint_file,
                    mode='a',
                    index=False,
                    header=False
                )
                buffer.clear()
                print(f"  → checkpointed after {completed} batches")

            # progress log
            if completed == 1 or completed % 100 == 0 or completed == total_batches:
                print(f"  completed {completed}/{total_batches} batches")

    return graph

if __name__ == "__main__":
    # Example usage in a standalone script
    G = nx.DiGraph()
    G.add_node("C1", type="Company", the_name="BlackRock, Inc.",
               the_identifier="US012345", the_postal_code="10022",
               the_town="New York", the_country="US")
    G.add_node("P1", type="Person", the_name="Jane Doe", the_country="US")

    screened = screen_graph_multi_threads(
        G,
        yente_base="http://localhost:8000",
        batch_size=20,
        max_workers=10,
        checkpoint_file="screening_checkpoint.csv",
        checkpoint_frequency=50
    )

    # Print out the raw screening CSV path for later ingestion
    print("Screening results checkpointed to:", os.path.abspath("screening_checkpoint.csv"))

2 nodes → 4 pending batches across 4 scopes.
  completed 1/4 batches
  → checkpointed after 4 batches
  completed 4/4 batches
Screening results checkpointed to: /Users/wiktorrajca/Documents/GitHub/Data-Science-Honors-Thesis/code/screening_checkpoint.csv


In [6]:
screen_graph_multi_threads(graph)

408728 nodes → 32700 pending batches across 4 scopes.
  completed 1/32700 batches
  → checkpointed after 100 batches
  completed 100/32700 batches
  → checkpointed after 200 batches
  completed 200/32700 batches
  → checkpointed after 300 batches
  completed 300/32700 batches
  → checkpointed after 400 batches
  completed 400/32700 batches
  → checkpointed after 500 batches
  completed 500/32700 batches
  → checkpointed after 600 batches
  completed 600/32700 batches
  → checkpointed after 700 batches
  completed 700/32700 batches
  → checkpointed after 800 batches
  completed 800/32700 batches
  → checkpointed after 900 batches
  completed 900/32700 batches
  → checkpointed after 1000 batches
  completed 1000/32700 batches
  → checkpointed after 1100 batches
  completed 1100/32700 batches
  → checkpointed after 1200 batches
  completed 1200/32700 batches
  → checkpointed after 1300 batches
  completed 1300/32700 batches
  → checkpointed after 1400 batches
  completed 1400/32700 batche

<networkx.classes.digraph.DiGraph at 0x1162a3ac0>

In [3]:
graph_it = nx.read_graphml(f"/Users/wiktorrajca/Documents/GitHub/Data-Science-Honors-Thesis/code/procurement_graph_italy_build_full_clean.graphml")

In [4]:
screen_graph_multi_threads(graph_it)

344110 nodes → 27532 pending batches across 4 scopes.
  completed 1/27532 batches
  → checkpointed after 100 batches
  completed 100/27532 batches
  → checkpointed after 200 batches
  completed 200/27532 batches
  → checkpointed after 300 batches
  completed 300/27532 batches
  → checkpointed after 400 batches
  completed 400/27532 batches
  → checkpointed after 500 batches
  completed 500/27532 batches
  → checkpointed after 600 batches
  completed 600/27532 batches
  → checkpointed after 700 batches
  completed 700/27532 batches
  → checkpointed after 800 batches
  completed 800/27532 batches
  → checkpointed after 900 batches
  completed 900/27532 batches
  → checkpointed after 1000 batches
  completed 1000/27532 batches
  → checkpointed after 1100 batches
  completed 1100/27532 batches
  → checkpointed after 1200 batches
  completed 1200/27532 batches
  → checkpointed after 1300 batches
  completed 1300/27532 batches
  → checkpointed after 1400 batches
  completed 1400/27532 batche

2025-05-01 22:12:22,135 ERROR: Scope sanctions batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-01 22:12:22,751 ERROR: Scope sanctions batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-01 22:12:23,343 ERROR: Scope sanctions batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-01 22:12:24,646 ERROR: Scope sanctions batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-01 22:12:25,172 ERROR: Scope sanctions batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-01 22:12:25,842 ERROR: Scope sanctions batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-01 22:12:30,521 ERROR: Scope sanctions batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read time

  → checkpointed after 3100 batches
  completed 3100/27532 batches


2025-05-01 22:15:24,670 ERROR: Scope sanctions batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-01 22:15:25,194 ERROR: Scope sanctions batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-01 22:15:25,866 ERROR: Scope sanctions batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-01 22:15:30,542 ERROR: Scope sanctions batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-01 22:15:37,615 ERROR: Scope sanctions batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)


  → checkpointed after 3200 batches
  completed 3200/27532 batches
  → checkpointed after 3300 batches
  completed 3300/27532 batches
  → checkpointed after 3400 batches
  completed 3400/27532 batches
  → checkpointed after 3500 batches
  completed 3500/27532 batches
  → checkpointed after 3600 batches
  completed 3600/27532 batches
  → checkpointed after 3700 batches
  completed 3700/27532 batches
  → checkpointed after 3800 batches
  completed 3800/27532 batches
  → checkpointed after 3900 batches
  completed 3900/27532 batches
  → checkpointed after 4000 batches
  completed 4000/27532 batches
  → checkpointed after 4100 batches
  completed 4100/27532 batches
  → checkpointed after 4200 batches
  completed 4200/27532 batches
  → checkpointed after 4300 batches
  completed 4300/27532 batches
  → checkpointed after 4400 batches
  completed 4400/27532 batches
  → checkpointed after 4500 batches
  completed 4500/27532 batches
  → checkpointed after 4600 batches
  completed 4600/27532 bat

2025-05-02 00:18:18,798 ERROR: Scope regulatory batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-02 00:18:19,112 ERROR: Scope regulatory batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-02 00:18:19,637 ERROR: Scope regulatory batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-02 00:18:20,265 ERROR: Scope regulatory batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-02 00:18:20,884 ERROR: Scope regulatory batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-02 00:18:21,680 ERROR: Scope regulatory batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-02 00:18:22,971 ERROR: Scope regulatory batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (re

  → checkpointed after 16900 batches
  completed 16900/27532 batches
  → checkpointed after 17000 batches
  completed 17000/27532 batches
  → checkpointed after 17100 batches
  completed 17100/27532 batches
  → checkpointed after 17200 batches
  completed 17200/27532 batches
  → checkpointed after 17300 batches
  completed 17300/27532 batches
  → checkpointed after 17400 batches
  completed 17400/27532 batches
  → checkpointed after 17500 batches
  completed 17500/27532 batches
  → checkpointed after 17600 batches
  completed 17600/27532 batches
  → checkpointed after 17700 batches
  completed 17700/27532 batches
  → checkpointed after 17800 batches
  completed 17800/27532 batches
  → checkpointed after 17900 batches
  completed 17900/27532 batches
  → checkpointed after 18000 batches
  completed 18000/27532 batches
  → checkpointed after 18100 batches
  completed 18100/27532 batches
  → checkpointed after 18200 batches
  completed 18200/27532 batches
  → checkpointed after 18300 batch

2025-05-02 01:43:46,116 ERROR: Scope crime batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-02 01:43:46,551 ERROR: Scope crime batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-02 01:43:47,075 ERROR: Scope crime batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-02 01:43:47,710 ERROR: Scope crime batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-02 01:43:48,233 ERROR: Scope crime batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-02 01:43:49,164 ERROR: Scope crime batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-02 01:43:50,349 ERROR: Scope crime batch failed: HTTPConnectionPool(host='localhost', port=8000): Read timed out. (read timeout=60)
2025-05-02 01:43:50,

  → checkpointed after 23800 batches
  completed 23800/27532 batches
  → checkpointed after 23900 batches
  completed 23900/27532 batches
  → checkpointed after 24000 batches
  completed 24000/27532 batches
  → checkpointed after 24100 batches
  completed 24100/27532 batches
  → checkpointed after 24200 batches
  completed 24200/27532 batches
  → checkpointed after 24300 batches
  completed 24300/27532 batches
  → checkpointed after 24400 batches
  completed 24400/27532 batches
  → checkpointed after 24500 batches
  completed 24500/27532 batches
  → checkpointed after 24600 batches
  completed 24600/27532 batches
  → checkpointed after 24700 batches
  completed 24700/27532 batches
  → checkpointed after 24800 batches
  completed 24800/27532 batches
  → checkpointed after 24900 batches
  completed 24900/27532 batches
  → checkpointed after 25000 batches
  completed 25000/27532 batches
  → checkpointed after 25100 batches
  completed 25100/27532 batches
  → checkpointed after 25200 batch

<networkx.classes.digraph.DiGraph at 0x10742eb50>