# Multi-Faceted B2B Name Matching

This notebook converts the script into a step-by-step workflow.

Each section isolates one key concept:
1. Setup and dependencies
2. Business-specific normalization rules
3. Parsing and profile creation
4. Scoring components
5. Hard gates and penalties
6. Ranking and one-to-one controls
7. Labeled evaluation metrics


In [None]:
# Core imports used across the notebook.
# pandas: tabular data operations
# rapidfuzz: fast fuzzy string similarity
# jellyfish/unidecode: optional phonetic + text normalization helpers

from __future__ import annotations

from dataclasses import dataclass
import re
import unicodedata
from typing import Dict, List, Set, Tuple

import pandas as pd
from rapidfuzz import fuzz

try:
    import jellyfish
except Exception:
    jellyfish = None

try:
    from unidecode import unidecode
except Exception:
    unidecode = None


## 1) Business Normalization Dictionaries

This section defines the rules used to normalize company names.

Why this matters:
- Legal suffixes (`Ltd`, `Limited`, `Inc`, `Co`, `LLC`, etc.) are often present in one source and absent in another.
- Synonyms and abbreviations (`MFG` vs `Manufacturing`) should map to a shared canonical token.
- Generic words (`Group`, `Holdings`) can create false positives if not controlled.


In [None]:
# Legal suffix tokens removed during normalization.
# These usually do not distinguish one company from another.
LEGAL_SUFFIXES = {
    "inc", "incorporated", "corp", "corporation", "co", "company",
    "ltd", "limited", "llc", "l l c", "llp", "lp", "plc",
    "gmbh", "sa", "ag", "bv", "pte", "pty", "sarl",
}

# Extra words that are often non-unique in business names.
BUSINESS_NOISE = {
    "the", "group", "holdings", "holding", "partners", "ventures",
    "international", "global",
}

# Token-level normalization aliases for common abbreviations.
TOKEN_ALIASES = {
    "&": "and",
    "intl": "international",
    "int'l": "international",
    "technologies": "technology",
    "tech": "technology",
    "svcs": "services",
    "svc": "services",
    "mfg": "manufacturing",
    "manufacture": "manufacturing",
    "mgmt": "management",
}

# Aliases used when building anchor keys (last significant token).
ANCHOR_ALIASES = {
    "oneil": "oneill",
    "o'neil": "oneill",
    "hlth": "health",
}

# If anchor is very generic, score should be discounted.
GENERIC_CORE_TOKENS = {
    "solutions", "systems", "services", "consulting",
    "logistics", "trading", "management", "technology",
}


## 2) Data Model + Parsing Helpers

We build a `ParsedBusiness` profile once per name and reuse it.

Why this helps:
- Keeps matching logic fast and deterministic.
- Separates raw text from normalized/computed features.
- Produces reusable keys for blocking and strict gates.


In [None]:
@dataclass
class ParsedBusiness:
    # Original input name from source system.
    raw: str
    # Canonical normalized string after cleanup and token mapping.
    normalized: str
    # Ordered normalized tokens.
    tokens: List[str]
    # Set form used for overlap checks.
    token_set: Set[str]
    # Last meaningful token (e.g., 'logistics', 'industrial').
    anchor: str
    # First meaningful token (used in blocking key).
    first_token: str
    # Acronym derived from normalized tokens.
    acronym: str
    # Optional phonetic fingerprints.
    metaphone: str
    soundex: str
    # Blocking key to reduce candidate explosion.
    block_key: str


def _ascii_fold(value: str) -> str:
    # Prefer unidecode if available, else standard Unicode decomposition.
    if unidecode is not None:
        return unidecode(value)
    return unicodedata.normalize("NFKD", value).encode("ascii", "ignore").decode("ascii")


def _safe_metaphone(value: str) -> str:
    if jellyfish is None or not value:
        return ""
    return jellyfish.metaphone(value)


def _safe_soundex(value: str) -> str:
    if jellyfish is None or not value:
        return ""
    return jellyfish.soundex(value)


