At first, we get extra demographics data to improve out predictions and ensure we accurate get the feature importance of spending_data. Obviously, this isn't a foolproof method, but it is better than only using the spending data. New demographics data can be added by modifying the script.

Currently we will use demographical data on nationality, gender, age, and highest completed education.

In [1]:
import os
import urllib.request

"""
URLs for the data.gv.at pages:
Date accessed: 2/10/2025 5:41PM

Bevölkerung - Staatsangehörigkeit
https://www.data.gv.at/katalog/dataset/2a654cd4-60e2-4dc9-ba8d-f7d48343ecf6
Bevölkerung - Geschlecht und Alter laut Volkszählung
https://www.data.gv.at/katalog/dataset/076e497b-e2e2-409f-a32c-009e05c5f957
Bevölkerung - Höchste abgeschlossene Ausbildung
https://www.data.gv.at/katalog/dataset/b80ecd53-cfe8-4c9f-9c47-ee567ce94f45

The download urls can be found on the [URL]#resources tab
"""

urls = [
    "https://e-gov.ooe.gv.at/at.gv.ooe.ogd2-citi/api/file/VAQ5Lkz1NttM9P4OQBxqaw/OOE_Bev_Staatsangehoerigkeit.csv",
    "https://e-gov.ooe.gv.at/at.gv.ooe.ogd2-citi/api/file/tT_GXC5wGdPvjNfN3GD1ow/OOE_Bev_laut_Volkszaehlung_Geschl_Alt5J.csv",
    "https://e-gov.ooe.gv.at/at.gv.ooe.ogd2-citi/api/file/E6urCdCJP4L-ZRbWAhzkiA/OOE_Bev_Hoechste_abgeschl_Ausbildung.csv"
]

os.makedirs("data", exist_ok=True)
for url in urls:
    try:
        filename = os.path.join("data", url.split("/")[-1])
        urllib.request.urlretrieve(url, filename)
        print(f"Downloaded: {filename}")
    except Exception as e:
        print(f"Error downloading {url}: {e}")


Downloaded: data/OOE_Bev_Staatsangehoerigkeit.csv
Downloaded: data/OOE_Bev_laut_Volkszaehlung_Geschl_Alt5J.csv
Downloaded: data/OOE_Bev_Hoechste_abgeschl_Ausbildung.csv


Now we write a scalable pipeline for processing election and demographic data using Apache Spark. We will use Spark DataFrames to handle large datasets efficiently and train a Random Forest classifier to predict the winning party in elections.

At first, we initialize spark.

In [2]:
#!/usr/bin/env python3

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast, udf
from pyspark.sql.types import StringType
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from unidecode import unidecode # https://pypi.org/project/Unidecode/

spark = SparkSession.builder.appName("MunicipalSpendingAndElectionAnalysis").getOrCreate()

We load the existing merged data that we created in `etl.py`. 

In [3]:
merged_data_path = "data/merged_data.csv"
merged_df = (
    spark.read
         .option("header", True)
         .option("inferSchema", True)
         .option("sep", ",")
         .option("enforceSchema", True)
         .csv(merged_data_path)
         .drop("_c0")  # Remove unnecessary `_c0` index col if appears
)

print(f"Loaded {merged_data_path}")

# To join later on demographic data we add a unique ID.
merged_df = merged_df.withColumn("id", F.monotonically_increasing_id()) \
                     .withColumn("Year", F.col("Year").cast("integer")) # ensure yr is int

print("First 5 rows now after adding id (and casting Year):")
merged_df.show(5, truncate=False)

Loaded data/merged_data.csv
First 5 rows now after adding id (and casting Year):
+----------------------+----+---------------+-----------------+---------------+------------------+----------------+---------------+-------------+-------------------+---+
|Municipality_Lowercase|Year|Municipality_ID|Municipality_Name|Wahlberechtigte|abgegebene_Stimmen|gueltige_Stimmen|Wahlbeteiligung|Winning_Party|Spending_Summe     |id |
+----------------------+----+---------------+-----------------+---------------+------------------+----------------+---------------+-------------+-------------------+---+
|linz                  |2008|40101          |Linz             |142125         |96209             |94496           |67.69          |SPO          |1.203534975E8      |0  |
|steyr                 |2008|40201          |Steyr            |28962          |20765             |20335           |71.7           |SPO          |3.377888836328125E7|1  |
|wels                  |2008|40301          |Wels             |40994 

Then we load our demographics data.

We calculate the ratio of Austrians per total population, the ratio of university graduates per total pop., and the ratio of 65+ population amongst the total population. If we want to use other ratios, we can adjust the respective part of the script.

