# Autoencoder UNIBO Powertools Dataset

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "3"

import numpy as np
import pandas as pd
import scipy.io
import math
import ntpath
import sys
import logging
import time
import sys
import random

from importlib import reload
import plotly.graph_objects as go


import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, regularizers
from tensorflow.keras.models import Model

IS_COLAB = False
IS_TRAINING = False
RESULT_NAME = ""
IS_OFFLINE = True

if IS_OFFLINE:
    import plotly.offline as pyo
    pyo.init_notebook_mode()

if IS_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')
    data_path = "/content/drive/My Drive/battery-state-estimation/battery-state-estimation/"
else:
    data_path = "../../"

sys.path.append(data_path)
from data_processing.unibo_powertools_data import UniboPowertoolsData, CycleCols
from data_processing.model_data_handler import ModelDataHandler
from data_processing.prepare_rul_data import RulHandler

### Config logging

In [None]:
reload(logging)
logging.basicConfig(format='%(asctime)s [%(levelname)s]: %(message)s', level=logging.DEBUG, datefmt='%Y/%m/%d %H:%M:%S')

# Load Data

In [None]:
dataset = UniboPowertoolsData(
    test_types=[],
    chunk_size=1000000,
    lines=[37, 40],
    charge_line=37,
    discharge_line=40,
    base_path=data_path
)

In [None]:
train_names = [
    '000-DM-3.0-4019-S',#minimum capacity 1.48
    '001-DM-3.0-4019-S',#minimum capacity 1.81
    '002-DM-3.0-4019-S',#minimum capacity 2.06

    '009-DM-3.0-4019-H',#minimum capacity 1.41
    '010-DM-3.0-4019-H',#minimum capacity 1.44

    '014-DM-3.0-4019-P',#minimum capacity 1.7
    '015-DM-3.0-4019-P',#minimum capacity 1.76
    '016-DM-3.0-4019-P',#minimum capacity 1.56
    '017-DM-3.0-4019-P',#minimum capacity 1.29
    #'047-DM-3.0-4019-P',#new 1.98
    #'049-DM-3.0-4019-P',#new 2.19



    '007-EE-2.85-0820-S',#2.5
    '008-EE-2.85-0820-S',#2.49
    '042-EE-2.85-0820-S',#2.51

    '043-EE-2.85-0820-H',#2.31


    '040-DM-4.00-2320-S',#minimum capacity 3.75, cycles 188


    '018-DP-2.00-1320-S',#minimum capacity 1.82
    #'019-DP-2.00-1320-S',#minimum capacity 1.61
    '036-DP-2.00-1720-S',#minimum capacity 1.91
    '037-DP-2.00-1720-S',#minimum capacity 1.84
    '038-DP-2.00-2420-S',#minimum capacity 1.854 (to 0)
    '050-DP-2.00-4020-S',#new 1.81
    '051-DP-2.00-4020-S',#new 1.866    
    
]

test_names = [
    '003-DM-3.0-4019-S',#minimum capacity 1.84

    '011-DM-3.0-4019-H',#minimum capacity 1.36

    '013-DM-3.0-4019-P',#minimum capacity 1.6



    '006-EE-2.85-0820-S',# 2.621
    
    '044-EE-2.85-0820-H',# 2.43



    '039-DP-2.00-2420-S',#minimum capacity 1.93



    '041-DM-4.00-2320-S',#minimum capacity 3.76, cycles 190
]

In [None]:
dataset.prepare_data(train_names, test_names)
dataset_handler = ModelDataHandler(dataset, [
    CycleCols.VOLTAGE,
    CycleCols.CURRENT,
    CycleCols.TEMPERATURE
])

rul_handler = RulHandler()

## Data preparation

In [None]:
(train_x, train_y_soh, test_x, test_y_soh,
  train_battery_range, test_battery_range,
  time_train, time_test, current_train, current_test) = dataset_handler.get_discharge_whole_cycle_future(
    train_names, 
    test_names, 
    # match decoder output
    min_cycle_length=300)

# train_x = train_x[:,:284,:]
# test_x = test_x[:,:284,:]
# print("cut train shape {}".format(train_x.shape))
# print("cut test shape {}".format(test_x.shape))