def _safe_jaro_winkler(a: str, b: str) -> float:
    # Jaro-Winkler is often useful for near-typo business names.
    if not a or not b:
        return 0.0
    if jellyfish is None:
        return fuzz.WRatio(a, b) / 100.0
    return float(jellyfish.jaro_winkler_similarity(a, b))


def _normalize_token(token: str) -> str:
    token = token.strip().lower()
    if token in TOKEN_ALIASES:
        token = TOKEN_ALIASES[token]
    return ANCHOR_ALIASES.get(token, token)


def _tokenize_business(value: str) -> List[str]:
    # 1) normalize punctuation/symbols
    cleaned = value.lower().replace("&", " and ")
    cleaned = re.sub(r"[^a-z0-9\s']", " ", cleaned)

    # 2) map tokens to canonical aliases
    tokens = [_normalize_token(t.strip("'")) for t in cleaned.split() if t.strip("'")]

    # 3) drop legal suffixes and generic noise
    filtered: List[str] = []
    for t in tokens:
        if not t:
            continue
        if t in LEGAL_SUFFIXES:
            continue
        if t in BUSINESS_NOISE:
            continue
        filtered.append(t)
    return filtered


def _normalize_anchor(token: str) -> str:
    if not token:
        return ""
    return ANCHOR_ALIASES.get(token, token)


def parse_and_normalize_name(raw_name: str) -> ParsedBusiness:
    # Keep function name for compatibility with existing script usage.
    raw_name = "" if raw_name is None else str(raw_name)
    folded = _ascii_fold(raw_name)

    tokens = _tokenize_business(folded)
    normalized = " ".join(tokens)

    # Feature engineering for matching controls.
    acronym = "".join(t[0] for t in tokens if t)
    anchor = _normalize_anchor(tokens[-1]) if tokens else ""
    first_token = tokens[0] if tokens else ""

    # Blocking key: first initial + anchor phonetic fingerprint.
    anchor_soundex = _safe_soundex(anchor)
    first_initial = first_token[0] if first_token else ""
    block_key = f"{first_initial}:{anchor_soundex or anchor}"

    return ParsedBusiness(
        raw=raw_name,
        normalized=normalized,
        tokens=tokens,
        token_set=set(tokens),
        anchor=anchor,
        first_token=first_token,
        acronym=acronym,
        metaphone=_safe_metaphone(normalized.replace(" ", "")),
        soundex=_safe_soundex(normalized.replace(" ", "")),
        block_key=block_key,
    )


## 3) Match Configuration + Scoring Components

We combine multiple signals into one score:
- exact normalized match
- token sort ratio
- token set ratio
- Jaro-Winkler similarity
- phonetic agreement
- acronym similarity

Weighted scoring avoids over-relying on one fuzzy metric.


In [None]:
@dataclass
class MatchConfig:
    # Decision thresholds for output bands.
    auto_threshold: float = 92.0
    review_threshold: float = 84.0
    reject_threshold: float = 74.0

    # Minimum component quality gates.
    min_token_set: float = 88.0
    min_jaro_winkler: float = 86.0

    # Gate switches.
    require_anchor_match: bool = True
    require_token_overlap: bool = True
    min_shared_tokens: int = 1

    # Difference required between top and runner-up candidates.
    top_margin_required: float = 4.0

    # Signal weights for final score.
    weights: Dict[str, float] | None = None

    def __post_init__(self) -> None:
        if self.weights is None:
            self.weights = {
                "exact": 0.30,
                "token_sort": 0.20,
                "token_set": 0.25,
                "jaro_winkler": 0.10,
                "phonetic": 0.10,
                "acronym": 0.05,
            }


def _anchors_match(left: ParsedBusiness, right: ParsedBusiness) -> bool:
    return bool(left.anchor and right.anchor and left.anchor == right.anchor)


def _shared_token_count(left: ParsedBusiness, right: ParsedBusiness) -> int:
    return len(left.token_set.intersection(right.token_set))


