Importieren, aller benötigten Packages und einlesen der Trainings-, Test- und Überprüfungsdaten

In [None]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import pandas as pd
import numpy as np

datensatz = pd.read_csv("./foot-traffic-wue/train.csv")


Trainingsdaten nach Fehlern und unrelevanten Daten bereinigen

In [None]:
datensatz_f = datensatz[
    ~datensatz['incidents'].isin(['laser_failure'])
    ]

Markierung der Feiertage im Datensatz

In [None]:
# Liste der Feiertage
feiertage = [
    '2019-04-19', '2019-04-22', '2019-05-01', '2019-05-30', '2019-06-10', '2019-06-20', '2019-08-15',
    '2019-10-03', '2019-11-01', '2019-12-25', '2019-12-26', '2020-01-01', '2020-01-06', '2020-04-10',
    '2020-04-13', '2020-05-01', '2020-05-21', '2020-06-01', '2020-06-11', '2020-08-15', '2020-10-03',
    '2020-11-01', '2020-12-25', '2020-12-26', '2021-01-01', '2021-01-06', '2021-04-02', '2021-04-05',
    '2021-05-01', '2021-05-13', '2021-05-24', '2021-06-03', '2021-08-15', '2021-10-03', '2021-11-01',
    '2021-12-25', '2021-12-26', '2022-01-01', '2022-01-06', '2022-04-15', '2022-04-18', '2022-05-01',
    '2022-05-26', '2022-06-06', '2022-06-16', '2022-08-15', '2022-10-03', '2022-11-01', '2022-12-25',
    '2022-12-26', '2023-01-01', '2023-01-06', '2023-04-07', '2023-04-10', '2023-05-01', '2023-05-18',
    '2023-05-29', '2023-06-08', '2023-08-15', '2023-10-03', '2023-11-01', '2023-12-25', '2023-12-26',
    '2024-01-01', '2024-01-06', '2024-03-29', '2024-04-01', '2024-05-01', '2024-05-09', '2024-05-20',
    '2024-05-30', '2024-08-15'
]

# Konvertiere die Feiertage zu einem Set für eine schnellere Suche
feiertage_set = set(feiertage)

# Erstelle die neue Spalte "is_feiertag"
datensatz_f['is_feiertag'] = datensatz_f['date'].apply(lambda x: 1 if x in feiertage_set else 0)


Aufteilung der Spalte date in year, month und day 

In [None]:
# Konvertiere die Spalte 'date' in ein Datetime-Format
datensatz_f['date'] = pd.to_datetime(datensatz_f['date'])

# Zerlege die Spalte in 'year', 'month' und 'day'
datensatz_f['year'] = datensatz_f['date'].dt.year
datensatz_f['month'] = datensatz_f['date'].dt.month
datensatz_f['day'] = datensatz_f['date'].dt.day


Zusammenführen der Wetterbedingungen, welche in Tag und Nacht unterteilt sind 

In [None]:
# Werte zusammenführen
datensatz_f['weather_condition'] = datensatz_f['weather_condition'].replace({
    'partly-cloudy-day': 'partly-cloudy',
    'partly-cloudy-night': 'partly-cloudy',
    'clear-day': 'clear',
    'clear-night': 'clear'
})

Aufstellen des Modells mithilfe eines neuronalen Netzes (Standardkategorien)

In [None]:
from typing import List
import pandas as pd
import numpy as np
from sklearn.model_selection import KFold, GridSearchCV
from sklearn.preprocessing import StandardScaler, OrdinalEncoder, OneHotEncoder
from scikeras.wrappers import KerasRegressor
import keras
from keras import layers
from keras import initializers

# Eingangs- und Zielvariablen definieren
feature_columns = ["streetname", "hour", "weekday", "incidents", "weather_condition", "temperature", "year", "month", "day"]
target_columns = ["n_pedestrians", "n_pedestrians_towards", "n_pedestrians_away"]


