In [1]:
import os
os.environ['SPARK_HOME'] = "/home/mario/spark-3.3.1-bin-hadoop3"

In [2]:
from datetime import datetime
from pyspark.sql.functions import col, udf, to_timestamp, year, month, concat_ws
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType, DateType, StringType, FloatType

In [3]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("myApp").setMaster("spark://192.168.144.80:7077").set('spark.executor.memory', '8g').set('spark.driver.memory', '8g')
sc = SparkContext(conf=conf)

23/02/03 09:07:10 WARN Utils: Your hostname, mario-hpprobook450g5 resolves to a loopback address: 127.0.1.1; using 192.168.144.80 instead (on interface wlp3s0)
23/02/03 09:07:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/03 09:07:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)

In [5]:
sc.setJobGroup("job group id", "read all files from HDFS and print the schema")
folder_Temperature = 'hdfs://192.168.144.80:9000/user/mario/input_files/temperatur_only_10_stations/'
folder_Solar = 'hdfs://192.168.144.80:9000/user/mario/input_files/solar_only_10_stations/'

#Einlesen der Daten in einen DataFrame

#Lufttemperatur
#Daniel
df_Temperatur = spark.read.option("header", "true").option("delimiter", ";").option("inferSchema", "false").csv(folder_Temperature+'*.txt')
#Sonnenscheindauer
#Daniel
df_Solar = spark.read.option("header", "true").option("delimiter", ";").option("inferSchema", "false").csv(folder_Solar+'*.txt')

                                                                                

In [6]:
#df_Temperatur.count()

In [7]:
#df_Solar.count()

Nächste Schritte: 
1. Datenstruktur verstehen 
2. Daten bearbeiten und in einen finalen DF schreiben 
-> Zusammenfassen der Daten auf eine Station und einen TimeStamp

In [8]:
df_Temperatur.printSchema()

root
 |-- STATIONS_ID: string (nullable = true)
 |-- MESS_DATUM: string (nullable = true)
 |--   QN: string (nullable = true)
 |-- PP_10: string (nullable = true)
 |-- TT_10: string (nullable = true)
 |-- TM5_10: string (nullable = true)
 |-- RF_10: string (nullable = true)
 |-- TD_10: string (nullable = true)
 |-- eor: string (nullable = true)



Aus diesem Datensatz benötigen wir die Spalten StationsID und MESS_Datum (Zeitstempel) TT_10 (Luftemperatur in 2m Höhe) und TM5_10 (Luftemperatur in 2cm Höhe) und PP_10(Luftdruck)
Ändern der Datentypen von TT_10, TM5_10 und PP_10 in float, MESS_Datum in Timestamp ändern

In [9]:
#Bearbeiten der Struktur in Gewünschtes Format 
#Datentypen bearbeiten 
sc.setJobGroup("job group id", "select only columns that we need, change column type, rename columns")
df_Temperatur = df_Temperatur\
    .withColumn('STATIONS_ID',df_Temperatur.STATIONS_ID.cast(IntegerType()))\
    .withColumn('TM5_10',df_Temperatur.TM5_10.cast(FloatType()))\
    .withColumn('PP_10',df_Temperatur.PP_10.cast(FloatType()))\
    .withColumn('TT_10',df_Temperatur.TT_10.cast(FloatType()))\
    .withColumn("MESS_DATUM",to_timestamp("MESS_DATUM", "yyyyMMddHHmm"))

#Spalten umbennen
df_Temperatur = df_Temperatur\
    .withColumnRenamed('TT_10','Temperatur_2m')\
    .withColumnRenamed('TM5_10','Temperatur_5cm')\
    .withColumnRenamed('PP_10','Luftdruck')

#Daten droppen
df_Temperatur = df_Temperatur.drop('  QN','RF_10','TD_10','eor')

sc.setJobGroup("job group id", "remove -999 values")
df_Temperatur = df_Temperatur.filter((df_Temperatur.Luftdruck != -999.0))
df_Temperatur = df_Temperatur.filter((df_Temperatur.Temperatur_2m != -999.0))
df_Temperatur = df_Temperatur.filter((df_Temperatur.Temperatur_5cm != -999.0))
df_Temperatur.show(3,False)

