In [None]:
# imports
import os
import json
import glob
from pathlib import Path
import xml.etree.ElementTree as et
import regex
import requests
import pandas as pd
import numpy as np
from tqdm import tqdm
from bs4 import BeautifulSoup
from datetime import datetime
import time
import zipfile
import io

# helper functions and constants
from dataGeneration.extract_contributions import extract
from dataGeneration.clean_text import clean_name_headers
from dataGeneration.match_names import insert_politician_id_into_contributions_extended
import paths as PATHS

# **3 Politicians**

## **3.1 Add unique Faction IDs to MPs**

Assigns a normalized faction ID to each member of parliament (MP) based on their associated faction name. The normalized mapping was created in the previous step (2.2). The result is a consistent representation of faction membership for every politician across all electoral terms.

The script:
- Loads the list of MPs (mps.pkl) and normalized faction data (factionsAbbreviations.pkl)
- Adds a new column faction_id to the MP DataFrame
- Assigns the appropriate ID by matching the original institution_name with the normalized faction_name
- Outputs the enriched MP data with faction references


### **Input:**
```
dataStage02/
├── dataPoliticiansStage02/
│   └── mps.pkl
dataStage03/
├── dataFactionsStage03/
│   └── factionsAbbreviations.pkl
```

### **Ouput:**
```
dataStage03
├── dataPoliticiansStage03/
│   ├── mpsFactions.pkl
dataExcel/
└── mpsFactions_stage03.xlsx
```


**Columns (mpsFactions.pkl):**
| Column name       | Description                                                      |
|-------------------|------------------------------------------------------------------|
| `ui`              | Unique ID for the politician                                     |
| `electoral_term`  | Electoral term number                                            |
| `faction_id`      | Integer ID of matched faction (from `factionsAbbreviations.pkl`) |
| `first_name`      | First name(s) of the MP                                          |
| `last_name`       | Last name of the MP                                              |
| `birth_place`     | Place of birth                                                   |
| `birth_country`   | Country of birth                                                 |
| `birth_date`      | Date of birth (as string)                                        |
| `death_date`      | Date of death (or -1 if unknown)                                 |
| `gender`          | Gender                                                           |
| `profession`      | Profession                                                       |
| `constituency`    | Additional location info                                         |
| `aristocracy`     | Nobility title (if any)                                          |
| `academic_title`  | Academic title (e.g. Dr., Prof.)                                 |
| `institution_type`| Type of institution (e.g. "Fraktion/Gruppe")                     |
| `institution_name`| Name of the institution (used for faction matching)              |

In [None]:
# Input and output directories via PATHS
POLITICIANS_INPUT = PATHS.POLITICIANS_STAGE02
FACTIONS_INPUT = PATHS.FACTIONS_ABBR_STAGE03
POLITICIANS_OUTPUT_DIR = PATHS.MPS_FACTIONS_STAGE03.parent
POLITICIANS_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Daten laden
factions = pd.read_pickle(FACTIONS_INPUT)
mps = pd.read_pickle(POLITICIANS_INPUT)

# Neue Spalte für Fraktions-ID hinzufügen
mps.insert(2, "faction_id", -1)

# Match Fraktionen anhand der Namen
for faction_name, faction_id in zip(factions["faction_name"], factions["id"]):
    mps.loc[mps["institution_name"] == faction_name, "faction_id"] = faction_id

# Speichern
mps.to_pickle(PATHS.MPS_FACTIONS_STAGE03)
mps.to_excel(PATHS.EXCEL_MPS_STAGE03, index=False)

print("Fraktionen zu den Parlamentsmitgliedern hinzugefügt.")

## **3.2 Scrape Government Membership Data from Wikipedia**


- Extracts structured data on German government members for all cabinets since 1949 from Wikipedia: https://de.wikipedia.org/wiki/Liste_der_deutschen_Regierungsmitglieder_seit_1949.
- This dataset complements the MP data by adding government-specific roles such as Ministers and Secretaries of State.


### **Input:**
```
None.
```

### **Ouput:**
```
rawData/
├── politiciansRawData/
│   └── mgs.pkl
dataExcel/
└── mgs_wiki_rawData.xlsx
```


