### Scrape REGULAR elections results from a landing page

The settings are at the bottom of the main function - untoggle as needed

In [None]:
import requests
from bs4 import BeautifulSoup
import json
import time

BASE_URL = "https://democracy.kent.gov.uk:9071"
HEADERS = {
    "User-Agent": "Mozilla/5.0 (compatible; ElectionScraper/1.0)"
}

def scrape_election(eid, election_date, rpid=None):
    print(f"\n📋 Scraping election {eid} on {election_date}...\n")

    # Construct the master page URL
    start_url = f"{BASE_URL}/mgElectionElectionAreaResults.aspx?Page=all&EID={eid}"
    if rpid:
        start_url += f"&RPID={rpid}"

    # --- Step 1: Get all division result links
    res = requests.get(start_url, headers=HEADERS)
    soup = BeautifulSoup(res.text, "html.parser")
    links = []
    seen = set()

    for a in soup.select("a"):
        href = a.get("href", "")
        text = a.get_text(strip=True)
        if "mgElectionAreaResults.aspx" in href and "ID=" in href:
            full_url = BASE_URL + "/" + href.lstrip("/")
            if full_url not in seen:
                seen.add(full_url)
                links.append((text, full_url))

    print(f"🔗 Found {len(links)} division links...")

    # --- Step 2: Scrape each division
    results = []
    failed = []

    for i, (name, url) in enumerate(links):
        print(f"[{i+1}/{len(links)}] Scraping: {name}")
        try:
            division_data = parse_division_page(name, url)
            division_data["election_date"] = election_date
            results.append(division_data)
            time.sleep(0.4)
        except Exception as e:
            print(f"⚠️ Failed on {name}: {e}")
            failed.append({"division": name, "url": url, "error": str(e)})

    # --- Step 3: Save results
    out_file = f"../data/elections/kent_results_{election_date}.json"
    with open(out_file, "w", encoding="utf-8") as f:
        json.dump(results, f, indent=2, ensure_ascii=False)

    if failed:
        with open(f"failed_{election_date}.json", "w", encoding="utf-8") as f:
            json.dump(failed, f, indent=2)

    print(f"\n✅ Saved {len(results)} results to '{out_file}'")
    if failed:
        print(f"⚠️ {len(failed)} divisions failed — see 'failed_{election_date}.json'")

# ------------------------------------------
# Helper: Parse individual division page
# ------------------------------------------

def parse_division_page(name, url):
    res = requests.get(url, headers=HEADERS)
    soup = BeautifulSoup(res.text, "html.parser")
    tables = soup.select("table.mgStatsTable")

    if len(tables) < 1:
        return {
            "division": name,
            "url": url,
            "status": "no_tables_found"
        }

    # Try to locate correct tables by caption
    candidate_table = next((t for t in tables if "Candidate" in t.get_text() or "results" in t.get_text().lower()), None)
    summary_table = next((t for t in tables if "Voting Summary" in t.get_text()), None)

    if not candidate_table or not summary_table:
        return {
            "division": name,
            "url": url,
            "status": "incomplete_data"
        }

    # --- Candidate table
    candidates = []
    candidate_rows = candidate_table.find_all("tr")[1:]
    for row in candidate_rows:
        cols = [td.get_text(strip=True) for td in row.find_all("td")]
        if len(cols) != 5:
            continue
        candidates.append({
            "name": cols[0],
            "party": cols[1],
            "votes": int(cols[2].replace(",", "")),
            "percentage": cols[3],
            "outcome": cols[4]
        })

    # --- Summary table
    summary = {}
    summary_rows = summary_table.find_all("tr")[1:]
    for row in summary_rows:
        cols = [td.get_text(strip=True) for td in row.find_all("td")]
        if len(cols) != 2 or not cols[0].strip():
            continue
        key = cols[0].lower().replace(" ", "_")
        val = cols[1].replace(",", "")
        summary[key] = int(val) if val.isdigit() else val

    return {
        "division": name,
        "url": url,
        "status": "ok",
        "candidates": candidates,
        "summary": summary
    }

# ------------------------------------------
# Example usage
# ------------------------------------------

if __name__ == "__main__":
    #scrape_election(eid=51, election_date="2025-05-01")
    #scrape_election(eid=32, election_date="2021-05-06")
    #scrape_election(eid=20, election_date="2017-05-04")
    # scrape_election(eid=12, election_date="2013-05-02")
    scrape_election(eid=3,  election_date="2009-06-04")


### By-election results scraping

In [None]:
import requests
from bs4 import BeautifulSoup
import json
import time
import re
from dateutil.parser import parse as parse_date

BASE_URL = "https://democracy.kent.gov.uk"
FINAL_HOST = "https://democracy.kent.gov.uk:9071"
START_URL = f"{BASE_URL}/mgManageElectionResults.aspx?bcr=1"
HEADERS = {
    "User-Agent": "Mozilla/5.0 (compatible; ByElectionScraper/1.0)"
}

def try_parse_int(s):
    try:
        return int(s.replace(",", ""))
    except:
        return s

def parse_name_and_date(text):
    """
    Extracts clean division name and ISO election date from mixed link formats.
    Supports:
        - "Election results for Division Name, 6 July 2023"
        - "Division Name by-election, 21/11/2024"
    """
    # Try flexible match: split last comma-separated part as date
    match = re.match(r"^(.*?),\s*(\d{1,2}[/\s]\w+[/\s]\d{4})$", text)
    if match:
        name = match.group(1).strip()
        date_str = match.group(2).strip()
        try:
            election_date = parse_date(date_str, dayfirst=True).date().isoformat()
        except:
            election_date = None
        return name, election_date

    # Fallback: just return as-is
    return text.strip(), None

def parse_division_page(name, url, election_date):
    res = requests.get(url, headers=HEADERS)
    soup = BeautifulSoup(res.text, "html.parser")
    tables = soup.select("table.mgStatsTable")

    if not tables:
        return {"division": name, "url": url, "election_date": election_date, "status": "no_tables_found"}

    candidate_table = None
    for table in tables:
        headers = [th.get_text(strip=True).lower() for th in table.find_all("th")]
        if any("candidate" in h or "votes" in h for h in headers):
            candidate_table = table
            break

    summary_table = next((t for t in tables if "Voting Summary" in t.get_text()), None)

    if not candidate_table:
        return {"division": name, "url": url, "election_date": election_date, "status": "no_candidate_table"}

    candidates = []
    rows = candidate_table.find_all("tr")[1:]
    for row in rows:
        cols = [td.get_text(strip=True) for td in row.find_all("td")]
        if len(cols) >= 4:
            candidate = {
                "name": cols[0],
                "party": cols[1],
                "votes": try_parse_int(cols[2]),
                "percentage": cols[3],
            }
            if len(cols) > 4:
                candidate["outcome"] = cols[4]
            candidates.append(candidate)

    summary = {}
    if summary_table:
        rows = summary_table.find_all("tr")[1:]
        for row in rows:
            cols = [td.get_text(strip=True) for td in row.find_all("td")]
            if len(cols) == 2:
                key = cols[0].lower().replace(" ", "_")
                summary[key] = try_parse_int(cols[1])

    return {
        "division": name,
        "election_date": election_date,
        "url": url,
        "status": "ok",
        "candidates": candidates,
        "summary": summary
    }

def scrape_byelection_results():
    print(f"📋 Scraping from: {START_URL}")
    res = requests.get(START_URL, headers=HEADERS)
    soup = BeautifulSoup(res.text, "html.parser")

    links = []
    for a in soup.select("a[href*='mgElectionResults.aspx?ID=']"):
        href = a.get("href")
        if not href:
            continue
        full_url = FINAL_HOST + "/" + href.lstrip("/").replace("mgElectionResults", "mgElectionAreaResults")
        text = a.get_text(strip=True)
        name, election_date = parse_name_and_date(text)
        links.append((name, election_date, full_url))

    print(f"🔗 Found {len(links)} byelection result links")

    results = []
    failed = []

    for i, (name, election_date, url) in enumerate(links):
        print(f"[{i+1}/{len(links)}] Scraping: {name} ({election_date})")
        try:
            result = parse_division_page(name, url, election_date)
            results.append(result)
            time.sleep(0.4)
        except Exception as e:
            print(f"⚠️ Failed: {name} — {e}")
            failed.append({"name": name, "url": url, "error": str(e)})

    # 🚫 Step: Remove junk division entries like "County Council, 01/05/2025"
    initial_count = len(results)
    results = [r for r in results if not r.get("division", "").strip().lower().startswith("county council")]
    removed = initial_count - len(results)

    print(f"\n🧹 Removed {removed} invalid 'County Council' rows from results.")

    # ✅ Save cleaned results
    with open("../data/elections/kent_byelection_results.json", "w", encoding="utf-8") as f:
        json.dump(results, f, indent=2, ensure_ascii=False)

    # 🚨 Log failures if any
    if failed:
        with open("kent_byelection_failures.json", "w", encoding="utf-8") as f:
            json.dump(failed, f, indent=2)

    print(f"\n✅ Final saved: {len(results)} byelection results. {len(failed)} failed.")

    # Save successful results
    with open("../data/elections/kent_byelection_results.json", "w", encoding="utf-8") as f:
        json.dump(results, f, indent=2, ensure_ascii=False)

    # Save failed logs
    if failed:
        with open("kent_byelection_failures.json", "w", encoding="utf-8") as f:
            json.dump(failed, f, indent=2)

    print(f"\n✅ Scraped {len(results)} byelection results. {len(failed)} failed.")

if __name__ == "__main__":
    scrape_byelection_results()


In [None]:
import pandas as pd
import json
from pathlib import Path

# Load byelection results
byelection_path = Path("../data/elections/kent_byelection_results.json")
with open(byelection_path, "r", encoding="utf-8") as f:
    raw_data = json.load(f)

# Flatten each result (one row per division)
df = pd.json_normalize(raw_data)

# Show basic shape and column names
print(f"📦 Shape: {df.shape[0]} rows × {df.shape[1]} columns\n")
print("🧾 Columns:", list(df.columns), "\n")

# Show missing value summary
print("🔍 Missing values per column:\n")
print(df.isna().sum())

# Optional: View sample of incomplete rows
print("\n🧩 Sample rows with missing values:")
display(df[df.isna().any(axis=1)].head(10))


### Consolidate all jsons into one

In [None]:
import json
import os
from glob import glob
from pathlib import Path