+-----------+-------------------+---------+-------------+--------------+
|STATIONS_ID|MESS_DATUM         |Luftdruck|Temperatur_2m|Temperatur_5cm|
+-----------+-------------------+---------+-------------+--------------+
|1766       |2010-01-01 00:00:00|989.7    |-1.3         |-1.8          |
|1766       |2010-01-01 00:10:00|989.7    |-1.3         |-1.8          |
|1766       |2010-01-01 00:20:00|989.8    |-1.3         |-1.8          |
+-----------+-------------------+---------+-------------+--------------+
only showing top 3 rows



In [10]:
#df_Temperatur.count()

In [11]:
df_Solar.printSchema()

root
 |-- STATIONS_ID: string (nullable = true)
 |-- MESS_DATUM: string (nullable = true)
 |--   QN: string (nullable = true)
 |-- DS_10: string (nullable = true)
 |-- GS_10: string (nullable = true)
 |-- SD_10: string (nullable = true)
 |-- LS_10: string (nullable = true)
 |-- eor: string (nullable = true)



Aus diesem Datensatz benötigen wir die Spalten StationsID und MESS_Datum (Zeitstempel), SD_10 (10 min-Summe der Sonnenscheindauer)
Ändern der Datentypen von SD_10 in float

In [12]:
#Datentypen bearbeiten
sc.setJobGroup("job group id", "select only columns that we need, change column type, rename columns")
df_Solar = df_Solar\
    .withColumn('SD_10',df_Solar.SD_10.cast(FloatType()))\
    .withColumn("MESS_DATUM",to_timestamp("MESS_DATUM", "yyyyMMddHHmm"))
#Daten droppen 
df_Solar = df_Solar.drop('  QN','DS_10','GS_10','LS_10','eor')
#Spalte umbennen
df_Solar = df_Solar.withColumnRenamed('SD_10','Sonnenscheindauer')
df_Solar = df_Solar.filter((df_Solar.Sonnenscheindauer != -999.0))
#df_Solar.show(3,False)

In [13]:
#df_Solar.count()

Nachdem die beiden Datensätze in die passende Struktur gebracht wurden, können Sie mit einem Join verbunden werden. 
Welchen Join? 
Da für die Auswertung des Spruches, für jede Station und jeden TimeStamp Daten von Solar und Temperature benötigt werden, müssen diese auch von beiden vorhanden sein. 
Daher wird der Inner Join gewählt. 

In [14]:
#Join der DF
sc.setJobGroup("job group id", "join solar and temperature")
df_Final = df_Temperatur.join(df_Solar, (df_Temperatur.STATIONS_ID == df_Solar.STATIONS_ID) & (df_Temperatur.MESS_DATUM == df_Solar.MESS_DATUM),"inner").select(df_Temperatur['*'],df_Solar.Sonnenscheindauer)

In [15]:
df_Final.printSchema()

root
 |-- STATIONS_ID: integer (nullable = true)
 |-- MESS_DATUM: timestamp (nullable = true)
 |-- Luftdruck: float (nullable = true)
 |-- Temperatur_2m: float (nullable = true)
 |-- Temperatur_5cm: float (nullable = true)
 |-- Sonnenscheindauer: float (nullable = true)



In [16]:
#df_Final.show(3,False)

In [17]:
#df_Final.count()

Auf der Basis dieser Datentabelle kann nun die Auswertung zur Bauernregel erfolgen. 
Diese Tabelle kann natürlich noch um weitere Wetterdaten erweitert werden, damit andere Bauernregeln oder andere Wettersimulationen durchgeführt werden können.

Bauernregel 1:

"Ist der Januar hell und weiß, wird der Sommer sicher heiß"
1. DF anlegen, welcher alle Kombinationen von Station ID und Jahre besitzt
2. Daten für Januar und Sommer erstellen
3. Auswertung

In [18]:
#DF aus dem df_Final erzeugen welcher alle Kombinationen der StationsIDs, Monate und Jahre enthält
#Neue Spalte Jahr hinzufügen
df = df_Final
df  = df.withColumn("Jahr", year("MESS_DATUM"))
df  = df.withColumn("Monat", month("MESS_DATUM"))
df.printSchema()