# Encoder anlegen und an Daten anpassen
def fit_preprocess(X: pd.DataFrame, ordinal_features: List[str] | None, onehot_features: List[str] | None, numerical_features: List[str] | None):
	X = X.copy()

	ordinal_encoder = None
	if ordinal_features is not None and ordinal_features:
		ordinal_encoder = OrdinalEncoder()
		X[ordinal_features] = ordinal_encoder.fit_transform(X[ordinal_features])

	onehot_encoder = None
	if onehot_features is not None and onehot_features:
		onehot_encoder = OneHotEncoder(sparse_output=False)
		encoded = onehot_encoder.fit_transform(X[onehot_features])
		encoded_onehot_columns = onehot_encoder.get_feature_names_out()
		X[encoded_onehot_columns] = pd.DataFrame(encoded, columns=encoded_onehot_columns)

	numerical_encoder = None
	if numerical_features is not None and numerical_features:
		numerical_encoder = StandardScaler()
		X[numerical_features] = numerical_encoder.fit_transform(X[numerical_features])

	return ordinal_encoder, onehot_encoder, numerical_encoder


# Codiert die Messkategorien
def preprocess(X: pd.DataFrame, 
			   ordinal_features, ordinal_encoder: OrdinalEncoder, 
			   onehot_features, onehot_encoder: OneHotEncoder, 
			   numerical_features: List[str] | None, numerical_encoder: StandardScaler):
	X = X.copy()
	features = []

	if ordinal_encoder is not None:
		X[ordinal_features] = ordinal_encoder.transform(X[ordinal_features])
		features.extend(ordinal_features)

	if onehot_encoder is not None:
		encoded = onehot_encoder.transform(X[onehot_features])
		encoded_onehot_columns = onehot_encoder.get_feature_names_out()
		X[encoded_onehot_columns] = pd.DataFrame(encoded, columns=encoded_onehot_columns)
		features.extend(encoded_onehot_columns)

	if numerical_features is not None:
		X[numerical_features] = numerical_encoder.transform(X[numerical_features])
		features.extend(numerical_features)

	return X[features]


# Kategorische und numerische Features den Codierungsverfahren zuordnen
numerical_features = ["year", "temperature"]
ordinal_features = ["weather_condition", "incidents"]
onehot_features = ["streetname", "weekday", "month", "hour",  "day"]


# Aufstellen der Encoder für den Datensatz
ordinal_encoder, onehot_encoder, numerical_encoder = fit_preprocess(datensatz_f, ordinal_features=ordinal_features,
																	 onehot_features=onehot_features,
																	 numerical_features=numerical_features)


# Daten vorverarbeiten (codieren)
X = preprocess(datensatz_f[feature_columns],
					ordinal_features=ordinal_features,
					ordinal_encoder=ordinal_encoder,
					onehot_features=onehot_features,
					onehot_encoder=onehot_encoder,
					numerical_features=numerical_features,
					numerical_encoder=numerical_encoder)
y = datensatz_f[target_columns]



# Neuronales Netzwerk definieren
def build_NN(X, y, n_nodes, n_hidden_layers, dropout):
	# Input Layer anlegen
	input = layers.Input(shape=(X.shape[1],), name="input")

	# Hidden Layer anlegen mit möglichem Dropout
	tmp = input
	for i in range(n_hidden_layers):
		tmp = layers.Dense(n_nodes, activation="relu", name=f"hidden_{i}")(tmp)
		if dropout is not None:
			tmp = layers.Dropout(dropout, name=f"dropout_{i}")(tmp)

	# temporäre Ausgabelayer des Personenstroms (n_pedestrians_towards & n_pedestrians_away)
	tmp = layers.Dense(2, activation="relu", name="towards_away")(tmp)

	# Summieren von n_pedestrians_towards & n_pedestrians_away, um n_pedestrians zu berechnen
	sum = layers.Dense(1, activation="relu", name="sum", kernel_initializer=initializers.Constant(1.0), bias_initializer=initializers.Zeros(), trainable=False)(tmp)
	
	# Aneinanderketten von n_pedestrians, n_pedestrians_towards und n_pedestrians_away (Output Layer)
	concat = layers.Concatenate(name="output")([sum, tmp])

	# Modell anlegen und kompilieren
	model = keras.Model(inputs=input, outputs=concat)
	model.compile(optimizer="adam", loss="mean_squared_error")
	return model

# Anlegen einer 10-fachen Cross Validation
kf = KFold(n_splits=10, shuffle=True)

