
# Mapping Automation: Retrieve → Rank Baseline

This notebook suggests `(after_table, after_attribute)` targets in a **data lake** for each input `(before_table, before_attribute)` using:
- TF‑IDF **word** vectors (captures tokens)
- TF‑IDF **character** n‑grams (robust to `camelCase` / `snake_case` / typos)
- Lightweight lexical signals (token **Jaccard**, token **containment**, and **SequenceMatcher** from Python stdlib)

> No external packages beyond scikit‑learn are required; `difflib` is part of Python.


In [None]:

# import standard libs and sklearn pieces
import pandas as pd
import numpy as np
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from difflib import SequenceMatcher  # stdlib, no install needed


In [None]:

# define compact domain synonyms to improve matching on short column names
SYN = {
    "qty": ["quantity", "qty"],
    "quantity": ["quantity", "qty", "amount", "size"],
    "amt": ["amount", "amt", "notional"],
    "ccy": ["currency", "ccy", "fx"],
    "currency": ["currency", "ccy", "fx"],
    "date": ["date", "dt"],
    "dt": ["date", "dt"],
    "id": ["id", "identifier", "code", "key"],
    "identifier": ["id", "identifier", "code", "key"],
    "code": ["code", "id"],
    "maturity": ["maturity", "mat"],
    "product": ["product", "prd"],
    "classification": ["classification", "class", "category", "type"],
    "class": ["classification", "class", "category", "type"],
    "type": ["type", "classification", "class", "category"],
    "source": ["source", "src", "origin"],
    "system": ["system", "sys"],
    "position": ["position", "pos"],
    "trade": ["trade", "trx", "transaction"],
}


In [None]:

