<span style="color:gray">
Fraunhofer Institute for Integrated Circuits IIS, Division Engineering of Adaptive Systems EAS<br>
Münchner Straße 16, 01187 Dresden, Germany
</span>

---

## ESB - Energy Saving by Blockchain

---

## Detection of Electric Vehicles and Photovoltaic Systems in Smart Meter Data

---

# 5: Combined Classifiers Photovoltaic


In this notebook, the Combined Classifier for detecting power generation of a photovoltaic system is applied to the preprocessed datasets of different input time spans. Furthermore, the feature importances of the Multilayer Perceptron branch of the Combined Classifier are analyzed. Finally, the classification result of the Combined Classifier is shown in a confusion matrix.

###### The following classification is performed for the monthly input time span

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input, Conv1D, Flatten, MaxPooling1D, AveragePooling1D, concatenate, LeakyReLU
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import plot_model
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from sklearn.inspection import permutation_importance
from sklearn.metrics import ConfusionMatrixDisplay

In [None]:
path=''

X_train = pd.read_csv(path + 'solar/monthly/X_train.csv', header=0, sep=',',decimal=".").to_numpy()
X_test = pd.read_csv(path + 'solar/monthly/X_test.csv', header=0, sep=',',decimal=".").to_numpy()
X_val = pd.read_csv(path + 'solar/monthly/X_val.csv', header=0, sep=',',decimal=".").to_numpy()

X_train_KDE = pd.read_csv(path + 'solar/monthly/X_train_KDE.csv', header=0, sep=',',decimal=".").to_numpy()
X_test_KDE = pd.read_csv(path + 'solar/monthly/X_test_KDE.csv', header=0, sep=',',decimal=".").to_numpy()
X_val_KDE = pd.read_csv(path + 'solar/monthly/X_val_KDE.csv', header=0, sep=',',decimal=".").to_numpy()

Y_train = pd.read_csv(path + 'solar/monthly/Y_train.csv', header=0, sep=',',decimal=".").to_numpy()
Y_test = pd.read_csv(path + 'solar/monthly/Y_test.csv', header=0, sep=',',decimal=".").to_numpy()
Y_val = pd.read_csv(path + 'solar/monthly/Y_val.csv', header=0, sep=',',decimal=".").to_numpy()

In [None]:
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
X_val = X_val.reshape(X_val.shape[0], X_val.shape[1], 1)

X_train_KDE = X_train_KDE.reshape(X_train_KDE.shape[0], X_train_KDE.shape[1], 1)
X_test_KDE = X_test_KDE.reshape(X_test_KDE.shape[0], X_test_KDE.shape[1], 1)
X_val_KDE = X_val_KDE.reshape(X_val_KDE.shape[0], X_val_KDE.shape[1], 1)

Y_train = Y_train.reshape(len(Y_train), )
Y_test = Y_test.reshape(len(Y_test), )
Y_val = Y_val.reshape(len(Y_val), )

#### Multilayer Perceptron

In [None]:
best_model_filepath = ''

learning_rate=0.0005

X_in = Input(shape=(X_train_KDE.shape[1]))
x = Dense(units=400)(X_in)
x=LeakyReLU(alpha=0.05)(x)
x = Dense(units=400)(x)
x=LeakyReLU(alpha=0.05)(x)
x = Dense(units=400)(x)
x=LeakyReLU(alpha=0.05)(x)
x = Dense(units=400)(x)
x=LeakyReLU(alpha=0.05)(x)
X_out = Dense(units=1, activation='sigmoid')(x)

classifier_mlp = Model(X_in, X_out)
classifier_mlp.compile(optimizer = Adam(learning_rate=learning_rate), 
                   loss = 'binary_crossentropy',  
                   metrics = ['accuracy'])

classifier_mlp.summary()
plot_model(classifier_mlp)


checkpoint = ModelCheckpoint(best_model_filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='min')

weight_for_0 = len(Y_train)/(2*len(Y_train[Y_train==0]))
weight_for_1 = len(Y_train)/(2*len(Y_train[Y_train==1]))
class_weight = {0: weight_for_0, 1: weight_for_1}


classifier_mlp.fit(X_train_KDE[:,:,0], Y_train,
                    epochs = 200,
                    batch_size = 64,
                    validation_data=(X_test_KDE[:,:,0], Y_test),  #X_test_KDE
                    callbacks=[checkpoint], 
                    class_weight=class_weight)

classifier_mlp = load_model(best_model_filepath)
score = classifier_mlp.evaluate(X_val_KDE[:,:,0], Y_val)

print(f"Solar Systems can be detected with an accuracy of {100*classifier_mlp.evaluate(X_val_KDE[:,:,0], Y_val)[1]:.2f} % on unseen data.")

#### Calculate and Plot the Feature Importance of the Multilayer Perceptron branch

In [None]:
def nn_build_fn():
    return classifier_mlp
