# Datenvorbereitung

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, FloatType, DateType

# SparkSession ist bereits in deinem Jupyter Notebook initialisiert
df = spark.read.format("csv").option("header", "true").load("gs://dscb420_bucket/data4SuperDuperTableOfDoomV2.csv")

# Umwandlung der Spalten in entsprechende numerische Typen
df = df.withColumn("date", col("date").cast(DateType())) \
       .withColumn("ankuenfte_anzahl", col("ankuenfte_anzahl").cast(IntegerType())) \
       .withColumn("uebernachtungen_anzahl", col("uebernachtungen_anzahl").cast(IntegerType())) \
       .withColumn("campingplaetze_anzahl", col("campingplaetze_anzahl").cast(IntegerType())) \
       .withColumn("urlaubs_campingplaetze_anzahl", col("urlaubs_campingplaetze_anzahl").cast(IntegerType())) \
       .withColumn("urlaubs_campingplaetze_offen", col("urlaubs_campingplaetze_offen").cast(IntegerType())) \
       .withColumn("urlaubs_stellplaetze_anzahl", col("urlaubs_stellplaetze_anzahl").cast(IntegerType())) \
       .withColumn("urlaubs_stellplaetze_offen", col("urlaubs_stellplaetze_offen").cast(IntegerType()))

# Konvertierung von Float-Spalten
float_columns = [
    'ankuenfte_veraenderung_zum_vorjahreszeitraum_prozent',
    'uebernachtungen_veraenderung_zum_vorjahreszeitraum_prozent',
    'durchsch_aufenthaltsdauer_tage',
    'change_urlaubs_stellplaetze_offen_vorjahresmonat',
    'anteil_urlaubs_stellplaetze_offen_an_urlaubs_stellplaetze_anzah',
    'mean_air_temp_max', 'mean_air_temp_mean', 'mean_air_temp_min', 
    'mean_drought_index', 'mean_evapo_p', 'mean_evapo_r', 'mean_frost_depth', 
    'mean_precipitation', 'mean_soil_moist', 'mean_soil_temperature_5cm', 
    'mean_sunshine_duration', 'std_air_temp_max', 'std_air_temp_mean', 
    'std_air_temp_min', 'std_drought_index', 'std_evapo_p', 'std_evapo_r', 
    'std_frost_depth', 'std_precipitation', 'std_soil_moist', 
    'std_soil_temperature_5cm', 'std_sunshine_duration'
]

for column in float_columns:
    df = df.withColumn(column, col(column).cast(FloatType()))

# Entfernen aller Zeilen mit Nullwerten
cleaned_df = df.na.drop()

# One-Hot-Encoding für "land"
indexer = StringIndexer(inputCol="land", outputCol="land_indexed")
encoded_df = indexer.fit(cleaned_df).transform(cleaned_df)
encoder = OneHotEncoder(inputCols=["land_indexed"], outputCols=["land_encoded"])
encoded_df = encoder.fit(encoded_df).transform(encoded_df)

# Auswahl der numerischen Features und der kategorialen Features nach Encoding
numeric_features = ['durchsch_aufenthaltsdauer_tage', 'mean_air_temp_mean', 'mean_drought_index',
                    'mean_evapo_p', 'mean_evapo_r', 'mean_frost_depth', 'mean_precipitation', 
                    'mean_soil_moist', 'mean_soil_temperature_5cm', 'mean_sunshine_duration',
                    'urlaubs_campingplaetze_offen', 'urlaubs_stellplaetze_offen']

# Assembler für alle Features inklusive der one-hot-kodierten
assembler = VectorAssembler(inputCols=numeric_features + ["land_encoded"], outputCol="assembled_features")
assembled_df = assembler.transform(encoded_df)

# Anwenden des StandardScalers
scaler = StandardScaler(inputCol="assembled_features", outputCol="features", withStd=True, withMean=True)
scaled_df = scaler.fit(assembled_df).transform(assembled_df)

train_data, test_data = scaled_df.randomSplit([0.8, 0.2], seed=42)

24/07/01 12:23:06 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

# Simples Lineares Regressionsmodell

In [2]:
# Training des linearen Regressionsmodells
lr = LinearRegression(featuresCol="features", labelCol="ankuenfte_anzahl", regParam=0.1)
model = lr.fit(train_data)
predictions = model.transform(test_data)

# Vorhersagen und Bewertung
predictions.select("features", "ankuenfte_anzahl", "prediction").show()

[Stage 9:>                                                          (0 + 1) / 1]