x_norm = rul_handler.Normalization()
train_x, test_x = x_norm.fit_and_normalize(train_x, test_x)

In [None]:
train_x.shape

# Model training

In [None]:
if IS_TRAINING:
    EXPERIMENT = "autoencoder_gl_unibo_powertools"

    experiment_name = time.strftime("%Y-%m-%d-%H-%M-%S") + '_' + EXPERIMENT
    print(experiment_name)

# Model definition

opt = tf.keras.optimizers.Adam(learning_rate=0.0002)
LATENT_DIM = 10

class Autoencoder(Model):
    def __init__(self, latent_dim):
        super(Autoencoder, self).__init__()
        self.latent_dim = latent_dim

        encoder_inputs = layers.Input(shape=(train_x.shape[1], train_x.shape[2]))
        encoder_conv1 = layers.Conv1D(filters=8, kernel_size=10, strides=2, activation='relu', padding='same')(encoder_inputs)
        encoder_pool1 = layers.MaxPooling1D(5, padding='same')(encoder_conv1)
        encoder_conv2 = layers.Conv1D(filters=8, kernel_size=4, strides=1, activation='relu', padding='same')(encoder_pool1)
        encoder_pool2 = layers.MaxPooling1D(3, padding='same')(encoder_conv2)
        encoder_flat1 = layers.Flatten()(encoder_pool1)
        encoder_flat2 = layers.Flatten()(encoder_pool2)
        encoder_concat = layers.concatenate([encoder_flat1, encoder_flat2])
        encoder_outputs = layers.Dense(self.latent_dim, activation='relu')(encoder_concat)
        self.encoder = Model(inputs=encoder_inputs, outputs=encoder_outputs)

        decoder_inputs = layers.Input(shape=(self.latent_dim,))
        decoder_dense1 = layers.Dense(10*8, activation='relu')(decoder_inputs)
        decoder_reshape1 = layers.Reshape((10, 8))(decoder_dense1)
        decoder_upsample1 = layers.UpSampling1D(3)(decoder_reshape1)
        decoder_convT1 = layers.Conv1DTranspose(filters=8, kernel_size=4, strides=1, activation='relu', padding='same')(decoder_upsample1)
        decoder_upsample2 = layers.UpSampling1D(5)(decoder_convT1)
        decoder_convT2 = layers.Conv1DTranspose(filters=8, kernel_size=10, strides=2, activation='relu', padding='same')(decoder_upsample2)
        decoder_outputs = layers.Conv1D(3, kernel_size=3, activation='relu', padding='same')(decoder_convT2)
        self.decoder = Model(inputs=decoder_inputs, outputs=decoder_outputs)



    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

autoencoder = Autoencoder(LATENT_DIM)
autoencoder.compile(optimizer=opt, loss='mse', metrics=['mse', 'mae', 'mape', tf.keras.metrics.RootMeanSquaredError(name='rmse')])
autoencoder.encoder.summary()
autoencoder.decoder.summary()

In [None]:
if IS_TRAINING:
    history = autoencoder.fit(train_x, train_x,
                                epochs=500, 
                                batch_size=32, 
                                verbose=1,
                                validation_split=0.1
                               )

In [None]:
if IS_TRAINING:
    autoencoder.save_weights(data_path + 'results/trained_model/%s/model' % experiment_name)

    hist_df = pd.DataFrame(history.history)
    hist_csv_file = data_path + 'results/trained_model/%s/history.csv' % experiment_name
    with open(hist_csv_file, mode='w') as f:
        hist_df.to_csv(f)
    history = history.history

In [None]:
if not IS_TRAINING:
    history = pd.read_csv(data_path + 'results/trained_model/%s/history.csv' % RESULT_NAME)
    autoencoder.load_weights(data_path + 'results/trained_model/%s/model' % RESULT_NAME)
    autoencoder.encoder.summary()
    autoencoder.decoder.summary()

In [None]:
if not IS_TRAINING:
    with pd.option_context('display.max_rows', None, 'display.max_columns', None):
        print(history)

### Testing