**Columns (mgs.pkl):**
| Column name        | Description                                                   |
|--------------------|---------------------------------------------------------------|
| `ui`               | Unique ID assigned by the script (e.g. "gov_123")             |
| `last_name`        | Last name of the politician                                   |
| `first_name`       | First name(s) (stored as list)                                |
| `position`         | Government position held (e.g., "Bundeskanzler")              |
| `position_from`    | Year the office began                                         |
| `position_until`   | Year the office ended (or -1 if open-ended)                   |
| `birth_date`       | Year of birth                                                 |
| `death_date`       | Year of death or -1 if still alive                            |
| `faction`          | Main political affiliation (e.g., "CDU", "SPD", "parteilos")  |
| `additional_faction` | Additional party affiliation (e.g., coalition)              |

In [None]:
# Output directory
output_dir = PATHS.RAW_POLITICIANS
output_dir.mkdir(parents=True, exist_ok=True)

URL = "https://de.wikipedia.org/wiki/Liste_der_deutschen_Regierungsmitglieder_seit_1949"

page = requests.get(URL)
soup = BeautifulSoup(page.text, "html.parser")
main_section = soup.find("div", {"id": "mw-content-text"}).find("div")

mgs = {
    "ui": [],
    "last_name": [],
    "first_name": [],
    "position": [],
    "position_from": [],
    "position_until": [],
    "birth_date": [],
    "death_date": [],
    "faction": [],
    "additional_faction": [],
}

ui = 0

for div in main_section.find_all("div", recursive=False):
    for ul in div.find_all("ul", recursive=False):
        for li in ul.find_all("li", recursive=False):
            find_all_a = li.find_all("a", recursive=False)
            name = find_all_a[0].text

            if "Liste" in name or "Kabinett" in name:
                break

            # This lines exclude Kristine Schröder because of her name change
            # due to marriage she has another structure in here HTML part, and
            # CDU is matched as name.
            # ToDo: Add second name as a entry at the end, not that important
            # as she is member of StammdatenXML
            if "CDU" in name:
                continue

            name = name.split(" ")
            first_name = name[:-1]
            last_name = name[-1]

            if len(find_all_a) > 2:
                faction = find_all_a[1].text
                additional_faction = find_all_a[2].text
            elif len(find_all_a) == 2:
                faction = find_all_a[1].text
                additional_faction = ""
            else:
                faction = "parteilos"
                additional_faction = ""

            birth_death = li.a.next_sibling.strip()

            match_years = regex.findall(r"(\d{4})", birth_death)
            if len(match_years) == 1:
                birth_date = int(match_years[0])
                death_date = -1
            elif len(match_years) == 2:
                birth_date = int(match_years[0])
                death_date = int(match_years[1])
            else:
                birth_date = -1
                death_date = -1

            # Iterate over government position of the current politician.
            for pos in li.find_all("li"):
                pos = pos.text
                match_years = regex.findall(r"(\d{4})", pos)
                if len(match_years) == 2:
                    # Example: "1974–1980 Verkehr und Post- und Fernmeldewesen"
                    position_from = int(match_years[0])
                    position_until = int(match_years[1])
                    position_full = pos.split(" ", 1)
                    if len(position_full) == 2:
                        position = position_full[1]
                    else:
                        print(name)
                        raise ValueError("What is the position?")

                elif len(match_years) == 1:
                    if "seit" in pos:
                        # Example: "seit 2018 Arbeit und Soziales"
                        position_from = int(match_years[0])
                        position_until = -1
                        pos = pos.split(" ", 1)[1]
                        pos = pos.split(" ", 1)
                        position = pos[1]
                    else:
                        # Example: "1969 Justiz"
                        pos = pos.split(" ", 1)
                        position = pos[1]
                        position_from = position_until = 2018

                elif len(match_years) == 4:
                    # Example: "1969–1982, 1982–1983 Keks Beauftragter"
                    position = pos.split(" ", 1)[1]
                    position = position.split(" ", 1)[1]
                    position_from = int(match_years[0])
                    position_until = int(match_years[1])

                    # Position will be appended twice. Here and below after
                    # the if statement.
                    mgs["ui"].append("gov_" + str(ui))
                    mgs["first_name"].append(first_name)
                    mgs["last_name"].append(last_name)
                    mgs["position"].append(position)
                    mgs["position_from"].append(position_from)
                    mgs["position_until"].append(position_until)
                    mgs["birth_date"].append(birth_date)
                    mgs["death_date"].append(death_date)
                    mgs["faction"].append(faction)
                    mgs["additional_faction"].append(additional_faction)

                    position_from = int(match_years[2])
                    position_until = int(match_years[3])

                else:
                    raise ValueError("Still something wrong")

                mgs["ui"].append("gov_" + str(ui))
                mgs["first_name"].append(first_name)
                mgs["last_name"].append(last_name)
                mgs["position"].append(position)
                mgs["position_from"].append(position_from)
                mgs["position_until"].append(position_until)
                mgs["birth_date"].append(birth_date)
                mgs["death_date"].append(death_date)
                mgs["faction"].append(faction)
                mgs["additional_faction"].append(additional_faction)

            ui += 1

