In [2]:
import ast
from collections import Counter, defaultdict
from random import choice

import pandas as pd
from bs4 import BeautifulSoup
from pyppeteer import launch
from pyppeteer_stealth import stealth
from pyppeteer_useragents import USERAGENTS
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from tqdm.auto import tqdm

  from .autonotebook import tqdm as notebook_tqdm


# Links extraction

In [3]:
async def start_driver():
    driver = await launch(headless = False, defaultViewport = {'height':1080, 'width':1920}, headers = {
        'useragent': choice(USERAGENTS)
    })
    return driver


async def destroy_driver(driver):
    await driver.close()


async def get_urls(page, url:str):
    """" returns a list of links to the pages with the results of voting in multi-mandate constituencies """
    await page.goto(url)
    await page.waitFor(2500)
    dt = await page.querySelectorAll('#root > div.res > div.row > div.stats.col-xs-12.col-md-6.col-lg-7 > div > ul > li > ul > li > a')
    dt = [await page.evaluate('(d) => d.getAttribute("href")', d) for d in dt]
    return dt


async def get_powiat_urls(page, url:str):
    """" returns a list of links to the pages with the results of voting in districts (powiats) """
    await page.goto(f'https://wybory.gov.pl{url}')
    await page.waitFor(2500)
    dt = await page.querySelectorAll('#root > div.res > div.row > div.stats.col-xs-12.col-md-6.col-lg-7 > div > ul > li > ul > li > ul > li > a')
    dt = [await page.evaluate('(d) => d.getAttribute("href")', d) for d in dt]
    return dt


async def get_gmina_urls(page, url: str):
    """ returns a list of links to the pages with the results of voting in municipalities (gminas) """
    try:
        await page.goto(f'https://wybory.gov.pl{url}', timeout=50000)
        await page.waitFor(2500)
        dt = await page.querySelectorAll(
            '#root > div.res > div.row > div.stats.col-xs-12.col-md-6.col-lg-7 > div > ul > li > ul > li > ul > li > ul > li > a'
        )
        gmina_urls = [await (await d.getProperty('href')).jsonValue() for d in dt]
        return gmina_urls
    except Exception as e:
        print(f"Error loading {url}: {e}")
        return []
    

async def main(url):
    driver = await start_driver()
    page = await driver.newPage()
    await stealth(page)
    urls = await get_urls(page, url)
    powiat_urls = []
    gmina_urls = []
    
    for url in tqdm(urls):
        powiat_urls.extend(await get_powiat_urls(page, url))
        
    for url in tqdm(powiat_urls):
        gmina_urls.extend(await get_gmina_urls(page, url))
        
    await destroy_driver(driver)
    
    #return powiat_urls
    return gmina_urls

In [4]:
gmina_urls = await main('https://wybory.gov.pl/sejmsenat2023/en/sejm/wynik/pl')

100%|██████████| 41/41 [02:01<00:00,  2.97s/it]
100%|██████████| 382/382 [19:18<00:00,  3.03s/it]


In [5]:
len(gmina_urls)

2520

In [6]:
#remove the links that do not contain the election results tables (Iraq and Lebanon, for some reason)
links_to_remove = {
    'https://sejmsenat2023.pkw.gov.pl/sejmsenat2023/en/sejm/wynik/gm/368',
    'https://sejmsenat2023.pkw.gov.pl/sejmsenat2023/en/sejm/wynik/gm/422'
}

gmina_urls_set = set(gmina_urls)
missing = links_to_remove - gmina_urls_set

links = list(gmina_urls_set - links_to_remove)

if missing:
    print("Not found in list:", *missing)

In [7]:
#load
with open('scraped_poland_local.txt', 'w') as file:
    file.write(str(links))
    
#read
with open('scraped_poland_local.txt', 'r') as file:
    file_contents = file.read()

try:
    links_list = ast.literal_eval(file_contents)
    if not isinstance(links_list, list):
        raise ValueError("The file contents are not a list")