def merge_nested_jsonl_files(
    folder_path="../data/elections/scrape_results/",
    output_path="../data/elections/kent_merged_elections_all_nested.jsonl"
):
    merged = []
    files = sorted(glob(os.path.join(folder_path, "*_elections_nested*.jsonl")))

    for file_path in files:
        with open(file_path, "r", encoding="utf-8") as f:
            for line in f:
                try:
                    record = json.loads(line)
                    merged.append(record)
                except json.JSONDecodeError as e:
                    print(f"⚠️ Skipping malformed line in {file_path}: {e}")

    # Ensure output folder exists
    Path(output_path).parent.mkdir(parents=True, exist_ok=True)

    # Write output
    with open(output_path, "w", encoding="utf-8") as f_out:
        for record in merged:
            f_out.write(json.dumps(record, ensure_ascii=False) + "\n")

    print(f"✅ Merged {len(files)} files into '{output_path}' ({len(merged)} total elections)")

if __name__ == "__main__":
    merge_nested_jsonl_files()

### Open the consolidated file for cleaning

In [None]:
import json
import pandas as pd

# Load JSONL file
jsonl_path = "../data/elections/kent_merged_elections_all_nested.jsonl"
with open(jsonl_path, "r", encoding="utf-8") as f:
    data = [json.loads(line) for line in f]


### Election level data

In [None]:
# Convert to DataFrame
df = pd.DataFrame(data)

print(f"✅ Loaded {len(df)} election records")
df.head(20)

#### Assign urls for elections

In [None]:
# Define the BASE URLs per council
BASE_URLS = {
    "kent_cc": "https://democracy.kent.gov.uk",
    "rtw_bc": "https://democracy.tunbridgewells.gov.uk",
    "ashford_bc": "https://ashford.moderngov.co.uk",
    "dartford_bc": "https://dartford.moderngov.co.uk",
    "thanet_bc": "https://democracy.thanet.gov.uk",
    "maidstone_bc": "https://maidstone.gov.uk",
    "swale_bc": "https://services.swale.gov.uk",
    "tonbridge_bc": "https://democracy.tmbc.gov.uk",

    # Add more councils if needed
}

def construct_election_url(election_id):
    try:
        council_id, eid_raw = election_id.rsplit("_", 1)
        eid = str(int(eid_raw))  # strip leading zeroes
        base = BASE_URLS.get(council_id)
        if base:
            return f"{base}/mgElectionElectionAreaResults.aspx?EID={eid}"
    except Exception as e:
        return None

# Inject election_url into each record
for rec in data:
    rec["election_url"] = construct_election_url(rec["election_id"])

# Convert to DataFrame to check
df = pd.DataFrame(data)
df[["election_id", "election_url"]].head(2)

In [None]:
df['election_type'].value_counts()

In [None]:
df['election_level'].value_counts()

In [None]:
from IPython.display import display, HTML

# Step 1: Filter unknown election levels
unknowns = df[df["election_level"] == "unknown"].copy()

# Step 2: Count by council_id
print("🔍 Unknown election levels by council:")
print(unknowns["council_id"].value_counts(), "\n")

# Step 3: Review distinct election descriptions
print("📝 Sample election descriptions with unknown level:")
print(unknowns["election_description"].value_counts().head(30))

# Step 4: Add HTML link column
unknowns["link"] = unknowns["election_url"].apply(lambda u: f'<a href="{u}" target="_blank">view</a>')

# Step 5: Display as HTML table with clickable links
print("\n🔎 Example unknown elections:")
display(HTML(unknowns[["election_id", "council_id","election_date", "election_description", "link"]].head(10).to_html(escape=False, index=False)))

In [None]:
def classify_election_level(description):
    desc = description.lower()

    if any(word in desc for word in ["neighbourhood plan", "referendum", "parish", "town"]):
        return "parish"
    if "member of parliament" in desc or "mp" in desc:
        return "parliamentary"
    if "county" in desc or "kcc" in desc:
        return "county"
    if "ward" in desc:
        return "borough"
    if "district" in desc:
        return "district"
    if "pcc" in desc or "police" in desc:
        return "pcc"
    if "av referendum" in desc or "eu referendum" in desc:
        return "national"
    
    return "unknown"

# Strip whitespace and fill missing
df["election_level"] = df["election_level"].fillna("unknown").str.strip()

# Diagnose initial state
print("🔎 BEFORE:")
print(df["election_level"].value_counts())

# Apply reclassification
mask = df["election_level"] == "unknown"
df.loc[mask, "election_level"] = df.loc[mask, "election_description"].apply(classify_election_level)

# Diagnose result
print("\n✅ AFTER:")
print(df["election_level"].value_counts())

In [None]:
# Patch rule: if council_id contains 'cc' and election_level is still unknown → set to 'county'
mask = (df["election_level"] == "unknown") & (df["council_id"].str.contains("cc", case=False, na=False))
df.loc[mask, "election_level"] = "county"

# Confirm the fix
print("✅ Updated election_level using council_id 'cc' rule.")
print(df["election_level"].value_counts())

### Explore the divisions

In [None]:
# Explode divisions from nested structure
divisions = []

for election in data:
    for div in election.get("divisions", []):
        div_record = {
            "election_id": election.get("election_id"),
            "council_id": election.get("council_id"),
            "election_date": election.get("election_date"),
            "election_type": election.get("election_type"),
            "election_description": election.get("election_description"),
            "source_url": election.get("source_url"),
            "division_name": div.get("division"),
            "division_id": div.get("division_id"),
            "status": div.get("status"),
            "url": div.get("url"),
            "summary": div.get("summary"),
            "candidates": div.get("candidates")
        }
        divisions.append(div_record)

# Convert to DataFrame
df_divisions = pd.DataFrame(divisions)

# View the first few rows
df_divisions.head(2)

In [None]:
# === Basic Overview ===
print("📊 Shape of df_divisions:", df_divisions.shape)
print("\n🧱 Columns:\n", df_divisions.columns.tolist())
print("\n🧾 Sample Rows:")
display(df_divisions.head(10))

# === Nulls and Completeness ===
print("\n🔍 Missing Values Summary:")
missing_summary = df_divisions.isnull().sum().sort_values(ascending=False)
display(missing_summary[missing_summary > 0])

# === Election Types and Councils ===
print("\n📊 Election types per council:")
display(df_divisions.groupby(["council_id", "election_type"]).size().unstack(fill_value=0))

# === Division Name Frequency ===
print("\n📍 Most Frequent Division Names:")
display(df_divisions["division_name"].value_counts().head(20))

# === Summary stats: seats, votes, etc. ===
print("\n📈 Basic Summary Stats:")
df_divisions["seats"] = df_divisions["summary"].apply(lambda s: s.get("seats") if isinstance(s, dict) else None)
df_divisions["total_votes"] = df_divisions["summary"].apply(lambda s: s.get("total_votes") if isinstance(s, dict) else None)
df_divisions["turnout"] = df_divisions["summary"].apply(lambda s: s.get("turnout") if isinstance(s, dict) else None)

display(df_divisions[["seats", "total_votes"]].describe())

# === Turnout Parsing (if percentage strings like "45%") ===
df_divisions["turnout_pct"] = (
    df_divisions["turnout"]
    .astype(str)
    .str.replace("%", "")
    .str.extract(r"(\d+\.?\d*)")[0]
    .astype(float)
)

print("\n📉 Turnout Stats (where available):")
display(df_divisions["turnout_pct"].describe())

In [None]:
#This code below:
#* 	Looks for duplicate division names on the same date,
#* 	Flags them,
#* 	Outputs a diagnostic,
#* 	Removes them safely.

# Step 1: Sort with kent_cc on top, so we keep it if it exists
df_divisions_sorted = df_divisions.sort_values(
    by=["election_date", "division_name", "council_id"], 
    ascending=[True, True, False]  # False puts 'kent_cc' first if it exists
)

# Step 2: Drop duplicates keeping first (which will be kent_cc if present)
df_deduped = df_divisions_sorted.drop_duplicates(
    subset=["election_date", "division_name"], keep="first"
).reset_index(drop=True)

# Step 3: Diagnostics
removed = len(df_divisions) - len(df_deduped)
print(f"✅ Removed {removed} duplicate division records (kept 1 per date+division).")

# Optionally overwrite
df_divisions = df_deduped
df_divisions

In [None]:
df_divisions.drop(columns=["source_url"], inplace=True)

In [None]:
import matplotlib.pyplot as plt

# Extract total_votes into a flat list
df_divisions["total_votes"] = df_divisions["summary"].apply(lambda x: x.get("total_votes") if isinstance(x, dict) else None)
df_divisions = df_divisions.dropna(subset=["total_votes"])

# Plot histogram
plt.figure(figsize=(6.0, 4.0))
plt.style.use("ggplot")
plt.hist(df_divisions["total_votes"], bins=range(0, int(df_divisions["total_votes"].max()) + 100, 100), edgecolor="black")
plt.title("Distribution of Total Votes per Division")
plt.xlabel("Total Votes (binned by 100)")
plt.ylabel("Number of Divisions")
plt.tight_layout()
plt.show()

In [None]:
# Ensure total_votes is extracted
df_divisions["total_votes"] = df_divisions["summary"].apply(
    lambda x: x.get("total_votes") if isinstance(x, dict) else None
)

# Filter for high total_votes
high_turnout = df_divisions[df_divisions["total_votes"] > 15000]

# Display result
high_turnout[[
    "election_id", "council_id", "election_date", "division_name", "total_votes", 'seats', "status", "url"
]]

In [None]:
# Group by division_id and then by division_name
grouped = df_divisions.groupby(["division_id", "division_name"]).agg({
    "election_id": "count",
    "election_date": "min",
    "total_votes": "mean"
}).rename(columns={
    "election_id": "num_elections",
    "election_date": "first_election_date",
    "total_votes": "avg_votes"
}).reset_index()

# Sort by number of elections descending
grouped_sorted = grouped.sort_values(by="num_elections", ascending=False)

# Show top results
grouped_sorted.head(10)

In [None]:
grouped = (
    df_divisions
    .groupby(["council_id", "division_name"], dropna=False)
    .agg(
        num_elections=("election_id", "count"),
        first_election_date=("election_date", "min"),
        last_election_date=("election_date", "max"),
        avg_votes=("summary", lambda s: pd.Series([x.get("total_votes") for x in s if x]).mean()),
        example_url=("url", "first"),
    )
    .reset_index()
    .sort_values(["council_id", "division_name"])
)
grouped.tail(20)