In [4]:
def add_muni_col(df, source_col):
    """
    helper function to create a "Municipality_Lowercase" col from a source col
    """
    return df.withColumn(
        "Municipality_Lowercase",
        F.regexp_replace(F.lower(F.col(source_col)), " ", "-")
    )

# --- A) NATIONALITY (Staatsangehoerigkeit) ---
nation_path = "data/OOE_Bev_Staatsangehoerigkeit.csv"
df_nation = (
    spark.read
         .option("header", True)
         .option("sep", ";")
         .option("encoding", "latin1")
         .option("inferSchema", True)
         .csv(nation_path)
)
df_nation = add_muni_col(df_nation, "LAU2_NAME") # using helper. LAU2 = municipality name
# rename cols for clarity
df_nation = df_nation.withColumnRenamed("YEAR", "orig_year") \
                     .withColumnRenamed("NATION_AUSTRIA", "Nation_Austria") \
                     .withColumnRenamed("NATION_TOTAL", "Nation_Total")
# cast to numeric (float) as needed
df_nation = df_nation.withColumn("Nation_Austria", F.col("Nation_Austria").cast("float")) \
                     .withColumn("Nation_Total", F.col("Nation_Total").cast("float")) \
                     .withColumn("orig_year", F.col("orig_year").cast("integer"))

# calculate ratio of austrians per population per municipality
df_nation = df_nation.withColumn("Austria_Ratio", F.col("Nation_Austria") / F.col("Nation_Total"))
df_nation = df_nation.select("Municipality_Lowercase", "orig_year", "Austria_Ratio")

print("Sample rows from Nationality data:")
df_nation.show(3, truncate=False)

# --- B) EDUCATION LEVEL (Höchste_abgeschl_Ausbildung) ---
edu_path = "data/OOE_Bev_Hoechste_abgeschl_Ausbildung.csv"
df_edu = (
    spark.read
         .option("header", True)
         .option("sep", ";")
         .option("encoding", "latin1")
         .option("inferSchema", True)
         .csv(edu_path)
)
df_edu = add_muni_col(df_edu, "COMMUNE_NAME") # using helper
# rename cols for clarity
df_edu = df_edu.withColumnRenamed("YEAR", "orig_year") \
               .withColumnRenamed("EDU_UNIVERSITY_FACHHOCHSCHULE", "Uni_Grads") \
               .withColumnRenamed("EDU_TOTAL", "Edu_Total")
# cast to numeric (float) as needed
df_edu = df_edu.withColumn("Uni_Grads", F.col("Uni_Grads").cast("float")) \
               .withColumn("Edu_Total", F.col("Edu_Total").cast("float")) \
               .withColumn("orig_year", F.col("orig_year").cast("integer"))

# calculate ratio of uni grads per population per municipality
df_edu = df_edu.withColumn("Uni_Grad_Ratio", F.col("Uni_Grads") / F.col("Edu_Total"))
df_edu = df_edu.select("Municipality_Lowercase", "orig_year", "Uni_Grad_Ratio")

print("Sample rows from Education Level data:")
df_edu.show(3, truncate=False)

# --- C) AGE / POPULATION (Bevölkerung laut Volkszählung) ---
age_path = "data/OOE_Bev_laut_Volkszaehlung_Geschl_Alt5J.csv"
df_age = (
    spark.read
         .option("header", True)
         .option("sep", ";")
         .option("encoding", "latin1")
         .option("inferSchema", True)
         .csv(age_path)
)
df_age = add_muni_col(df_age, "LAU2_NAME") # using helper

# For our analysis, we will calculate the ratio of 65+ / total population.
# If we want to do some other ratio, adjust this code.

# columns for 65+
age_cols_65plus = [
    "AGE_65_TO_69", "AGE_70_TO_74", "AGE_75_TO_79",
    "AGE_80_TO_84", "AGE_85_TO_89", "AGE_90_PLUS"
]

# ensure correct types
for c in age_cols_65plus:
    df_age = df_age.withColumn(c, F.when(F.col(c).isNull(), 0)
                                       .otherwise(F.col(c).cast("float")))
df_age = df_age.withColumn("AGE_TOTAL", F.when(F.col("AGE_TOTAL").isNull(), 0)
                                         .otherwise(F.col("AGE_TOTAL").cast("float")))
# create row-wise sum of the 65+ population
df_age = df_age.withColumn("POP_65plus", sum([F.col(c) for c in age_cols_65plus]))
df_age = df_age.withColumnRenamed("YEAR", "orig_year") \
               .withColumn("orig_year", F.col("orig_year").cast("integer"))

