
# Advanced Company Profiling & Data Validation (UK Companies)

**Objective:** Profile, cleanse, deduplicate, match and enrich UK company contact records using the **Companies House REST API**, then produce clear **reports and charts**.  
> Put your `Company.csv` in this folder before running the notebook.

**Pipeline**
1. Data Collection (load `Company.csv`)
2. Profiling
3. Cleansing
4. Deduplication
5. Matching (Companies House REST API)
6. Enrichment
7. Reporting & visualisation
8. Packaging / outputs


## 0. Setup & Configuration

In [None]:

# Install deps if needed (uncomment for first run)
# %pip install -r requirements.txt

import os
import re
from pathlib import Path

import pandas as pd
import numpy as np
import requests
from tqdm import tqdm
from rapidfuzz import fuzz, process
import matplotlib.pyplot as plt

# Optional: read API key from .env if present
try:
    from dotenv import load_dotenv
    load_dotenv()
except Exception:
    pass

DATA_PATH = Path("Company.csv")  # Ensure this file exists in the project root
OUTPUT_DIR = Path("outputs")
OUTPUT_DIR.mkdir(exist_ok=True)

# Companies House API configuration
# Obtain an API key: https://developer.company-information.service.gov.uk/get-started
# Auth: Basic Auth with API key as username, empty password.
COMPANIES_HOUSE_API_KEY = os.getenv("COMPANIES_HOUSE_API_KEY", "")
BASE_URL = "https://api.company-information.service.gov.uk"  # Official base

session = requests.Session()
session.auth = (COMPANIES_HOUSE_API_KEY, "")

print("API key configured:", bool(COMPANIES_HOUSE_API_KEY))


## 1. Data Collection

In [None]:

# Load input CSV
assert DATA_PATH.exists(), "Company.csv not found. Place it next to this notebook."
df_raw = pd.read_csv(DATA_PATH)
print("Rows:", len(df_raw))
display(df_raw.head())


## 2. Profiling

In [None]:

import pandas as pddef profile_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    summary = []
    for col in df.columns:
        s = df[col]
        summary.append({
            "column": col,
            "dtype": s.dtype.name,
            "non_null": int(s.notna().sum()),
            "nulls": int(s.isna().sum()),
            "unique": int(s.nunique(dropna=True)),
            "sample_values": list(s.dropna().astype(str).head(3))
        })
    return pd.DataFrame(summary)

profile = profile_dataframe(df_raw)
display(profile)

# Basic null matrix (% of nulls)
null_pct = df_raw.isna().mean().sort_values(ascending=False)
display(null_pct.to_frame("null_pct"))


## 3. Cleansing

In [None]:

import unicodedata
import string

LEGAL_SUFFIX_MAP = {
    "LTD": "LIMITED",
    "LTD.": "LIMITED",
    "LIMITED": "LIMITED",
    "PLC": "PLC",
    "LLP": "LLP",
    "L.L.P": "LLP",
}

def normalize_whitespace(s: str) -> str:
    return re.sub(r"\s+", " ", s).strip()

def strip_punct(s: str) -> str:
    return s.translate(str.maketrans("", "", string.punctuation))

def normalize_company_name(name: str) -> str:
    if not isinstance(name, str) or not name.strip():
        return ""
    # Unicode normalize
    name = unicodedata.normalize("NFKC", name)
    # Upper-case
    name = name.upper()
    # Standardize legal suffixes
    tokens = normalize_whitespace(strip_punct(name)).split(" ")
    tokens = [t for t in tokens if t]
    # Map suffix if present
    if tokens and tokens[-1] in LEGAL_SUFFIX_MAP:
        tokens[-1] = LEGAL_SUFFIX_MAP[tokens[-1]]
    return " ".join(tokens)

def normalize_postcode(pc: str) -> str:
    if not isinstance(pc, str):
        return ""
    return normalize_whitespace(pc.upper())

def normalize_email(email: str) -> str:
    if not isinstance(email, str):
        return ""
    return email.strip().lower()