# normalize a string into space-separated tokens
def normalize(s: str) -> str:
    # split camelCase, then split common separators, then split letters from digits
    if pd.isna(s):
        return ""
    s = str(s)
    s = re.sub(r'(?<=[a-z])(?=[A-Z])', ' ', s)
    s = re.sub(r"[_\-/\.]", " ", s)
    s = re.sub(r"([a-zA-Z])(\d)", r"\1 \2", s)
    s = re.sub(r"(\d)([a-zA-Z])", r"\1 \2", s)
    s = s.lower()
    s = re.sub(r"[^a-z0-9 ]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s


In [None]:

# tokenize and apply lightweight synonym expansion
def expand_synonyms(tokens):
    expanded = []
    for t in tokens:
        expanded.append(t)
        if t in SYN:
            for alt in SYN[t]:
                if alt not in expanded:
                    expanded.append(alt)
    return expanded

def tokens(s: str):
    toks = normalize(s).split()
    return expand_synonyms(toks)

# small lexical utilities for scoring
def seq_ratio(a, b):
    # sequence similarity over normalized strings
    return SequenceMatcher(None, normalize(a), normalize(b)).ratio()

def jaccard(a_list, b_list):
    A, B = set(a_list), set(b_list)
    if not A and not B:
        return 0.0
    return len(A & B) / max(1, len(A | B))

def containment(a_list, b_list):
    # fraction of before tokens found inside the after tokens
    A, B = set(a_list), set(b_list)
    if not A:
        return 0.0
    return len(A & B) / len(A)


In [None]:

# set file paths (update these to your environment)
hist_path = "Sample input data.xlsx"              # optional: historic pairs for vectorizer context
after_catalog_path = "after_catalog_position.csv" # required: all candidate targets (after_table, after_attribute)
before_list_path = "before_columns_to_map.csv"    # optional: list of before cols to score in batch


In [None]:

# read historical pairs if available
try:
    hist_raw = pd.read_excel(hist_path, sheet_name=0)
    hist = hist_raw.rename(columns={
        "Before Table": "before_table",
        "Before Attribute": "before_attribute",
        "After Table": "after_table",
        "After Attribute": "after_attribute",
        "Applicable Products": "applicable_products",
        "Mandatory": "mandatory",
    })
except Exception:
    # proceed without history if file not found
    hist = pd.DataFrame(columns=["before_table","before_attribute","after_table","after_attribute","applicable_products","mandatory"])

# read the after-catalog (entire candidate universe)
after_catalog = pd.read_csv(after_catalog_path)
assert {"after_table","after_attribute"} <= set(after_catalog.columns), "after_catalog needs after_table, after_attribute"


In [None]:

# build textual fields for vectorization
after_catalog = after_catalog.copy()
after_catalog["after_text"] = after_catalog["after_table"].map(normalize) + " " + after_catalog["after_attribute"].map(normalize)

if len(hist):
    hist["before_text"] = (
        hist["before_table"].map(normalize) + " " +
        hist["before_attribute"].map(normalize) + " " +
        hist.get("applicable_products", "").fillna("").map(normalize).astype(str) + " " +
        hist.get("mandatory", "").fillna("").map(normalize).astype(str)
    ).str.strip()
    word_corpus = pd.concat([hist["before_text"], after_catalog["after_text"]], ignore_index=True)
    char_corpus = pd.concat([hist["before_attribute"].map(normalize), after_catalog["after_attribute"].map(normalize)], ignore_index=True)
else:
    word_corpus = after_catalog["after_text"]
    char_corpus = after_catalog["after_attribute"].map(normalize)


In [None]:

# fit word and char vectorizers
word_vec = TfidfVectorizer(ngram_range=(1,2), min_df=1).fit(word_corpus)
char_vec = TfidfVectorizer(analyzer="char_wb", ngram_range=(3,5), min_df=1).fit(char_corpus)

# precompute after-side matrices for fast scoring
after_word_mat = word_vec.transform(after_catalog["after_text"]).toarray()
after_char_mat = char_vec.transform(after_catalog["after_attribute"].map(normalize)).toarray()


In [None]:

# rank candidates for one (before_table, before_attribute) pair
def rank_candidates(before_table: str, before_attribute: str, topn: int = 10) -> pd.DataFrame:
    before_text = normalize(before_table) + " " + normalize(before_attribute)
    bw = word_vec.transform([before_text]).toarray()
    bc = char_vec.transform([normalize(before_attribute)]).toarray()

    sim_word = cosine_similarity(bw, after_word_mat)[0]
    sim_char = cosine_similarity(bc, after_char_mat)[0]

    b_toks = tokens(before_attribute)

    scores = []
    for j, cand in after_catalog.iterrows():
        a_attr = cand["after_attribute"]
        a_toks = tokens(a_attr)
        s_seq = seq_ratio(before_attribute, a_attr)
        s_jac = jaccard(b_toks, a_toks)
        s_cont = containment(b_toks, a_toks)
        score = (
            0.35 * sim_word[j] +
            0.25 * sim_char[j] +
            0.15 * s_seq +
            0.15 * s_jac +
            0.10 * s_cont
        )
        scores.append(score)

    order = np.argsort(-np.array(scores))
    out = after_catalog.iloc[order].copy().head(topn)
    out["score"] = np.array(scores)[order][:topn]
    return out.reset_index(drop=True)


In [None]:

# example 1: attribute with mixed words
rank_candidates("position table", "TradeDateQuantity", topn=10)


In [None]:

# example 2: attribute with concatenated business terms
rank_candidates("position table", "productclassificationsource", topn=10)


In [None]:

# batch scoring from a CSV: expects columns [before_table, before_attribute]
try:
    before_df = pd.read_csv(before_list_path)
except Exception:
    before_df = pd.DataFrame([
        {"before_table":"position table","before_attribute":"TradeDateQuantity"},
        {"before_table":"position table","before_attribute":"productclassificationsource"},
    ])

rows = []
for _, r in before_df.iterrows():
    bt = r["before_table"]
    ba = r["before_attribute"]
    preds = rank_candidates(bt, ba, topn=5)
    row = {"before_table": bt, "before_attribute": ba}
    for k in range(len(preds)):
        row[f"pred_{k+1}_table"] = preds.loc[k, "after_table"]
        row[f"pred_{k+1}_attribute"] = preds.loc[k, "after_attribute"]
        row[f"pred_{k+1}_score"] = float(preds.loc[k, "score"])
    rows.append(row)

pred_df = pd.DataFrame(rows)
pred_df.to_csv("mapping_suggestions.csv", index=False)
pred_df.head(10)


In [None]:

# apply a simple decision rule for auto-mapping vs review
auto_threshold = 0.40  # adjust based on your tolerance for false positives

def auto_map(row):
    if pd.notna(row.get("pred_1_score", np.nan)) and row["pred_1_score"] >= auto_threshold:
        return "AUTO_MAP"
    return "REVIEW"

pred_df["decision"] = pred_df.apply(auto_map, axis=1)
pred_df.to_csv("mapping_suggestions_with_decisions.csv", index=False)
pred_df.head(10)
