# 1) Daten laden

In [2]:
## Spark Session starten

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Wetter- und Luftqualitätsdaten") \
    .getOrCreate()

25/06/15 17:54:22 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## 1.1) Woher kommen die Daten und in welcher Form liegen sie vor?

#### Wetterdaten Zürich und St. Gallen
Die Wetterdaten von Zürich und St. Gallen stammen vom Bundesamt für Meteorologie und Kimatologie - MeteoSchweiz. Abrufbar unter folgendem Link: xxx

Es handelt sich hierbei um tägliche Messwerte, die als CSV-Dateien im UFT-8 Format vorliegen. Das Datumsfeld trägt den Namen *reference_timestamp* und ist im Format DD.MM.YYYY HH:MM als Zeichenkette gespeichert. Die Dateien umfassen Messwerte aus zwei Wetterstationen: 

* Zürich Fluntern (SMA)
* St. Gallen (STG)

#### Luftqualitätsdaten 

NEU SCHREIBEN


https://www.ostluft.ch/messwerte/datenabfrage


Die Luftqualitätsdaten stammen vom Amt für Abfall, Wasser, Energie udn Luft (AWEL) des Kantons Zürich sowie dem Amt für Umwelt des Kantons St. Gallen. Abrufbar unter folgendem Link: xxx

Es handelt sich um tägliche Mittelwerte der Luftschadstoffe, die als CSV-Dataeien im UFT-8 Format vorliegen. Das Datumsfeld trägt den Namen *Datum* und ist im Format JJJ-MM-TT HH:MM+ZZ+ZZ als Zeichenkette gespeichert. Die Dateien umfassen Messwerte für Zwei Luftmessstationen:

* Zürich (Zch_Stampfenbachstr)
* st. Gallen (StG_St.Leonhard-Str)

In [3]:
## Schritt 1: Wetterdaten einlesen

# Zürich
df_zurich = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .option("sep", ";") \
    .csv("data/weather/zurich/ogd-smn_sma_d_historical.csv")

# St. Gallen
df_stgallen = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .option("sep", ";") \
    .csv("data/weather/stgallen/ogd-smn_stg_d_historical.csv")

---------------------------------------------------------

## 1.2) Pivotierung der Luftqualitätsdaten

Das ursprüngliche Format der Luftqualitätsdaten lag im "Long Format" vor. In dieser Struktur entsprach jede Zeile einen einzelnen Messswert, der durch Datum, Standort sowie Schadstoffparameter bestimmt war. Um die Daten jedoch effizient analysieren und mit den Wetterdaten veknüpfen zu könenn, wird eine Pivotierung vorgenommen. Das Ergebnis ist ein "Wide Format", bei dem jeder schadstoff als eigene Spalte dargestellt ist. So enthält jede Zeile alle relevanten Messwerte eines Tages für einen bestimmten Standort.

In [8]:
from pyspark.sql.functions import to_date, col, first, when

# Bereinigung
df_air_clean = df_air

# Pivotierung
from pyspark.sql.functions import to_date, col

df_air_clean = df_air_clean.withColumn(
    "date", to_date(col("Startzeit"), "dd.MM.yyyy HH:mm")
)

In [None]:
from pyspark.sql.functions import first

df_luft_pivot = df_air.groupBy("Startzeit", "location") \
    .agg(
        first("PM10").alias("PM10"),
        first("PM2.5").alias("PM2.5"),
        first("Ozon").alias("Ozon"),
        first("CO").alias("CO"),
        first("NO2").alias("NO2"),
        first("NO").alias("NO")
    )

In [11]:
from pyspark.sql.functions import first, col

df_luft_pivot = df_air.groupBy("Startzeit", "location") \
    .agg(
        first(col("PM10")).alias("PM10"),
        first(col("`PM2.5`")).alias("PM2.5"),
        first(col("Ozon")).alias("Ozon"),
        first(col("CO")).alias("CO"),
        first(col("NO2")).alias("NO2"),
        first(col("NO")).alias("NO")
    )

---------------------------------------------------------

## 1.3) Datumsformat vereinheitlichen

Um die Verarbeitung sowie Verknüpfung der Wetter- und Luftqualitätsdaten zu ermöglichen, wird das Datumsformat in beiden Datsätzen vereinheitlicht. Die ursprüngliche Datums- und Zeitangabe liegen in unterschiedlcihen Formate vor und zwar:

* **Wetterdaten**: DD.MM.YYYY HH:mm (reference_timestamp)
* **Luftqualitätsdaten**: DD.MM.YYYY (Startzeit)

Durch die Konvertierung in ein einheitliches Datums- und Zeitangabeformat wird sichergestellt, dass beide Quellen korrekt über das Datum gejoint werden können.

In [13]:
from pyspark.sql.functions import to_date, col

# Wetterdaten: Datum umwandeln
df_zurich = df_zurich.withColumn(
    "date", to_date(col("reference_timestamp"), "dd.MM.yyyy HH:mm")
)
df_stgallen = df_stgallen.withColumn(
    "date", to_date(col("reference_timestamp"), "dd.MM.yyyy HH:mm")
)