#### Assigning proper division IDs

In [None]:
import re

def slugify(name):
    # Lowercase, replace & with "and", remove punctuation, replace spaces with underscores
    slug = name.lower()
    slug = re.sub(r"[&]", "and", slug)
    slug = re.sub(r"[^\w\s-]", "", slug)  # remove punctuation
    slug = re.sub(r"[\s]+", "_", slug)    # replace spaces with underscores
    return slug.strip("_")

# Apply to df_divisions
df_divisions["division_uid"] = df_divisions.apply(
    lambda row: f"{row['council_id']}__{slugify(row['division_name'])}",
    axis=1
)

#### Assigning division code

In [None]:
# Sort for stable ordering
df_divisions = df_divisions.sort_values(by=["council_id", "division_name"]).reset_index(drop=True)

# Create numeric codes grouped by council_id
df_divisions["division_code"] = (
    df_divisions.groupby("council_id").cumcount() + 1
).astype(str).str.zfill(3)  # e.g. 001, 002, ...

# Add council prefix to make it unique
df_divisions["division_code"] = df_divisions["council_id"].str.extract(r"^([a-z]+)")[0] + df_divisions["division_code"]

### Examining the candidate level of details

In [None]:
# === STEP: Extract candidates from each division ===
candidate_records = []

for _, row in df_divisions.iterrows():
    for cand in row.get("candidates", []):
        candidate_records.append({
            "election_id": row["election_id"],
            "council_id": row["council_id"],
            "election_type": row["election_type"],
            "election_date": row["election_date"],
            "division_name": row["division_name"],
            "division_uid": row["division_uid"],
            "division_code": row["division_code"],
            "division_url": row["url"],
            "candidate_name": cand.get("name"),
            "party": cand.get("party"),
            "votes": pd.to_numeric(cand.get("votes"), errors="coerce"),
            "percentage": pd.to_numeric(str(cand.get("percentage")).replace("%", ""), errors="coerce"),
            "outcome": cand.get("outcome")
        })

# === STEP: Convert to DataFrame ===
df_candidates = pd.DataFrame(candidate_records)

# === STEP: Preview ===
print(f"✅ Loaded {len(df_candidates)} candidate records")
df_candidates.head(20)

In [None]:
df_candidates["party"].value_counts()

In [None]:
# Map canonical name → list of keywords to search for
party_keywords = {
    "Labour": ["labour"],
    "Conservatives": ["conservative", "tory"],
    "Liberal Democrats": ["lib dem", "liberal democrat", "libdem"],
    "Greens": ["green"],
    "Reform UK": ["reform"],
    "UKIP": ["ukip", "independence", "independance"],
    "Women's Equality Party": ["women"],
    "English Democrats": ["english democrat"],
    "Tunbridge Wells Alliance": ["tunbridge wells", "t.w. alliance"],
    "Ashford Independents": ["ashford independents"],
    "Liberal": ["liberal"],
    "Monster Raving Loony Party": ["monster raving", "mrp", "monster raving loony party"],
    "National Front": ["national front"], 
    "People First": ["people first"]

}

In [None]:
# Aliases to catch less predictable patterns
fallback_aliases = {
    "Labour": ["labour and co-operative", "co-operative and labour"],
    "Liberal Democrats": ["the liberal democrats"],
    "Independent": ["independent alliance", "local independent", "independent group", "Other"],
    "Unknown": ["", "(no description)", "no description", "No description given", "No party specified",None],
}

# Build lookup
fallback_map = {
    alias.lower(): canonical
    for canonical, aliases in fallback_aliases.items()
    for alias in aliases
    if isinstance(alias, str)
}

In [None]:
def classify_party(raw_party):
    if not isinstance(raw_party, str):
        return "Unknown"
    
    raw_lower = raw_party.lower()

    # Keyword search (robust first pass)
    for canonical, keywords in party_keywords.items():
        if any(keyword in raw_lower for keyword in keywords):
            return canonical

    # Fallback to exact alias matches
    if raw_lower in fallback_map:
        return fallback_map[raw_lower]

    return raw_party.strip()  # return original if unclassified

In [None]:
df_candidates["party_canonical"] = df_candidates["party"].apply(classify_party)

In [None]:
collapse_minor = {
    "The Peace Party Non-Violence Jusitce, Environment": "The Peace Party",
    "The Peace Party Non-Violence Justice, Environment": "The Peace Party",
    "Trade Unionists and Socialists Against Cuts": "TUSC",
    "Trade Unionist and Socialist Coalition": "TUSC",
    "People First Party": "People First",
    "Party for a United Thanet": "Thanet First",
    "New England Party Caring for England": "New England Party"
}

df_candidates["party_canonical"] = df_candidates["party_canonical"].replace(collapse_minor)

In [None]:
df_candidates["party_canonical"].value_counts().head(30)

### Create party IDs

In [None]:
import pandas as pd
import re

# === STEP 1: Clean and normalize names ===
def clean_party_name(name):
    if not isinstance(name, str):
        return "unknown"
    name = name.lower().strip()
    name = re.sub(r"\bparty\b$", "", name).strip()  # Remove trailing "party"
    return name

def slugify(name):
    name = clean_party_name(name)
    name = re.sub(r"[^\w\s-]", "", name)       # Remove punctuation
    name = re.sub(r"\s+", "_", name)           # Replace whitespace with underscores
    return name.strip("_")

# STEP 2: Count occurrences per canonical party
party_counts = df_candidates["party_canonical"].value_counts()

# STEP 3: Reassign rare ones to 'Other'
df_candidates["party_canonical"] = df_candidates["party_canonical"].apply(
    lambda name: "Other" if party_counts.get(name, 0) < 2 else name
)

# STEP 4: Generate party_id
df_candidates["party_id"] = df_candidates["party_canonical"].apply(
    lambda x: f"party_{slugify(x)}"
)

In [None]:
df_candidates["party_id"].value_counts()

to view elections with independent candidates - to verify no mistake

In [None]:
from IPython.display import display, HTML

# ✅ Safe re-merge of election_url only
df_candidates = df_candidates.merge(
    df[["election_id", "election_url"]],
    on="election_id",
    how="left"
)

# 🎯 Filter for Independent candidates
indep = df_candidates[df_candidates["party_canonical"] == "Independent"]

# 📄 Create summary table
table = (
    indep[["election_id", "council_id", "election_url"]]
    .drop_duplicates()
    .sort_values(["council_id", "election_id"])
    .copy()
)

# 🔗 Format clickable links
table["link"] = table["election_url"].apply(
    lambda url: f'<a href="{url}" target="_blank">view</a>' if pd.notna(url) else ""
)

# 📊 Display the table as HTML (clickable)
#display(HTML(table[["election_id", "council_id", "link"]].to_html(escape=False, index=False)))

### Cleaning candidates names

### Titles stripped, first and last name separated

Strip titles and normalize names.

Extract and store first_name, middle_names, last_name, and canonical_name.

Identify potentially mergeable variants (e.g., “Steve Campkin” vs “Steven R Campkin”) — but only where first names differ but share the same initial.

applying all the transformations step-by-step to df_candidates, starting from candidate_name. This handles:
	1.	Stripping academic titles (MBE, OBE, etc)
	2.	Fixing commas and spacing
	3.	Fixing inverted names when the first token is ALL CAPS
	4.	Normalizing casing
	5.	Splitting into first_name, middle_names, last_name
	6.	Building canonical_name

In [None]:
import re

# STEP 0 — Sanitize obvious syntax issues
df_candidates["candidate_name"] = df_candidates["candidate_name"].str.replace(",,", ",", regex=False)

# STEP 1 — Start from raw names
df_candidates["cleaned_candidate_name"] = df_candidates["candidate_name"].fillna("").copy()

df_candidates["candidate_name"] = df_candidates["candidate_name"].str.replace(r"^(Mr|Mrs|Ms|Miss|Dr|Cllr|Sir|Dame)\s+", "", flags=re.IGNORECASE, regex=True)

# STEP 2 — Strip honorifics and degrees (MBE, PhD, etc)
title_pattern = r",?\s+(MBE|OBE|CBE|KBE|DBE|CH|QC|KC|JP|DL|FRSA|FRICS|BA|MA|PhD|BSc|MSc|LLB|LLM|TD)\b"
df_candidates["cleaned_candidate_name"] = (
    df_candidates["cleaned_candidate_name"]
    .str.replace(title_pattern, "", regex=True)
    .str.strip()
)

# STEP 3 — Ensure comma is followed by a space (for proper splitting)
df_candidates["cleaned_candidate_name"] = df_candidates["cleaned_candidate_name"].str.replace(r",(?=\S)", ", ", regex=True)

# STEP 4 — Invert names if comma present (e.g. "SMITH, John" → "John Smith")
inverted_mask = df_candidates["cleaned_candidate_name"].str.contains(",", na=False)
df_candidates.loc[inverted_mask, "cleaned_candidate_name"] = (
    df_candidates.loc[inverted_mask, "cleaned_candidate_name"]
    .str.split(",", n=1)
    .apply(lambda parts: f"{parts[1].strip()} {parts[0].strip()}" if len(parts) == 2 else parts[0])
)
# STEP 4b — Invert names where the first token is ALL CAPS and others are not
def conditional_invert(name):
    tokens = name.strip().split()
    if len(tokens) >= 2 and tokens[0].isupper() and not all(tok.isupper() for tok in tokens[1:]):
        return " ".join(tokens[1:] + [tokens[0].title()])
    return name

df_candidates["cleaned_candidate_name"] = df_candidates["cleaned_candidate_name"].apply(conditional_invert)

# STEP 5 — Convert all to title case
df_candidates["cleaned_candidate_name"] = df_candidates["cleaned_candidate_name"].str.title().str.strip()

# STEP 5b — Fix McX and MacX capitalisation (e.g., Mcdonald → McDonald, Macpherson → MacPherson)
df_candidates["cleaned_candidate_name"] = df_candidates["cleaned_candidate_name"].str.replace(
    r"\b(Mc|Mac)([a-z])",
    lambda m: m.group(1) + m.group(2).upper(),
    regex=True
)

# STEP 6 — Split into first / middle / last
split_parts = df_candidates["cleaned_candidate_name"].str.split()
df_candidates["first_name"] = split_parts.apply(lambda x: x[0] if len(x) > 0 else "")
df_candidates["last_name"] = split_parts.apply(lambda x: x[-1] if len(x) > 1 else "")
df_candidates["middle_names"] = split_parts.apply(lambda x: " ".join(x[1:-1]) if len(x) > 2 else "")

