## Serververbindung konfigurieren

Wir verbinden uns zur SQL Server Datenbank, die als Docker Container läuft. Zuvor wurde mit
einem SQL Editor die Datenbank *Weatherdata* und das Schema aus [database.sql](database.sql) angelegt.
Für das Beladen ist die Einstellung *fast_executemany* wichtig. 

> The cursor.executemany() function internally performs a loop and sends rows one by one, unless 
> “fast_executemany” flag is specified. If the flag is specified and if the drivers supports it, 
> the database will perform the operation (insert or update) on the entire array in a single operation.
> (https://dbwhisperer.wordpress.com/2020/11/21/pyodbc-fast_executemany-and-oracle-rdbms/)

In [1]:
import sqlalchemy

connection_url = sqlalchemy.engine.URL.create(
    "mssql+pyodbc",
    username="sa",
    password="SqlServer2019",      # oder ein anderes Passwort
    host="127.0.0.1",              # oder .\SERVERNAME
    database="Weatherdata",
    query={
        "driver": "ODBC Driver 17 for SQL Server"
    },
)

engine = sqlalchemy.create_engine(connection_url, fast_executemany=True)

### Leeren der Datenbank

**Dies machen wir nur, damit alle den gleichen Ausgangszustand haben. Es kommt natürlich in echten
Loaderskripts nicht vor dass die ganze Datenbank geleert wird!**

In [2]:
with engine.connect() as conn:
    conn.execute("TRUNCATE TABLE MeasurementDaily")
    conn.execute("TRUNCATE TABLE MeasurementHourly")
    conn.execute("DELETE FROM Station")

## Extract (1)

### Laden der originalen Meldungen

Damit wir einen inkrementellen Import testen können, definieren wir eine Variable *max_year*. Die
Messdaten umfassen die Jahre 2000 bis inklusive 2021. Zuerst importieren wir eine Teilmenge in die
leere Datenbank. Danach setzen wir das Jahr auf 9999 und prüfen, ob wir nicht versehentlich doppelte
Datensätze importieren. Das würde zu einem Fehler (UNIQUE Contraint) führen.

In [3]:
max_year = 2019

Wie in den vorigen Beispielen laden wir die Stationsdaten aus den originalen Meldungen mit dem
Synop Parser. Wir laden nur die Daten bis inklusive *max_year*.

In [4]:
import pandas as pd
import parseSynop as ps    # Datei parseSynop.py laden
data_vienna_city = ps.readFile("synop_11034.txt.bz2")  # Wien Innere Stadt
data_vienna_hohewarte = ps.readFile("synop_11035.txt.bz2")  # Wien Hohe Warte
data_gump = ps.readFile("synop_11082.txt.bz2")    # Gumpoldskirchen
data_rax = ps.readFile("synop_11180.txt.bz2")     # Rax Bergstation
data = pd.concat([data_gump, data_vienna_city, data_vienna_hohewarte, data_rax])
data = data[data.year <= max_year]
print(f"{len(data)} Datensätze geladen.")
data.head(3)

465962 Datensätze geladen.


Unnamed: 0,station,date,datetime,year,month,day,hour,minute,temp,dewp,pressure,prec_amount,prec_duration,cloud_octas,wind_dir,wind_speed,max_temp,min_temp,sunshine
0,11082,2000-01-01,2000-01-01 00:00:00,2000,1,1,0,0,-3.4,-5.3,997.7,,,,9.0,1.0,,,
1,11082,2000-01-01,2000-01-01 03:00:00,2000,1,1,3,0,-3.7,-5.3,998.4,,,,3.0,1.0,,,
2,11082,2000-01-01,2000-01-01 06:00:00,2000,1,1,6,0,-2.7,-4.1,998.8,,,,0.0,0.0,,-5.5,


### Laden des Stationsverzeichnisses

Auf [www.zamg.ac.at](https://www.zamg.ac.at/cms/de/klima/messnetze/wetterstationen) kann eine CSV
Datei mit den Wetterstationen geladen werden. Es ist die Basis für die Tabelle Station in unserer
Datenbank. Daher laden wir diese Datei aus dem Web.

In [5]:
stations = pd.read_csv("https://www.zamg.ac.at/cms/de/dokumente/klima/dok_messnetze/Stationsliste_20220101.csv", sep=";",
    encoding="cp1252", decimal=",", index_col="SYNNR")

stations.head(3)

Unnamed: 0_level_0,NAME,BUNDESLAND,LÄNGE,BREITE,STATIONSHÖHE,BEGINNDATUM,ORDNUNG,LÄNGE DEZI,BREITE DEZI
SYNNR,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
8989072,ANDAU,BGL,170200,474621,118,19950701,TAWES,17.033333,47.772499
11395,ANDAU NEU,BGL,170208,474612,117,20210616,TAWES/VAMES,17.035555,47.77
11193,BAD TATZMANNSDORF,BGL,161330,472017,347,20040917,Ö3,16.224998,47.338055


## Transform und Filterung (2)

Der 2. Schritt besteht aus zwei Teilen. Zuerst müssen wir die Daten in das Format der
Zieltabelle transformieren. Dabei sind mehrere Schritte nötig:
- Transformation von Fremdschlüssel
- Umbenennung von Spalten

Der zweite Teil filtert Daten aus:
- Können alle Daten aufgrund der Datentypen und Constraints importiert werden?
- Sind schon Werte in der Datenbank vorhanden, die ausgefiltert werden müssen?

### Daten für die Tabelle Station

Wir wollen nur neu hinzugekommene Stationen in die Tabelle *Station* importieren. Dafür müssen
wir uns eine Liste aller Stations-IDs von der Datenbank holen (*station_ids*). Die Bundesländer
sind in der Datei der ZAMG mit eigenen Kürzeln angeführt:

In [6]:
stations.BUNDESLAND.unique()

array(['BGL', 'KNT', 'NOE', 'OOE', 'SAL', 'STMK', 'TIR', 'VBG', 'WIE'],
      dtype=object)

In unserer Station Tabelle verweisen wir allerdings auf die Tabelle Bundesland mit einem eigenen
(numerischen) Schlüssel. Die Tabelle *Bundesland* in der Datenbank hat folgende Werte:

In [7]:
with engine.connect() as conn: display(pd.read_sql("Bundesland", conn))

Unnamed: 0,Id,Shortname,Name
0,1,BGL,Burgenland
1,2,KNT,Kärnten
2,3,NOE,Niederösterreich
3,4,OOE,Oberösterreich
4,5,SAL,Salzburg
5,6,STMK,Steiermark
6,7,TIR,Tirol
7,8,VBG,Vorarlberg
8,9,WIE,Wien


Wir müssen also im Transform Prozess für die Stationen konkret 2 Schritte erledigen:
- Die Stationen ausfiltern, dessen ID (*SYNNR* in der Datei) schon in der Datenbank ist.
- Die Fremdschlüssel müssen wir von den Kurznamen auf die internen IDs der Datenbank ändern.

In [8]:
with engine.connect() as conn:
    station_ids = conn.execute("SELECT Id FROM Station").all()
    bundeslaender_db = conn.execute("SELECT Shortname, Id FROM Bundesland").all()

bundeslaender_db = {b[0]: b[1] for b in  bundeslaender_db} # Liste von Tupel mit 2 Elementen in ein Dictionary umwandeln
station_ids = [s[0] for s in station_ids]                  # Liste von Tupel mit 1 Element in eine normale Liste umwandeln

# Mit der relace() Methode des Dataframes können wir sehr einfach den Wert von Bundesland suchen
# und den Wert im Dictionary bundeslaender_db eintragen:
stations["BL_ID"] = stations.BUNDESLAND.replace(bundeslaender_db)
col_mapping={"SYNNR": "Id", "NAME": "Name", "LÄNGE DEZI": "Lng", "BREITE DEZI": "Lat", "STATIONSHÖHE": "Alt", "BL_ID": "BundeslandId"}
# Nun filtern wir alle Datensätze aus stations, dessen indexwert (der Index von Stations ist die SYNNR)
# nicht in der Liste station_ids ist. ~ bedeutet Negierung.
stations_export = stations \
    .reset_index() \
    .loc[~stations.index.isin(station_ids), col_mapping.keys()].rename(columns=col_mapping)
stations_export.head(3)


Unnamed: 0,Id,Name,Lng,Lat,Alt,BundeslandId
0,8989072,ANDAU,17.033333,47.772499,118,1
1,11395,ANDAU NEU,17.035555,47.77,117,1
2,11193,BAD TATZMANNSDORF,16.224998,47.338055,347,1


### Daten für die Tabelle MeasurementHourly

Bei den Daten der Tabelle *MeasurementHourly* müssen wir auf die Performace acht geben. Es sind
ca. 500 000 Werte in der Tabelle. Wir dürfen Messwerte nicht erneut importieren, denn das würde
zu einem Fehler beim Import führen. Daher müssen wir die in der Datenbank vorhandenen Werte
ausschließen. 

Bei den Stationen haben wir einfach die Liste der ID Werte gelesen. Bei den Messwerten gibt es
für StationId und Datetime ein UNIQUE Constraint. Wir könnten also diese beiden Werte laden und
für jeden Wert im Dataframe prüfen, ob er schon vorhanden ist.

Bei dieser Menge an Datensätzen würde das allerdings sehr lange dauern. Wir entscheiden uns daher
für eine andere Lösung: Wir laden pro Station das Datum des letzten Eintrages. Das ist eine einfache
MAX(Datetime) Abfrage und liefert pro Station einen Datensatz. Danach markieren wir im Dataframe
die Daten, sodass pro Station nur Werte ab dem letzten Eintrag in der Datenbank geladen werden.

In [9]:
import datetime as dt
with engine.connect() as conn:
    last_measurement_hourly = conn \
        .execute("SELECT StationId AS STATION_ID, MAX(Datetime) AS LAST_DATETIME FROM MeasurementHourly GROUP BY StationId") \
        .all()
# Ein Dictionary mit StationId: Letzter Messwert aus der Liste der Tupel, die uns all() lieferte, erzeugen.
last_measurement_hourly = {m[0]: m[1] for m in  last_measurement_hourly}
# Standardwert, falls eine Station noch gar nicht vorkommt.
begin = dt.datetime(1900,1,1)
# In der Tabelle sind Temperaturen zweistellig. Das ist auch ausreichend, es gibt aber durch Fehlwerte
# Fälle, wo ein Komma nicht übertragen wurde. So wird aus 20.1° 201° im Datenbestand. Das filtern
# wir aus.
valid_values = (data.temp.abs() < 100) & (data.dewp.abs() < 100)
# Wir holen uns mit get() die Station der Zeile und prüfen, ob das Datum nachher ist. Wird der key
# nicht gefunden, wird begin als Defaultwert zurückgeliedert.
new_values = data.apply(lambda row: row.datetime > last_measurement_hourly.get(row.station, begin), axis=1)
# Mapping der Spalten Dataframe -> Tabelle
col_mapping = {"station": "StationId", "datetime": "Datetime", "temp": "Temp", "dewp": "Dewp"}
measurement_hourly = data.loc[valid_values & new_values, col_mapping.keys()].rename(columns=col_mapping)
print(f"{len(measurement_hourly)} Datensätze zu importieren:")
measurement_hourly.groupby("StationId").size()

465961 Datensätze zu importieren:


StationId
11034    113526
11035    117232
11082    122105
11180    113098
dtype: int64

## Load (3)

### Laden in die Tabelle Station

In [10]:
with engine.connect() as conn:
    stations_export.to_sql("Station", conn, if_exists="append", index=False)

### Laden in die Tabelle MeasurementHourly

In [11]:
with engine.connect() as conn:
    measurement_hourly.to_sql("MeasurementHourly", conn, if_exists="append", index=False)

Mit SQL können wir die Anzahl der Werte aus der Datenbank lesen. Die Aggregierung macht die
Datenbank. Für eine Ausgabe der Häufigkeiten die ganze Tabelle zu lesen und diese dann im DataFrame
zu verarbeiten wäre Unsinn. SQL ist daher immer eine wichtige Technologie, die auch im DataScrience
beherrscht werden muss.

In [12]:
with engine.connect() as conn: display(pd.read_sql("""
SELECT StationId, DATEPART(year, datetime) AS Year, COUNT(*) AS Count
FROM MeasurementHourly
GROUP BY StationId, DATEPART(year, datetime)
""", conn).pivot(index="Year", columns="StationId"))

Unnamed: 0_level_0,Count,Count,Count,Count
StationId,11034,11035,11082,11180
Year,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
2000,2843,3533,3342,3359
2001,2599,2642,2597,2482
2002,3934,3936,3934,3877
2003,8429,8518,8385,8071
2004,7375,8410,7348,7603
2005,6872,7895,6868,6307
2006,2984,2952,2984,2944
2007,2821,2919,2814,2825
2008,2849,2911,2856,2853
2009,2825,2918,2836,2816


Jetzt können wir zum Testen den Filter *max_year* auf 9999 stellen und prüfen, ob das Laden von
neu hinzugekommenen Daten auch funktioniert.