except (SyntaxError, ValueError) as e:
    print(e)
    links_list = []

len(links_list)

2518

# Tables extraction

In [8]:
def get_tables(driver, url: str):
    """ returns a list of <td> rows for the party results table from the given URL """
    driver.get(url)

    #wait until the results table wrapper is visible
    WebDriverWait(driver, 60).until(
        EC.visibility_of_element_located(
            (
                By.XPATH,
                "//*[@class='dataTables_wrapper dt-bootstrap5 no-footer' and @id='DataTables_Table_0_wrapper']",
            )
        )
    )

    soup = BeautifulSoup(driver.page_source, "html.parser")

    table = soup.find(
        "table",
        {
            "class": "table table-bordered table-striped table-hover dataTable no-footer clickable right2 right4"
        },
    )
    if table is None:
        raise RuntimeError("results table not found")

    tableX = []

    for row in table.find_all("tr"):
        tableX.append(row.find_all("td"))

    return tableX


def get_name(driver, url: str) -> str:
    """ returns the name (powiat/gmina) as plain text """
    driver.get(url)

    WebDriverWait(driver, 60).until(
        EC.visibility_of_element_located(
            (
                By.XPATH,
                "//*[@class='stats col-xs-12 col-md-6 col-lg-7']",
            )
        )
    )

    soup = BeautifulSoup(driver.page_source, "html.parser")
    h3_tag = soup.find("div", {"class": "stats col-xs-12 col-md-6 col-lg-7"}).find("h3")

    if h3_tag is None:
        raise RuntimeError("name <h3> not found")

    return h3_tag.text.strip()


def tidy_df(df: pd.DataFrame) -> pd.DataFrame:
    """ converts the raw scraped cells into a tidy DataFrame """
    df.columns = [
        "election_comitee",
        "num_voters",
        "voters_percentage",
        "num_mandates",
        "mandates_percentage",
        "link",
    ]

    df = df.iloc[:-1].copy()

    df["election_comitee"] = df["election_comitee"].apply(
        lambda x: x.text if hasattr(x, "text") else str(x)
    )

    df["num_voters"] = df["num_voters"].apply(
        lambda x: int(x.contents[1].replace("\xa0", "")) if hasattr(x, "contents") else None
    )

    df["voters_percentage"] = df["voters_percentage"].apply(
        lambda x: float(
            x.contents[1].text.rstrip("%").replace(",", ".")
        )
        if hasattr(x, "contents")
        else None
    )

    df["num_mandates"] = df["num_mandates"].apply(
        lambda x: int(x.contents[1]) if hasattr(x, "contents") else None
    )

    df["mandates_percentage"] = df["mandates_percentage"].apply(
        lambda x: x.contents[1].text if hasattr(x, "contents") else None
    )

    df["link"] = df["link"].apply(
        lambda x: (
            f"https://wybory.gov.pl{x.find('a').get('href')}"
            if x and x.find("a")
            else None
        )
    )

    return df


def handle_duplicates(raw_named_frames: dict[str, pd.DataFrame]) -> dict[str, pd.DataFrame]:
    """ returns a flat dict with unique keys """
    unique_map = {}
    for name, df_list in raw_named_frames.items():
        if len(df_list) == 1:
            unique_map[name] = df_list[0]
        else:
            for idx, frame in enumerate(df_list, start=1):
                if idx == 1:
                    unique_map[name] = frame
                else:
                    unique_map[f"{name} {idx}"] = frame
    return unique_map


def calc_absolute_votes(named_df_map: dict[str, pd.DataFrame]) -> dict[str, pd.DataFrame]:
    """
    for each district DataFrame:
      - transpose
      - set first row as header
      - keep only first data row (absolute votes, not percentages etc.)
      - attach district name
    returns dict[name] = 1-row DataFrame with absolute votes
    """
    votes_abs = {}

    for district_name, df in named_df_map.items():
        dftr = df.transpose()

        #first row becomes header
        dftr.columns = dftr.iloc[0]
        dftr = dftr[1:]  # drop header row

        #add district name
        dftr["District"] = district_name

        #reorder so 'District' is the first column
        cols = ["District"] + [c for c in dftr.columns if c != "District"]
        dftr = dftr[cols]

        #keep only the first row (absolute votes row)
        votes_abs[district_name] = dftr.iloc[0:1]

    return votes_abs


