In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType, DateType
from pyspark.sql.functions import col
import requests
import json
import psycopg2
from psycopg2.extras import execute_values

In [2]:
def fetch_and_process_data():
    api = "https://www.pegelonline.wsv.de/webservices/rest-api/v2/stations.json?includeTimeseries=true&hasTimeseries=WV&includeForecastTimeseries=true"
    responses = requests.get(api).json()
    stations_data = []
    timeseries_data = []

    for station in responses:
        station_entry = {
            "uuid": station["uuid"],
            "number": station["number"],
            "shortname": station["shortname"],
            "longname": station["longname"],
            "km": station["km"],
            "agency": station["agency"],
            "longitude": station["longitude"],
            "latitude": station["latitude"],
            "water_shortname": station["water"]["shortname"],
            "water_longname": station["water"]["longname"]
        }

        if station_entry not in stations_data:
            stations_data.append(station_entry)

        for ts in station["timeseries"]:
            timeseries_entry = {
                "station_uuid": station["uuid"],
                "shortname": ts.get("shortname"),
                "longname": ts.get("longname"),
                "unit": ts.get("unit"),
                "equidistance": ts.get("equidistance"),
                "timeseries_start": ts.get("start"),
                "timeseries_end": ts.get("end"),
                "comment_shortDescription": ts.get("comment", {}).get("shortDescription"),
                "comment_longDescription": ts.get("comment", {}).get("longDescription"),
                "gaugeZero_unit": ts.get("gaugeZero", {}).get("unit"),
                "gaugeZero_value": ts.get("gaugeZero", {}).get("value"),
                "gaugeZero_validFrom": ts.get("gaugeZero", {}).get("validFrom")
            }
            timeseries_data.append(timeseries_entry)

    return stations_data, timeseries_data

In [3]:
def create_spark_dataframes(spark, stations_data, timeseries_data):
    station_schema = StructType([
        StructField("uuid", StringType(), True),
        StructField("number", StringType(), True),
        StructField("shortname", StringType(), True),
        StructField("longname", StringType(), True),
        StructField("km", DoubleType(), True),
        StructField("agency", StringType(), True),
        StructField("longitude", DoubleType(), True),
        StructField("latitude", DoubleType(), True),
        StructField("water_shortname", StringType(), True),
        StructField("water_longname", StringType(), True)
    ])

    timeseries_schema = StructType([
        StructField("station_uuid", StringType(), True),
        StructField("shortname", StringType(), True),
        StructField("longname", StringType(), True),
        StructField("unit", StringType(), True),
        StructField("equidistance", IntegerType(), True),
        StructField("timeseries_start", TimestampType(), True),
        StructField("timeseries_end", TimestampType(), True),
        StructField("comment_shortDescription", StringType(), True),
        StructField("comment_longDescription", StringType(), True),
        StructField("gaugeZero_unit", StringType(), True),
        StructField("gaugeZero_value", DoubleType(), True),
        StructField("gaugeZero_validFrom", DateType(), True)
    ])

    stations_str = json.dumps(stations_data)
    timeseries_str = json.dumps(timeseries_data)

    stations_df = spark.read.json(spark.sparkContext.parallelize([stations_str]), schema=station_schema)
    timeseries_df = spark.read.json(spark.sparkContext.parallelize([timeseries_str]), schema=timeseries_schema)

    return stations_df, timeseries_df

In [4]:
def filter_timeseries_data(timeseries_df):
    water_level_raw_df = timeseries_df.filter(col("longname").isin(["WASSERSTAND ROHDATEN"])) \
        .select(col("station_uuid"), col("shortname"), col("longname"), col("unit"), col("equidistance"),
                col("gaugeZero_unit"), col("gaugeZero_value"), col("gaugeZero_validFrom"))

    water_level_forecast_df = timeseries_df.filter(col("longname").isin(["WASSERSTANDVORHERSAGE"])) \
        .select(col("station_uuid"), col("shortname"), col("longname"), col("unit"), col("equidistance"),
                col("timeseries_start"), col("timeseries_end"), col("comment_shortDescription"), col("comment_longDescription"))

    water_temperature_df = timeseries_df.filter(col("longname").isin(["WASSERTEMPERATUR", "WASSERTEMPERATUR ROHDATEN"])) \
        .select(col("station_uuid"), col("shortname"), col("longname"), col("unit"), col("equidistance"),
                col("comment_shortDescription"), col("comment_longDescription"))

    air_temperature_df = timeseries_df.filter(col("longname").isin(["LUFTTEMPERATUR"])) \
        .select(col("station_uuid"), col("shortname"), col("longname"), col("unit"), col("equidistance"))

    drain_df = timeseries_df.filter(col("longname").isin(["ABFLUSS", "ABFLUSS_ROHDATEN"])) \
        .select(col("station_uuid"), col("shortname"), col("longname"), col("unit"), col("equidistance"))

    return water_level_raw_df, water_level_forecast_df, water_temperature_df, air_temperature_df, drain_df

