In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import joblib
import time
from sklearn.preprocessing import MinMaxScaler, MaxAbsScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.metrics import confusion_matrix, f1_score, classification_report
from sklearn.model_selection import cross_val_predict, GridSearchCV
from sklearn.decomposition import PCA
from sklearn.base import clone
import pickle
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin, TransformerMixin

# to avoid (hopefully unimportant) warnings
from silence_tensorflow import silence_tensorflow
from absl import logging

silence_tensorflow()
logging.set_verbosity(logging.ERROR)


class MyDynamicKerasRegressor(BaseEstimator, RegressorMixin):
    """
    Custom regressor that wraps a dynamic Keras model for use with scikit-learn pipelines.
    This wrapper is necessary because sometimes (e.g. when using pca in pipeline) the input shape is not clear apriori
    """

    def __init__(self, model, epochs=None, batch_size=None, verbose=0, optimizer="adam", loss=None, metrics=None):
        self.model = model
        self.epochs = epochs
        self.batch_size = batch_size
        self.verbose = verbose
        self.optimizer = optimizer
        self.loss = loss
        self.metrics = metrics
        self.keras_model = None
        self.history = None

    def fit(self, x, y):
        """fits the parameter"""

        if self.keras_model is None:
            self.keras_model = self.model(input_shape=(x.shape[1],))
            self.keras_model.compile(optimizer=self.optimizer, loss=self.loss, metrics=self.metrics)
            self.history = self.keras_model.fit(x, y, epochs=self.epochs, batch_size=self.batch_size,
                                                verbose=self.verbose)

        return self

    def predict(self, x):
        """Predict using the Keras model."""

        if self.keras_model is None:
            raise Exception("The model has not been fitted yet!")
        else:
            predictions = self.keras_model.predict(x, verbose=self.verbose)
            return np.ravel(predictions)

    def score(self, x, y, sample_weight=None):
        """
        Returns the mean squared error on the given test data and labels.
        """
        predictions = self.predict(x)
        mse = np.mean((predictions - y) ** 2)
        return -mse  # Negative MSE because scikit-learn's convention is that higher return values are better

    def get_params(self, deep=True):
        """Get parameters for this estimator."""

        return {
            "model": self.model,
            "epochs": self.epochs,
            "batch_size": self.batch_size,
            "verbose": self.verbose,
            "optimizer": self.optimizer,
            "loss": self.loss,
            "metrics": self.metrics,
        }

    def set_params(self, **params):
        """Set parameters for this estimator."""
        valid_params = self.get_params(deep=True)
        for parameter, value in params.items():
            if parameter in valid_params:
                setattr(self, parameter, value)
        return self


class MyDynamicKerasClassifier(BaseEstimator, ClassifierMixin):
    """
    Custom classifier that wraps a dynamic Keras model for use with scikit-learn pipelines.
    This wrapper is necessary because sometimes (e.g. when using pca in pipeline) the input shape is not clear apriori
    """

    def __init__(self, model, epochs=None, batch_size=None, verbose=0, optimizer="adam", loss=None, metrics=None,
                 cnn=False, cnn_shape_list=None):
        self.model = model
        self.epochs = epochs
        self.batch_size = batch_size
        self.verbose = verbose
        self.loss = loss
        self.optimizer = optimizer
        self.metrics = metrics
        self.cnn = cnn
        self.cnn_shape_list = cnn_shape_list
        self.keras_model = None
        self.history = None

    def fit(self, x, y):
        """fits the parameter"""

        # convert y to numpy array and set classes_
        y = np.array(y)
        self.classes_ = np.unique(y)

        if self.keras_model is None:
            if self.cnn:
                x_new = x.reshape(self.cnn_shape_list)
                self.keras_model = self.model(input_shape=x_new.shape[1:])
                self.keras_model.compile(optimizer=self.optimizer, loss=self.loss, metrics=self.metrics)
                self.history = self.keras_model.fit(x_new, y, epochs=self.epochs, batch_size=self.batch_size,
                                                    verbose=self.verbose)
            else:
                self.keras_model = self.model(input_shape=(x.shape[1],))
                self.keras_model.compile(optimizer=self.optimizer, loss=self.loss, metrics=self.metrics)
                self.history = self.keras_model.fit(x, y, epochs=self.epochs, batch_size=self.batch_size,
                                                    verbose=self.verbose)

        return self

    def predict(self, x):
        """Predict using the Keras model."""

        if self.keras_model is None:
            raise Exception("The model has not been fitted yet!")
        elif self.cnn:
            x_new = x.reshape(self.cnn_shape_list)
            probabilities = self.keras_model.predict(x_new, verbose=self.verbose)
        else:
            probabilities = self.keras_model.predict(x, verbose=self.verbose)

        if probabilities.shape[1] == 1:
            # Use 0.5 as the threshold to convert probabilities to binary labels
            return (probabilities > 0.5).astype('int32')
        else:
            # Use argmax for multi-class problems
            return probabilities.argmax(axis=1)

    def get_params(self, deep=True):
        """Get parameters for this estimator."""

        return {
            "model": self.model,
            "epochs": self.epochs,
            "batch_size": self.batch_size,
            "verbose": self.verbose,
            "optimizer": self.optimizer,
            "loss": self.loss,
            "metrics": self.metrics,
            "cnn": self.cnn,
            "cnn_shape_list": self.cnn_shape_list,
        }

    def set_params(self, **params):
        """Set parameters for this estimator."""
        valid_params = self.get_params(deep=True)
        for parameter, value in params.items():
            if parameter in valid_params:
                setattr(self, parameter, value)
        return self