# Funktion zum Erstellen des Modells innerhalb der Cross Validation
def model_build_fn(n_nodes, n_hidden_layers, dropout):
	return build_NN(X=X, y=y, n_nodes=n_nodes,
					n_hidden_layers=n_hidden_layers, dropout=dropout)



# Aufstellen des HPT mit entsprechenden Wertebereichen
param_grid = {
	"model__n_nodes": [300, 400],
	"model__n_hidden_layers": [2, 3, 4],
	"model__dropout": [None, 0.2]
}

# Erstellen des GridSearchs
modelCV = KerasRegressor(model=model_build_fn, batch_size=100, epochs=100, verbose=0)
gs = GridSearchCV(estimator=modelCV, param_grid=param_grid, n_jobs=32, cv=kf, scoring="neg_mean_squared_error")
gs_result = gs.fit(X=X, y=y, verbose=1)


# Ausgabe der besten Parameter und den dazugehörigen Fehlerwerten
print("Best parameters set found on validation set:")
print()
print(gs.best_params_)
print()
print("Score:")
print()
print(f"mse: {abs(gs.best_score_)}")
print(f"rmse: {np.sqrt(abs(gs.best_score_))}")






Abbildung der Vorhersagegenauigkeit für eine Woche

In [None]:
import matplotlib.pyplot as plt

def score(model, X, y):
	y_pred = model.predict(X)
	score = mean_squared_error(y, y_pred)
	x = np.arange(0, y_pred.shape[0])
	for (i, name) in enumerate(["ped", "ped_towards", "ped_away"]):
		ax = plt.subplot(3, 1, i+1)
		ax.plot(x[0:24*7*1], np.array(y)[0:24*7*1,i], label=f"realität_{name}")
		ax.plot(x[0:24*7*1], y_pred[0:24*7*1,i], label=f"vorhersage_{name}")
		ax.legend()
	plt.show()

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, shuffle=False)
model = model_build_fn(n_nodes=400, n_hidden_layers=4, dropout=None)
model.fit(x=X_train, y=y_train, batch_size=100, epochs=100, validation_data=(X_test, y_test))
score(model, X_test, y_test)

Aufstellen des Modells mithilfe eines neuronalen Netzes (Standardkategorien + Berücksichtigung der Feiertage)

In [None]:
from typing import List
import pandas as pd
import numpy as np
from sklearn.model_selection import KFold, GridSearchCV
from sklearn.preprocessing import StandardScaler, OrdinalEncoder, OneHotEncoder
from scikeras.wrappers import KerasRegressor
import keras
from keras import layers
from keras import initializers

# Eingangs- und Zielvariablen definieren
feature_columns = ["streetname", "hour", "weekday", "incidents", "weather_condition", "temperature", "year", "month", "day", "is_feiertag"]
target_columns = ["n_pedestrians", "n_pedestrians_towards", "n_pedestrians_away"]



# Encoder anlegen und an Daten anpassen
def fit_preprocess(X: pd.DataFrame, ordinal_features, onehot_features, numerical_features):
	X = X.copy()

	ordinal_encoder = None
	if ordinal_features is not None and ordinal_features:
		ordinal_encoder = OrdinalEncoder()
		X[ordinal_features] = ordinal_encoder.fit_transform(X[ordinal_features])

	onehot_encoder = None
	if onehot_features is not None and onehot_features:
		onehot_encoder = OneHotEncoder(sparse_output=False)
		encoded = onehot_encoder.fit_transform(X[onehot_features])
		encoded_onehot_columns = onehot_encoder.get_feature_names_out()
		X[encoded_onehot_columns] = pd.DataFrame(encoded, columns=encoded_onehot_columns)

	numerical_encoder = None
	if numerical_features is not None and numerical_features:
		numerical_encoder = StandardScaler()
		X[numerical_features] = numerical_encoder.fit_transform(X[numerical_features])

	return ordinal_encoder, onehot_encoder, numerical_encoder


