# Analyse ECAD Daten mit Spark

## Einleitung

Auf der Seite European Climate Assessment & Dataset https://www.ecad.eu/ findet man tägliche Beobachtungen an meteorologischen Stationen in ganz Europa und im Mittelmeerraum aus der Vergangehiet bis fast in die aktuelle Gegenwart. Insgesamt handelt es sich um 81366 Beobachtungsreihen für 13 Elemente an 22359 meteorologischen Stationen. Es sind sowohl gemischte als auch nicht gemischte ECA-Reihen verfügbar. Bei gemischten Reihen handelt es sich um Reihen, die durch Auffüllen von benachbarten Stationen nahezu vollständig sind. Die Reihen werden einer Qualitätskontrolle unterzogen

Für unsere Analyse haben wir uns für Daten von 4 Wetterstationen aus Österreich interessiert:<Br>
Krems: Daten von 1936 bis 2021. <Br>
Poysdorf: Daten von 1965 bis 2021. <Br>
Neusiedl am See: Daten von 1936 bis 2021<Br>
Güssing: Daten von 1947 bis 2021.

Für diese Standorte haben wir die jeweiligen Dateien für mittlere Temperatur/Tag und Regenmenge/Tag heruntergeladen. <Br>
Dateiformat: .txt – wobei die Daten mit dem Wert -9999 fehlende Werte darstellen <Br>
STAID: gibt die Wetterstation an <Br>
SOUID: gibt den Standort der Station an zB Krems <Br>
Datum: wird im Format YYYYMMDD ausgegeben 

ECAD Temperatur <Br>
TG: Durchschnittstemperatur wird in 0.1 °C angegeben<Br>
Q_TG: gibt die Qualität der Temperaturdaten an (0= valid, 1=suspect, 9= missing) 

ECAD Regen<Br>
RR: Durchschnittsniederschlag wird in 0.1 mm angegeben<Br>
Q_RR: gibt die Qualität der Niederschlagsdaten an (0=valid, 1=suspect, 9=missing)

## Regendaten

### Datenmanagement

Zunächst werden die notwenigen Libraries geladen.

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import pandas as pd
import glob, os

#edac_raw = '../data/ecad/RR_SOUID234717.txt'

Da es nicht möglich ist, mit SparkSession Headerzeilen zu skippen, lesen wir die Daten zuerst als Panda ein, entfernen die unnötigen Zeilen und konvertieren dann in ein Spark-Dataframe.

Im ersten Schritt wurden die Regendaten/Tag der Stationen Krems, Poysdorf-Ost, Neusiedl/See sowie Güssing heruntergeladen.

In [17]:
path = '../data/ecad/'
all_files = glob.glob(path + 'RR_*.txt')
all_files

['../data/ecad/RR_SOUID234717.txt',
 '../data/ecad/RR_SOUID234938.txt',
 '../data/ecad/RR_SOUID235003.txt',
 '../data/ecad/RR_SOUID236251.txt']

Speichern aller Daten in einem Panda-Dataframe.

In [18]:
li = []

for filename in all_files:
    df = pd.read_csv(filename
                     , index_col=None
                     , header=0
                     , sep=","
                   , skiprows=18)
    li.append(df)

data = pd.concat(li, axis=0, ignore_index=True)
data

Unnamed: 0,STAID,SOUID,DATE,RR,Q_RR
0,24702,234717,19360101,-9999,9
1,24702,234717,19360102,1,0
2,24702,234717,19360103,3,0
3,24702,234717,19360104,44,0
4,24702,234717,19360105,11,0
...,...,...,...,...,...
110843,24820,236251,20220426,-9999,9
110844,24820,236251,20220427,-9999,9
110845,24820,236251,20220428,-9999,9
110846,24820,236251,20220429,-9999,9


In [19]:
# edac ######### Krems Regen
#data = pd.read_csv(edac_raw
#                   , sep=","
#                   , skiprows=18
#                   , header=0)
#data.head(5)
#len(data)

Einlesen der Panda-Dataframes mit Spark:

In [20]:
spark = SparkSession \
    .builder \
    .appName("Read Data") \
    .getOrCreate()
#Create PySpark DataFrame from Pandas
sparkDF=spark.createDataFrame(data) 
sparkDF.printSchema()
sparkDF.show()

root
 |--  STAID: long (nullable = true)
 |--     SOUID: long (nullable = true)
 |--     DATE: long (nullable = true)
 |--    RR: long (nullable = true)
 |--  Q_RR: long (nullable = true)

