In [56]:
import csv
import io
import requests
import pandas as pd
import json

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.conf import SparkConf

from retrieve_data import DataRetrieverOverAPI

DAWUM_API_URL = "https://api.dawum.de/"

In [57]:
spark = SparkSession.builder \
                    .appName("Spark Basics") \
                    .getOrCreate()

In [58]:
dawum_api = DataRetrieverOverAPI(DAWUM_API_URL)
data = dawum_api.data

with open("file.json", "w") as f:
    json.dump(data, f)

In [59]:
dawum_database = data['Database']
dawum_parliaments = data['Parliaments']
dawum_institutes = data['Institutes']
dawum_taskers = data['Taskers']
dawum_methods = data['Methods']
dawum_parties = data['Parties']
dawum_surveys = data['Surveys']

print(type(dawum_surveys))

<class 'dict'>


In [60]:
df_database = spark.read.json(spark.sparkContext.parallelize([dawum_database]))
df_database = df_database.withColumn("license_name", F.col("License.Name")) \
                    .withColumn("license_shortcut", F.col("License.Shortcut")) \
                    .withColumn("license_link", F.col("License.Link")) \
                    .drop("License") \
                    .withColumnRenamed("Author", "author") \
                    .withColumnRenamed("Last_Update", "last_update") \
                    .withColumnRenamed("Publisher", "publisher") \
                    
df_database.show()


+--------------------+--------------------+---------+--------------------+----------------+--------------------+
|              author|         last_update|publisher|        license_name|license_shortcut|        license_link|
+--------------------+--------------------+---------+--------------------+----------------+--------------------+
|Dipl.-Jur. Philip...|2025-02-22T12:45:...| dawum.de|ODC Open Database...|        ODC-ODbL|https://opendatac...|
+--------------------+--------------------+---------+--------------------+----------------+--------------------+



In [61]:
rdd = spark.sparkContext.parallelize(dawum_parliaments.items())  # Parallelize the dictionary items

df_parliaments = rdd.map(lambda x: (x[0], x[1]['Name'], x[1]['Shortcut'], x[1]['Election'])).toDF(["parliament_id", "parliament_name", "parliament_shortcut", "parliament_election"])

df_parliaments.show()

+-------------+--------------------+--------------------+--------------------+
|parliament_id|     parliament_name| parliament_shortcut| parliament_election|
+-------------+--------------------+--------------------+--------------------+
|            0|           Bundestag|           Bundestag|      Bundestagswahl|
|            6|Hamburgische Bürg...|             Hamburg|Bürgerschaftswahl...|
|            9|Niedersächsischer...|       Niedersachsen|Landtagswahl in N...|
|            1|Landtag von Baden...|   Baden-Württemberg|Landtagswahl in B...|
|            8|Landtag von Meckl...|Mecklenburg-Vorpo...|Landtagswahl in M...|
|           16|Thüringischer Lan...|           Thüringen|Landtagswahl in T...|
|            4|Brandenburgischer...|         Brandenburg|Landtagswahl in B...|
|           15|Landtag von Schle...|  Schleswig-Holstein|Landtagswahl in S...|
|           14|Landtag von Sachs...|      Sachsen-Anhalt|Landtagswahl in S...|
|           13| Sächsischer Landtag|             Sac

In [62]:
rdd = spark.sparkContext.parallelize(dawum_institutes.items())  # Parallelize the dictionary items
df_institutes = rdd.map(lambda x: (x[0], x[1]["Name"])).toDF(["institute_id", "institute_name"])
df_institutes.show()

+------------+--------------------+
|institute_id|      institute_name|
+------------+--------------------+
|           5|                INSA|
|          24|Institut Wahlkrei...|
|          17|               Ipsos|
|           9|          Allensbach|
|           2|               Forsa|
|          13|              YouGov|
|          16|               Civey|
|           6|Forschungsgruppe ...|
|           1|     Infratest dimap|
|           4|                 GMS|
|          22|            pollytix|
|           3|      Verian (Emnid)|
|           7|Trend Research Ha...|
|          25|          IFM Berlin|
|          21|      Policy Matters|
|          18| Universität Hamburg|
|          15|         Mentefactum|
|          20|            IM Field|
|          12|              uniQma|
|          23|           Conoscope|
+------------+--------------------+
only showing top 20 rows