# STEP 7 — Construct canonical name: First + Last
df_candidates["canonical_name"] = (
    df_candidates["first_name"].str.strip() + " " + df_candidates["last_name"].str.strip()
).str.strip()

# STEP 8 — Final tidy-up: remove stray trailing commas
df_candidates["canonical_name"] = df_candidates["canonical_name"].str.rstrip(",").str.strip()
df_candidates["last_name"] = df_candidates["last_name"].str.rstrip(",").str.strip()

In [None]:
# Find rows where the first_name ends with a comma
mask = df_candidates["first_name"].str.endswith(",", na=False)

# Display problematic rows for inspection
df_candidates.loc[mask, ["candidate_name", "cleaned_candidate_name", "first_name", "middle_names", "last_name", "canonical_name"]]

In [None]:
mask = df_candidates["candidate_name"] == "R. John Pritchard"

# Override cleaned fields
df_candidates.loc[mask, "cleaned_candidate_name"] = "John Pritchard"
df_candidates.loc[mask, "first_name"] = "John"
df_candidates.loc[mask, "middle_names"] = "R"
df_candidates.loc[mask, "last_name"] = "Pritchard"
df_candidates.loc[mask, "canonical_name"] = "John Pritchard"

In [None]:
# STEP 8 — Final diagnostic view
df_candidates[["candidate_name", "cleaned_candidate_name", "first_name", "middle_names", "last_name", "canonical_name"]].sample(20)

In [None]:
df_candidates[df_candidates["first_name"].str.isupper() & df_candidates["first_name"].str.len() > 1][
    ["candidate_name", "cleaned_candidate_name", "first_name", "last_name", "canonical_name"]
]

In [None]:
df_candidates[df_candidates["last_name"].str.islower()][
    ["candidate_name", "cleaned_candidate_name", "first_name", "last_name", "canonical_name"]
]

In [None]:
df_candidates[df_candidates["canonical_name"].str.split().str.len() > 2][
    ["candidate_name", "canonical_name"]
]

In [None]:
df_candidates['canonical_name'].value_counts().head(5)

#### Assign councillor status

In [None]:
df_candidates["is_councillor"] = df_candidates["outcome"].str.lower() == "elected"

In [None]:
df_candidates.info()

### Canonical Name Mapping and Normalisation

This code below refines candidate name consistency across election records:

- **Exclusions**: A predefined list of merge-excluded indices (`excluded_merge_indices`) identifies ambiguous last names that should not be merged.
- **Group and Merge**: For each last name in `last_name_map`, candidate full names are grouped by first name. If all first names share the same initial, the longest variant is chosen as the canonical name.
- **Exception Handling**: If a last name is in the exclusion list, all its variants are kept separate.
- **Assignment**: The `canonical_name` field is assigned to each candidate in the dataset based on the mapping.
- **Diagnostics**: A list (`diffs`) shows where original names differ from canonical ones for inspection.

This helps unify inconsistent name formats (e.g. "J. Smith", "John Smith", "Jonathan Smith") while allowing manual override of ambiguous cases.


In [None]:
# === Rebuild Ambiguity Map ===
from collections import defaultdict

df_candidates["stripped_name"] = df_candidates["candidate_name"].fillna("").apply(strip_titles)

last_name_map = defaultdict(set)
for _, row in df_candidates.iterrows():
    first, middle, last = row["first_name"], row["middle_names"], row["last_name"]
    full = row["stripped_name"]
    if first and last:
        last_name_map[last].add((first, middle, full))

# === Detect ambiguous last names ===
ambiguous_last_names = {}

for last, entries in last_name_map.items():
    first_names = {first for first, _, _ in entries if first}
    first_initials = {first[0] for first in first_names}
    if len(first_names) > 1 and len(first_initials) == 1:
        ambiguous_last_names[last] = sorted(entries)

# === Convert to DataFrame for inspection ===
ambiguous_df = pd.DataFrame([
    {
        "last_name": last,
        "first_names": ", ".join(sorted({first for first, _, _ in entries})),
        "examples": "; ".join(sorted(full for _, _, full in entries))
    }
    for last, entries in ambiguous_last_names.items()
])

# === View result ===
ambiguous_df.tail(50)

In [None]:
# Show candidates where the name or last name contains "Winterbottom"
mask = (
    df_candidates["candidate_name"].str.contains("sue", case=False, na=False) |
    df_candidates["last_name"].str.contains("sue", case=False, na=False)
)

# Display relevant columns for review
df_candidates.loc[mask, ["candidate_name", "cleaned_candidate_name", "first_name", "middle_names", "last_name", "canonical_name"]]

In [None]:
mask = df_candidates["candidate_name"] == "Bell Sue"

# Override cleaned fields
df_candidates.loc[mask, "cleaned_candidate_name"] = "Sue Bell"
df_candidates.loc[mask, "first_name"] = "Sue"
df_candidates.loc[mask, "middle_names"] = ""
df_candidates.loc[mask, "last_name"] = "Bell"
df_candidates.loc[mask, "canonical_name"] = "Sue Bell"

In [None]:
mask = df_candidates["candidate_name"] == "Butterfield Sue"

# Override cleaned fields
df_candidates.loc[mask, "cleaned_candidate_name"] = "Sue Butterfill"
df_candidates.loc[mask, "first_name"] = "Sue"
df_candidates.loc[mask, "middle_names"] = ""
df_candidates.loc[mask, "last_name"] = "Butterfill"
df_candidates.loc[mask, "canonical_name"] = "Sue Butterfill"

In [None]:
mask = df_candidates["candidate_name"] == "Sue Butterfield"

# Override cleaned fields
df_candidates.loc[mask, "cleaned_candidate_name"] = "Sue Butterfield"
df_candidates.loc[mask, "first_name"] = "Sue"
df_candidates.loc[mask, "middle_names"] = ""
df_candidates.loc[mask, "last_name"] = "Butterfield"
df_candidates.loc[mask, "canonical_name"] = "Sue Butterfield"

In [None]:
mask = df_candidates["candidate_name"] == "Eykelenboom Katherine Ann"

# Override cleaned fields
df_candidates.loc[mask, "cleaned_candidate_name"] = "Katherine Ann Eykelenboom"
df_candidates.loc[mask, "first_name"] = "Katherine"
df_candidates.loc[mask, "middle_names"] = "Ann"
df_candidates.loc[mask, "last_name"] = "Eykelenboom"
df_candidates.loc[mask, "canonical_name"] = "Katherine Eykelenboom"

In [None]:
mask = df_candidates["candidate_name"] == "Winterbottom Kim"

# Override cleaned fields
df_candidates.loc[mask, "cleaned_candidate_name"] = "Kim Winterbottom"
df_candidates.loc[mask, "first_name"] = "Kim"
df_candidates.loc[mask, "middle_names"] = ""
df_candidates.loc[mask, "last_name"] = "Winterbottom"
df_candidates.loc[mask, "canonical_name"] = "Kim Winterbottom"

✅ Guidelines to Exclude

You should exclude any row where:

    The first names differ in gender (e.g., Teresa and Tom)

    The names are clearly distinct (e.g., James and John)

    There is no strong reason to assume they're variants of the same person

In [None]:
excluded_merge_indices = [
    1,   # Grayling → Chris vs Christopher — likely same but borderline
    2,   # Pickering → Alan vs Audrey — different gender
#    5,   # Chattenton → Jeremy vs Jerry — possible, but Jerry not a standard Jeremy nickname
    6,   # Benford → Margaret vs Marian — different names
#    10,  # Barnett → Rosa vs Rosetta — possibly different people
    12,  # Ozog → Jan vs Julie — different names and likely different gender
    13,  # Dunmall → Chris vs Christine — gender mismatch
    14,  # Lawson → David vs Derek — different names
#    15,  # Bobby → Les vs Leslie — possibly same, but could be male/female versions
    17,  # Lynch → Karen vs Katie — not clear they’re the same
    19,  # Withstandley → Barbara vs Brian — definitely different
    20,  # Binks → Roger vs Rosalind — definitely different
    21,  # Matterface → Jennifer vs Jenny vs John — gender and identity ambiguity
#    31,  # Shread → Nick vs Nicolas — might be same but if both appear, best to be safe
    32,  # Moss → Benjamin vs Brian — different names
    34,  # Meade → Jackie vs Jordan — very different names
    35,  # Hodge → Sarah vs Susan — clearly distinct
    36,  # Beer → James, Jon, Julia — mixed gender and unrelated names
    40,  # Slaughter → Rachel vs Ryan — different gender
    42,  # Lack → Maggie vs Martin — different gender
#    43,  # Summersgill → Michael vs Mick vs Mike — possibly same, but if all exist maybe split
    47,  # London → James vs John — commonly confused but different

    73,  # Brindle → Adrian vs Anne — different gender
    78,  # Underwood → Patricia vs Peter — gender mismatch
    85,  # Nuttall → Sue vs Susan — same root, but may be different people
    88,  # Fowle → Sandra vs Simon — gender mismatch
    96,  # Sue → Bell vs Butterfield — last name Sue is actually the first name
    97,  # O'Toole → Lee vs Liam — likely different people

]


This code block identifies ambiguous last names where multiple first names share the same initial (e.g. *Steve* and *Steven* Campkin), then applies a canonical naming rule:

* It **merges** those names only if not manually excluded,
* And assigns the **longest full version** of each name as canonical.

It ensures consistency in tracking people across elections without incorrectly combining distinct individuals.


In [None]:
import re

# Get last names from excluded rows in the previously built ambiguous_df
excluded_last_names = rechecked_strict_df.loc[excluded_merge_indices, "last_name"].tolist()

# Add special override for partial merging within ambiguous last names
partial_merge_overrides = {
    "Matterface": {
        # group these first names into one canonical group
        "Jennifer": ["Jenny", "Jennifer"]
        # leave "John" out — it will be handled separately
    }
}

# Rebuild canonical name map with partial merges
final_canonical_name_map = {}

