In [1]:
import os
from bs4 import BeautifulSoup

html_dir = "html"
offset = 0
amount = 309

# List to store BeautifulSoup objects
soups = dict()

# Loop through each file in the specified directory
for filename in sorted(os.listdir(html_dir))[offset:offset+amount]:
    # Check if the file is an HTML file
    if filename.endswith('.html'):
        # Construct full file path
        file_path = os.path.join(html_dir, filename)
        # Open and read the HTML file
        with open(file_path, 'r', encoding="latin-1") as file:
            # Create a BeautifulSoup object and append it to the list
            soup = BeautifulSoup(file, 'html.parser')
            soups[filename] = soup

# Capture the vote subjects

In [2]:
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Union
import re
from copy import copy

def is_subject(tag):
    return (tag.name == "p" and "Titre2NL" in tag.get('class', [])) or \
        (tag.name == "h2" and tag.get('class', []) == [] and not tag.find('span', attrs={'lang': 'FR'}))

def is_section(tag):
    return (tag.name == "p" and ("Titre1NL" in tag.get('class', []) or "Titre1FR" in tag.get('class', []))) or \
        (tag.name == "h1" and tag.get('class', []))

def is_votecounts_table(tag):
    "should return True for beginning of a table"
    return (tag.name == "table") and (len(tag) == 11)  

def consistent_votekeys(s1, s2):
    key = '/'.join([s1, s2])
    if ('Ja' in key) or ('Oui' in key):
        key = 'yay'
    elif ('Nee' in key) or ('Non' in key):
        key = 'nay'
    elif ('Onthoud' in key) or ('Abstentions' in key):
        key = 'dunno'
    elif ('Total' in key) or ('Totaal' in key):
        key = 'total'
    else:
        print(f'unexpected row contains {s1} and {s2}')
    return key

def parse_votes_table(tag, debug=True):
    # feed this the tag that made is_count return True
    # Initialize an empty list to store the parsed data
    parsed_data = {}

    # Iterate through each row of the table
    taalgroepen=False
    for i, row in enumerate(tag.find_all('tr')):
        if i == 0:
            continue  # skip the first row which states the vote id within the subject

        row_data = []
        for cell in row.find_all('td'):
            # Extract the text from the cell and strip any leading/trailing whitespace
            cell_text = cell.get_text(strip=True)
            row_data.append(cell_text)
            print(cell_text)

        if i == 1:
            print('_'.join(row_data))
            # Check whether we are counting taalgroepen separately:
            taalgroepen = ('F' in "_".join(row_data)) and ('N' in "_".join(row_data))
            print(taalgroepen)
            if taalgroepen:
                lankeys = row_data[1:]
                print(lankeys)
                continue

        if debug:   # todo make check an assertion once all cases covered by is_votecounts_table
            check = (len(row_data) == 3) and (row_data[0] in ['Ja', "Nee", "Totaal", "Onthoudingen"])
            if not check:
                Warning(f"Possibly invalid table passed is_votecounts_table (row_data: {row_data})")

        # Parse data as ints if possible, else parse anyway but throw a warning.
        try:
            key = consistent_votekeys(row_data[0], row_data[-1])
            if taalgroepen:
                parsed_data[key] = {lan: int(lvote) for lan, lvote in zip(lankeys, row_data[1:-1])}
            else:
                parsed_data[key] = int(row_data[1])  # key: yay/nay/dunno; row_data[1]: n of votes
        except:
            key = consistent_votekeys(row_data[0], row_data[-1])
            if taalgroepen:
                parsed_data[key] = {lan: lvote for lan, lvote in zip(lankeys, row_data[1:-1])}
            else:
                parsed_data[key] = row_data[1]
            Warning("Non-int votecount, possibly invalid format passing is_votecounts_table")
    return parsed_data



@dataclass
class Vote:
    session_id: str
    nr_within_session: int = -1
    subject: str = ""
    summarized_vote: dict = field(default_factory=lambda: {})
    yay: List[str] = field(default_factory=lambda: [])
    nay: List[str] = field(default_factory=lambda: [])
    dunno: List[str] = field(default_factory=lambda: [])
    srcfile: Union[str, os.PathLike[str]] = ""