In [63]:
rdd = spark.sparkContext.parallelize(dawum_taskers.items())
df_taskers = rdd.map(lambda x: (x[0], x[1]["Name"])).toDF(["tasker_id", "tasker_name"])
df_taskers.show()

+---------+--------------------+
|tasker_id|         tasker_name|
+---------+--------------------+
|        4|                BILD|
|       97|Institut Wahlkrei...|
|       13|               Ipsos|
|        6|Frankfurter Allge...|
|       63|          RTL / n-tv|
|       43|              YouGov|
|      122|           Seven.One|
|        5|  ZDF-Politbarometer|
|       11|                 ARD|
|        7|                 GMS|
|       56| Schwäbische Zeitung|
|        3|     BILD am Sonntag|
|       10|     ARD-Tagesthemen|
|       67|            pollytix|
|       38|      Ostsee-Zeitung|
|       80|FUNKE Medien Thür...|
|       39|                 NDR|
|       64|               FOCUS|
|      112|Märkische Allgeme...|
|      120|Radio Hamburg / D...|
+---------+--------------------+
only showing top 20 rows



In [64]:
rdd = spark.sparkContext.parallelize(dawum_methods.items())
df_methods = rdd.map(lambda x: (x[0], x[1]["Name"])).toDF(["method_id", "method_name"])
df_methods.show()

+---------+----------------+
|method_id|     method_name|
+---------+----------------+
|        3|          Online|
|        2|      Persönlich|
|        1|     Telefonisch|
|        4|Telefon & Online|
|        0|       Unbekannt|
+---------+----------------+



In [65]:
rdd = spark.sparkContext.parallelize(dawum_parties.items())
df_parties = rdd.map(lambda x: (x[0], x[1]["Name"], x[1]["Shortcut"])).toDF(["party_id", "party_name", "party_shortcut"])
df_parties.show(df_parties.count())


+--------+--------------------+----------------+
|party_id|          party_name|  party_shortcut|
+--------+--------------------+----------------+
|       7|Alternative für D...|             AfD|
|      11|   Bayernpartei e.V.|    Bayernpartei|
|      14|Brandenburger Ver...|          BVB/FW|
|       4|Bündnis 90/Die Gr...|           Grüne|
|      23|Bündnis Sahra Wag...|             BSW|
|      21|bunt.saar – sozia...|       bunt.saar|
|      22|Bürger für Thüringen|            BfTh|
|      16|       Bürger in Wut|             BIW|
|       1|Christlich Demokr...|         CDU/CSU|
|     101|Christlich Demokr...|             CDU|
|     102|Christlich-Sozial...|             CSU|
|       5|           Die Linke|           Linke|
|      17|Familienpartei De...|         Familie|
|       3|Freie Demokratisc...|             FDP|
|       8|        Freie Wähler|    Freie Wähler|
|       9|Nationaldemokrati...|             NPD|
|      12|Ökologisch-Demokr...|             ÖDP|
|      13|Partei für

In [112]:
df = []
for key, value in dawum_surveys.items():
    row = {'survey_id': key}
    for k, v in value.items():
        if k == "Results":
            for party_id, result in v.items():
                row['party_id'] = party_id
                row['survey_result_by_percent'] = float(result)
                row['survey_publish_date'] = value['Date']
                row['institute_id'] = int(value['Institute_ID'])
                row['parliament_id'] = int(value['Parliament_ID'])
                row['method_id'] = int(value['Method_ID'])
                row['survey_start_date'] = value['Survey_Period']['Date_Start']
                row['survey_end_date'] = value['Survey_Period']['Date_End']
                row['total_surveyees'] = int(value['Surveyed_Persons'])
                row['tasker_id'] = int(value['Tasker_ID'])
                
                df.append(row)
                row = {'survey_id': key}

df_surveys = spark.createDataFrame(spark.sparkContext.parallelize(df))

df_surveys = df_surveys.select(F.col("survey_id"), 
                               F.col("party_id"), 
                               F.col("institute_id"),
                               F.col("parliament_id"),
                               F.col("method_id"),
                               F.col("tasker_id"),
                               F.col("survey_result_by_percent"),
                               F.col("survey_publish_date"),
                               F.col("survey_start_date"), 
                               F.col("survey_end_date"),
                               F.col("total_surveyees") 
                               )