for last, entries in last_name_map.items():
    grouped_by_first = defaultdict(list)
    first_initials = set()

    for first, middle, full in entries:
        if first:
            grouped_by_first[first].append(full)
            first_initials.add(first[0])

    if last in partial_merge_overrides:
        # Merge only within specified subgroups
        overrides = partial_merge_overrides[last]
        handled = set()

        for canonical, first_variants in overrides.items():
            merged_variants = []
            for first in first_variants:
                merged_variants.extend(grouped_by_first.pop(first, []))
                handled.add(first)

            # Pick the longest as representative
            if merged_variants:
                canonical_variant = max(merged_variants, key=len)
                for v in merged_variants:
                    final_canonical_name_map[(canonical, last)] = canonical_variant

        # All other unmerged first names (like 'John') → treat as separate
        for first, variants in grouped_by_first.items():
            for v in variants:
                final_canonical_name_map[(first.strip(), last.strip())] = v

    elif last in excluded_last_names:
        # Fully excluded → treat each first name separately
        for first, variants in grouped_by_first.items():
            for v in variants:
                final_canonical_name_map[(first.strip(), last.strip())] = v

    elif len(first_initials) == 1:
        # Safe to merge all
        for first, variants in grouped_by_first.items():
            canonical_variant = max(variants, key=len)
            for v in variants:
                final_canonical_name_map[(first.strip(), last.strip())] = canonical_variant

            
# ✅ Apply final canonical name mapping to df_candidates

def apply_canonical(row):
    key = (row["first_name"].strip(), row["last_name"].strip())
    return final_canonical_name_map.get(key, row["candidate_name"])

# Create new column with cleaned names
df_candidates["canonical_name"] = df_candidates.apply(apply_canonical, axis=1)

# 🔍 Show examples of standardised names
diffs = df_candidates[df_candidates["candidate_name"] != df_candidates["canonical_name"]][
    ["candidate_name", "canonical_name"]
].drop_duplicates().sort_values(by="candidate_name")

print(f"✅ {len(diffs)} candidate names were standardised.\n")
display(diffs.head(30))


This is a last-mile cleaning patch. It:

    Fixes known typos (e.g., "Joesph" → "Joseph")

    Removes double spaces and trailing whitespace

    Updates canonical_name fields if they differ from the cleaned version

In [None]:
# custom cleaning script for diffs 
 
import re

def clean_canonical_name(name):
    # Fix common typos
    name = name.replace("Joesph", "Joseph")

    # Collapse multiple spaces
    name = re.sub(r"\s{2,}", " ", name)

    # Strip and standardise spacing
    return name.strip()

# Apply cleanup
cleaned_diffs = sorted({
    (orig, clean_canonical_name(canon))
    for orig, canon in diffs
    if orig != clean_canonical_name(canon)  # Only include cleaned diffs
})

# Optional: update canonical names directly in your data
for record in data:
    if record.get("status") != "ok":
        continue
    for cand in record["candidates"]:
        orig = cand.get("name", "")
        canon = cand.get("canonical_name", "")
        cleaned = clean_canonical_name(canon)
        if orig != cleaned:
            cand["canonical_name"] = cleaned

# Preview cleaned diffs
cleaned_diffs[:100]


### Code: Flag Shared Last Names Across Different People

In [None]:
from collections import defaultdict

# Build a map of last name → set of full canonical names
last_name_to_people = defaultdict(set)

for record in data:
    if record.get("status") != "ok":
        continue
    for cand in record["candidates"]:
        last = cand.get("last_name", "").strip()
        canon = cand.get("canonical_name", "").strip()
        if last and canon:
            last_name_to_people[last].add(canon)

# Filter to only those last names that have >1 distinct person
ambiguous_last_names = {
    last: sorted(list(people))
    for last, people in last_name_to_people.items()
    if len(people) > 1
}

# Display
for last, names in sorted(ambiguous_last_names.items()):
    print(f"⚠️  Shared last name: {last}")
    for name in names:
        print(f"   - {name}")
    print()


### Create aliases commonly used in council documents

In [None]:
nicknames = {
    # Male names
    "Christopher": "Chris",
    "Jonathan": "Jon",
    "Stephen": "Steve",
    "Steven": "Steve",
    "Michael": "Mike",
    "Richard": "Rick",
    "Robert": "Rob",
    "Joseph": "Joe",
    "Timothy": "Tim",
    "Nicholas": "Nick",
    "James": "Jim",
    "Francis": "Frank",
    "Jeremy": "Jerry",

    # Female names
    "Margaret": "Maggie",
    "Elizabeth": "Liz",
    "Patricia": "Pat",
    "Penelope": "Penny",
    "Pamela": "Pam",
    "Karen": "Kate",  # or Katie
    "Katherine": "Kate",
    "Rebecca": "Becky",
    "Deborah": "Debbie",
    "Susan": "Sue",
    "Jacqueline": "Jackie",
    "Victoria": "Vicky"
}


In [None]:
import re

def extract_initials(full_name, last_name):
    """Extract initials from full name, skipping the last name."""
    parts = full_name.replace(last_name, "").strip().split()
    return [p[0] for p in parts if p and p[0].isalpha()]

def generate_aliases(first_name, last_name, canonical_name):
    aliases = set()

    # Title-based forms (all genders)
    for title in ["Mr", "Ms", "Mrs", "Miss", "Cllr", "Councillor"]:
        aliases.add(f"{title} {last_name}")

    # Add full first name + last name
    if first_name:
        aliases.add(f"{first_name} {last_name}")
        aliases.add(f"{first_name[0]} {last_name}")

    # Extract canonical first name (may be more formal)
    canonical_first = canonical_name.split()[0]
    if canonical_first.lower() != first_name.lower():
        aliases.add(f"{canonical_first} {last_name}")

    # Add nickname if available
    nick = nicknames.get(canonical_first)
    if nick and nick.lower() != first_name.lower():
        aliases.add(f"{nick} {last_name}")

    # Add initials
    initials = extract_initials(canonical_name, last_name)
    if initials:
        joined = ' '.join(initials)
        for title in ["Mr", "Ms", "Mrs", "Miss"]:
            aliases.add(f"{title} {joined} {last_name}")
            aliases.add(f"{title}\n{joined} {last_name}")
        aliases.add(f"{joined} {last_name}")

    return sorted(aliases)


# === Apply to your data ===

for record in data:
    if record.get("status") != "ok":
        continue
    for cand in record["candidates"]:
        first = cand.get("first_name", "").strip()
        last = cand.get("last_name", "").strip()
        canon = cand.get("canonical_name", cand.get("name", "")).strip()
        cand["aliases"] = generate_aliases(first, last, canon)


What code below does

    Keeps the preferred version of each (date, candidate) pair

    Drops any duplicate where the division name starts with "County"

    Leaves you with one clean record per candidate per date



In [None]:
from collections import defaultdict

# Keyed by (election_date, canonical_name)
deduped_records = {}
skipped_records = []

for record in data:
    if record.get("status") != "ok":
        continue
    election_date = record.get("election_date")
    division = record.get("division", "").strip()
    url = record.get("url", "")

    for cand in record.get("candidates", []):
        name = cand.get("canonical_name", "").strip()
        key = (election_date, name)

        # Prefer division NOT starting with 'County'
        keep_current = not division.lower().startswith("county council")

        if key not in deduped_records:
            deduped_records[key] = {
                "record": record,
                "candidate": cand,
                "division": division
            }
        else:
            existing_div = deduped_records[key]["division"]
            if keep_current and existing_div.lower().startswith("county council"):
                deduped_records[key] = {
                    "record": record,
                    "candidate": cand,
                    "division": division
                }
            else:
                skipped_records.append((name, election_date, division))

# Build deduplicated version
deduplicated_data = []
for (date, name), info in deduped_records.items():
    record = info["record"].copy()
    record["candidates"] = [info["candidate"]]
    deduplicated_data.append(record)

# Replace your working data
print(f"✅ Deduplicated dataset: {len(data)} → {len(deduplicated_data)} records")
data = deduplicated_data

# Optional: View what was skipped
if skipped_records:
    print(f"🗑️ Skipped {len(skipped_records)} duplicate entries (less preferred divisions):")
    for name, date, div in skipped_records[:10]:
        print(f"- {name} on {date} in '{div}'")


### Creating unique persons dataset

In [None]:
import pandas as pd

# Ensure `canonical_name` and election date exist
assert "canonical_name" in df_candidates.columns
assert "election_date" in df_candidates.columns

df_candidates["is_councillor"] = df_candidates["outcome"].str.lower() == "elected"

In [None]:
# Sort to prioritise most recent elections
df_c = df_candidates.sort_values("election_date")

# Group by canonical name and keep the most recent record (or earliest if you prefer)
df_people = (
    df_c.groupby("canonical_name")
    .agg({
        "election_date": lambda x: x[df_c.loc[x.index, "is_councillor"]].min(),  # first elected date
        "party_canonical": "last",
        "council_id": "last",
    })
    .reset_index()
)

In [None]:
df_people = df_people.sort_values("canonical_name").reset_index(drop=True)
df_people["person_id"] = df_people.index.map(lambda i: f"person_{i+1:04d}")

In [None]:
df_people = df_people[
    ["person_id", "canonical_name", "election_date", "party_canonical", "council_id"]
].rename(columns={"election_date": "first_elected_date", "party_canonical": "party"})

df_people

In [None]:
print(f"📊 Records remaining in data: {len(data)}")


In [None]:
output_path = "../data/elections/kent_results_all_years_cleaned.jsonl"

with open(output_path, "w", encoding="utf-8") as f:
    for record in data:
        json.dump(record, f, ensure_ascii=False)
        f.write("\n")

output_path

### Examine one person- Candidate Participation Lookup

In [None]:
from IPython.display import display, HTML

def get_candidate_history(data, candidate_name):
    records = []

    for record in data:
        if record.get("status") != "ok":
            continue
        date = record.get("election_date")
        div = record.get("division", "")
        election_type = record.get("election_type", "unknown")
        url = record.get("url", "")
        for cand in record.get("candidates", []):
            if cand.get("canonical_name", "").strip() == candidate_name:
                records.append({
                    "Election Date": date,
                    "Division": div,
                    "Party": cand.get("party", ""),
                    "Outcome": cand.get("outcome", ""),
                    "Election Type": election_type,
                    "URL": f'<a href="{url}" target="_blank">Link</a>'
                })

    if not records:
        print(f"⚠️ No records found for: {candidate_name}")
        return pd.DataFrame()

    df = pd.DataFrame(sorted(records, key=lambda x: x["Election Date"]))
    return df.to_html(escape=False, index=False)