# Luftqualitätsdaten: Datum umwandeln
df_luft_pivot = df_luft_pivot.withColumn(
    "date", to_date(col("Startzeit"), "dd.MM.yyyy HH:mm")
)

---------------------------------------------------------

## 1.4) Standortbezeichnung vereinheitlichen

Um die Wetter- und Luftqualitätsdaten sinnvoll miteinander verknüpfen zu können, muss die Bezeichnung der jeweiligen Standorte gleich sein. In den vorhanden Dateien jedoch gibt es unterschiedliche Bezeichnungen der Standorte, was ein Join erschwert. Beispielsweise steht in den Wetterdaten *SMA* für Zürich und in den Luftqualitätsdaten finden wir *Zch_...* für Zürich. 

Diese ungleiche Benennung würde einen Join der beiden Datensätze nicht ermöglichen. Somit wird die Standortbezeichnung vereinheitlicht, so dass wir sichergestellen können, dass die Werte für Zürich und St. Gallen korrekt zusammengeführt werden können.

In [15]:
from pyspark.sql.functions import when, col

# Standort Wetterdaten standardisieren
if "station_abbr" in df_zurich.columns:
    df_weather_std = df_zurich.withColumn(
        "location",
        when(col("station_abbr") == "SMA", "Zürich")
        .when(col("station_abbr") == "STG", "St. Gallen")
        .otherwise("Andere")
    )
else:
    print("Spalte 'station_abbr' nicht gefunden in df_zurich!")
    df_weather_std = df_zurich

# Standort Luftqualitätsdaten standardisieren
df_air_std = df_luft_pivot.withColumn(
    "location",
    when(col("location").startswith("Zch_"), "Zürich")
    .when(col("location").contains("St. Gallen"), "St. Gallen")
    .otherwise("Andere")
)

---------------------------------------------------------

## 1.5) Wetter- und Luftqualitätsdaten anschauen bevor sie gejoint werden

Bevor die bereinigten und standardisierten Wetter- und Luftqualitätsdaten auf Tagesbasis zusammengeführt werden, erfolgt eine vorläufige Analyse. Ziel ist es, sicherzustellen, dass der Join reibungslos funktioniert und keine Inkonsistenzen in den Datums- oder Standortangaben bestehen.

-------------------------------------------------------------------------

## 1.5) Join Wetter- und Luftqualitätsdaten

Die Zusammenführung der Wetter- und Luftqualitätsdaten brauchen wir, damit wir später mit einem einheitlichen Datensatz weiter arbeiten können, der alle relevanten Merkmale für eine Modellierung beinhaltet. Die Wetter- und Luftqualitätsdaten sind grundsätzlich örtlich und zeitlich aufeinander abgestimtm worden und können nun miteinander verknüpft werden.

Ziel ist es mögliche Wettereinflüsse auf die Luftqualität zu analysieren und so ein prädiktives Modell zu trainieren. Durch den Join stehen uns pro Zeile sowohl meterologische als auch umweltrelevante Parameter pro Tag und Standort zur Verfügung.

In [None]:
df_joined = df_air_std.join(
    df_weather_std,
    on=["date", "location"],
    how="inner"
)

# Anzahl Zeilen und Vorschau auf kombinierte Einträge
print("Anzahl Zeilen nach dem Join", df_joined.count())
df_joined.select("date", "location").distinct().show(5)

# Optional: Schema und erste Einträge anzeigen
df_joined.printSchema()
df_joined.show(5)

---------------------------------------------------------

## 1.6) Deskriptive Analyse Wetter- und Luftqualitätsdaten nach dem Join



In [25]:
## Schritt 1: Anzahl Zeilen in DataFrames vor und nach Join

print("Anzahl Zeilen Wetterdaten:", df_zurich.count())
print("Anzahl Zeilen Luftqualitätsdaten:", df_luft_pivot.count())

Anzahl Zeilen Wetterdaten: 58805
Anzahl Zeilen Luftqualitätsdaten: 7308


In [None]:
## Schritt 2: 

Spalten in df_luft_pivot:
['Datum', 'Standort', 'CO', 'NO', 'NO2', 'NOx', 'O3', 'O3_max_h1', 'O3_nb_h1>120', 'PM10', 'PM2.5', 'PN', 'SO2', 'date']

Spalten in df_zurich:
['station_abbr', 'reference_timestamp', 'tre200d0', 'tre200dx', 'tre200dn', 'tre005d0', 'tre005dx', 'tre005dn', 'ure200d0', 'pva200d0', 'prestad0', 'pp0qffd0', 'ppz850d0', 'ppz700d0', 'pp0qnhd0', 'fkl010d0', 'fkl010d1', 'fu3010d0', 'fu3010d1', 'fkl010d3', 'fu3010d3', 'wcc006d0', 'rre150d0', 'rka150d0', 'htoautd0', 'gre000d0', 'oli000d0', 'olo000d0', 'osr000d0', 'ods000d0', 'sre000d0', 'sremaxdv', 'erefaod0', 'xcd000d0', 'dkl010d0', 'xno000d0', 'xno012d0', 'rreetsd0', 'date']