votes: Dict[Tuple[str, int], Vote] = dict()
def close_vote_and_start_new(Vote, votes, session_id, srcfile, current_vote, tag_text):
    if current_vote.subject and current_vote.nr_within_session != -1:
        votes[(session_id, current_vote.nr_within_session)] = copy(current_vote)
    current_vote = Vote(session_id=session_id, srcfile=srcfile, subject=tag_text + "\n")
    return current_vote


for i, (filename, soup) in enumerate(soups.items()):
    # Find the sections in the html file (contained in soup) which contain "naamstemmingen" (case insensitive) as a h1-header or the Titre1Nl-format:
    naamstemmingen = soup.find_all(
        lambda tag: ((tag.name == "p" and tag.get("class") == ["Titre1NL"]) or (tag.name == "h1")) and "naamstemmingen" in tag.text.lower().strip()
        )
    for naamstemming in naamstemmingen:
        session_id = f"{i+1+offset:04d}"
        print(f"{session_id}: {naamstemming}")
        print("-"*100)

        current_vote: Vote = Vote(session_id=session_id, srcfile=filename)
        last_tag_was_subject = False
        # .next_siblings looks at the tag ahead of naamstemming in the corresponding soup.
        for j, tag in enumerate(naamstemming.next_siblings):
            tag_text = " ".join(tag.strings).lower()

            # Next siblings also produces a nonsense string (like \n) after every real tag
            # So skip all even next tags, which will be nonsense (but annoyingly, a different nonsense every document)
            if j % 2 == 0:
                continue

            # A bold (titre2) sentence in dutch signifies a subject of a vote. It can be a new subject
            # or a new line in the current subject. Depends on whether the previous sentence was also a subject
            # or not
            if is_subject(tag):
                # Every subtitle in between "Naamstemmingen" and the next H1 title is considered part of a vote
                # If the previous tag was a subject, this new line is part of the previous one
                print(f"\nFound SUBTITLE: {tag_text}")
                if last_tag_was_subject:
                    current_vote.subject += tag_text + "\n"
                else:
                    # If not: consider this to be a new vote, close off the old one and reset
                    current_vote = close_vote_and_start_new(Vote, votes, session_id, filename, current_vote, tag_text)
                last_tag_was_subject = True
            elif is_section(tag):
                # If we hit the next Title, the voting section is done
                # This should never happen right after a subtitle, otherwise we won't have the vote nr anyway
                # However, we're searching for "naamstemmingen" so it might happen the french version is next immediately below
                # it's also a title, so skip that
                if "votes nominatifs" in tag_text:
                    last_tag_was_subject = False
                    continue
                # If this is truly a next title, close off the current vote and break the loop
                current_vote = close_vote_and_start_new(Vote, votes, session_id, filename, current_vote, tag_text)
                break

            else:
                # In this case it's normal text
                # First, check if it contains the vote nr (aka we're in between vote headers)
                vote_match = re.search(r'\(stemming\/vote(?:\s|&nbsp;)* *\d+\)', tag_text, re.IGNORECASE)
                if vote_match:
                    # Extract the matched text
                    matched_text = vote_match.group()
                    print(f"Found VOTE: {matched_text}")
                    # Extract the number, assuming it's the last part of the matched string
                    number = int(re.search(r'\d+', matched_text).group())
                    # Set the current vote to that number. This overwrites the number inhereted from the previous voting session in this section.
                    current_vote.nr_within_session = number

                    # Parse the table counting the votes
                    if not is_votecounts_table(tag):  # matching stemming/vote should only find tables:
                        Warning(f"Tag containing stemming/vote is not a votecountstable: {tag}")
                    # Overwrite the votecountstable inhereted from the previous voting session in this section.
                    current_vote.summarized_vote = parse_votes_table(tag)
                    tabletag = tag
                   
                last_tag_was_subject = False
            
        # If this is truly a next title, close off the current vote and break the loop
        close_vote_and_start_new(Vote, votes, session_id, filename, current_vote, tag_text)