In [None]:
# View Trevor Shonk's election history as clickable table
html_table = get_candidate_history(data, "Trevor Leslie Shonk")
display(HTML(html_table))


### Crosstab of election results - by party

In [None]:
import json
from datetime import datetime

# === Load and patch the data ===
data = []
with open("../data/elections/kent_results_all_years_cleaned.jsonl", "r", encoding="utf-8") as f:
    for line in f:
        entry = json.loads(line)

        # Patch: If 'election_date' is missing, extract from 'division'
        if entry.get("status") == "ok" and "election_date" not in entry:
            division = entry.get("division", "")
            if "," in division:
                try:
                    possible_date = division.split(",")[-1].strip()
                    parsed_date = datetime.strptime(possible_date, "%d/%m/%Y").date()
                    entry["election_date"] = str(parsed_date)
                except ValueError:
                    pass  # Skip if date can't be parsed

        data.append(entry)

In [None]:
# Check which 'ok' entries are missing 'election_date'
missing_dates = [entry for entry in data if entry.get("status") == "ok" and "election_date" not in entry]
print(f"⚠️ Entries missing 'election_date': {len(missing_dates)}")


### Reform candidates 

the table produced below:

The final df is a timeline matrix of candidates whose latest party is "Reform UK", showing:

| Row → each Reform UK candidate's canonical_name
| Columns → regular election dates (election_date)
| Cell value → election outcome at that date
| Plus one column: Latest Division

In [None]:

import json
import pandas as pd
from collections import defaultdict

# Step 0: Filter regular elections only for timeline columns
election_dates = sorted({
    entry["election_date"]
    for entry in data
    if entry.get("status") == "ok" and entry.get("election_type") == "regular"
})

# Step 1: Track latest party for each candidate by canonical name
latest_party_by_candidate = {}
for record in data:
    if record.get("status") != "ok":
        continue
    election_date = record["election_date"]
    for cand in record["candidates"]:
        name = cand["canonical_name"].strip()
        if name not in latest_party_by_candidate or election_date > latest_party_by_candidate[name]["election_date"]:
            latest_party_by_candidate[name] = {
                "election_date": election_date,
                "party": cand["party"]
            }

# Step 2: Build outcome timeline for candidates whose latest party is "Reform UK"
party_filter = "Reform UK"
candidate_results = defaultdict(lambda: {date: "NP" for date in election_dates})

for record in data:
    if record.get("status") != "ok":
        continue
    election_date = record["election_date"]
    if election_date not in election_dates:
        continue  # skip byelections
    for cand in record["candidates"]:
        name = cand["canonical_name"].strip()
        if latest_party_by_candidate.get(name, {}).get("party") == party_filter:
            candidate_results[name][election_date] = cand["outcome"]

# Step 3: Convert to DataFrame
df = pd.DataFrame.from_dict(candidate_results, orient="index")
df.index.name = "Candidate"

# Step 4: Add each candidate's latest division
latest_division_by_candidate = {}
for record in data:
    if record.get("status") != "ok":
        continue
    election_date = record["election_date"]
    division = record["division"]
    for cand in record["candidates"]:
        name = cand["canonical_name"].strip()
        if name not in latest_division_by_candidate or election_date > latest_division_by_candidate[name]["election_date"]:
            latest_division_by_candidate[name] = {
                "election_date": election_date,
                "division": division
            }

df["Latest Division"] = df.index.map(lambda name: latest_division_by_candidate.get(name, {}).get("division", ""))

# Reorder columns
df = df[["Latest Division"] + [col for col in df.columns if col != "Latest Division"]]

# View the result
df.head(20)


### Councillors elected in 2025

The final councillor_df is a comprehensive performance and history profile for all councillors elected in the most recent regular election. It captures their past results, experience, party affiliations, and division in one clean table.

In [None]:
# Step 1: Get the most recent election date
latest_election_date = max(election_dates)

# Step 2: Find all candidates elected in the latest election
elected_councillors = set()
latest_division_by_candidate = {}
latest_party_by_candidate = {}

for record in data:
    if record.get("status") != "ok":
        continue
    if record["election_date"] != latest_election_date:
        continue

    division = record["division"]
    for cand in record["candidates"]:
        if cand["outcome"] == "Elected":
            name = cand["canonical_name"].strip()
            elected_councillors.add(name)
            latest_division_by_candidate[name] = division
            latest_party_by_candidate[name] = cand["party"]

# Build party history for each candidate
party_affiliations_by_candidate = defaultdict(set)

for record in data:
    if record.get("status") != "ok":
        continue
    for cand in record["candidates"]:
        name = cand["canonical_name"].strip()
        party = cand["party"].strip()
        if name and party:
            party_affiliations_by_candidate[name].add(party)


# Step 3: Build the outcome history for each elected councillor
councillor_results = defaultdict(lambda: {date: "NP" for date in election_dates})

for record in data:
    if record.get("status") != "ok":
        continue
    election_date = record["election_date"]
    for cand in record["candidates"]:
        name = cand["canonical_name"].strip()
        if name in elected_councillors:
            councillor_results[name][election_date] = cand["outcome"]

# Step 4: Create DataFrame
councillor_df = pd.DataFrame.from_dict(councillor_results, orient="index")
councillor_df.index.name = "Councillor"

# Add division and latest party columns
councillor_df["Division"] = councillor_df.index.map(lambda name: latest_division_by_candidate.get(name, ""))
councillor_df["Latest Party"] = councillor_df.index.map(lambda name: latest_party_by_candidate.get(name, ""))

# Reorder columns
cols = ["Division", "Latest Party"] + [col for col in election_dates]
councillor_df = councillor_df[cols]

# Filter the DataFrame to include only councillors who were elected in the latest election
elected_df = councillor_df[councillor_df[latest_election_date] == "Elected"]
# Step 1: Format past parties (already done above)
councillor_df["Past Parties"] = councillor_df.index.map(
    lambda name: ", ".join(sorted(party_affiliations_by_candidate.get(name, [])))
)

# Step 2: Calculate experience (number of times elected)
councillor_df["Experience at KCC (terms)"] = councillor_df[election_dates].apply(
    lambda row: sum(1 for value in row if value == "Elected"), axis=1
)

# Step 3: Reorder columns
ordered_cols = ["Division", "Latest Party"] + election_dates + ["Experience at KCC (terms)", "Past Parties"]
councillor_df = councillor_df[ordered_cols]

councillor_df.head(30)

In [None]:
# Save the final dataframe as a CSV in the data/elections directory
output_path = "../data/elections/kent_councillors_elected_2025.csv"
councillor_df.to_csv(output_path)
output_path

### KCC 2021

In [None]:
# Re-run after code reset

import json
import pandas as pd

# Set target election date
target_date = "2021-05-06"

# Extract elected candidates from that date
elected_2021 = []
for record in data:
    if record.get("status") != "ok":
        continue
    if record["election_date"] != target_date:
        continue
    division = record["division"]
    for cand in record["candidates"]:
        if cand["outcome"] == "Elected":
            elected_2021.append({
                "name": cand["name"].strip(),
                "division": division,
                "first_name": cand.get("first_name", "").strip(),
                "last_name": cand.get("last_name", "").strip(),
                "middle_names": cand.get("middle_names", "").strip(),
                "party": cand["party"].strip()
            })

# Create DataFrame
elected_2021_df = pd.DataFrame(elected_2021)


# Save the final dataframe as a CSV in the data/elections directory
output_path = "../data/elections/kent_councillors_elected_2021_short.csv"
elected_2021_df.to_csv(output_path)
output_path

elected_2021_df.tail(20)

In [None]:
import json
import pandas as pd

# Target election date
target_date = "2025-05-01"
elected_2021 = []

for record in data:
    if record.get("status") != "ok" or record.get("election_date") != target_date:
        continue
    division = record.get("division", "")
    for cand in record["candidates"]:
        if cand.get("outcome") == "Elected":
            elected_2021.append({
                "name": cand.get("name", "").strip(),
                "division": division,
                "first_name": cand.get("first_name", "").strip(),
                "last_name": cand.get("last_name", "").strip(),
                "middle_names": cand.get("middle_names", "").strip(),
                "party": cand.get("party", "").strip()
            })

# Convert to DataFrame
elected_2025_df = pd.DataFrame(elected_2021)

# Save the final dataframe as a CSV in the data/elections directory
output_path = "../data/elections/kent_councillors_elected_2025_short.csv"
elected_2025_df.to_csv(output_path)
output_path

elected_2025_df


### Meetings mock up

## 🧠 What This Script Does: "Who Is Who" Councillor Classifier

This script builds a "Who Is Who" registry by linking **meeting attendance records** (from council minutes) to **elected councillor data** across two electoral terms (2021 and 2025). It classifies each name mentioned in meeting minutes into one of three categories:

### 🔍 Key Steps:

1. **Load and Standardize Councillor Data**
   - Reads two CSVs of elected councillors from 2021 and 2025.
   - Standardizes names (e.g., lowercasing, ASCII stripping) to enable fuzzy matching.

2. **Extract Attendee Names from JSONL Minutes**
   - Opens a `.jsonl` file of meeting metadata.
   - Pulls out unique names listed under 'present', 'absent', or 'virtual'.

3. **Standardize Attendee Names**
   - Removes titles like `Mr`, `Cllr`, `Dr`, etc.
   - Splits names into initials and last names for pattern matching.

4. **Flexible Matching Logic**
   - Matches attendees to current or former councillors using:
     - Exact last name
     - Fuzzy regex on first initials
   - Categorizes results into:
     - `current` councillor
     - `former` councillor
     - `civil_servant` (if no match found)
     - `needs_review` (if ambiguous matches found)

5. **Export Final Dataset**
   - Outputs a clean CSV (`who_is_who.csv`) with:
     - Raw name
     - Match info (first, last, division, party)
     - Status tag (`current`, `former`, `civil_servant`, `needs_review`)

### ✅ Result:
This enables downstream systems to recognize **who's who** in meeting minutes, distinguishing between elected representatives and council staff or visitors — essential for analytics, attendance stats, or knowledge graphs.

This is just an experiment.

In [None]:
import pandas as pd
import json
from pathlib import Path
from typing import List, Tuple, Dict
import re