df_surveys.coalesce(1).write.mode("overwrite").option("header", "true").csv("df_surveys.csv")


dfs = df_surveys.join(df_parliaments, on="parliament_id", how="full").join(df_parties, on="party_id", how="full")
#dfs.show()
surveys_by_parliament = []

unique_parliament_ids = df_parliaments.select("parliament_id").distinct().rdd.flatMap(lambda x: x).collect()
unique_parliament_names = df_parliaments.select("parliament_name").distinct().rdd.flatMap(lambda x: x).collect()

for i, e in enumerate(surveys_by_parliament):
    print(unique_parliament_ids[i], df_surveys.select("parliament_id").distinct().count())

for parliament_id in unique_parliament_ids:
    sur_df = dfs.filter(dfs.parliament_id == parliament_id)
    surveys_by_parliament.append(sur_df)

for i, e in enumerate(surveys_by_parliament):
    print(unique_parliament_ids[i], unique_parliament_names[i], e.orderBy("survey_publish_date", ascending=True).count())

#df_surveys = df_surveys.fillna({"survey_result_by_percent": 0})
#df_surveys.show(90)


25/02/25 01:14:43 WARN TaskSetManager: Stage 4137 contains a task of very large size (1715 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

0 Hamburgische Bürgerschaft 16487
6 Bundestag 294
9 Niedersächsischer Landtag 477
1 Landtag von Baden-Württemberg 447
8 Landtag von Mecklenburg-Vorpommern 337
16 Thüringischer Landtag 634
15 Brandenburgischer Landtag 327
4 Landtag von Schleswig-Holstein 569
13 Sächsischer Landtag 505
14 Landtag von Sachsen-Anhalt 222
7 Landtag von Nordrhein-Westfalen 522
10 Hessischer Landtag 626
11 Landtag von Rheinland-Pfalz 397


                                                                                

3 Berliner Abgeordnetenhaus 872
5 Europäisches Parlament 232


                                                                                

17 Bremische Bürgerschaft 462
12 Saarländischer Landtag 214
2 Bayerischer Landtag 1283


In [67]:
import pandas as pd

# Your input dictionary
data = {
    '1000': {'Date': '2018-08-31',
             'Institute_ID': '16',
             'Method_ID': '0',
             'Parliament_ID': '7',
             'Results': {'0': 3.9,
                         '101': 31.1,
                         '2': 23.9,
                         '3': 7.1,
                         '4': 13.4,
                         '5': 7.9,
                         '7': 12.7},
             'Survey_Period': {'Date_End': '2018-08-30',
                               'Date_Start': '2018-08-14'},
             'Surveyed_Persons': '4532',
             'Tasker_ID': '76'},
    '1001': {'Date': '2018-08-31',
             'Institute_ID': '5',
             'Method_ID': '0',
             'Parliament_ID': '13',
             'Results': {'0': 4,
                         '101': 28,
                         '2': 11,
                         '3': 7,
                         '4': 7,
                         '5': 18,
                         '7': 25},
             'Survey_Period': {'Date_End': '2018-08-30',
                               'Date_Start': '2018-08-27'},
             'Surveyed_Persons': '1040',
             'Tasker_ID': '4'}
}

survey_df = []
for key, value in data.items():
    row = {'survey_id': key}
    for k, v in value.items():
        if k == "Results":
            for party_id, result in v.items():
                row['party_id'] = party_id
                row['survey_result_by_percent'] = result
                row['survey_publish_date'] = value['Date']
                row['institute_id'] = value['Institute_ID']
                row['parliament_id'] = value['Parliament_ID']
                row['method_id'] = value['Method_ID']
                row['survey_start_date'] = value['Survey_Period']['Date_Start']
                row['survey_end_date'] = value['Survey_Period']['Date_End']
                row['total_surveyees'] = value['Surveyed_Persons']
                row['tasker_id'] = value['Tasker_ID']
                
                survey_df.append(row)
                row = {'survey_id': key}
            break


survey_df = pd.DataFrame(survey_df)

# Show the resulting DataFrame
print(survey_df)

   survey_id party_id  survey_result_by_percent survey_publish_date  \
0       1000        0                       3.9          2018-08-31   
1       1000      101                      31.1          2018-08-31   
2       1000        2                      23.9          2018-08-31   
3       1000        3                       7.1          2018-08-31   
4       1000        4                      13.4          2018-08-31   
5       1000        5                       7.9          2018-08-31   
6       1000        7                      12.7          2018-08-31   
7       1001        0                       4.0          2018-08-31   
8       1001      101                      28.0          2018-08-31   
9       1001        2                      11.0          2018-08-31   
10      1001        3                       7.0          2018-08-31   
11      1001        4                       7.0          2018-08-31   
12      1001        5                      18.0          2018-08-31   
13    

In [68]:
df_surveys.join(df_parliaments, on="parliament_id" , how="full").join(df_parties, on="party_id", how="full").show()


+--------+-------------+---------+------------+---------+---------+------------------------+-------------------+-----------------+---------------+---------------+---------------+-------------------+-------------------+-----------------+--------------+
|party_id|parliament_id|survey_id|institute_id|method_id|tasker_id|survey_result_by_percent|survey_publish_date|survey_start_date|survey_end_date|total_surveyees|parliament_name|parliament_shortcut|parliament_election|       party_name|party_shortcut|
+--------+-------------+---------+------------+---------+---------+------------------------+-------------------+-----------------+---------------+---------------+---------------+-------------------+-------------------+-----------------+--------------+
|       0|            0|     3774|           5|        3|        4|                     5.0|         2025-02-22|       2025-02-20|     2025-02-21|           2005|      Bundestag|          Bundestag|     Bundestagswahl|sonstige Parteien|      So

In [69]:
print(df_database.columns)
print(df_parliaments.columns)
print(df_institutes.columns)
print(df_taskers.columns)
print(df_methods.columns)
print(df_parties.columns)
print(df_surveys.columns)

column_database = ['author', 'last_update', 'publisher', 'license_name', 'license_shortcut', 'license_link']
column_parliaments = ['parliament_id', 'parliament_name', 'parliament_shortcut', 'parliament_election']
column_institutes = ['institute_id', 'institute_name']
column_taskers = ['tasker_id', 'tasker_name']
column_methods = ['method_id', 'method_name']
column_parties = ['party_id', 'party_name', 'party_shortcut']
column_surveys = ['survey_id', 'parliament_id', 'institute_id', 'tasker_id', 'method_id', 'party_id', 'survey_result_by_percent', 'survey_publish_date', 'survey_start_date', 'survey_end_date', 'total_surveyees']

['author', 'last_update', 'publisher', 'license_name', 'license_shortcut', 'license_link']
['parliament_id', 'parliament_name', 'parliament_shortcut', 'parliament_election']
['institute_id', 'institute_name']
['tasker_id', 'tasker_name']
['method_id', 'method_name']
['party_id', 'party_name', 'party_shortcut']
['survey_id', 'party_id', 'institute_id', 'parliament_id', 'method_id', 'tasker_id', 'survey_result_by_percent', 'survey_publish_date', 'survey_start_date', 'survey_end_date', 'total_surveyees']


In [70]:
import sqlite3
conn = sqlite3.connect('./database/fun.db')

dfp_database = df_database.toPandas()
dfp_parliaments = df_parliaments.toPandas()
dfp_institutes = df_institutes.toPandas()
dfp_taskers = df_taskers.toPandas()
dfp_methods = df_methods.toPandas()
dfp_parties = df_parties.toPandas()
dfp_surveys = df_surveys.toPandas()

dfp_database.to_sql("", con=conn, if_exists="replace", index=False)
dfp_parliaments.to_sql("", con=conn, if_exists="replace", index=False)
dfp_institutes.to_sql("", con=conn, if_exists="replace", index=False)
dfp_taskers.to_sql("", con=conn, if_exists="replace", index=False)
dfp_methods.to_sql("", con=conn, if_exists="replace", index=False)
dfp_parties.to_sql("", con=conn, if_exists="replace", index=False)
dfp_surveys.to_sql("", con=conn, if_exists="replace", index=False)

OperationalError: unable to open database file

In [None]:
pd.read_sql("select * from database_information;", con=conn)

Unnamed: 0,author,last_update,publisher,license_name,license_shortcut,license_link
0,Dipl.-Jur. Philipp Guttmann,2025-02-18T13:00:31+01:00,dawum.de,ODC Open Database License,ODC-ODbL,https://opendatacommons.org/licenses/odbl/1-0/