root
 |-- STATIONS_ID: integer (nullable = true)
 |-- MESS_DATUM: timestamp (nullable = true)
 |-- Luftdruck: float (nullable = true)
 |-- Temperatur_2m: float (nullable = true)
 |-- Temperatur_5cm: float (nullable = true)
 |-- Sonnenscheindauer: float (nullable = true)
 |-- Jahr: integer (nullable = true)
 |-- Monat: integer (nullable = true)



In [19]:
sc.setJobGroup("job group id", "merge 2 columns into 1")
df_Kombinationen =  df.withColumn("ID",concat_ws('','Jahr','STATIONS_ID'))
df_Kombinationen = df_Kombinationen.select("ID",'STATIONS_ID','Jahr')
df_Kombinationen = df_Kombinationen.dropDuplicates(["ID"])
df_Kombinationen.printSchema()

root
 |-- ID: string (nullable = false)
 |-- STATIONS_ID: integer (nullable = true)
 |-- Jahr: integer (nullable = true)



In [20]:
#df_Kombinationen.show(3,False)

Pro Jahr sind nur die Temperaturen von 21. Juni bis 23. September und die Sonnenscheindauer im Januar wichtig 

In [21]:
#Zu DF df_Kombinationen eine neue Spalte hinzufügen, welche die Summe an Sonnenstunden pro StationsID und Jahr enthält
sc.setJobGroup("job group id", "filter,groupBy,mean")
df_Final_Solar_Grouped = df.filter((df.Monat == 12)).groupBy('STATIONS_ID','Jahr','Monat').mean('Sonnenscheindauer')
df_Final_Solar_Grouped = df_Final_Solar_Grouped.withColumnRenamed('avg(Sonnenscheindauer)','Sonnenscheindauer')
sc.setJobGroup("job group id", "join 2 dfs")
df_Kombinationen = df_Kombinationen.join(df_Final_Solar_Grouped,(df_Kombinationen.STATIONS_ID == df_Final_Solar_Grouped.STATIONS_ID)&(df_Kombinationen.Jahr == df_Final_Solar_Grouped.Jahr )).select(df_Kombinationen['*'],df_Final_Solar_Grouped.Sonnenscheindauer)

In [22]:
sc.setJobGroup("job group id", "filter, groupBy, mean")
df_Final_Temperatur_Grouped = df.filter((df.Monat == 7)|(df.Monat == 8)).groupBy('STATIONS_ID','Jahr').mean('Temperatur_2m')
df_Final_Temperatur_Grouped = df_Final_Temperatur_Grouped.withColumnRenamed('avg(Temperatur_2m)','Temperatur_2m')
sc.setJobGroup("job group id", "join 2 dfs")
df_Kombinationen = df_Kombinationen.join(df_Final_Temperatur_Grouped,(df_Kombinationen.STATIONS_ID == df_Final_Temperatur_Grouped.STATIONS_ID)&(df_Kombinationen.Jahr == df_Final_Temperatur_Grouped.Jahr )).select(df_Kombinationen['*'],df_Final_Temperatur_Grouped.Temperatur_2m)
df_Kombinationen = df_Kombinationen.sort('Jahr')
df_Kombinationen = df_Kombinationen.sort('STATIONS_ID')

In [23]:
df_Kombinationen.printSchema()

root
 |-- ID: string (nullable = false)
 |-- STATIONS_ID: integer (nullable = true)
 |-- Jahr: integer (nullable = true)
 |-- Sonnenscheindauer: double (nullable = true)
 |-- Temperatur_2m: double (nullable = true)



In [24]:
#df_Kombinationen.show(3,False)

In [25]:
#Korrelation zwischen Sonnenschein und Durschsnittstemperatur berechnen
sc.setJobGroup("job group id", "calculate correlation")
corr_Spruch = df_Kombinationen.stat.corr('Sonnenscheindauer','Temperatur_2m')

[Stage 7:==>(17 + 8) / 25][Stage 8:==>(21 + 3) / 24][Stage 9:>   (0 + 9) / 25]4]