# Configuration
MINUTES_PATH = Path("/Users/lgfolder/github/council-assistant/data/document_metadata/metadata_test.jsonl")
#MINUTES_PATH = Path("/Users/lgfolder/github/council-assistant/data/metadata/meetings.jsonl")
COUNCILLORS_2025_CSV = Path("/Users/lgfolder/github/council-assistant/data/elections/kent_councillors_elected_2025_short.csv")
COUNCILLORS_2021_CSV = Path("/Users/lgfolder/github/council-assistant/data/elections/kent_councillors_elected_2021_short.csv")
OUTPUT_PATH = Path("/Users/lgfolder/github/council-assistant/data/who_is_who.csv")

def load_and_standardize_councillors(current_csv: Path, previous_csv: Path) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Load both current and previous councillor data with standardized names"""
    def process_councillors(df: pd.DataFrame) -> pd.DataFrame:
        # Handle different column name variations
        first_name_col = next((col for col in df.columns if 'first' in col.lower()), None)
        last_name_col = next((col for col in df.columns if 'last' in col.lower()), None)
        
        if not first_name_col or not last_name_col:
            raise ValueError("Could not find first_name and last_name columns in councillor data")
        
        df['standard_first'] = (
            df[first_name_col]
            .astype(str)
            .str.lower()
            .str.strip()
            .str.normalize('NFKD')
            .str.encode('ascii', errors='ignore')
            .str.decode('utf-8')
        )
        df['standard_last'] = (
            df[last_name_col]
            .astype(str)
            .str.lower()
            .str.strip()
            .str.normalize('NFKD')
            .str.encode('ascii', errors='ignore')
            .str.decode('utf-8')
        )
        
        # Handle division/ward/department naming
        division_col = next((col for col in df.columns if any(x in col.lower() for x in ['division', 'ward', 'district'])), None)
        if division_col:
            df['division'] = df[division_col]
        else:
            df['division'] = ''
            
        # Handle party/group naming
        party_col = next((col for col in df.columns if any(x in col.lower() for x in ['party', 'group'])), None)
        if party_col:
            df['party'] = df[party_col]
        else:
            df['party'] = ''
            
        return df
    
    try:
        current = process_councillors(pd.read_csv(current_csv))
        previous = process_councillors(pd.read_csv(previous_csv))
        return current, previous
    except Exception as e:
        print(f"Error loading councillor data: {e}")
        raise

def parse_minute_names(minutes_path: Path) -> List[str]:
    """Extract all unique names from meeting minutes"""
    attendees = set()
    
    with open(minutes_path, 'r') as f:
        for line in f:
            try:
                meeting = json.loads(line)
                for status in ['present', 'absent', 'virtual']:
                    for name in meeting['attendance'].get(status, []):
                        attendees.add(name.strip())
            except json.JSONDecodeError:
                continue
    
    return list(attendees)

def standardize_minutes_name(name: str) -> Tuple[str, str]:
    """Convert council minutes names to standardized format"""
    # Remove honorifics and trailing periods
    clean_name = re.sub(
        r'^(Mr|Mrs|Ms|Miss|Sir|Dr|Cllr)\.?\s+', 
        '', 
        name, 
        flags=re.IGNORECASE
    ).strip()
    
    # Handle cases with multiple initials
    parts = [p.strip('. ') for p in clean_name.split() if p.strip()]
    
    if not parts:
        return ('', '')
    
    last_name = parts[-1]
    first_parts = parts[:-1]
    
    if not first_parts:
        return ('', last_name.lower())
    
    # Create first initial pattern (e.g., "R W" becomes "r.?w.?")
    first_initials = ''.join([f"{p[0].lower()}.*" for p in first_parts if p])
    
    return (first_initials, last_name.lower())

def find_councillor_match(first: str, last: str, councillors: pd.DataFrame) -> pd.DataFrame:
    """Find matching councillors using flexible matching"""
    # Exact last name match
    matches = councillors[councillors['standard_last'] == last]
    
    if not first:
        return matches
    
    # Flexible first initial matching (e.g., "r.?w.?" matches "Roger William")
    try:
        pattern = re.compile(f'^{first}')
        return matches[
            matches['standard_first'].str.contains(pattern, na=False)
        ]
    except:
        return matches

def classify_attendees(
    attendees: List[str], 
    current_councillors: pd.DataFrame,
    previous_councillors: pd.DataFrame
) -> pd.DataFrame:
    """Classify each attendee into categories"""
    records = []
    
    for raw_name in attendees:
        first, last = standardize_minutes_name(raw_name)
        
        # Check current councillors first
        current_matches = find_councillor_match(first, last, current_councillors)
        previous_matches = find_councillor_match(first, last, previous_councillors)
        
        if len(current_matches) == 1:
            # Current councillor match
            record = create_record(raw_name, current_matches.iloc[0], 'current')
        elif len(previous_matches) == 1:
            # Former councillor match
            record = create_record(raw_name, previous_matches.iloc[0], 'former')
        elif len(current_matches) > 1 or len(previous_matches) > 1:
            # Ambiguous match
            record = create_ambiguous_record(raw_name, first, last, current_matches, previous_matches)
        else:
            # No match - likely civil servant
            record = create_civil_servant_record(raw_name, first, last)
        
        records.append(record)
    
    return pd.DataFrame(records)

def create_record(raw_name: str, councillor: pd.Series, status: str) -> Dict:
    """Create standardized record for matched councillor"""
    return {
        'raw_name': raw_name,
        'first_name': councillor.get('first_name', ''),
        'last_name': councillor.get('last_name', ''),
        'position': 'Councillor',
        'division': councillor.get('division', ''),
        'party': councillor.get('party', ''),
        'status': status,
        'source': 'current' if status == 'current' else 'previous'
    }

def create_ambiguous_record(
    raw_name: str, 
    first: str, 
    last: str,
    current_matches: pd.DataFrame,
    previous_matches: pd.DataFrame
) -> Dict:
    """Create record for ambiguous matches"""
    all_matches = pd.concat([current_matches, previous_matches])
    return {
        'raw_name': raw_name,
        'first_name': '',
        'last_name': last.title(),
        'position': 'AMBIGUOUS',
        'division': '|'.join(all_matches.get('division', '').unique()),
        'party': '|'.join(all_matches.get('party', '').unique()),
        'status': 'needs_review',
        'source': 'multiple'
    }

def create_civil_servant_record(raw_name: str, first: str, last: str) -> Dict:
    """Create record for civil servants"""
    formatted_first = ' '.join([f"{c.upper()}." for c in first.split('.') if c]) if first else ''
    return {
        'raw_name': raw_name,
        'first_name': formatted_first,
        'last_name': last.title(),
        'position': 'Civil Servant',
        'division': '',
        'party': '',
        'status': 'civil_servant',
        'source': ''
    }

def main():
    try:
        # Load and standardize councillor data
        current_councillors, previous_councillors = load_and_standardize_councillors(
            COUNCILLORS_2025_CSV, 
            COUNCILLORS_2021_CSV
        )
        
        # Parse meeting attendees
        attendees = parse_minute_names(MINUTES_PATH)
        
        # Classify attendees
        who_is_who = classify_attendees(attendees, current_councillors, previous_councillors)
        
        # Save results
        who_is_who.to_csv(OUTPUT_PATH, index=False)
        print(f"Processed {len(who_is_who)} names. Saved to {OUTPUT_PATH}")
        print("\nSample output:")
        print(who_is_who.head().to_string())
        
    except Exception as e:
        print(f"Error in main execution: {e}")
        raise

if __name__ == "__main__":
    main()

In [None]:
import pandas as pd
from pathlib import Path

# Load the processed data
who_is_who = pd.read_csv("../data/who_is_who.csv")

# 1. Current Councillors Table
current_councillors = who_is_who[who_is_who['status'] == 'current'].copy()
current_councillors['full_name'] = current_councillors['first_name'] + ' ' + current_councillors['last_name']
councillors_table = current_councillors[['full_name', 'division', 'party', 'last_name']].sort_values('last_name')
councillors_table = councillors_table[['full_name', 'division', 'party']]  # Drop last_name after sorting

# 2. Civil Servants Directory
civil_servants = who_is_who[who_is_who['status'] == 'civil_servant'].copy()
civil_servants['formatted_name'] = civil_servants['first_name'] + ' ' + civil_servants['last_name']
civil_servants_table = civil_servants[['formatted_name', 'raw_name', 'last_name']].sort_values('last_name')
civil_servants_table = civil_servants_table[['formatted_name', 'raw_name']]  # Drop last_name after sorting

# 3. Meeting Participation Heatmap
# First we need to count meeting appearances (this would be better done during initial processing)
def count_meetings(minutes_path):
    meeting_counts = {}
    with open(minutes_path, 'r') as f:
        for line in f:
            try:
                meeting = json.loads(line)
                for status in ['present', 'absent', 'virtual']:
                    for name in meeting['attendance'].get(status, []):
                        meeting_counts[name.strip()] = meeting_counts.get(name.strip(), 0) + 1
            except json.JSONDecodeError:
                continue
    return meeting_counts

meeting_counts = count_meetings(MINUTES_PATH)
who_is_who['meetings_attended'] = who_is_who['raw_name'].map(meeting_counts).fillna(0)

participation_table = who_is_who[
    ['first_name', 'last_name', 'position', 'meetings_attended']
].sort_values('meetings_attended', ascending=False)

# 4. Department Affiliations
department_table = who_is_who.groupby(['division', 'position']).size().unstack(fill_value=0)
department_table['Total'] = department_table.sum(axis=1)

# 5. Ambiguity Resolution Table
ambiguity_table = who_is_who[who_is_who['status'] == 'needs_review'].copy()
ambiguity_table['possible_matches'] = ambiguity_table.apply(
    lambda x: f"{x['division']} ({x['party']})", axis=1
)
ambiguity_table = ambiguity_table[['raw_name', 'possible_matches']]

# Save all tables
tables_path = Path("/Users/lgfolder/github/council-assistant/data/who_is_who_tables/")
tables_path.mkdir(exist_ok=True)

councillors_table.to_csv(tables_path / "councillors.csv", index=False)
civil_servants_table.to_csv(tables_path / "civil_servants.csv", index=False)
participation_table.to_csv(tables_path / "participation.csv", index=False)
department_table.to_csv(tables_path / "divisions.csv")
ambiguity_table.to_csv(tables_path / "ambiguities.csv", index=False)

print("All tables generated successfully!")

### Generate elections json from the elections results

## 🗳️ What This Script Does: `elections.jsonl` Metadata Generator

This script creates a machine-readable reference file (`elections.jsonl`) that describes each **election year** found in your cleaned Kent County Council results dataset.

### 📦 Input
- Loads the cleaned election results from:  
  `../data/elections/kent_results_all_years_cleaned.json`  
  This file contains detailed candidate-level results for all elections and by-elections.

### ⚙️ What It Builds
It generates one metadata entry per **election year**, each including:

| Field | Description |
|-------|-------------|
| `election_id` | Unique ID like `kent_cc_2025` |
| `council_id` | Fixed as `"kent_cc"` |
| `election_date` | ISO date of the election (from first result that year) |
| `election_type` | `"local"` |
| `scope` | `"county-wide"` (assumes all are full council elections) |
| `description` | Human-readable description like `"Kent County Council local elections 2025"` |
| `results_path` | File path to the full dataset |
| `results_filter` | A filter dictionary, e.g. `{"election_year": 2025}` to slice the dataset |
| `source_url` | Link to the council's official elections page |

### 🧾 Output
- Writes the data to a `.jsonl` file at:  
  `../data/references/elections.jsonl`  
  This can be used for indexing, display in UI menus, filtering APIs, or building dashboards.

### ✅ Example Use Cases
- Generating dropdowns like **"View Results for 2025"**
- Linking knowledge graph events to specific election cycles
- Running time-series analyses across elections

This metadata file acts as a lightweight index of all the **election cycles** your project has data for.


In [None]:
import json
from pathlib import Path
import pandas as pd

# Load the cleaned results
input_path = Path("../data/elections/kent_results_all_years_cleaned.jsonl")
results_data = pd.read_json(input_path, lines=True)

In [None]:
import pandas as pd

# Basic shape
print(f"📦 Shape: {results_data.shape[0]:,} rows × {results_data.shape[1]} columns")

# Basic info
print("\n🔍 Data Types & Non-Null Counts:")
print(results_data.info())

# Check for missing values
print("\n🚫 Missing Values per Column:")
missing_counts = results_data.isna().sum()
print(missing_counts[missing_counts > 0])

# Value counts of election types
if "election_type" in results_data.columns:
    print("\n🗳️ Election Type Breakdown:")
    print(results_data["election_type"].value_counts(dropna=False))

# Year coverage
if "election_date" in results_data.columns:
    results_data["election_date"] = pd.to_datetime(results_data["election_date"], errors="coerce")
    results_data["election_year"] = results_data["election_date"].dt.year
    print("\n📆 Election Years Available:")
    print(results_data["election_year"].value_counts().sort_index())

# Division name checks
if "division" in results_data.columns:
    print("\n🏷️ Top 10 Most Frequent Division Names:")
    print(results_data["division"].value_counts().head(10))

# Status field analysis
if "status" in results_data.columns:
    print("\n✅ Status Value Counts:")
    print(results_data["status"].value_counts())

# Candidate-level checks
candidate_counts = results_data["candidates"].apply(lambda x: len(x) if isinstance(x, list) else 0)
print(f"\n👥 Candidate counts per record:\n- Mean: {candidate_counts.mean():.2f}, Max: {candidate_counts.max()}, Min: {candidate_counts.min()}")
print(f"- Records with 0 candidates: {(candidate_counts == 0).sum()}")


In [None]:
# Show rows with missing election_date
missing_election_date = results_data[results_data["election_date"].isna()]

print(f"⚠️ Found {missing_election_date.shape[0]} rows without an election date.")

# Display them
missing_election_date.head(20)  # You can change 20 to see more


In [None]:


# Ensure output directory exists
output_path = Path("../data/references/elections.jsonl")
output_path.parent.mkdir(parents=True, exist_ok=True)

# Add election year column from date
results_data["election_year"] = pd.to_datetime(results_data["election_date"]).dt.year

# Define base fields
council_id = "kent_cc"
source_url = "https://www.kent.gov.uk/about-the-council/how-the-council-works/elections"
results_path = str(input_path)

# Generate one entry per unique election year
elections = []

for year in sorted(results_data["election_year"].unique(), reverse=True):
    # Filter records for this year that have a valid election_date
    subset = results_data[
        (results_data["election_year"] == year) & 
        (results_data["election_date"].notnull())
    ]

    if subset.empty:
        print(f"⚠️ Skipping year {year} — no valid election_date found.")
        continue

    # Use the first valid election_date
    election_date = pd.to_datetime(subset["election_date"].iloc[0]).date().isoformat()

    election = {
        "election_id": f"{council_id}_{int(year)}",
        "council_id": council_id,
        "election_date": election_date,
        "election_type": "local",
        "scope": "county-wide",
        "description": f"Kent County Council local elections {year}",
        "results_path": str(input_path),
        "results_filter": {"election_year": int(year)},
        "source_url": source_url
    }

    elections.append(election)


### Populate people.json from existing civil servants json

The civil servant json was already available - generated by ChatGPT from a pdf I found on the council's website

In [None]:
import json
from pathlib import Path
import re

# === CONFIGURATION ===
INPUT_FILE = Path("../data/jsons/civil_servants_all.json")
OUTPUT_FILE = Path("../data/entities/people.jsonl")
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)

# === UTILITY FUNCTIONS ===
def slugify(name):
    return re.sub(r"[^a-z0-9]+", "_", name.lower()).strip("_")

def generate_person_id(slug, counter):
    return f"{slug}_{counter:03d}"

# === LOAD EXISTING PEOPLE ===
existing_people = {}
slug_counter = {}

if OUTPUT_FILE.exists():
    with open(OUTPUT_FILE) as f:
        for line in f:
            person = json.loads(line)
            slug = slugify(person["full_name"])
            existing_people[slug] = person
            # update counter
            id_suffix = person["person_id"].split("_")[-1]
            try:
                slug_counter[slug] = max(slug_counter.get(slug, 0), int(id_suffix))
            except ValueError:
                pass

# === LOAD CIVIL SERVANTS DATA ===
with open(INPUT_FILE) as f:
    civil_servants = json.load(f)

new_people = []
flagged_people = []

for entry in civil_servants:
    full_name = entry.get("name", "").strip()
    if not full_name:
        continue

    parts = full_name.split()
    first_name = parts[0] if parts else ""
    last_name = parts[-1] if len(parts) > 1 else ""
    slug = slugify(full_name)

    if slug in existing_people:
        flagged_people.append(full_name)
        continue  # Skip known person

    # Assign new person_id
    slug_counter[slug] = slug_counter.get(slug, 0) + 1
    person_id = generate_person_id(slug, slug_counter[slug])

    person = {
        "person_id": person_id,
        "full_name": full_name,
        "first_name": first_name,
        "last_name": last_name,
        "aliases": list({full_name, last_name}),
        "roles": ["civil_servant"],
        "civil_service_roles": [{
            "role": entry.get("role", ""),
            "department": entry.get("department", ""),
            "division": entry.get("Division", ""),
            "service_unit": entry.get("Service Unit", ""),
            "grade": entry.get("Grade", ""),
            "contract_title": entry.get("Contract Title", ""),
            "manager_name": entry.get("Manager Name", ""),
            "start_date": "",
            "end_date": ""
        }],
        "committees": entry.get("committees", []),
        "elections": [],
        "profiles": {
            "council_url": "",
            "linkedin": "",
            "twitter": ""
        }
    }

    new_people.append(person)
    existing_people[slug] = person

# === APPEND TO people.jsonl ===
mode = "a" if OUTPUT_FILE.exists() else "w"
with open(OUTPUT_FILE, mode) as f:
    for person in new_people:
        f.write(json.dumps(person) + "\n")

print(f"✅ Added {len(new_people)} new civil servants to: {OUTPUT_FILE}")
if flagged_people:
    print(f"⚠️  Skipped {len(flagged_people)} possible duplicates:")
    for name in flagged_people:
        print(" -", name)

### Append Candidates to people.jsonl

In [None]:
import json
from pathlib import Path
import re

# === CONFIGURATION ===
ELECTION_FILE = Path("../data/elections/kent_results_all_years_cleaned.jsonl")
PEOPLE_FILE = Path("../data/metadata/people.jsonl")
PEOPLE_FILE.parent.mkdir(parents=True, exist_ok=True)

# === UTILITY FUNCTIONS ===
def slugify(name):
    return re.sub(r"[^a-z0-9]+", "_", name.lower()).strip("_")

def generate_person_id(base_slug, counter):
    return f"{base_slug}_{counter:03d}"

# === LOAD EXISTING PEOPLE ===
existing_people = {}
slug_counter = {}

if PEOPLE_FILE.exists():
    with open(PEOPLE_FILE) as f:
        for line in f:
            person = json.loads(line)
            slug = slugify(person["full_name"])
            existing_people[slug] = person
            slug_counter[slug] = int(person["person_id"].split("_")[-1])

# === LOAD ELECTION RESULTS ===
with open(ELECTION_FILE, "r", encoding="utf-8") as f:
    election_data = [json.loads(line) for line in f if line.strip()]

# === PROCESS NEW CANDIDATES ===
for record in election_data:
    year = int(record.get("election_date", "")[:4])
    division = record.get("division", "")
    
    for cand in record.get("candidates", []):
        canonical_name = cand.get("canonical_name", cand.get("name", "")).strip()
        if not canonical_name:
            continue

        slug = slugify(canonical_name)
        first, *rest = canonical_name.split()
        last = rest[-1] if rest else first

        # Prepare the election record
        election_info = {
            "year": year,
            "division": division,
            "party": cand.get("party", ""),
            "status": cand.get("status", "")
        }

        if slug in existing_people:
            # Append election to existing person if not a duplicate
            existing = existing_people[slug]
            if election_info not in existing["elections"]:
                existing["elections"].append(election_info)
        else:
            # New person: assign new ID
            slug_counter[slug] = slug_counter.get(slug, 0) + 1
            person_id = generate_person_id(slug, slug_counter[slug])

            new_person = {
                "person_id": person_id,
                "full_name": canonical_name,
                "first_name": first,
                "last_name": last,
                "aliases": list({canonical_name, last}),
                "roles": ["candidate"],
                "civil_service_roles": [],
                "committees": [],
                "elections": [election_info],
                "profiles": {
                    "council_url": "",
                    "linkedin": "",
                    "twitter": ""
                }
            }

            existing_people[slug] = new_person

# === WRITE UPDATED PEOPLE FILE ===
with open(PEOPLE_FILE, "w") as f:
    for person in existing_people.values():
        f.write(json.dumps(person) + "\n")

print(f"✅ people.jsonl updated with {len(existing_people)} unique individuals.")