# group by municipality and orig_year
df_age = df_age.groupBy("Municipality_Lowercase", "orig_year") \
               .agg(
                   F.sum("POP_65plus").alias("POP_65plus"),
                   F.sum("AGE_TOTAL").alias("AGE_TOTAL")
               )
df_age = df_age.withColumn("Pop_65plus_Ratio", F.col("POP_65plus") / F.col("AGE_TOTAL"))
df_age = df_age.select("Municipality_Lowercase", "orig_year", "Pop_65plus_Ratio")

print("Sample rows from Age data:")
df_age.show(3, truncate=False)

print("Done")

Sample rows from Nationality data:
+----------------------+---------+------------------+
|Municipality_Lowercase|orig_year|Austria_Ratio     |
+----------------------+---------+------------------+
|linz                  |2023     |0.7072245498811006|
|steyr                 |2023     |0.7656833359625598|
|wels                  |2023     |0.6647265152327416|
+----------------------+---------+------------------+
only showing top 3 rows

Sample rows from Education Level data:
+----------------------+---------+-------------------+
|Municipality_Lowercase|orig_year|Uni_Grad_Ratio     |
+----------------------+---------+-------------------+
|linz                  |2022     |0.15113934789550695|
|steyr                 |2022     |0.08382775624291328|
|wels                  |2022     |0.08342151126602763|
+----------------------+---------+-------------------+
only showing top 3 rows

Sample rows from Age data:
+----------------------------+---------+-------------------+
|Municipality_Lowercase  

In [5]:
# dimensions of dataframes
print((merged_df.count(), len(merged_df.columns)))
print((df_nation.count(), len(df_nation.columns)))
print((df_edu.count(), len(df_edu.columns)))
print((df_age.count(), len(df_age.columns)))

(1240, 11)
(4818, 3)
(3066, 3)
(2628, 3)



Then, since demographic data isn't available for every election year, we match each row with closest (in terms of date) available demographic data.

While this isn't a perfect solution, it is good enough for our purposes. There are more accurate solutions (e.g. extrapolating data), but for the sake of scalability, and for avoiding unnecessary complexity, we will not do such methods. The time complexity from the initial join is roughly O(N), where N is the number of rows in the main merged dataset.

This efficiency is achieved by broadcasting the relatively small demographic datasets, which we will use to join the closest nationality, education, and age ratios to the election data. We will assume the nation, edu, and age dataset fits into memory, so we will use a broadcast join. Thus, the join operation essentially scales linearly with N. The subsequent window function (which orders demographic records per unique id) contributes only a constant factor, keeping the overall complexity at O(N). If the demographic data does not fit into memory, the code can be modified to use a standard left join (O(N log N) due to sorting) or an alternative method like a hash join (O(N) when feasible).

Examples of methods if demographic data does not fit into memory:
```
# left join
joined = merged.join(demo_df, on="Municipality_Lowercase", how="left")

# disable auto-broadcasting and perform a hash join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
joined = merged.join(demo_df.hint("shuffle_hash"), on="Municipality_Lowercase", how="left")
```
Broadcast join docs: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.broadcast.html

In [6]:
def closest_demographic_join(merged, demo_df, ratio_col):
    """
    Joins demographic data to the main (merged) dataset by selecting the closest available year 
    for each municipality.

    The function broadcasts the demographic dataset to optimize join performance.
    Performs a left join on `Municipality_Lowercase` to merge demographic data.
    Computes the absolute difference between `Year` (from `merged`) and `orig_year` (from `demo_df`).
    Uses a window function to rank rows based on the closest year difference, 
        with ties resolved by preferring the most recent `orig_year`.
    Selects the best match (i.e., the row with rank 1) and returns a DataFrame 
       containing `id` and the requested demographic ratio column.

    :param merged: pyspark.sql.DataFrame election/spendings dataset w/ 
        The main DataFrame containing the primary dataset with a "Year" col.
    :param demo_df: pyspark.sql.DataFrame The demographic dataset containing "Municipality_Lowercase, "orig_year", and the ratio col.
    ratio_col : str Name of the col in demo_df that contains the demographic ratio to be joined.

    :return: pyspark.sql.DataFrame of "id" and "ratio col"
    
    Note:
    - assumes `merged` and `demo_df` contain "Municipality_Lowercase".
    """
    # broadcast demographics
    demo_df = broadcast(demo_df)

    # calculate year diff
    joined = merged.join(demo_df, on="Municipality_Lowercase", how="left") \
                   .withColumn("year_diff", F.abs(F.col("Year") - F.col("orig_year")))

    # use window func instead to find closest year
    window_spec = Window.partitionBy("id").orderBy(F.col("year_diff").asc(), F.col("orig_year").desc())

    # select closest match per id
    closest = joined.withColumn("rank", F.row_number().over(window_spec)) \
                    .filter(F.col("rank") == 1) \
                    .select("id", ratio_col)
    return closest