def _component_scores(left: ParsedBusiness, right: ParsedBusiness) -> Dict[str, float]:
    exact = 100.0 if left.normalized and left.normalized == right.normalized else 0.0
    token_sort = float(fuzz.token_sort_ratio(left.normalized, right.normalized))
    token_set = float(fuzz.token_set_ratio(left.normalized, right.normalized))
    jaro_winkler = _safe_jaro_winkler(left.normalized, right.normalized) * 100.0

    phonetic = 0.0
    if left.metaphone and right.metaphone and left.metaphone == right.metaphone:
        phonetic += 50.0
    if left.soundex and right.soundex and left.soundex == right.soundex:
        phonetic += 50.0

    acronym = 0.0
    if left.acronym and right.acronym:
        acronym = float(fuzz.ratio(left.acronym, right.acronym))

    return {
        "exact": exact,
        "token_sort": token_sort,
        "token_set": token_set,
        "jaro_winkler": jaro_winkler,
        "phonetic": phonetic,
        "acronym": acronym,
    }


## 4) Controls: Gates, Penalties, and Decision Bands

This is where false positives are reduced.

Controls applied:
- Hard gate on anchor alignment (business equivalent of strict key token matching)
- Minimum token overlap and component minimums
- Penalties for underspecified names or generic anchors
- Decision bands (`auto_match`, `manual_review`, `weak_match`, `reject`)


In [None]:
def _penalty_points(left: ParsedBusiness, right: ParsedBusiness) -> float:
    penalty = 0.0

    # Very short entity names are ambiguous.
    if len(left.tokens) <= 1 or len(right.tokens) <= 1:
        penalty += 15.0

    # Large token-count mismatch increases risk.
    token_gap = abs(len(left.tokens) - len(right.tokens))
    if token_gap >= 2:
        penalty += 8.0
    elif token_gap == 1:
        penalty += 3.0

    # Generic anchors require extra caution.
    if left.anchor and right.anchor and left.anchor == right.anchor:
        if left.anchor in GENERIC_CORE_TOKENS:
            penalty += 7.0

    # If overlap is only the generic anchor, penalize.
    if _shared_token_count(left, right) == 1 and left.anchor == right.anchor:
        penalty += 5.0

    return penalty


def _passes_hard_gates(
    left: ParsedBusiness,
    right: ParsedBusiness,
    components: Dict[str, float],
    config: MatchConfig,
) -> Tuple[bool, str]:
    if config.require_anchor_match and not _anchors_match(left, right):
        return False, "failed_anchor_gate"

    if config.require_token_overlap and _shared_token_count(left, right) < config.min_shared_tokens:
        return False, "failed_token_overlap_gate"

    if components["token_set"] < config.min_token_set:
        return False, "failed_token_set_minimum"

    if components["jaro_winkler"] < config.min_jaro_winkler:
        return False, "failed_jaro_minimum"

    return True, "passed"


def _decision_band(score: float, config: MatchConfig) -> str:
    if score >= config.auto_threshold:
        return "auto_match"
    if score >= config.review_threshold:
        return "manual_review"
    if score >= config.reject_threshold:
        return "weak_match"
    return "reject"


def _blocked_candidate_indices(left_profile: ParsedBusiness, right_profiles: Dict[int, ParsedBusiness]) -> List[int]:
    # First pass: strict block key.
    in_block = [idx for idx, p in right_profiles.items() if p.block_key == left_profile.block_key]
    if in_block:
        return in_block

    # Fallback: same anchor.
    fallback = [idx for idx, p in right_profiles.items() if _anchors_match(left_profile, p)]
    if fallback:
        return fallback

    # Final fallback preserves recall for edge cases.
    return list(right_profiles.keys())


def score_name_pair(
    left: ParsedBusiness,
    right: ParsedBusiness,
    config: MatchConfig,
) -> Tuple[float, Dict[str, float], str]:
    components = _component_scores(left, right)

    passed, gate_reason = _passes_hard_gates(left, right, components, config)
    if not passed:
        return 0.0, components, gate_reason

    weighted_sum = sum(components[k] * config.weights[k] for k in config.weights)
    weighted_score = weighted_sum / sum(config.weights.values())

    final = max(0.0, weighted_score - _penalty_points(left, right))
    return round(final, 2), components, gate_reason