# Convert and save
mgs = pd.DataFrame(mgs)
save_path = output_dir / "mgs.pkl"
mgs.to_pickle(save_path)

# Excel export
PATHS.DATA_EXCEL_DIR.mkdir(parents=True, exist_ok=True)
mgs.to_excel(PATHS.EXCEL_MGS, index=False)

print(f"Saved {len(mgs)} politicians to {save_path}")

## **3.3 Merge Parliament and Government Members**

Merges the dataset of MPs mpsFactions.pkl with the dataset of MGs mgs.pkl into a unified dataframe politicians.csv that includes all individuals with legislative or executive roles in the Bundestag:
- **For each government member:**
    - The algorithm attempts to match by last name, first name, and birth date.
    - If a match is found in the MP dataset, the government position (e.g. Minister, Chancellor) is appended to that MP across all electoral terms the position covers.
    - If no match is found, a new entry with a generated ui (unique identifier) is created manually.
    - The electoral term(s) are inferred from the time period of the government position using a mapping table of legislative periods.
- **The output:**
    - Contains full biographical and role-specific information
    - Covers all electoral periods from 1 to 20


### **Input:**
```
rawData/
├── politiciansRawData/
│   └── mgs.pkl
dataStage03/
├── dataPoliticiansStage03/
│   ├── mpsFactions.pkl
├── dataFactionsStage03/
│   └── factionsAbbreviations.pkl
```

### **Ouput:**
```
dataStage03/
├── dataPoliticiansStage03/
│   └── politicians.csv
dataExcel/
└── politicians_stage03.xlsx
```


**Columns (politicians.csv):**
| Column name       | Description                                                                 |
|-------------------|-----------------------------------------------------------------------------|
| `ui`              | Unique identifier (integer)                                                 |
| `electoral_term`  | Electoral period (1-based index from 1949)                                  |
| `faction_id`      | Normalized integer ID of party affiliation (linked to faction lookup table) |
| `first_name`      | First name of the person (as string or joined from list)                    |
| `last_name`       | Last name of the person                                                     |
| `birth_place`     | Place of birth (may be empty)                                               |
| `birth_country`   | Country of birth (default: "Deutschland")                                   |
| `birth_date`      | Year of birth (as string)                                                   |
| `death_date`      | Year of death (or "-1" if still alive)                                      |
| `gender`          | Gender ("m" / "w")                                                           |
| `profession`      | Stated profession                                                           |
| `constituency`    | Place of political representation                                           |
| `aristocracy`     | Aristocratic prefix (e.g., "von", "Freiherr")                               |
| `academic_title`  | Academic title (e.g., "Dr.")                                                |
| `institution_type`| Type of institution (e.g., "Fraktion/Gruppe", "Regierungsmitglied")         |
| `institution_name`| Name of institution or office held

In [None]:
# input directory
mps_path = PATHS.MPS_FACTIONS_STAGE03
mgs_path = PATHS.RAW_POLITICIANS / "mgs.pkl"
factions_path = PATHS.FACTIONS_ABBR_STAGE03

# Output directory via PATHS
output_path = PATHS.SPEAKER_LOOKUP_STAGE03.parent
output_path.mkdir(parents=True, exist_ok=True)

# Read data
mps = pd.read_pickle(mps_path)
mgs = pd.read_pickle(mgs_path)
factions = pd.read_pickle(factions_path)