class Selector(BaseEstimator, TransformerMixin):
    """
    Selcects the features (numerical, categorical or all)
    """

    def __init__(self, select):
        """
        select has to be "num features", "cat features" or "all features"
        """

        if select not in ["num features", "cat features", "all features"]:
            raise TypeError("for select only num features, cat features or all features")

        self.select = select
        self.num_attr = None
        self.cat_attr = None

    def fit(self, x: pd.DataFrame, _y=None):
        """fits the parameter"""

        if not isinstance(x, pd.DataFrame):
            raise TypeError("Selector needs Pandas Dataframe!")

        self.num_attr = list(x.select_dtypes(include=[np.number]).columns)
        self.cat_attr = list(x.select_dtypes(exclude=[np.number]).columns)

        return self

    def transform(self, x: pd.DataFrame, _y=None):
        """does the transformation"""

        if not isinstance(x, pd.DataFrame):
            raise TypeError("Selector needs Pandas Dataframe!")

        if self.select == "num features":
            x_new = x[self.num_attr].copy()
        elif self.select == "cat features":
            x_new = x[self.cat_attr].copy()
        elif self.select == "all features":
            x_new = x[self.num_attr + self.cat_attr].copy()
        else:
            raise TypeError("for select only num features, cat features or all features")

        return x_new

    def get_feature_names_out(self):
        """this method is needed, otherwise we cannot use set_ouput"""
        pass


data = pd.read_csv(r'./../projekt2_train.csv')

data.head()

Unnamed: 0,23,0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,...,0.406,0.407,0.408,0.409,0.410,0.411,0.412,0.413,0.414,0.415
0,7,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,16,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,15,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,23,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,17,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [2]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

# Labels und Features trennen
X = data.drop(columns=data.columns[0]).values
y = data[data.columns[0]].values

# Normalisierung der Features
X = X / 255.0

# Umwandeln der Labels in kategorische Daten
y = to_categorical(y)

# Aufteilung in Trainings- und Testdaten
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [8]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
from kerastuner.tuners import RandomSearch

def build_model(hp):
    model = Sequential()
    model.add(Dense(units=hp.Int('units', min_value=32, max_value=512, step=32),
                    activation='relu', input_shape=(X_train.shape[1],)))
    model.add(Dropout(hp.Float('dropout', min_value=0, max_value=0.5, step=0.1)))

    for i in range(hp.Int('num_layers', 1, 3)):
        model.add(Dense(units=hp.Int('units_' + str(i), min_value=32, max_value=512, step=32),
                        activation='relu'))
        model.add(Dropout(hp.Float('dropout_' + str(i), min_value=0, max_value=0.5, step=0.1)))

    model.add(Dense(y_train.shape[1], activation='softmax'))  # Output layer
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

    return model

tuner = RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=5,
    executions_per_trial=3,
    directory='my_dir',
    project_name='dnn_classification'
)

tuner.search(X_train, y_train, epochs=20, validation_data=(X_test, y_test), callbacks=[EarlyStopping(monitor='val_loss', patience=5)])

# Beste Modellkonfiguration abrufen
best_model = tuner.get_best_models(num_models=1)[0]


Trial 5 Complete [00h 11m 34s]
val_accuracy: 0.9040728211402893

Best val_accuracy So Far: 0.9063438375790914
Total elapsed time: 00h 43m 30s


In [9]:
best_model = tuner.get_best_models(num_models=1)[0]

In [11]:
best_model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 384)               301440    
                                                                 
 dropout (Dropout)           (None, 384)               0         
                                                                 
 dense_1 (Dense)             (None, 96)                36960     
                                                                 
 dropout_1 (Dropout)         (None, 96)                0         
                                                                 
 dense_2 (Dense)             (None, 32)                3104      
                                                                 
 dropout_2 (Dropout)         (None, 32)                0         
                                                                 
 dense_3 (Dense)             (None, 27)                8