# before joining, we ensure no encoding issues will arise
unidecode_udf = udf(lambda x: unidecode(x) if x else x, StringType())

merged_df = (
    merged_df
    .withColumn("Municipality_Lowercase", unidecode_udf(F.col("Municipality_Lowercase")))
    .withColumn("Municipality_Name", unidecode_udf(F.col("Municipality_Name")))
)
df_nation = df_nation.withColumn("Municipality_Lowercase", unidecode_udf(F.col("Municipality_Lowercase")))
df_edu = df_edu.withColumn("Municipality_Lowercase", unidecode_udf(F.col("Municipality_Lowercase")))
df_age = df_age.withColumn("Municipality_Lowercase", unidecode_udf(F.col("Municipality_Lowercase")))

# apply
nation_closest = closest_demographic_join(merged_df, df_nation, "Austria_Ratio")
edu_closest = closest_demographic_join(merged_df, df_edu, "Uni_Grad_Ratio")
age_closest = closest_demographic_join(merged_df, df_age, "Pop_65plus_Ratio")

print("Sample of nation_closest join results:")
nation_closest.show(3, truncate=False)
print("Sample of edu_closest join results:")
edu_closest.show(3, truncate=False)
print("Sample of age_closest join results:")
age_closest.show(3, truncate=False)

# join
merged_df = (
    merged_df
    .join(nation_closest, on="id", how="left")
    .join(edu_closest, on="id", how="left")
    .join(age_closest, on="id", how="left")
)
print(f"Row count after joining demographics: {merged_df.count()}")

print("Sample merged_df after joining demographics:")
merged_df.show(5, truncate=False)

Sample of nation_closest join results:
+---+------------------+
|id |Austria_Ratio     |
+---+------------------+
|26 |0.9342235410484668|
|29 |0.9445953286257469|
|474|0.9523026315789473|
+---+------------------+
only showing top 3 rows

Sample of edu_closest join results:
+---+--------------------+
|id |Uni_Grad_Ratio      |
+---+--------------------+
|26 |0.025025025025025027|
|29 |0.05830849478390462 |
|474|0.03536977491961415 |
+---+--------------------+
only showing top 3 rows

Sample of age_closest join results:
+---+-------------------+
|id |Pop_65plus_Ratio   |
+---+-------------------+
|26 |0.15015015015015015|
|29 |0.17343517138599107|
|474|0.19614147909967847|
+---+-------------------+
only showing top 3 rows

Row count after joining demographics: 1240
Sample merged_df after joining demographics:
+---+----------------------+----+---------------+-----------------+---------------+------------------+----------------+---------------+-------------+------------------+------------

Afterwards, we will clean and prepare the data for machine learning.

In [7]:
# Handle inf/-inf before filtering
merged_df = merged_df.replace(float("inf"), None).replace(float("-inf"), None)

# Drop rows with missing key columns
cols_required = ["Spending_Summe", "Wahlbeteiligung", "Winning_Party",
                 "Austria_Ratio", "Uni_Grad_Ratio", "Pop_65plus_Ratio"]
merged_df = merged_df.dropna(subset=cols_required)


# ensure correct types
merged_df = merged_df.withColumn("Spending_Summe", F.col("Spending_Summe").cast("float")) \
                     .withColumn("Wahlbeteiligung", F.col("Wahlbeteiligung").cast("float")) \
                     .withColumn("Austria_Ratio", F.col("Austria_Ratio").cast("float")) \
                     .withColumn("Uni_Grad_Ratio", F.col("Uni_Grad_Ratio").cast("float")) \
                     .withColumn("Pop_65plus_Ratio", F.col("Pop_65plus_Ratio").cast("float"))

print(f"Number of rows after cleaning: {merged_df.count()}")
print("Spark: DataFrame Schema:")
merged_df.printSchema()


Number of rows after cleaning: 1051
Spark: DataFrame Schema:
root
 |-- id: long (nullable = true)
 |-- Municipality_Lowercase: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Municipality_ID: integer (nullable = true)
 |-- Municipality_Name: string (nullable = true)
 |-- Wahlberechtigte: integer (nullable = true)
 |-- abgegebene_Stimmen: integer (nullable = true)
 |-- gueltige_Stimmen: integer (nullable = true)
 |-- Wahlbeteiligung: float (nullable = true)
 |-- Winning_Party: string (nullable = true)
 |-- Spending_Summe: float (nullable = true)
 |-- Austria_Ratio: float (nullable = true)
 |-- Uni_Grad_Ratio: float (nullable = true)
 |-- Pop_65plus_Ratio: float (nullable = true)