# helper functions
electoral_terms_dict = {
    "from": [
        1949,
        1953,
        1957,
        1961,
        1965,
        1969,
        1972,
        1976,
        1980,
        1983,
        1987,
        1990,
        1994,
        1998,
        2002,
        2005,
        2009,
        2013,
        2017,
        2021
    ],
    "until": [
        1953,
        1957,
        1961,
        1965,
        1969,
        1972,
        1976,
        1980,
        1983,
        1987,
        1990,
        1994,
        1998,
        2002,
        2005,
        2009,
        2013,
        2017,
        2021,
        -1
    ],
}

faction_patterns = {
    "Bündnis 90/Die Grünen": r"(?:BÜNDNIS\s*(?:90)?/?(?:\s*D[1I]E)?|Bündnis\s*90/(?:\s*D[1I]E)?)?\s*[GC]R[UÜ].?\s*[ÑN]EN?(?:/Bündnis 90)?|Bündnis 90/Die Grünen",  # noqa: E501
    "CDU/CSU": r"(?:Gast|-)?(?:\s*C\s*[DSMU]\s*S?[DU]\s*(?:\s*[/,':!.-]?)*\s*(?:\s*C+\s*[DSs]?\s*[UÙ]?\s*)?)(?:-?Hosp\.|-Gast|1)?",  # noqa: E501
    "BP": r"^BP",
    "DA": r"^DA",
    "DP": r"^DP",
    "DIE LINKE.": r"DIE LINKE",
    "DPB": r"(?:^DPB)",
    "DRP": r"DRP(\-Hosp\.)?|SRP",
    "DSU": r"^DSU",
    "FDP": r"\s*F\.?\s*[PDO][.']?[DP]\.?",
    "Fraktionslos": r"(?:fraktionslos|Parteilos|parteilos)",
    "FU": r"^FU",
    "FVP": r"^FVP",
    "Gast": r"Gast",
    "GB/BHE": r"(?:GB[/-]\s*)?BHE(?:-DG)?",
    "KPD": r"^KPD",
    "PDS": r"(?:Gruppe\s*der\s*)?PDS(?:/(?:LL|Linke Liste))?",
    "SPD": r"\s*'?S(?:PD|DP)(?:\.|-Gast)?",
    "SSW": r"^SSW",
    "SRP": r"^SRP",
    "WAV": r"^WAV",
    "Z": r"^Z$",
    "DBP": r"^DBP$",
    "NR": r"^NR$",
}


def get_faction_abbrev(faction, faction_patterns):
    """matches the given faction and returns an id"""

    for faction_abbrev, faction_pattern in faction_patterns.items():
        if regex.search(faction_pattern, faction):
            return faction_abbrev
    return None


def get_electoral_term(from_year=None, to_year=None):
    if not from_year and not to_year:
        raise AttributeError()
    elif not from_year:
        if to_year in electoral_terms_dict["until"]:
            return electoral_terms_dict["until"].index(to_year) + 1
        else:
            if to_year > 2017:
                return 19
            for counter, year in enumerate(electoral_terms_dict["until"]):
                if year > to_year:
                    return counter + 1
            raise ValueError()
    elif not to_year:
        if from_year in electoral_terms_dict["from"]:
            return electoral_terms_dict["from"].index(from_year) + 1
        else:
            if from_year > 2017:
                return 19
            for counter, year in enumerate(electoral_terms_dict["from"]):
                if year > from_year:
                    return counter
            raise ValueError()
    else:
        from_year = get_electoral_term(from_year=from_year, to_year=None)
        to_year = get_electoral_term(from_year=None, to_year=to_year)
        if from_year != to_year:
            return list(range(from_year, to_year + 1))
        else:
            return [from_year]


politicians = mps.copy()
politicians["first_name"] = politicians["first_name"].str.replace("-", " ", regex=False)