In [23]:
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from sklearn.model_selection import cross_val_predict

# Funktion, die das Keras-Modell erstellt
def create_model():
    model = Sequential([
        Dense(384, activation='relu', input_shape=(X_train.shape[1],)),
        Dropout(0.5),
        Dense(96, activation='relu'),
        Dropout(0.5),
        Dense(32, activation='relu'),
        Dropout(0.5),
        Dense(27, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# Erstellen des KerasClassifier
model = KerasClassifier(build_fn=create_model, epochs=10, batch_size=10, verbose=1)

# Cross-validation Vorhersagen
predictions = cross_val_predict(model, X_train, y_train, cv=3)


  model = KerasClassifier(build_fn=create_model, epochs=10, batch_size=10, verbose=1)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [25]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import numpy as np

# Konvertieren der One-Hot-codierten Vorhersagen in Klassenindizes, falls erforderlich
if predictions.ndim > 1:
    predictions_labels = np.argmax(predictions, axis=1)
else:
    predictions_labels = predictions

# Konvertieren der One-Hot-codierten tatsächlichen Labels in Klassenindizes
y_true_labels = np.argmax(y_train, axis=1) if y_train.ndim > 1 else y_train

# Berechnung verschiedener Metriken
accuracy = accuracy_score(y_true_labels, predictions_labels)
precision = precision_score(y_true_labels, predictions_labels, average='macro')
recall = recall_score(y_true_labels, predictions_labels, average='macro')
f1 = f1_score(y_true_labels, predictions_labels, average='macro')

print("Genauigkeit:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1-Score:", f1)


Genauigkeit: 0.8106955334393784
Precision: 0.815390010620604
Recall: 0.8099783830230405
F1-Score: 0.8086725745140663


In [26]:
import joblib
import pickle
from tensorflow.keras.models import load_model

# Speichern des optimierten Keras-Modells
best_model.save("DNN_optimized_model.h5")

# Speichern der Kreuzvalidierungs-Vorhersagen
joblib.dump(predictions, "DNN_cross_val_predictions.pkl")

# Speichern der besten Hyperparameter des Keras-Tuners
best_hyperparameters = tuner.get_best_hyperparameters(num_trials=1)[0].values
with open("DNN_best_hyperparameters.pkl", "wb") as f:
    pickle.dump(best_hyperparameters, f)

In [15]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from sklearn.model_selection import cross_val_predict


# CNN-Modell erstellen
def create_cnn_model():
    model = Sequential([
        Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),
        Flatten(),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(27, activation='softmax')  # 27 Klassen
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# KerasClassifier Wrapper
cnn_model = KerasClassifier(build_fn=create_cnn_model, epochs=10, batch_size=10, verbose=1)

# Umwandlung der Eingabedaten in das benötigte Format für CNNs
X_train_cnn = X_train.reshape(-1, 28, 28, 1)
X_test_cnn = X_test.reshape(-1, 28, 28, 1)

# Cross-validation Vorhersagen
predictions = cross_val_predict(cnn_model, X_train_cnn, y_train, cv=3)


  cnn_model = KerasClassifier(build_fn=create_cnn_model, epochs=10, batch_size=10, verbose=1)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [17]:
predictions

array([23, 24,  1, ...,  7,  4, 13])

In [21]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Wenn y_train One-Hot-encoded ist, konvertieren Sie es zurück in Label-Indizes
if y_train.ndim > 1:
    y_true_labels = np.argmax(y_train, axis=1)
else:
    y_true_labels = y_train

# Berechnung verschiedener Metriken
accuracy = accuracy_score(y_true_labels, predictions)
precision = precision_score(y_true_labels, predictions, average='macro')
recall = recall_score(y_true_labels, predictions, average='macro')
f1 = f1_score(y_true_labels, predictions, average='macro')

print("Genauigkeit:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1-Score:", f1)


Genauigkeit: 0.9056293022142766
Precision: 0.9062898676562765
Recall: 0.9052862564150287
F1-Score: 0.9052812653893703


In [22]:
# Training des Keras-Modells
cnn_model.fit(X_train_cnn, y_train)

# Zugriff auf das trainierte Keras-Modell
trained_keras_model = cnn_model.model

# Speichern des trainierten Keras-Modells
trained_keras_model.save("CNN_model_trained.h5")

# Kreuzvalidierungs-Vorhersagen waren bereits gemacht
# predictions = cross_val_predict(cnn_model, X_train_cnn, y_train, cv=3)

# Speichern der Kreuzvalidierungs-Vorhersagen
joblib.dump(predictions, "CNN_cross_val_predictions.pkl")

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


['CNN_cross_val_predictions.pkl']