def main(links):
    driver = None
    try:
        driver = webdriver.Chrome()
        raw_named_frames = defaultdict(list)

        for url in tqdm(links):
            try:
                district_name = get_name(driver, url)
                table_rows = get_tables(driver, url)  #list[list<td>]

                df_raw = pd.DataFrame(table_rows[1:])  #skip header row
                df_clean = tidy_df(df_raw)

                raw_named_frames[district_name].append(df_clean)

            except Exception as e:
                print(f"[ERROR] while scraping {url}: {e}")
                continue

        #resolve duplicates
        named_df_map = handle_duplicates(raw_named_frames)

        #get per-district 1-row tables of absolute votes
        abs_votes_map = calc_absolute_votes(named_df_map)

        #stack into one big df
        absolute_votes_by_district = pd.concat(
            abs_votes_map.values(), ignore_index=False
        )
        absolute_votes_by_district.reset_index(drop=True, inplace=True)

        return absolute_votes_by_district, raw_named_frames

    finally:
        if driver is not None:
            driver.close()

In [9]:
absolute_votes, raw_named_frames = main(links)

100%|██████████| 2518/2518 [59:06<00:00,  1.41s/it] 


In [10]:
absolute_votes

election_comitee,District,KOMITET WYBORCZY PRAWO I SPRAWIEDLIWOŚĆ,KOALICYJNY KOMITET WYBORCZY KOALICJA OBYWATELSKA PO .N IPL ZIELONI,KOALICYJNY KOMITET WYBORCZY TRZECIA DROGA POLSKA 2050 SZYMONA HOŁOWNI - POLSKIE STRONNICTWO LUDOWE,KOMITET WYBORCZY NOWA LEWICA,KOMITET WYBORCZY KONFEDERACJA WOLNOŚĆ I NIEPODLEGŁOŚĆ,KOMITET WYBORCZY POLSKA JEST JEDNA,KOMITET WYBORCZY BEZPARTYJNI SAMORZĄDOWCY,KOMITET WYBORCZY WYBORCÓW RUCHU DOBROBYTU I POKOJU,KOMITET WYBORCZY NORMALNY KRAJ,KOMITET WYBORCZY RUCH NAPRAWY POLSKI,KOMITET WYBORCZY WYBORCÓW MNIEJSZOŚĆ NIEMIECKA,KOMITET WYBORCZY ANTYPARTIA
0,m. Szczytno,4038,3861,1746,1241,777,195,159,,,,,
1,m. Podkowa Leśna,636,1228,357,258,157,28,37,,,,,
2,gm. Jeżewo,1534,935,600,251,278,55,88,,,,,
3,gm. Raciechowice,1906,574,420,107,325,92,58,,,,,
4,gm. Rogowo,1153,313,400,156,170,40,37,4,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2513,m. Miasteczko Śląskie,1609,964,551,248,259,129,62,,,,,
2514,gm. Gnojno,1541,184,279,112,138,32,44,2,6,,,
2515,gm. Łomazy,1361,259,376,106,244,90,53,13,,,,
2516,gm. Tyczyn,3166,1069,699,295,659,209,128,,,,,


In [11]:
threshold_abs = absolute_votes.drop(labels = ['KOMITET WYBORCZY BEZPARTYJNI SAMORZĄDOWCY',
                                              'KOMITET WYBORCZY POLSKA JEST JEDNA',
                                              'KOMITET WYBORCZY WYBORCÓW RUCHU DOBROBYTU I POKOJU',
                                              'KOMITET WYBORCZY ANTYPARTIA',
                                              'KOMITET WYBORCZY NORMALNY KRAJ',
                                              'KOMITET WYBORCZY RUCH NAPRAWY POLSKI',
                                              'KOMITET WYBORCZY WYBORCÓW MNIEJSZOŚĆ NIEMIECKA'],
                                              axis = 1)