# merging for mgs
mgs_iter = zip(
    mgs["last_name"],
    mgs["first_name"],
    mgs["birth_date"],
    mgs["death_date"],
    mgs["position"],
    mgs["position_from"],
    mgs["position_until"],
    mgs["faction"],
)
for (
    last_name,
    first_name,
    birth_date,
    death_date,
    position,
    position_from,
    position_until,
    faction,
) in tqdm(list(mgs_iter), desc="Merging mp-data..."):

    # Hardcode special cases
    if last_name == "Fischer" and first_name[0] == "Joschka":
        first_name = ["Joseph"]
    elif last_name == "Waigel" and first_name[0] == "Theo":
        first_name = ["Theodor"]
    elif last_name == "Baum" and first_name[0] == "Gerhart":
        first_name = ["Gerhart Rudolf"]
    elif last_name == "Heinemann" and first_name[0] == "Gustav":
        first_name = ["Gustav W."]
    elif last_name == "Lehr" and first_name[0] == "Ursula":
        first_name = ["Ursula Maria"]
    elif last_name == "Möllemann" and first_name[0] == "Jürgen":
        first_name = ["Jürgen W."]
    elif last_name == "Kinkel" and first_name[0] == "Klaus":
        faction = "FDP"

    faction_abbrev = get_faction_abbrev(faction, faction_patterns)

    if faction_abbrev:
        faction_match = int(
            factions.loc[factions["abbreviation"] == faction_abbrev, "id"].iloc[0]
        )
    else:
        faction_match = -1

    first_name = [regex.sub("-", " ", name) for name in first_name]

    electoral_term_to_be_changed = -1
    electoral_terms = get_electoral_term(
        from_year=int(position_from), to_year=int(position_until)
    )
    possible_matches = politicians.loc[
        (politicians["last_name"] == last_name)
        & (politicians["first_name"].str.contains(first_name[0]))
        & (politicians["birth_date"].str.contains(str(birth_date)))
    ]

    possible_matches = possible_matches.drop_duplicates(subset="ui", keep="first")

    if len(possible_matches) == 1:
        for electoral_term in electoral_terms:
            series = {
                "ui": possible_matches["ui"].iloc[0],
                "electoral_term": electoral_term,
                "faction_id": faction_match,
                "first_name": possible_matches["first_name"].iloc[0],
                "last_name": possible_matches["last_name"].iloc[0],
                "birth_place": possible_matches["birth_place"].iloc[0],
                "birth_country": possible_matches["birth_country"].iloc[0],
                "birth_date": possible_matches["birth_date"].iloc[0],
                "death_date": possible_matches["death_date"].iloc[0],
                "gender": possible_matches["gender"].iloc[0],
                "profession": possible_matches["profession"].iloc[0],
                "constituency": possible_matches["constituency"].iloc[0],
                "aristocracy": possible_matches["aristocracy"].iloc[0],
                "academic_title": possible_matches["academic_title"].iloc[0],
                "institution_type": "Regierungsmitglied",
                "institution_name": position,
            }
            series = pd.DataFrame(series, index=[politicians.index[-1]])
            politicians = pd.concat([politicians, series], ignore_index=True)
            # success_counter += 1
    elif len(possible_matches) > 1:
        # This doesn't get reached
        raise RuntimeError("What happened?")
    else:
        if len(first_name) > 1:
            possible_matches = politicians.loc[
                (politicians["last_name"] == last_name)
                & (politicians["first_name"] == (" ".join([first_name[0], first_name[1]])))
                & (politicians["birth_date"].str.contains(str(birth_date)))
            ]

            possible_matches = possible_matches.drop_duplicates(
                subset="ui", keep="first"
            )

        if len(possible_matches) == 1:
            for electoral_term in electoral_terms:
                series = {
                    "ui": possible_matches["ui"].iloc[0],
                    "electoral_term": electoral_term,
                    "faction_id": faction_match,
                    "first_name": possible_matches["first_name"].iloc[0],
                    "last_name": possible_matches["last_name"].iloc[0],
                    "birth_place": possible_matches["birth_place"].iloc[0],
                    "birth_country": possible_matches["birth_country"].iloc[0],
                    "birth_date": possible_matches["birth_date"].iloc[0],
                    "death_date": possible_matches["death_date"].iloc[0],
                    "gender": possible_matches["gender"].iloc[0],
                    "profession": possible_matches["profession"].iloc[0],
                    "constituency": possible_matches["constituency"].iloc[0],
                    "aristocracy": possible_matches["aristocracy"].iloc[0],
                    "academic_title": possible_matches["academic_title"].iloc[0],
                    "institution_type": "Regierungsmitglied",
                    "institution_name": position,
                }
                series = pd.DataFrame(series, index=[politicians.index[-1]])
                politicians = pd.concat([politicians, series], ignore_index=True)
        elif len(possible_matches) > 1:
            # This doesn't get reached
            raise RuntimeError("What happened?")
        else:
            ui_temp = max(politicians["ui"].tolist()) + 1
            for electoral_term in electoral_terms:
                series = {
                    "ui": ui_temp,
                    "electoral_term": electoral_term,
                    "faction_id": faction_match,
                    "first_name": " ".join(first_name),
                    "last_name": last_name,
                    "birth_place": "",
                    "birth_country": "",
                    "birth_date": str(birth_date),
                    "death_date": str(death_date),
                    "gender": "",
                    "profession": "",
                    "constituency": "",
                    "aristocracy": "",
                    "academic_title": "",
                    "institution_type": "Regierungsmitglied",
                    "institution_name": position,
                }
                series = pd.DataFrame(series, index=[politicians.index[-1]])
                politicians = pd.concat([politicians, series], ignore_index=True)