Then, we will build the model pipeline. First, we get the winning partys. These are the partys that won in the considered elections. Therefore only these will be used for predictions. 

It is worth noting that while in a closed/stable party systems (like the US), this is fine, as realistically, the number and name of the parties stays roughly consistent. In more open/fluid party systems, like many european ones, where parties frequently emerge/disappear/rebrand, this might yield inaccurate results.

However, as accurately predicting such events (e.g. party dissolution, party founding, previously unpopular parties gaining sudden popularity) is incredibly complex (or even impossible) and requires way more social/cultural/political/historical data, this is out of scope for this project.

Therefore we assume such events do not occur and the currently popular parties (for our current data ÖVP, SPÖ, FPÖ) continue to dominate in the predicted future election.

In [8]:
party_labels = merged_df.select("Winning_Party").distinct().rdd.flatMap(lambda x: x).collect()

# convert party names into numeric labels.
label_indexer = StringIndexer(
    inputCol="Winning_Party",
    outputCol="label",
    handleInvalid="skip"
).setStringOrderType("alphabetDesc")

# we assemble a single feature vector from selected columns
assembler = VectorAssembler(
    inputCols=[
        "Spending_Summe",
        "Wahlbeteiligung",
        "Austria_Ratio",
        "Uni_Grad_Ratio",
        "Pop_65plus_Ratio",
    ],
    outputCol="features",
    handleInvalid="skip"
)

# we train a random forest classifier with 50 trees and a max depth of 5.
# if we get out of memory error, we can safely decrease the number of trees.
# we will assume the memory can handle 50 trees.
rf_classifier = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=50,
    maxDepth=5,
    seed=42
)

# we create the spark pipeline for ML training
pipeline = Pipeline(stages=[label_indexer, assembler, rf_classifier])

After this, we train the model and evaluate it. We split the data, predict the future election results, then evaluate the model accuracy, then extract the feature importances.

In [9]:
# split 0.8/0.2
train_df, test_df = merged_df.randomSplit([0.8, 0.2], seed=1)
model = pipeline.fit(train_df)

# for debug, display winning party - label index pairs.
party_label_list = [(party, str(index)) for index, party in enumerate(model.stages[0].labels)] # model.stages[0].labels = label names
party_label_df = spark.createDataFrame(party_label_list, ["Winning_Party", "Label"])
party_label_df.show(truncate=False)

# get predictions on the test set
predictions = model.transform(test_df)
predictions.select("Municipality_Name", "Year", "Winning_Party", "prediction") \
           .show(5, truncate=False)

# evaluate classification accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
print(f"\nTest Accuracy = {evaluator.evaluate(predictions) * 100:.2f}%\n")

# get the feature importances
importances = model.stages[-1].featureImportances
feature_cols = assembler.getInputCols()

# print out the "importances" in a readable format. (for raw data: print(importances))
for idx, feature_name in enumerate(feature_cols):
    print(f"{feature_name}: {importances[idx]:.4f}")

+-------------+-----+
|Winning_Party|Label|
+-------------+-----+
|SPO          |0    |
|OVP          |1    |
|FPO          |2    |
+-------------+-----+

+-----------------------+----+-------------+----------+
|Municipality_Name      |Year|Winning_Party|prediction|
+-----------------------+----+-------------+----------+
|Bachmanning            |2019|OVP          |1.0       |
|Edt bei Lambach        |2008|SPO          |1.0       |
|Dietach                |2019|OVP          |1.0       |
|Schenkenfelden         |2008|OVP          |1.0       |
|Eggendorf im Traunkreis|2013|OVP          |1.0       |
+-----------------------+----+-------------+----------+
only showing top 5 rows


Test Accuracy = 75.36%

Spending_Summe: 0.3874
Wahlbeteiligung: 0.2048
Austria_Ratio: 0.1713
Uni_Grad_Ratio: 0.1333
Pop_65plus_Ratio: 0.1032


**Discussion:**

Our model predicted with ~75% accuracy the winning party. We consider this a good accuracy, considering we have only little data on previous elections.

We can also see that the sum of education spending is the most important feature our of the 5 we considered. This raises further questions on what sub-category of education spending is the most important in predicting election results. We will do this analysis in another script (`predict_winner_subcategories.ipynb`).

Additionally, the voter turnout (Wahlbeteiligung) seems also relatively significant. This raises the question how voter turnout influences election results.

In [10]:
spark.stop()