## 5) Sample Data and Labels

This sample intentionally includes:
- legal suffix variations (`Ltd`, `Limited`, `Co`, `Company`, `Inc`)
- abbreviation variations (`MFG` vs `Manufacturing`)
- distractor records to test false-positive resistance


In [None]:
def build_sample_dataframes() -> Tuple[pd.DataFrame, pd.DataFrame]:
    left = pd.DataFrame(
        {
            "record_id": [1, 2, 3, 4, 5, 6, 7, 8],
            "name": [
                "North Star Logistics Ltd.",
                "Acme Industrial Co.",
                "Blue River Technologies Inc",
                "Summit Health Services LLC",
                "Redwood Manufacturing Limited",
                "Pioneer Energy Holdings",
                "Global Trade Partners Inc.",
                "Urban Data Systems Co",
            ],
        }
    )

    right = pd.DataFrame(
        {
            "customer_id": [101, 102, 103, 104, 105, 106, 107, 108, 109, 110],
            "full_name": [
                "North Star Logistics Limited",
                "ACME Industrial Company",
                "Blue River Technology Incorporated",
                "Summit Health Service",
                "Redwood MFG Ltd",
                "Pioneer Energy",
                "Global Trading Partners",
                "Urban Data System Inc",
                "Northern Star Logistics Co",
                "Acme Logistics Inc",
            ],
        }
    )

    return left, right


def build_sample_labels() -> pd.DataFrame:
    return pd.DataFrame(
        {
            "left_name": [
                "North Star Logistics Ltd.",
                "Acme Industrial Co.",
                "Blue River Technologies Inc",
                "Pioneer Energy Holdings",
                "Acme Industrial Co.",
                "Urban Data Systems Co",
            ],
            "right_name": [
                "North Star Logistics Limited",
                "ACME Industrial Company",
                "Blue River Technology Incorporated",
                "Pioneer Energy",
                "Acme Logistics Inc",
                "Global Trading Partners",
            ],
            "is_true_match": [1, 1, 1, 1, 0, 0],
        }
    )


## 6) Ranking Pipeline and One-to-One Assignment

The ranking process does the following:
1. Parse left and right names into profiles.
2. Use blocking to limit candidate comparisons.
3. Score each candidate with gates + penalties.
4. Apply decision bands.
5. Enforce one-to-one preference and top-margin checks.