23/02/03 09:07:50 ERROR TaskSchedulerImpl: Lost executor 1 on 192.168.144.80: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/02/03 09:07:50 WARN TaskSetManager: Lost task 11.0 in stage 7.0 (TID 158) (192.168.144.80 executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/02/03 09:07:50 WARN TaskSetManager: Lost task 13.0 in stage 7.0 (TID 160) (192.168.144.80 executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/02/03 09:07:50 WARN TaskSetManager: Lost task 7.0 in stage 7.0 (TID 154) (192.168.144.80 executor 1): ExecutorLostFailure (executor 1

[Stage 9:==>(17 + 0) / 25][Stage 10:=>(20 + 4) / 24][Stage 11:>  (0 + 8) / 25]

23/02/03 09:08:03 ERROR TaskSchedulerImpl: Lost executor 2 on 192.168.144.80: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/02/03 09:08:03 WARN TaskSetManager: Lost task 22.0 in stage 9.0 (TID 226) (192.168.144.80 executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/02/03 09:08:03 WARN TaskSetManager: Lost task 16.0 in stage 9.0 (TID 220) (192.168.144.80 executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/02/03 09:08:03 WARN TaskSetManager: Lost task 19.0 in stage 9.0 (TID 223) (192.168.144.80 executor 2): ExecutorLostFailure (executor 

                                                                                

In [26]:
print(corr_Spruch)

0.07188381758199237


Bauernregel 2:

„Wenn's im Dezember nicht wintert, sommert's im Juni auch nicht.“

Hierbei handelt es sich im Prinzip um die entgegengesetzte Regel zu der vorherigen:
Im Juni wird es nicht warm, wenn im Dezember kein richtiger Winter war.
Es stimmt, dass die These zu 65 Prozent eintrifft. Andersrum, wenn es also im Juni nicht „sommert“, wird es im Dezember nicht „wintern“,
stimmt die Bauernregel aber nicht. Warum das so ist, weiß keiner so genau.


In [27]:
# we only need months June and Decemeber
sc.setJobGroup("job group id", "filter only june and december")
df = df_Final
df = df.filter((month("MESS_DATUM") == 6) | (month("MESS_DATUM") == 12))
#df.show()

In [28]:
sc.setJobGroup("job group id", "convert timestamp into month and year")
df = df.select("STATIONS_ID", year("MESS_DATUM").alias("year"),month("MESS_DATUM").alias("month"), "Temperatur_2m")
#df.show()

In [29]:
# calculate average temperature for each year+month+stations_id
sc.setJobGroup("job group id", "calculate average temperature for each year+month+stations_id")
df = df.groupBy("year", "month", "STATIONS_ID").avg("Temperatur_2m")
#df.show()

In [30]:
sc.setJobGroup("job group id", "new dataframe with only december")
df2 = df.filter(df.month == 12)

In [31]:
#df2.show()

In [32]:
from pyspark.sql.functions import lit

sc.setJobGroup("job group id", "adding additional month and year columns")
df2 = df2.withColumnRenamed('avg(Temperatur_2m)', 'temperature_december')
df2 = df2.withColumnRenamed('year', 'year_december')
df2 = df2.withColumnRenamed('month', 'month_december')
df2 = df2.withColumn('year_june', df2.year_december + 1)
df2 = df2.withColumn('month_june', lit(6))

In [33]:
#df2.show()

In [34]:
sc.setJobGroup("job group id", "join two dataframes")
df2 = df2.join(df, (df2.year_june == df.year) & (df2.month_june == df.month) & (df2.STATIONS_ID == df.STATIONS_ID)).select(df2["*"], df["avg(Temperatur_2m)"])
df2 = df2.withColumnRenamed('avg(Temperatur_2m)', 'temperature_june')

In [35]:
#df2.show()

In [36]:
sc.setJobGroup("job group id", "calculate correlation")
df2.corr("temperature_december", "temperature_june", method="pearson")

[Stage 52:=> (5 + 8) / 13][Stage 53:>  (0 + 4) / 13][Stage 56:>  (0 + 0) / 12]3]

23/02/03 09:08:54 WARN StandaloneAppClient$ClientEndpoint: Connection to 192.168.144.80:7077 failed; waiting for master to reconnect...
23/02/03 09:08:54 WARN StandaloneSchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...
23/02/03 09:08:54 WARN StandaloneAppClient$ClientEndpoint: Connection to 192.168.144.80:7077 failed; waiting for master to reconnect...


                                                                                

0.2159190300045076

In [37]:
#df2.coalesce(1).write.mode("overwrite").option("header", "true").csv("/hdfs://192.168.199.80:9000/user/mario/output_files/10minutenwerte_only_100_result")