In [None]:
results = autoencoder.evaluate(test_x, test_x, return_dict = True)
print(results)
max_rmse = 0
for index in range(test_x.shape[0]):
    result = autoencoder.evaluate(np.array([test_x[index, :, :]]), np.array([test_x[index, :, :]]), return_dict = True, verbose=0)
    max_rmse = max(max_rmse, result['rmse'])
print("Max rmse: {}".format(max_rmse))

# Results Visualization

In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(y=history['loss'],
                    mode='lines', name='train'))
if 'val_loss' in history:
    fig.add_trace(go.Scatter(y=history['val_loss'],
                    mode='lines', name='validation'))
fig.update_layout(title='Loss trend',
                  xaxis_title='epoch',
                  yaxis_title='loss',
                  width=1400,
                  height=600)
fig.show()

In [None]:
train_predictions = autoencoder.predict(train_x)
labels = ['Voltage', 'Current', 'Temperature']

In [None]:
for i in range(train_x.shape[2]):
    fig = go.Figure()
    fig.add_trace(go.Scatter(y=train_predictions[0,:,i],
                        mode='lines', name='predicted'))
    fig.add_trace(go.Scatter(y=train_x[0,:,i],
                        mode='lines', name='actual'))
    fig.update_layout(title='Results on training - battery new',
                    xaxis_title='Step',
                    yaxis_title=labels[i],
                    width=1400,
                    height=600)
    fig.show()

In [None]:
for i in range(train_x.shape[2]):
    fig = go.Figure()
    fig.add_trace(go.Scatter(y=train_predictions[int(train_battery_range[0]/2),:,i],
                        mode='lines', name='predicted'))
    fig.add_trace(go.Scatter(y=train_x[int(train_battery_range[0]/2),:,i],
                        mode='lines', name='actual'))
    fig.update_layout(title='Results on training - middle life',
                    xaxis_title='Step',
                    yaxis_title=labels[i],
                    width=1400,
                    height=600)
    fig.show()

In [None]:
for i in range(train_x.shape[2]):
    fig = go.Figure()
    fig.add_trace(go.Scatter(y=train_predictions[train_battery_range[0]-1,:,i],
                        mode='lines', name='predicted'))
    fig.add_trace(go.Scatter(y=train_x[train_battery_range[0]-1,:,i],
                        mode='lines', name='actual'))
    fig.update_layout(title='Results on training - end of life',
                    xaxis_title='Step',
                    yaxis_title=labels[i],
                    width=1400,
                    height=600)
    fig.show()

In [None]:
test_predictions = autoencoder.predict(test_x)
labels = ['Voltage', 'Current', 'Temperature']

In [None]:
for i in range(train_x.shape[2]):
    fig = go.Figure()
    fig.add_trace(go.Scatter(y=test_predictions[0,:,i],
                        mode='lines', name='predicted'))
    fig.add_trace(go.Scatter(y=test_x[0,:,i],
                        mode='lines', name='actual'))
    fig.update_layout(title='Results on testing - battery new',
                    xaxis_title='Step',
                    yaxis_title=labels[i],
                    width=1400,
                    height=600)
    fig.show()

In [None]:
for i in range(train_x.shape[2]):
    fig = go.Figure()
    fig.add_trace(go.Scatter(y=test_predictions[int(test_battery_range[0]/2),:,i],
                        mode='lines', name='predicted'))
    fig.add_trace(go.Scatter(y=test_x[0,:,i],
                        mode='lines', name='actual'))
    fig.update_layout(title='Results on testing - middle life',
                    xaxis_title='Step',
                    yaxis_title=labels[i],
                    width=1400,
                    height=600)
    fig.show()

In [None]:
for i in range(train_x.shape[2]):
    fig = go.Figure()
    fig.add_trace(go.Scatter(y=test_predictions[test_battery_range[0]-1,:,i],
                        mode='lines', name='predicted'))
    fig.add_trace(go.Scatter(y=test_x[0,:,i],
                        mode='lines', name='actual'))
    fig.update_layout(title='Results on testing - end of life',
                    xaxis_title='Step',
                    yaxis_title=labels[i],
                    width=1400,
                    height=600)
    fig.show()