threshold_abs

election_comitee,District,KOMITET WYBORCZY PRAWO I SPRAWIEDLIWOŚĆ,KOALICYJNY KOMITET WYBORCZY KOALICJA OBYWATELSKA PO .N IPL ZIELONI,KOALICYJNY KOMITET WYBORCZY TRZECIA DROGA POLSKA 2050 SZYMONA HOŁOWNI - POLSKIE STRONNICTWO LUDOWE,KOMITET WYBORCZY NOWA LEWICA,KOMITET WYBORCZY KONFEDERACJA WOLNOŚĆ I NIEPODLEGŁOŚĆ
0,m. Szczytno,4038,3861,1746,1241,777
1,m. Podkowa Leśna,636,1228,357,258,157
2,gm. Jeżewo,1534,935,600,251,278
3,gm. Raciechowice,1906,574,420,107,325
4,gm. Rogowo,1153,313,400,156,170
...,...,...,...,...,...,...
2513,m. Miasteczko Śląskie,1609,964,551,248,259
2514,gm. Gnojno,1541,184,279,112,138
2515,gm. Łomazy,1361,259,376,106,244
2516,gm. Tyczyn,3166,1069,699,295,659


In [12]:
threshold_perc = threshold_abs.copy()

cols = [
    'KOMITET WYBORCZY PRAWO I SPRAWIEDLIWOŚĆ',
    'KOALICYJNY KOMITET WYBORCZY KOALICJA OBYWATELSKA PO .N IPL ZIELONI',
    'KOALICYJNY KOMITET WYBORCZY TRZECIA DROGA POLSKA 2050 SZYMONA HOŁOWNI - POLSKIE STRONNICTWO LUDOWE',
    'KOMITET WYBORCZY NOWA LEWICA',
    'KOMITET WYBORCZY KONFEDERACJA WOLNOŚĆ I NIEPODLEGŁOŚĆ'
]

total = threshold_abs[cols].sum(axis=1)

for c in cols:
    threshold_perc[c] = (threshold_abs[c] / total)

threshold_perc

election_comitee,District,KOMITET WYBORCZY PRAWO I SPRAWIEDLIWOŚĆ,KOALICYJNY KOMITET WYBORCZY KOALICJA OBYWATELSKA PO .N IPL ZIELONI,KOALICYJNY KOMITET WYBORCZY TRZECIA DROGA POLSKA 2050 SZYMONA HOŁOWNI - POLSKIE STRONNICTWO LUDOWE,KOMITET WYBORCZY NOWA LEWICA,KOMITET WYBORCZY KONFEDERACJA WOLNOŚĆ I NIEPODLEGŁOŚĆ
0,m. Szczytno,0.346223,0.331047,0.149704,0.106405,0.066621
1,m. Podkowa Leśna,0.241275,0.465857,0.135432,0.097876,0.05956
2,gm. Jeżewo,0.426348,0.259867,0.166759,0.069761,0.077265
3,gm. Raciechowice,0.572029,0.172269,0.12605,0.032113,0.097539
4,gm. Rogowo,0.526004,0.142792,0.182482,0.071168,0.077555
...,...,...,...,...,...,...
2513,m. Miasteczko Śląskie,0.443129,0.265492,0.151749,0.068301,0.07133
2514,gm. Gnojno,0.683673,0.081633,0.12378,0.049689,0.061224
2515,gm. Łomazy,0.580136,0.110401,0.160273,0.045183,0.104007
2516,gm. Tyczyn,0.537704,0.181556,0.118716,0.050102,0.111923


In [None]:
threshold_perc.to_excel("Votes (%) by gminas, Sejm parties.xlsx", index=False)
threshold_abs.to_excel("Votes (total) by gminas, Sejm parties.xlsx", index=False)