0004: <p class="Titre1NL"><a name="_Toc21073224"><span lang="FR-BE" style="mso-ansi-language:
FR-BE">Naamstemmingen</span></a><span lang="FR-BE" style="mso-ansi-language:FR-BE"><o:p></o:p></span></p>
----------------------------------------------------------------------------------------------------

Found SUBTITLE: 09 aangehouden amendementen en
artikelen van het wetsontwerp tot opening van voorlopige kredieten voor de
maanden augustus, september en oktober 2019 (25/1-5)
Found VOTE: (stemming/vote 1)
Ja
16
Oui
Ja_16_Oui
False
Nee
91
Non
Onthoudingen
19
Abstentions
Totaal
126
Total
Found VOTE: (stemming/vote 2)
Ja
15
Oui
Ja_15_Oui
False
Nee
92
Non
Onthoudingen
19
Abstentions
Totaal
126
Total
Found VOTE: (stemming/vote 3)
Ja
34
Oui
Ja_34_Oui
False
Nee
92
Non
Onthoudingen
0
Abstentions
Totaal
126
Total
Found VOTE: (stemming/vote 4)
Ja
33
Oui
Ja_33_Oui
False
Nee
83
Non
Onthoudingen
8
Abstentions
Totaal
124
Total
Found VOTE: (stemming/vote 5)
Ja
34
Oui
Ja_34_Oui
False
Nee
91
Non
Onthouding

In [5]:
def parse_votes_table(tag, debug=True):
    # feed this the tag that made is_count return True
    # Initialize an empty list to store the parsed data
    parsed_data = {}

    # Iterate through each row of the table
    taalgroepen=False
    for i, row in enumerate(tag.find_all('tr')):
        if i == 0:
            continue  # skip the first row which states the vote id within the subject

        row_data = []
        for cell in row.find_all('td'):
            # Extract the text from the cell and strip any leading/trailing whitespace
            cell_text = cell.get_text(strip=True)
            row_data.append(cell_text)
            print(cell_text)

        if i == 1:
            print('_'.join(row_data))
            # Check whether we are counting taalgroepen separately:
            taalgroepen = ('F' in "_".join(row_data)) and ('N' in "_".join(row_data))
            print(taalgroepen)
            if taalgroepen:
                lankeys = row_data[1:]
                print(lankeys)
                continue

        if debug:   # todo make check an assertion once all cases covered by is_votecounts_table
            check = (len(row_data) == 3) and (row_data[0] in ['Ja', "Nee", "Totaal", "Onthoudingen"])
            if not check:
                Warning(f"Possibly invalid table passed is_votecounts_table (row_data: {row_data})")

        # Parse data as ints if possible, else parse anyway but throw a warning.
        try:
            key = consistent_votekeys(row_data[0], row_data[-1])
            if taalgroepen:
                parsed_data[key] = {lan: int(lvote) for lan, lvote in zip(lankeys, row_data[1:-1])}
            else:
                parsed_data[key] = int(row_data[1])  # key: yay/nay/dunno; row_data[1]: n of votes
        except:
            key = consistent_votekeys(row_data[0], row_data[-1])
            if taalgroepen:
                parsed_data[key] = {lan: lvote for lan, lvote in zip(lankeys, row_data[1:-1])}
            else:
                parsed_data[key] = row_data[1]
            Warning("Non-int votecount, possibly invalid format passing is_votecounts_table")
    return parsed_data


parse_votes_table(tabletag)

Oui
121
Ja
Oui_121_Ja
False
Non
15
Nee
Abstentions
1
Onthoudingen
Total
137
Totaal


{'yay': 121, 'nay': 15, 'dunno': 1, 'total': 137}

# Capture the date of the session