# Codiert die Messkategorien
def preprocess(X: pd.DataFrame, 
			   ordinal_features, ordinal_encoder: OrdinalEncoder, 
			   onehot_features, onehot_encoder: OneHotEncoder, 
			   numerical_features, numerical_encoder):
	X = X.copy()
	features = []

	if ordinal_encoder is not None:
		X[ordinal_features] = ordinal_encoder.transform(X[ordinal_features])
		features.extend(ordinal_features)

	if onehot_encoder is not None:
		encoded = onehot_encoder.transform(X[onehot_features])
		encoded_onehot_columns = onehot_encoder.get_feature_names_out()
		X[encoded_onehot_columns] = pd.DataFrame(encoded, columns=encoded_onehot_columns)
		features.extend(encoded_onehot_columns)

	if numerical_features is not None:
		X[numerical_features] = numerical_encoder.transform(X[numerical_features])
		features.extend(numerical_features)

	return X[features]


# Kategorische und numerische Features den Codierungsverfahren zuordnen
numerical_features = ["year", "temperature", "is_feiertag"]
ordinal_features = ["weather_condition", "incidents"]
onehot_features = ["streetname", "weekday", "month", "hour", "day"]


# Aufstellen der Encoder für den Datensatz
ordinal_encoder, onehot_encoder, numerical_encoder = fit_preprocess(datensatz_f, ordinal_features=ordinal_features,
																	 onehot_features=onehot_features,
																	 numerical_features=numerical_features)


# Daten vorverarbeiten (codieren)
X = preprocess(datensatz_f[feature_columns],
					ordinal_features=ordinal_features,
					ordinal_encoder=ordinal_encoder,
					onehot_features=onehot_features,
					onehot_encoder=onehot_encoder,
					numerical_features=numerical_features,
					numerical_encoder=numerical_encoder)
y = datensatz_f[target_columns]



# Neuronales Netzwerk definieren
def build_NN(X, y, n_nodes, n_hidden_layers, dropout):
	# Input Layer anlegen
	input = layers.Input(shape=(X.shape[1],), name="input")

	# Hidden Layer anlegen mit möglichem Dropout
	tmp = input
	for i in range(n_hidden_layers):
		tmp = layers.Dense(n_nodes, activation="relu", name=f"hidden_{i}")(tmp)
		if dropout is not None:
			tmp = layers.Dropout(dropout, name=f"dropout_{i}")(tmp)

	# temporäre Ausgabelayer des Personenstroms (n_pedestrians_towards & n_pedestrians_away)
	tmp = layers.Dense(2, activation="relu", name="towards_away")(tmp)

	# Summieren von n_pedestrians_towards & n_pedestrians_away, um n_pedestrians zu berechnen
	sum = layers.Dense(1, activation="relu", name="sum", kernel_initializer=initializers.Constant(1.0), bias_initializer=initializers.Zeros(), trainable=False)(tmp)
	
	# Aneinanderketten von n_pedestrians, n_pedestrians_towards und n_pedestrians_away (Output Layer)
	concat = layers.Concatenate(name="output")([sum, tmp])

	# Modell anlegen und kompilieren
	model = keras.Model(inputs=input, outputs=concat)
	model.compile(optimizer="adam", loss="mean_squared_error")
	return model

# Anlegen einer 10-fachen Cross Validation
kf = KFold(n_splits=10, shuffle=True)

# Funktion zum Erstellen des Modells innerhalb der Cross Validation
def model_build_fn(n_nodes, n_hidden_layers, dropout):
	return build_NN(X=X, y=y, n_nodes=n_nodes,
					n_hidden_layers=n_hidden_layers, dropout=dropout)



# Aufstellen des HPT mit entsprechenden Wertebereichen
param_grid = {
	"model__n_nodes": [400],
	"model__n_hidden_layers": [4],
	"model__dropout": [None]
}


# Erstellen des GridSearchs
modelCV = KerasRegressor(model=model_build_fn, batch_size=100, epochs=100, verbose=0)
gs = GridSearchCV(estimator=modelCV, param_grid=param_grid, n_jobs=32, cv=kf, scoring="neg_mean_squared_error")
gs_result = gs.fit(X=X, y=y, verbose=1)


# Ausgabe der besten Parameter und den dazugehörigen Fehlerwerten
print("Best parameters set found on validation set:")
print()
print(gs.best_params_)
print()
print("Score:")
print()
print(f"mse: {abs(gs.best_score_)}")
print(f"rmse: {np.sqrt(abs(gs.best_score_))}")