# Save output
politicians.to_csv(PATHS.SPEAKER_LOOKUP_STAGE03.with_suffix(".csv"), index=False)
PATHS.DATA_EXCEL_DIR.mkdir(parents=True, exist_ok=True)
politicians.to_excel(PATHS.EXCEL_POLITICIANS_STAGE03, index=False)

print(f"Saved politicians.csv to {PATHS.SPEAKER_LOOKUP_STAGE03.parent}")

## **3.4 Create Speaker-to-Faction Lookup Table**


Creates a lookup table mapping each speaker (by speaker_id) and electoral term to their most recent valid faction (faction_id). This is necessary for resolving speaker affiliations in cases where the original speech data does not contain accurate or complete faction information.
- **Process overview:**
    - Iterates over all combinations of speaker_id (ui) and electoral_term.
    - For each group, scans the politician records in reverse order to find the last valid (≠ -1) faction ID.
    - If a valid faction is found, adds a row to the lookup table with: speaker_id, electoral_term, faction_id
- **This lookup table can later be used to:**
    - Fill in missing faction IDs in speech datasets
    - Support faction-based analyses when direct affiliation is missing or ambiguous


### **Input:**
```
dataStage03/
├── dataPoliticiansStage03/
│   └── politicians.csv
```

### **Ouput:**
```
dataStage03/
├── dataPoliticiansStage03/
│   └── speaker_faction_lookup.csv
```


**Columns (speaker_faction_lookup.csv):**
| Column name      | Description                                             |
|------------------|---------------------------------------------------------|
| `speaker_id`     | Unique ID of the speaker (`ui` from `politicians.csv`) |
| `electoral_term` | Electoral period ID (integer)                          |
| `faction_id`     | Final party ID assigned in that term                   |

In [None]:
# input path
POLITICIANS = PATHS.SPEAKER_LOOKUP_STAGE03.parent
politicians = pd.read_csv(PATHS.SPEAKER_LOOKUP_STAGE03.with_suffix(".csv"))

# Tabelle vorbereiten
lookup_rows = []

# Gruppiere nach speaker_id (ui) und Wahlperiode
for (uid, term), group in politicians.groupby(["ui", "electoral_term"]):
    # sichere Sortierung
    group = group.reset_index(drop=True)

    # Letzte gültige Fraktion (≠ -1) finden
    faction_id_candidate = -1
    for fid in reversed(group["faction_id"].values):
        if fid != -1:
            faction_id_candidate = fid
            break

    # Nur wenn ein gültiger Wert gefunden wurde, speichern
    if faction_id_candidate != -1:
        lookup_rows.append({
            "speaker_id": uid,
            "electoral_term": term,
            "faction_id": faction_id_candidate
        })

# In DataFrame umwandeln
lookup_df = pd.DataFrame(lookup_rows)

# Speichern
lookup_df.to_csv(PATHS.SPEAKER_LOOKUP_STAGE03, index=False)

print(f"Lookup-Tabelle mit {len(lookup_df)} Einträgen erstellt unter {PATHS.SPEAKER_LOOKUP_STAGE03}")