In [4]:
for soup in soups.values():
    print(re.search(
        r"([1-9]|[1-2][0-9]|3[0-1])[\s\\n]*(januari|februari|maart|april|mei|juni|juli|augustus|september|oktober|november|december)[\s\\n]*(\d{4})",
        "\n".join(list(soup.strings)),
        re.IGNORECASE
    ).group().replace("\n", ""))

20 juni 2019
27 juni 2019
10 juli 2019
18 juli 2019
19 september 2019
26 september 2019
3 oktober 2019
8 oktober 2019
17 oktober 2019
24 oktober 2019
31 oktober 2019
7 november 2019
14 november 2019
21 november 2019
28 november 2019
5 december 2019
12 december 2019
19 december 2019
9 januari 2020
16 januari 2020
23 januari 2020
30 januari 2020
6 februari 2020
13 februari 2020
20 februari 2020
5 maart 2020
12 maart 2020
17 maart 2020
17 maart 2020
19 maart 2020
19 maart 2020
23 maart 2020
26 maart 2020
2 april 2020
9 april 2020
16 april 2020
23 april 2020
30 april 2020
23 oktober 2019
14 mei 2020
20 mei 2020
28 mei 2020
4 juni 2020
11 juni 2020
18 juni 2020
18 juni 2020
25 juni 2020
25 juni 2020
2 juli 2020
9 juli 2020
9 juli 2020
15 juli 2020
15 juli 2020
16 juli 2020
17 september 2020
24 september 2020
1 oktober 2020
2 oktober 2020
2 oktober 2020
2 oktober 2020
3 oktober 2020
8 oktober 2020
13 oktober 2020
15 oktober 2020
22 oktober 2020
29 oktober 2020
29 oktober 2020
5 november 2020

# Capture the names from the vote details

In [6]:
import logging

from common.text_corrections import fix_name


for i, (filename, soup) in enumerate(soups.items()):
    full_text = soup.text
    session_id = f"{i+1+offset:04d}"

    # Look for a line in the HTML where "details" and "naamstemmingen" occur, below which is listed who voted Y/N/eh
    vote_details = soup.find_all(lambda tag: ((tag.name == "p" and tag.get("class") == ["Titre1NL"]) or (tag.name == "h1")) and "detail" in tag.text.lower() and "naamstemmingen" in tag.text.lower())
    if vote_details:
        if len(vote_details) > 1:
            logging.warning(f"Found more than 1 occurence of Details of Naamstemmingen in document nr {session_id}")
            vote_details = vote_details[-1]
        else:
            vote_details = vote_details[0]
        
        naamstemming_index = full_text.find(vote_details.text)
        detailed_text = full_text[naamstemming_index:]

        split_on_vote = re.split(r"Vote[\s| ]*nominatif[\s| ]*-[\s| ]*Naamstemming:[\s| ]*(\d*)", detailed_text)

        for i in range(1, len(split_on_vote), 2):
            nr_within_session = int(split_on_vote[i])
            print(nr_within_session)
            vote_body = split_on_vote[i+1].strip()
            
            split_body = re.split(r"(Oui|Ja|Non|Nee|Abstentions|Onthoudingen)[\s| ]*(\d*)[\s| ]*(?:Oui|Ja|Non|Nee|Abstentions|Onthoudingen)", vote_body)
            
            for j in range(1, len(split_body), 3):
                vote = split_body[j].lower()
                amount_of_votes = split_body[j+1]
                names = split_body[j+2].replace("\n", " ")
                names = [n.strip() for n in names.split(",") if n.strip()]
                names = [' '.join(n.split()) for n in names]
                names = [fix_name(name, swap_first_last_name=False) for name in names]

                print(f"{amount_of_votes} votes {vote}: {names}")

                if vote in ("ja", "oui"):
                    if (session_id, nr_within_session) in votes:
                        votes[(session_id, nr_within_session)].yay = names
                elif vote in ("nee", "non"):
                    if (session_id, nr_within_session) in votes:
                        votes[(session_id, nr_within_session)].nay = names
                elif vote in ("abstentions", "onthoudingen"):
                    if (session_id, nr_within_session) in votes:
                        votes[(session_id, nr_within_session)].dunno = names
                else:
                    raise ValueError(f"Vote is not a valid value: {vote} for session: {session_id}")
        print("\n")
        