def cleanse(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    # Heuristic column detection
    name_col = None
    for cand in ["company_name","CompanyName","name","Name","Company","Organisation","organisation"]:
        if cand in out.columns:
            name_col = cand
            break
    if name_col is None:
        # Fall back to first column
        name_col = out.columns[0]

    if "postcode" in out.columns:
        out["postcode_norm"] = out["postcode"].apply(normalize_postcode)
    elif "Postcode" in out.columns:
        out["postcode_norm"] = out["Postcode"].apply(normalize_postcode)
    else:
        out["postcode_norm"] = ""

    # Email normalization if present
    if "email" in out.columns:
        out["email_norm"] = out["email"].apply(normalize_email)
    else:
        out["email_norm"] = ""

    out["company_name_norm"] = out[name_col].astype(str).apply(normalize_company_name)
    # Blocking key for dedup (tune as needed)
    out["block_key"] = out["company_name_norm"].str.replace(" ", "") + "|" + out["postcode_norm"]
    return out

df_clean = cleanse(df_raw)
display(df_clean.head())


## 4. Deduplication

In [None]:

# Rule-based: drop exact duplicates on normalized keys first
dedup_cols = ["company_name_norm","postcode_norm","email_norm"]
before = len(df_clean)
df_dedup = df_clean.drop_duplicates(subset=dedup_cols, keep="first").copy()
print(f"Exact dedup removed {before - len(df_dedup)} rows")

# Fuzzy dedup within blocks using RapidFuzz
def fuzzy_cluster_block(block_df: pd.DataFrame, name_col="company_name_norm", threshold=95):
    # Greedy clustering by name similarity
    taken = set()
    clusters = []
    names = block_df[name_col].tolist()
    idxs = block_df.index.tolist()
    for i, idx in enumerate(idxs):
        if idx in taken:
            continue
        rep = names[i]
        cluster = [idx]
        taken.add(idx)
        # compare with remaining in block
        for j in range(i+1, len(idxs)):
            jidx = idxs[j]
            if jidx in taken: 
                continue
            score = fuzz.token_set_ratio(rep, names[j])
            if score >= threshold:
                cluster.append(jidx)
                taken.add(jidx)
        clusters.append(cluster)
    return clusters

# Apply fuzzy clustering per block
clusters = []
for block_val, block_df in df_dedup.groupby("block_key"):
    if len(block_df) == 1:
        clusters.append([block_df.index[0]])
    else:
        clusters.extend(fuzzy_cluster_block(block_df, threshold=93))

# Choose first record as canonical in each cluster
canonical_idx = [c[0] for c in clusters]
df_canonical = df_dedup.loc[canonical_idx].copy().reset_index(drop=True)
print("Canonical records:", len(df_canonical), "from", len(df_dedup))

# Save mapping of original -> canonical if needed
cluster_map = {}
for cid, cl in enumerate(clusters):
    for idx in cl:
        cluster_map[idx] = cid
df_dedup["cluster_id"] = df_dedup.index.map(cluster_map)
df_dedup.to_csv(OUTPUT_DIR / "dedup_clusters.csv", index=False)


## 5. Matching (Companies House) & Enrichment

In [None]:

from urllib.parse import quote

def ch_search_company(query: str, items_per_page: int = 5):
    if not COMPANIES_HOUSE_API_KEY:
        raise RuntimeError("Set COMPANIES_HOUSE_API_KEY env var before calling Companies House API.")
    url = f"{BASE_URL}/search/companies?q={quote(query)}&items_per_page={items_per_page}"
    r = session.get(url, timeout=20)
    if r.status_code == 429:
        # Rate-limited; caller can retry with backoff
        return {"rate_limited": True, "status_code": r.status_code}
    r.raise_for_status()
    return r.json()

def best_match_for_name(name: str):
    data = ch_search_company(name, items_per_page=10)
    if isinstance(data, dict) and data.get("rate_limited"):
        return None
    items = data.get("items", [])
    if not items:
        return None
    # Score candidates by fuzzy similarity on title
    scored = []
    for it in items:
        title = it.get("title","") or it.get("company_name","")
        score = fuzz.token_set_ratio(name.upper(), (title or "").upper())
        scored.append((score, it))
    scored.sort(reverse=True, key=lambda x: x[0])
    return scored[0][1] if scored else None

enrich_cols = [
    "company_number","company_status","company_type","date_of_creation",
    "address_snippet","kind","title","links"
]

matches = []
for name in tqdm(df_canonical["company_name_norm"], desc="Matching Companies House"):
    try:
        m = best_match_for_name(name)
    except Exception as e:
        m = None
    matches.append(m)

df_matches = pd.json_normalize(matches)[enrich_cols] if any(matches) else pd.DataFrame(columns=enrich_cols)
df_enriched = pd.concat([df_canonical.reset_index(drop=True), df_matches.reset_index(drop=True)], axis=1)
display(df_enriched.head())

df_enriched.to_csv(OUTPUT_DIR / "enriched_companies.csv", index=False)


## 6. Reporting & Visualisation

In [None]:

# KPIs
kpis = {
    "input_rows": len(df_raw),
    "after_exact_dedup": len(df_dedup),
    "canonical_records": len(df_canonical),
    "enriched_records": int(df_enriched["company_number"].notna().sum()) if "company_number" in df_enriched else 0,
    "match_rate_pct": round(100.0 * (df_enriched["company_number"].notna().mean() if "company_number" in df_enriched else 0.0), 2)
}
print(kpis)

# Bar chart: Status distribution (if present)
if "company_status" in df_enriched:
    status_counts = df_enriched["company_status"].fillna("unmatched").value_counts().sort_values(ascending=False)
    plt.figure()
    status_counts.plot(kind="bar", title="Company status distribution")
    plt.xlabel("status"); plt.ylabel("count")
    plt.tight_layout()
    plt.show()


## 7. Outputs

In [None]:

# Save final outputs
df_profile = profile_dataframe(df_raw)
df_profile.to_csv(OUTPUT_DIR / "profile.csv", index=False)

df_enriched.to_parquet(OUTPUT_DIR / "enriched_companies.parquet", index=False)
print("Files written to:", OUTPUT_DIR.resolve())
list(OUTPUT_DIR.glob("*"))



## 8. Notes & Next Steps
- Tune **normalisation** and **blocking** to fit your data (e.g., include city or first line of address).
- Adjust **fuzzy thresholds**; lower threshold finds more matches but risks false positives.
- Use **exponential backoff** on HTTP 429 (rate limiting: ~600 requests / 5 minutes).
- Consider caching API responses locally to avoid re-calling the same names.
- For high volumes, batch processing and resume checkpoints are recommended.