In [None]:
def rank_matches(
    df_left: pd.DataFrame,
    df_right: pd.DataFrame,
    left_col: str = "name",
    right_col: str = "full_name",
    top_n: int = 3,
    config: MatchConfig | None = None,
) -> pd.DataFrame:
    config = config or MatchConfig()

    left_profiles = {idx: parse_and_normalize_name(v) for idx, v in df_left[left_col].fillna("").items()}
    right_profiles = {idx: parse_and_normalize_name(v) for idx, v in df_right[right_col].fillna("").items()}

    rows: List[Dict[str, object]] = []

    for left_idx, left_name in left_profiles.items():
        per_left: List[Dict[str, object]] = []
        candidate_indices = _blocked_candidate_indices(left_name, right_profiles)

        for right_idx in candidate_indices:
            right_name = right_profiles[right_idx]
            score, details, gate_reason = score_name_pair(left_name, right_name, config)
            band = _decision_band(score, config)
            if band == "reject":
                continue

            per_left.append(
                {
                    "left_index": left_idx,
                    "left_name": left_name.raw,
                    "left_normalized": left_name.normalized,
                    "left_anchor": left_name.anchor,
                    "right_index": right_idx,
                    "right_name": right_name.raw,
                    "right_normalized": right_name.normalized,
                    "right_anchor": right_name.anchor,
                    "match_score": score,
                    "decision_band": band,
                    "gate_reason": gate_reason,
                    "shared_tokens": _shared_token_count(left_name, right_name),
                    "penalty_points": round(_penalty_points(left_name, right_name), 2),
                    **{f"component_{k}": round(v, 2) for k, v in details.items()},
                }
            )

        per_left.sort(key=lambda r: r["match_score"], reverse=True)

        # If top two candidates are too close, force manual review.
        if len(per_left) >= 2:
            top_delta = per_left[0]["match_score"] - per_left[1]["match_score"]
            per_left[0]["top_margin"] = round(top_delta, 2)
            if top_delta < config.top_margin_required and per_left[0]["decision_band"] == "auto_match":
                per_left[0]["decision_band"] = "manual_review"
                per_left[0]["gate_reason"] = "tight_runner_up_margin"
        elif len(per_left) == 1:
            per_left[0]["top_margin"] = 999.0

        rows.extend(per_left[:top_n])

    result = pd.DataFrame(rows)
    if result.empty:
        return result

    # One-to-one preference: keep best left candidate per right record.
    result = result.sort_values(["right_index", "match_score"], ascending=[True, False])
    dedup_rows = []
    for _, group in result.groupby("right_index", as_index=False):
        top = group.iloc[0].copy()
        if len(group) >= 2:
            margin = float(group.iloc[0]["match_score"] - group.iloc[1]["match_score"])
            if margin < config.top_margin_required and top["decision_band"] == "auto_match":
                top["decision_band"] = "manual_review"
                top["gate_reason"] = "right_side_competition"
        dedup_rows.append(top)

    final = pd.DataFrame(dedup_rows)
    final = final.sort_values(["left_index", "match_score"], ascending=[True, False])
    return final.reset_index(drop=True)


## 7) Labeled Evaluation and Demo Run

This final section gives quick metrics to validate tuning choices.

Key metric focus for production tuning:
- Precision (keep this high to avoid false positives)
- False positive rate
- Recall (track tradeoff when tightening controls)


In [None]:
def evaluate_with_labels(
    labels_df: pd.DataFrame,
    left_col: str = "left_name",
    right_col: str = "right_name",
    label_col: str = "is_true_match",
    config: MatchConfig | None = None,
) -> pd.DataFrame:
    config = config or MatchConfig()

    y_true: List[int] = []
    y_pred: List[int] = []

    for _, row in labels_df.iterrows():
        left = parse_and_normalize_name(row[left_col])
        right = parse_and_normalize_name(row[right_col])
        score, _, _ = score_name_pair(left, right, config)
        pred = 1 if score >= config.auto_threshold else 0
        y_true.append(int(row[label_col]))
        y_pred.append(pred)

    tp = sum(1 for t, p in zip(y_true, y_pred) if t == 1 and p == 1)
    fp = sum(1 for t, p in zip(y_true, y_pred) if t == 0 and p == 1)
    fn = sum(1 for t, p in zip(y_true, y_pred) if t == 1 and p == 0)
    tn = sum(1 for t, p in zip(y_true, y_pred) if t == 0 and p == 0)

    precision = tp / (tp + fp) if (tp + fp) else 0.0
    recall = tp / (tp + fn) if (tp + fn) else 0.0
    fpr = fp / (fp + tn) if (fp + tn) else 0.0

    return pd.DataFrame(
        {
            "metric": ["precision", "recall", "false_positive_rate", "tp", "fp", "fn", "tn"],
            "value": [round(precision, 4), round(recall, 4), round(fpr, 4), tp, fp, fn, tn],
        }
    )


# Run end-to-end demo.
left_df, right_df = build_sample_dataframes()
config = MatchConfig()
matches_df = rank_matches(left_df, right_df, left_col="name", right_col="full_name", top_n=3, config=config)
metrics_df = evaluate_with_labels(build_sample_labels(), config=config)

print("Sample Left DataFrame")
display(left_df)
print("Sample Right DataFrame")
display(right_df)
print("Ranked Matches")
display(matches_df)
print("Labeled Evaluation")
display(metrics_df)