1
016 votes oui: ['Briers Jan', 'Bury Katleen', 'Creyelman Steven', 'Dedecker Jean-Marie', 'Depoortere Ortwin', 'De Spiegeleer Pieter', 'Dewulf Nathalie', 'Gilissen Erik', 'Pas Barbara', 'Ravyts Kurt', 'Sneppe Dominiek', 'Troosters Frank', 'Van Grieken Tom', 'Van Langenhove Dries', 'Vermeersch Wouter', 'Verreyt Hans']
091 votes non: ['Arens Josy', 'Bacquelaine Daniel', 'Bayet Hugues', 'Beke Wouter', 'Ben Achour Malik', 'Bertels Jan', 'Bogaert Hendrik', 'Burton Emmanuel', 'Buyst Kim', 'Calvo Kristof', 'Chanson Julie', 'Clarinval David', 'Cogolati Samuel', 'Colebunders Gaby', 'Creemers Barbara', 'Crombez John', 'Daems Greet', 'Daerden Frederic', 'Dallemagne Georges', 'DAmico Roberto', 'De Caluwe Robby', 'De Croo Alexander', 'Dedonder Ludivine', 'de Laveleye Severine', 'Delizee Jean-Marc', 'De Maegd Michel', 'Demon Franky', 'Depraetere Melissa', 'De Smet Francois', 'De Vriendt Wouter', 'Dierick Leen', 'Ducarme Denis', 'Flahaut Andre', 'Fonck Catherine', 'Gabriels Katja', 'Geens Koen', 'Gi

# Check the vote counts against the detailed naamstemmingen


In [17]:
votes_with_mistakes = {}
votes_with_error = {}

for (session, num), v in votes.items():
    n_yay = len(v.yay)
    n_nay = len(v.nay)
    n_dunno = len(v.dunno)

    # try:
    if not isinstance(v.summarized_vote['yay'], dict):
        wrong_yay = int(v.summarized_vote['yay']) != n_yay
        wrong_nay = int(v.summarized_vote['nay']) != n_nay
        wrong_dunno = int(v.summarized_vote['dunno']) != n_dunno

        if any([wrong_yay, wrong_nay, wrong_nay]):
            votes_with_mistakes[(session, num)] = v
            print(f"Mistakes found for session {session} ({num}):")
            if wrong_yay:
                print(f"    Ja/Oui: Namenlijst {n_yay}; Tabel: {v.summarized_vote['yay']}")
            if wrong_nay:
                print(f"    Nee/Non: Namenlijst: {n_nay}; Tabel: {v.summarized_vote['nay']}")
            if wrong_dunno:
                print(f"    Onthoudingen/Abstentions: Namenlijst: {n_dunno}; Tabel: {v.summarized_vote['dunno']}")
    # except Exception as e:
    #     print(e)
    #     votes_with_error[(session, num)] = v
    #     print(f"{session} ({num}: something wrong with summarized votes parsing!)")
        
print()
if len(votes_with_mistakes) == 0:
    print("All counts check out.")
else:
    print(f"Parsed names do not correspond to parsed summarized votes in table for {len(votes_with_mistakes)} votes ({len(votes_with_mistakes)/len(votes)*100:0.2f}% of all votes).")
    print(f"Wrongly parsed tables which could not be checked: {len(votes_with_error)} out of {len(votes)}")

KeyError: 'yay'

Convenience investigate specific files where the counts do not match:

In [None]:
# take a look at the mistakes
for (sid, vnr), v in votes_with_mistakes.items():
    print()
    print(sid, vnr)
    print(v.srcfile)
    print(v.subject)
    print(v.summarized_vote)


Afwijkend formaat wegens meerderheid nodig in elke taalgroep (f/tot/n) (our bad):
* sessie 0237 stemmingen 14 en 15
* sessie 0236 stemming 28
* sessie 219 stemmingen 17 tem 21
* sessie 0181 stemming 10
* 
Unaniem, 1 te weinig geteld in tabel:
* sessie 0230 stemming 6
* 

# Some initial stats

In [None]:
import json

with open("../../members.json", "r") as fp:
    name_metadata = json.load(fp)
name_metadata

In [None]:
from typing import Counter

from common.text_corrections import fix_name


all_names = []
for vote in votes.values():
    all_names.extend([f"{name}: {[m['party'] for m in name_metadata if fix_name(m['name'], swap_first_last_name=True) == name]}" for name in vote.yay])

Counter(all_names)

In [None]:
from typing import Counter

from common.text_corrections import fix_name


all_names = []
for vote in votes.values():
    all_names.extend([f"{name}: {[m['party'] for m in name_metadata if fix_name(m['name'], swap_first_last_name=True) == name]}" for name in vote.nay])

Counter(all_names)

In [None]:
from typing import Counter

from common.text_corrections import fix_name


all_names = []
for vote in votes.values():
    all_names.extend([f"{name}: {[m['party'] for m in name_metadata if fix_name(m['name'], swap_first_last_name=True) == name]}" for name in vote.dunno])

Counter(all_names)

In [None]:
# Get all unique parties
from common.text_corrections import fix_name, fix_party


name_to_party = dict()
for member in name_metadata:
    party = fix_party(member["party"])
    name = fix_name(member["name"])
    name_to_party[name] = party

set(name_to_party.values())


In [None]:
def fix_name(name):
    name = name.replace("Flahaux André", "Flahaut André")
    name = name.replace(".", "")
    name = name.replace("'", "")
    name = unidecode(name)
    return name

In [None]:
# Mapping from person ID to party ID
from collections import defaultdict

INCLUDE_DUNNO = False

person_to_party = {fix_name(m['name'], swap_first_last_name=True): m["party"] for m in name_metadata}
parties = set(person_to_party.values())

dissenting_party_votes = {party: 0 for party in parties}
party_votes = {party: 0 for party in parties}
lone_wolfs = defaultdict(int)
dissenters = defaultdict(int)

# For each vote, find the majority per party
for vote in votes.values():
    for party in parties:
        yay = [n for n in vote.yay if n and person_to_party[n] == party]
        nay = [n for n in vote.nay if n and person_to_party[n] == party]
        dunno = [n for n in vote.dunno if n and person_to_party[n] == party]

        votecounts = [len(yay), len(nay), len(dunno)]
        party_votes[party] += sum(votecounts)
        if votecounts.count(0) == 2:
            # No dissenting vote, skipping
            continue
        # This is where it gets interesting, at least one person voted against their party!
        # print(party, votecounts, vote.subject)
        # Let's keep track of which people dissent and in which party they are
        majority_idx = votecounts.index(max(votecounts))

        vote_sets_to_count = [yay, nay]
        if INCLUDE_DUNNO:
            vote_sets_to_count.append(dunno)
        
        for i, votelist in enumerate(vote_sets_to_count):
            if i == majority_idx:
                continue
            # We're adding the amount of non-majority votes to the dissenting party votes here
            dissenting_party_votes[party] += len(votelist)
            if len(votelist) == 1:
                lone_wolfs[votelist[0]] += 1
            for name in votelist:
                dissenters[name] += 1

print("Dissenting Parties:")
print("=" * 25)
print(dissenting_party_votes)
for party in parties:
    percentage_dissenting = round(100 * (dissenting_party_votes[party] / party_votes[party]), 2)
    print(f"{party}: {percentage_dissenting}%")
print("\n")

print("Dissenters:")
print("=" * 25)
for name, amount in sorted(dissenters.items(), key=lambda x: x[1], reverse=True):
    print(f"{name} ({person_to_party[name]}): {amount}")
print("\n")

print("Lone Wolfs (they were alone in their dissenting vote):")
print("=" * 25)
for name, amount in sorted(lone_wolfs.items(), key=lambda x: x[1], reverse=True):
    print(f"{name} ({person_to_party[name]}): {amount}")

In [None]:
party_votes