In [5]:
def create_tables(conn_params):
    create_stations_table_query = """
    CREATE TABLE IF NOT EXISTS stations (
        uuid UUID PRIMARY KEY,
        number VARCHAR(255),
        shortname VARCHAR(255),
        longname VARCHAR(255),
        km DOUBLE PRECISION,
        agency VARCHAR(255),
        longitude DOUBLE PRECISION,
        latitude DOUBLE PRECISION,
        water_shortname VARCHAR(255),
        water_longname VARCHAR(255)
    );
    """

    create_water_level_raw_table_query = """
    CREATE TABLE IF NOT EXISTS water_level_raw (
        id SERIAL PRIMARY KEY,
        station_uuid UUID REFERENCES stations(uuid),
        shortname VARCHAR(255),
        longname VARCHAR(255),
        unit VARCHAR(50),
        equidistance INTEGER,
        gaugeZero_unit VARCHAR(50),
        gaugeZero_value DOUBLE PRECISION,
        gaugeZero_validFrom DATE
    );
    """

    create_water_level_forecast_table_query = """
    CREATE TABLE IF NOT EXISTS water_level_forecast (
        id SERIAL PRIMARY KEY,
        station_uuid UUID REFERENCES stations(uuid),
        shortname VARCHAR(255),
        longname VARCHAR(255),
        unit VARCHAR(50),
        equidistance INTEGER,
        timeseries_start TIMESTAMPTZ,
        timeseries_end TIMESTAMPTZ,
        comment_shortDescription TEXT,
        comment_longDescription TEXT
    );
    """

    create_water_temperature_table_query = """
    CREATE TABLE IF NOT EXISTS water_temperature (
        id SERIAL PRIMARY KEY,
        station_uuid UUID REFERENCES stations(uuid),
        shortname VARCHAR(255),
        longname VARCHAR(255),
        unit VARCHAR(50),
        equidistance INTEGER,
        comment_shortDescription TEXT,
        comment_longDescription TEXT
    );
    """

    create_air_temperature_table_query = """
    CREATE TABLE IF NOT EXISTS air_temperature (
        id SERIAL PRIMARY KEY,
        station_uuid UUID REFERENCES stations(uuid),
        shortname VARCHAR(255),
        longname VARCHAR(255),
        unit VARCHAR(50),
        equidistance INTEGER
    );
    """

    create_drain_table_query = """
    CREATE TABLE IF NOT EXISTS drain (
        id SERIAL PRIMARY KEY,
        station_uuid UUID REFERENCES stations(uuid),
        shortname VARCHAR(255),
        longname VARCHAR(255),
        unit VARCHAR(50),
        equidistance INTEGER
    );
    """

    conn = psycopg2.connect(**conn_params)
    cursor = conn.cursor()
    cursor.execute(create_stations_table_query)
    cursor.execute(create_water_level_raw_table_query)
    cursor.execute(create_water_level_forecast_table_query)
    cursor.execute(create_water_temperature_table_query)
    cursor.execute(create_air_temperature_table_query)
    cursor.execute(create_drain_table_query)
    conn.commit()
    cursor.close()
    conn.close()

In [6]:
def insert_data(table, data, columns, conn_params):
    conn = psycopg2.connect(**conn_params)
    cursor = conn.cursor()
    insert_query = f"INSERT INTO {table} ({', '.join(columns)}) VALUES %s"
    execute_values(cursor, insert_query, data)
    conn.commit()
    cursor.close()
    conn.close()

In [7]:
def main():
    spark = SparkSession.builder \
        .appName("mini-project") \
        .getOrCreate()

    stations_data, timeseries_data = fetch_and_process_data()
    stations_df, timeseries_df = create_spark_dataframes(spark, stations_data, timeseries_data)
    water_level_raw_df, water_level_forecast_df, water_temperature_df, air_temperature_df, drain_df = filter_timeseries_data(timeseries_df)

    conn_params = {
        "dbname": "db",
        "user": "postgres",
        "password": "12345",
        "host": "pg_container",
        "port": 5432
    }

    create_tables(conn_params)

    stations_data_list = [tuple(row.asDict().values()) for row in stations_df.collect()]
    water_level_raw_data_list = [tuple(row.asDict().values()) for row in water_level_raw_df.collect()]
    water_level_forecast_data_list = [tuple(row.asDict().values()) for row in water_level_forecast_df.collect()]
    water_temperature_data_list = [tuple(row.asDict().values()) for row in water_temperature_df.collect()]
    air_temperature_data_list = [tuple(row.asDict().values()) for row in air_temperature_df.collect()]
    drain_data_list = [tuple(row.asDict().values()) for row in drain_df.collect()]

    stations_columns = [
        "uuid", "number", "shortname", "longname", "km", "agency",
        "longitude", "latitude", "water_shortname", "water_longname"
    ]

    water_level_raw_columns = [
        "station_uuid", "shortname", "longname", "unit", "equidistance",
        "gaugeZero_unit", "gaugeZero_value", "gaugeZero_validFrom"
    ]

    water_level_forecast_columns = [
        "station_uuid", "shortname", "longname", "unit", "equidistance",
        "timeseries_start", "timeseries_end", "comment_shortDescription", "comment_longDescription"
    ]

    water_temperature_columns = [
        "station_uuid", "shortname", "longname", "unit", "equidistance",
        "comment_shortDescription", "comment_longDescription"
    ]

    air_temperature_columns = [
        "station_uuid", "shortname", "longname", "unit", "equidistance"
    ]

    drain_columns = [
        "station_uuid", "shortname", "longname", "unit", "equidistance"
    ]

    insert_data("stations", stations_data_list, stations_columns, conn_params)
    insert_data("water_level_raw", water_level_raw_data_list, water_level_raw_columns, conn_params)
    insert_data("water_level_forecast", water_level_forecast_data_list, water_level_forecast_columns, conn_params)
    insert_data("water_temperature", water_temperature_data_list, water_temperature_columns, conn_params)
    insert_data("air_temperature", air_temperature_data_list, air_temperature_columns, conn_params)
    insert_data("drain", drain_data_list, drain_columns, conn_params)

if __name__ == "__main__":
    main()