classifier_sklearn_format = KerasClassifier(build_fn=nn_build_fn)
classifier_sklearn_format.fit(X_train_KDE, Y_train,
                    epochs = 1,
                    batch_size = 64,
                    class_weight=class_weight)
classifier_mlp = load_model(best_model_filepath)
classifier_sklearn_format.model = classifier_mlp
classifier_sklearn_format.score(X_val_KDE, Y_val)
r = permutation_importance(classifier_sklearn_format, X_train_KDE[:,:,0], Y_train, n_repeats=30, random_state=0)

In [None]:
discr = np.linspace(-10, 20, 100).reshape(-1, 1)

fig = plt.figure()
plt.plot(discr, r['importances_mean'])
plt.xlabel("Energy [kWh]")
plt.ylabel("Feature Importance")
plt.ylim(-0.01, 0.1)
plt.title("Feature Importances for Solar System")
plt.grid(True)
plt.show()

#### Convolutional Neural Network

In [None]:
best_model_filepath = ''

X_in = Input(shape=(X_train.shape[1],1))
x2 = Conv1D(32, 5, activation="relu")(X_in)
x2 = MaxPooling1D(strides=2)(x2)
x2 = Conv1D(32, 5, activation="relu")(x2)
x2 = MaxPooling1D(strides=2)(x2)
x2 = Conv1D(64, 5, activation="relu")(x2)
x2 = MaxPooling1D(strides=2)(x2)
x2 = Conv1D(64, 5, activation="relu")(x2)
x2 = MaxPooling1D(strides=2)(x2)
x2_inter_b = Flatten()(x2)
x = Dense(units=400)(x2_inter_b)
x=LeakyReLU(alpha=0.05)(x)
X_out = Dense(units=1, activation='sigmoid', name="cnn_output")(x)

classifier_cnn = Model(X_in, X_out)

classifier_cnn = Model(X_in, X_out)
classifier_cnn.compile(optimizer = Adam(learning_rate=learning_rate),
                   loss = 'binary_crossentropy',
                   metrics = ['accuracy'])

classifier_cnn.summary()
plot_model(classifier_cnn)


checkpoint = ModelCheckpoint(best_model_filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='min')


n_epochs=100
classifier_cnn.fit(X_train, Y_train,
                    epochs = n_epochs,
                    batch_size = 64,
                    validation_data=(X_test, Y_test), 
                    callbacks=[checkpoint], 
                    class_weight=class_weight)
classifier_cnn = load_model(best_model_filepath)
score = classifier_cnn.evaluate(X_val, Y_val)


print(f"Solar Systems can be detected with an accuracy of {100*classifier_cnn.evaluate(X_val, Y_val)[1]:.2f} % on unseen data.")

#### Combined Model

In [None]:
# MLP
X1_in = classifier_mlp.input
x1_inter = classifier_mlp.layers[-2].output
part_mlp = Model(X1_in, x1_inter)
part_mlp.trainable=False

# CNN
X2_in = classifier_cnn.input
x2_inter = classifier_cnn.layers[-2].output
part_cnn = Model(X2_in, x2_inter)
part_cnn.trainable=False


# Combined
x_combined = concatenate([x1_inter, x2_inter])
x = Dense(units=200, activation='relu')(x_combined)
X_out = Dense(units=1, activation='sigmoid', name="combined_output")(x)

classifier_combined = Model([X1_in,X2_in], X_out)
classifier_combined.compile(optimizer = Adam(learning_rate=learning_rate),
                   loss = 'binary_crossentropy',
                   metrics = ['accuracy'])

classifier_combined.summary()
plot_model(classifier_combined)



weight_for_0 = len(Y_train)/(2*len(Y_train[Y_train==0]))
weight_for_1 = len(Y_train)/(2*len(Y_train[Y_train==1]))
class_weight = {0: weight_for_0, 1: weight_for_1}


n_epochs=100
classifier_combined.fit([X_test_KDE,X_test], Y_test,
                    epochs = n_epochs,
                    batch_size = 64,
                    validation_data=([X_test_KDE,X_test], Y_test), 
                    class_weight=class_weight)
score = classifier_combined.evaluate([X_val_KDE,X_val], Y_val)

print(f"Solar Systems can be detected with an accuracy of {100*classifier_combined.evaluate([X_val_KDE,X_val], Y_val)[1]:.2f} % on unseen data.")

#### Plot Confusion Matrix

In [None]:
pred_label = np.where((classifier_combined.predict([X_val_KDE,X_val]) >= 0.5),"with Solar","without Solar")

Y_val_label = np.where((Y_val >= 0.5),"with Solar","without Solar").reshape(len(Y_val), 1)

fig, ax = plt.subplots(figsize=(8, 6))
ConfusionMatrixDisplay.from_predictions(Y_val_label, pred_label,
                                        labels=["without Solar","with Solar"],
                                        ax=ax,colorbar=False)
plt.title("Confusion Matrix for Monthly Input Period")
plt.tight_layout()
plt.show()