+--------------------+----------------+------------------+
|            features|ankuenfte_anzahl|        prediction|
+--------------------+----------------+------------------+
|[0.10171832282235...|           14672|51107.526405051845|
|[0.58646406012829...|          193660| 133934.6176779811|
|[0.82883712140194...|           29347|  68618.9428446633|
|[0.58646406012829...|           10284| 41081.70389784705|
|[0.66725495213905...|           60341| 88552.97596692781|
|[1.15200107468636...|            4698|10611.749021976808|
|[-0.2214456304620...|          171536|122275.72334380273|
|[1.63674681199230...|            4858|26680.280190481313|
|[1.31358285870788...|            5176|25466.336978484505|
|[0.50567316811753...|            4471|15701.574214047487|
|[0.58646406012829...|            6451| 22609.25398642103|
|[0.02092743081159...|           61950| 97709.79456093952|
|[-0.1406545458306...|          186413|145173.43209580128|
|[1.23279196669712...|            7203| 49520.9981588583

                                                                                

In [3]:
from pyspark.ml.evaluation import RegressionEvaluator
# Evaluation des Modells
evaluator = RegressionEvaluator(labelCol="ankuenfte_anzahl", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) auf Testdaten = {rmse}")

r2_evaluator = RegressionEvaluator(labelCol="ankuenfte_anzahl", predictionCol="prediction", metricName="r2")
r2 = r2_evaluator.evaluate(predictions)
print(f"R2 auf Testdaten = {r2}")

[Stage 10:>                                                         (0 + 1) / 1]                                                                                

Root Mean Squared Error (RMSE) auf Testdaten = 41450.33198538461


[Stage 11:>                                                         (0 + 1) / 1]

R2 auf Testdaten = 0.5934726998951146


                                                                                

# Random Forest

In [None]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Beispiel mit Random Forest
rf = RandomForestRegressor(featuresCol="features", labelCol="ankuenfte_anzahl")

# Erstelle das Parametergitter
rf_paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20]) \
    .addGrid(rf.maxDepth, [5, 7]) \
    .build()

# Evaluator
evaluator = RegressionEvaluator(labelCol="ankuenfte_anzahl", predictionCol="prediction", metricName="rmse")

# Konfiguriere den TrainValidationSplit
rf_tvs = TrainValidationSplit(estimator=rf,
                              estimatorParamMaps=rf_paramGrid,
                              evaluator=evaluator,
                              trainRatio=0.8)  # 80% der Daten für Training, 20% für Validierung

# Trainiere das Modell
rf_tvsModel = rf_tvs.fit(train_data)
best_rf_model = rf_tvsModel.bestModel

# Vorhersagen treffen und Modell bewerten
rf_predictions = best_rf_model.transform(test_data)
rf_best_rmse = evaluator.evaluate(rf_predictions)
print(f"Best Random Forest RMSE: {rf_best_rmse}")

# Optional: Ausgabe weiterer Metriken wie R2
rf_r2 = evaluator.evaluate(rf_predictions, {evaluator.metricName: "r2"})
print(f"Random Forest R2: {gbt_r2}")

# Gradient Boosted Trees

In [None]:
# Beispiel mit Gradient Boosting Trees
gbt = GBTRegressor(featuresCol="features", labelCol="ankuenfte_anzahl")
gbt_paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [10, 20, 50]) \
    .addGrid(gbt.maxDepth, [5, 10, 20]) \
    .build()

gbt_tvs = TrainValidationSplit(estimator=gbt,
                               estimatorParamMaps=gbt_paramGrid,
                               evaluator=evaluator,
                               trainRatio=0.8)

# Trainiere das Modell
gbt_tvsModel = gbt_tvs.fit(train_data)
best_gbt_model = gbt_tvsModel.bestModel

# Vorhersagen treffen und Modell bewerten
gbt_predictions = best_gbt_model.transform(test_data)
gbt_best_rmse = evaluator.evaluate(gbt_predictions)
print(f"Best Gradient Boosted Trees RMSE: {gbt_best_rmse}")

# Bei Bedarf: Manuell nochmal beste Modelle trainieren:

In [4]:
from pyspark.ml.regression import GBTRegressor
# Gradient Boosting Trees Regressor
gbt = GBTRegressor(featuresCol="features", labelCol="ankuenfte_anzahl", maxIter=50, maxDepth=5)

# Trainieren des Gradient Boosting Trees Modells
gbt_model = gbt.fit(train_data)

# Vorhersagen treffen
gbt_predictions = gbt_model.transform(test_data)

# Evaluation
gbt_evaluator = RegressionEvaluator(labelCol="ankuenfte_anzahl", predictionCol="prediction", metricName="rmse")
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)
print(f"Gradient Boosted Trees RMSE: {gbt_rmse}")

# Optional: Ausgabe weiterer Metriken wie R2
gbt_r2 = gbt_evaluator.evaluate(gbt_predictions, {gbt_evaluator.metricName: "r2"})
print(f"Gradient Boosted Trees R2: {gbt_r2}")


                                                                                

Gradient Boosted Trees RMSE: 23061.83834447464
Gradient Boosted Trees R2: 0.8741591809480382


In [5]:
from pyspark.ml.regression import RandomForestRegressor
# Random Forest Regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="ankuenfte_anzahl")

# Trainieren des Random Forest Modells
rf_model = rf.fit(train_data)

# Vorhersagen treffen
rf_predictions = rf_model.transform(test_data)

# Evaluation
rf_evaluator = RegressionEvaluator(labelCol="ankuenfte_anzahl", predictionCol="prediction", metricName="rmse")
rf_rmse = rf_evaluator.evaluate(rf_predictions)
print(f"Random Forest RMSE: {rf_rmse}")

# Optional: Ausgabe weiterer Metriken wie R2
rf_r2 = rf_evaluator.evaluate(rf_predictions, {rf_evaluator.metricName: "r2"})
print(f"Random Forest R2: {rf_r2}")


Random Forest RMSE: 35138.448729572905
Random Forest R2: 0.7078547305490359


# Modelle in das Bucket speichern

In [7]:
model_path = "gs://dscb420_bucket/lr_model"

model.write().overwrite().save(model_path)

                                                                                

In [8]:
rf_model_path = "gs://dscb420_bucket/rf_model"

# Modell im GCS Bucket speichern
rf_model.write().overwrite().save(rf_model_path)

                                                                                

In [9]:
gbt_model_path = "gs://dscb420_bucket/gbt_model"

gbt_model.write().overwrite().save(gbt_model_path)

                                                                                