+------+---------+--------+-----+-----+
| STAID|    SOUID|    DATE|   RR| Q_RR|
+------+---------+--------+-----+-----+
| 24702|   234717|19360101|-9999|    9|
| 24702|   234717|19360102|    1|    0|
| 24702|   234717|19360103|    3|    0|
| 24702|   234717|19360104|   44|    0|
| 24702|   234717|19360105|   11|    0|
| 24702|   234717|19360106|    0|    0|
| 24702|   234717|19360107|    0|    0|
| 24702|   234717|19360108|    0|    0|
| 24702|   234717|19360109|    1|    0|
| 24702|   234717|19360110|   53|    0|
| 24702|   234717|19360111|   51|    0|
| 24702|   234717|19360112|   13|    0|
| 24702|   234717|19360113|    0|    0|
| 24702|   234717|19360114|    0|    0|
| 24702|   234717|19360115|    0|    0|
| 24702|   234717|19360116|    0|    0|
| 24702|   234717|19360117|    0|    0|
| 24702|   

### Bereinigung

In [21]:
sparkDF.schema.names

[' STAID', '    SOUID', '    DATE', '   RR', ' Q_RR']

Bei Ansicht des Schemas sieht man, dass die Spalten mit Leerzeichen beginnen, deshalb speichern wir diese um:

In [22]:
column_names=['STAID', 'SOUID', 'DATE', 'RR', 'Q_RR']
sparkDF = sparkDF.toDF(*column_names)
sparkDF.schema.names

['STAID', 'SOUID', 'DATE', 'RR', 'Q_RR']

Ausgabe der Anzahl der Datensätze / Station:

In [23]:
sparkDF.groupby('SOUID').count().show()

+------+-----+
| SOUID|count|
+------+-----+
|234717|31532|
|234938|31532|
|235003|20635|
|236251|27149|
+------+-----+



Da in den Metadaten die Info zu finden war, dass Datensätze in der Spalte "Q_RR" mit den Werten 1 oder 9 ungültige Daten enthalten, entfernen wir diese Zeilen:

In [24]:
sparkDF = sparkDF.filter(col("Q_RR") == '0')

Man sieht nun, dass einige Datensätze entfernt wurden, vor allem bei der Station 236251 (Güssing):

In [25]:
sparkDF.groupby('SOUID').count().show()

+------+-----+
| SOUID|count|
+------+-----+
|234717|31411|
|234938|30334|
|235003|20515|
|236251|17655|
+------+-----+



In [26]:
sparkDF.printSchema()

root
 |-- STAID: long (nullable = true)
 |-- SOUID: long (nullable = true)
 |-- DATE: long (nullable = true)
 |-- RR: long (nullable = true)
 |-- Q_RR: long (nullable = true)



Nun werden noch die unnötigen Spalten entfernt: es stehen nun nur noch die interessierenden Werte Station, Datum und Regenmenge zur Verfügung:

In [27]:
sparkDF = sparkDF.drop("STAID", "Q_RR")
sparkDF.show()

+------+--------+---+
| SOUID|    DATE| RR|
+------+--------+---+
|234717|19360102|  1|
|234717|19360103|  3|
|234717|19360104| 44|
|234717|19360105| 11|
|234717|19360106|  0|
|234717|19360107|  0|
|234717|19360108|  0|
|234717|19360109|  1|
|234717|19360110| 53|
|234717|19360111| 51|
|234717|19360112| 13|
|234717|19360113|  0|
|234717|19360114|  0|
|234717|19360115|  0|
|234717|19360116|  0|
|234717|19360117|  0|
|234717|19360118|  0|
|234717|19360119|  0|
|234717|19360120|  0|
|234717|19360121|  0|
+------+--------+---+
only showing top 20 rows



In [28]:
sparkDF.printSchema()

root
 |-- SOUID: long (nullable = true)
 |-- DATE: long (nullable = true)
 |-- RR: long (nullable = true)



Erneut speichern wir das File für die Regendaten lokal im hdf5-Format. 

In [29]:
rr = sparkDF.toPandas()
rr.to_hdf('../data/ecad/RR.h5', key='all', mode='w')

## Temperaturdaten

### Datenmanagement

Im ersten Schritt wurden die mittleren Temperaturdaten/Tage der Stationen Krems, Poysdorf-Ost, Neusiedl/See sowie Güssing heruntergeladen.

In [2]:
path = '../data/ecad/'
all_files = glob.glob(path + 'TG_*.txt')
all_files


['../data/ecad/TG_SOUID234722.txt',
 '../data/ecad/TG_SOUID234943.txt',
 '../data/ecad/TG_SOUID235008.txt',
 '../data/ecad/TG_SOUID236256.txt']

Speichern aller Daten in einem Panda-Dataframe.

In [3]:
li = []

for filename in all_files:
    df = pd.read_csv(filename
                     , index_col=None
                     , header=0
                     , sep=","
                   , skiprows=18)
    li.append(df)

data = pd.concat(li, axis=0, ignore_index=True)
data

Unnamed: 0,STAID,SOUID,DATE,TG,Q_TG
0,24702,234722,19360101,38,0
1,24702,234722,19360102,33,0
2,24702,234722,19360103,34,0
3,24702,234722,19360104,51,0
4,24702,234722,19360105,36,0
...,...,...,...,...,...
110843,24820,236256,20220426,-9999,9
110844,24820,236256,20220427,-9999,9
110845,24820,236256,20220428,-9999,9
110846,24820,236256,20220429,-9999,9


Einlesen der Panda-Dataframes mit Spark:

In [34]:
spark = SparkSession \
    .builder \
    .appName("Read Data TG") \
    .getOrCreate()
#Create PySpark DataFrame from Pandas
sparkDF=spark.createDataFrame(data)
sparkDF.printSchema()
sparkDF.show()

root
 |--  STAID: long (nullable = true)
 |--     SOUID: long (nullable = true)
 |--     DATE: long (nullable = true)
 |--    TG: long (nullable = true)
 |--  Q_TG: long (nullable = true)

+------+---------+--------+-----+-----+
| STAID|    SOUID|    DATE|   TG| Q_TG|
+------+---------+--------+-----+-----+
| 24702|   234722|19360101|   38|    0|
| 24702|   234722|19360102|   33|    0|
| 24702|   234722|19360103|   34|    0|
| 24702|   234722|19360104|   51|    0|
| 24702|   234722|19360105|   36|    0|
| 24702|   234722|19360106|    9|    0|
| 24702|   234722|19360107|   -3|    0|
| 24702|   234722|19360108|   30|    0|
| 24702|   234722|19360109|    2|    0|
| 24702|   234722|19360110|   56|    0|
| 24702|   234722|19360111|  102|    0|
| 24702|   234722|19360112|   56|    0|
| 24702|   234722|19360113|   17|    0|
| 24702|   234722|19360114|   17|    0|
| 24702|   234722|19360115|    2|    0|
| 24702|   234722|19360116|  -15|    0|
| 24702|   234722|19360117|   -4|    0|
| 24702|   

### Bereinigung

In [35]:
sparkDF.schema.names

[' STAID', '    SOUID', '    DATE', '   TG', ' Q_TG']

Bei Ansicht des Schemas sieht man, dass die Spalten mit Leerzeichen beginnen, deshalb speichern wir diese um:

In [36]:
column_names=['STAID', 'SOUID', 'DATE', 'TG', 'Q_TG']
sparkDF = sparkDF.toDF(*column_names)
sparkDF.schema.names

['STAID', 'SOUID', 'DATE', 'TG', 'Q_TG']

Ausgabe der Anzahl der Datensätze / Station:

In [37]:
sparkDF.groupby('SOUID').count().show()

+------+-----+
| SOUID|count|
+------+-----+
|234722|31532|
|234943|31532|
|235008|20635|
|236256|27149|
+------+-----+



Da in den Metadaten die Info zu finden war, dass Datensätze in der Spalte "Q_TG" mit den Werten 1 oder 9 ungültige Daten enthalten, entfernen wir diese Zeilen:

In [38]:
sparkDF = sparkDF.filter(col("Q_TG") == '0')

+------+-----+
| SOUID|count|
+------+-----+
|234722|30621|
|234943|29673|
|235008|20511|
|236256|17655|
+------+-----+



Man sieht nun, dass einige Datensätze entfernt wurden, vor allem bei der Station 236256 Güssing):

In [None]:
sparkDF.groupby('SOUID').count().show()

In [40]:
sparkDF.printSchema()

root
 |-- STAID: long (nullable = true)
 |-- SOUID: long (nullable = true)
 |-- DATE: long (nullable = true)
 |-- TG: long (nullable = true)
 |-- Q_TG: long (nullable = true)



Nun werden noch die unnötigen Spalten entfernt und die Temperatur auf Grad skaliert: es stehen nun nur noch die interessierenden Werte Station, Datum und mittlere Temperatur zur Verfügung:

In [41]:
sparkDF = sparkDF.drop("STAID", "Q_TG")
sparkDF = sparkDF.withColumn("TG", round(sparkDF.TG / 10, 2))
sparkDF.printSchema()

root
 |-- SOUID: long (nullable = true)
 |-- DATE: long (nullable = true)
 |-- TG: double (nullable = true)



Erneut speichern wir das File für die Temperaturdatenn lokal im hdf5-Format. 

In [42]:
tg = sparkDF.toPandas()
tg.to_hdf('../data/ecad/TG.h5', key='all', mode='w')