**MC-LSTM**

In [None]:
## Run this so that the Original Code can pull and clone the Github repository and install it
!git clone https://github.com/zexyang/RUL_prediction_ME599.git
%cd RUL_prediction_ME599
!cat README.md
!pip install keras-tuner

Cloning into 'RUL_prediction_ME599'...
remote: Enumerating objects: 73, done.[K
remote: Counting objects: 100% (55/55), done.[K
remote: Compressing objects: 100% (46/46), done.[K
remote: Total 73 (delta 22), reused 26 (delta 8), pack-reused 18[K
Receiving objects: 100% (73/73), 63.94 MiB | 7.89 MiB/s, done.
Resolving deltas: 100% (22/22), done.
Updating files: 100% (34/34), done.
/content/RUL_prediction_ME599
# A Hybrid CNN-LSTM for Battery Remaining Useful Life Prediction with Charging Profiles Data
[![DOI:10.1145/3575882.3575903](http://img.shields.io/badge/DOI-10.1145/3575882.3575903-1481c1.svg)](https://doi.org/10.1145/3575882.3575903)

+ Battery RUL prediction using data-driven method based on a hybrid deep model of Convolutional Neural Networks (CNN) and Long-Short Term Memory (LSTM).
+ CNN and LSTM are used to extract features from multiple measurable data (Voltage, Current, Temperature, Capacity) in parallel.
+ CNN extracts features of multi-channel charging profiles, where

In [None]:
import os
import re
import json
import numpy as np
import random as rn
import tensorflow as tf
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error
import matplotlib.pyplot as plt

from tensorflow.keras.layers import Dense, LSTM, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam

# Set seeds for reproducibility
SEED = 12345
os.environ["CUDA_VISIBLE_DEVICES"] = str(SEED)
os.environ['PYTHONHASHSEED'] = str(SEED)
np.random.seed(SEED)
rn.seed(SEED)
tf.compat.v1.random.set_random_seed(SEED)
tf.random.set_seed(SEED)
session_conf = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)
sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf)
tf.compat.v1.keras.backend.set_session(sess)

# Define preprocessing function
def preprocess(dataset):
    from sklearn.preprocessing import MinMaxScaler
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaled = scaler.fit_transform(dataset)
    return scaled, scaler

# Define function to extract VIT capacity
def extract_VIT_capacity(x_datasets, y_datasets, seq_len, hop, sample):
    from pandas import read_csv
    V, I, T, C = [], [], [], []
    x, y = [], []

    for x_data, y_data in zip(x_datasets, y_datasets):
        x_df = read_csv(x_data).dropna()[['cycle', 'voltage_battery', 'current_battery', 'temp_battery']]
        x_df = x_df[x_df['cycle'] != 0].reset_index(drop=True)

        y_df = read_csv(y_data).dropna()[['capacity']]
        y_df['cycle_idx'] = y_df.index + 1
        y_df = y_df.astype('float32').values

        data_len = int(np.floor((len(y_df) - seq_len - 1) / hop)) + 1

        for i in range(len(y_df)):
            cy = x_df.cycle.unique()[i]
            df = x_df[x_df['cycle'] == cy]
            cap = np.array([y_df[i, 0]])
            C.append(cap)
            scaled_C, scaler_C = preprocess(np.array(C))
            le = len(df['voltage_battery']) % sample

            vTemp = df['voltage_battery'].to_numpy()
            if le != 0:
                vTemp = vTemp[0:-le]
            vTemp = np.reshape(vTemp, (len(vTemp) // sample, -1), order="F").mean(axis=0)
            V.append(vTemp)
            scaled_V, _ = preprocess(np.array(V))

            iTemp = df['current_battery'].to_numpy()
            if le != 0:
                iTemp = iTemp[0:-le]
            iTemp = np.reshape(iTemp, (len(iTemp) // sample, -1), order="F").mean(axis=0)
            I.append(iTemp)
            scaled_I, _ = preprocess(np.array(I))

            tTemp = df['temp_battery'].to_numpy()
            if le != 0:
                tTemp = tTemp[0:-le]
            tTemp = np.reshape(tTemp, (len(tTemp) // sample, -1), order="F").mean(axis=0)
            T.append(tTemp)
            scaled_T, _ = preprocess(np.array(T))

            VIT_temp = np.concatenate([scaled_V, scaled_I, scaled_T, scaled_C], axis=1)

        for i in range(data_len):
            x.append(VIT_temp[(hop * i):(hop * i + seq_len)])
            y.append(scaled_C[hop * i + seq_len])

    return np.array(x), np.array(y), scaler_C

# Define main function
def main():
    import RUL_prediction_ME599.train.param_VITC_C as pr
    import RUL_prediction_ME599.train.utils as utils

    pth = pr.pth
    train_x_files = [os.path.join(pth, 'charge/train', f) for f in os.listdir(os.path.join(pth, 'charge/train'))]
    train_x_files.sort(key=lambda f: int(re.sub('\D', '', f)))
    train_y_files = [os.path.join(pth, 'discharge/train', f) for f in os.listdir(os.path.join(pth, 'discharge/train'))]
    train_y_files.sort(key=lambda f: int(re.sub('\D', '', f)))
    test_x_data = [os.path.join(pth, 'charge/test', f) for f in os.listdir(os.path.join(pth, 'charge/test'))]
    test_y_data = [os.path.join(pth, 'discharge/test', f) for f in os.listdir(os.path.join(pth, 'discharge/test'))]

    accuracy_results = {}

    learning_rate_space = np.logspace(np.log10(1e-6), np.log10(1e-2), 10000)
    dropout_space = np.linspace(.25, .75, 10000)
    activation_space = ["relu", "tanh"]

    lowest_error = np.inf

    for _ in range(1000):

        learning_rate = np.random.choice(learning_rate_space)
        dropout_rate = np.random.choice(dropout_space)
        lstm_activation = np.random.choice(activation_space)
        dense_activation = np.random.choice(activation_space)


        folds = list(KFold(n_splits=pr.k, shuffle=True, random_state=pr.random).split(train_x_files))

        fold_accuracies = []

        for j, (train_idx, val_idx) in enumerate(folds):
            # print('\nFold', j + 1)
            train_x_data = [train_x_files[i] for i in train_idx]
            train_y_data = [train_y_files[i] for i in train_idx]
            val_x_data = [train_x_files[i] for i in val_idx]
            val_y_data = [train_y_files[i] for i in val_idx]

            trainX, trainY, SS_tr = extract_VIT_capacity(train_x_data, train_y_data, pr.seq_len, pr.hop, pr.sample)
            valX, valY, SS_val = extract_VIT_capacity(val_x_data, val_y_data, pr.seq_len, pr.hop, pr.sample)
            testX, testY, SS_tt = extract_VIT_capacity(test_x_data, test_y_data, pr.seq_len, pr.hop, pr.sample)

            # print('Input shape: {}'.format(trainX.shape))

            model = Sequential([
                    LSTM(400, input_shape=(pr.seq_len, trainX.shape[-1]), activation=lstm_activation, return_sequences=True),
                    Dropout(dropout_rate),
                    LSTM(400, return_sequences=False),
                    Dense(128, activation=dense_activation),
                    Dense(1)
            ])

            model.compile(optimizer=Adam(learning_rate=learning_rate), loss='mse')

            model.summary()

            history = model.fit(trainX, trainY, validation_data=(valX, valY), batch_size=pr.batch_size, epochs=pr.epochs)

            val_results = model.evaluate(valX, valY)
            # print('Val loss:', val_results)

            valPredict = model.predict(valX)
            inv_valY = SS_val.inverse_transform(valY)
            inv_valPredict = SS_val.inverse_transform(valPredict)

            val_mae = mean_absolute_error(inv_valY, inv_valPredict)
            val_rmse = np.sqrt(mean_squared_error(inv_valY, inv_valPredict))
            fold_accuracies.append(val_rmse)

            # model_dir = pr.model_dir + f'_k{j + 1}_lstm{lstm_units}'
            # save_dir = pr.save_dir
            # os.makedirs(os.path.join(save_dir, model_dir), exist_ok=True)

            # model.save(os.path.join(save_dir, model_dir, "saved_model_and_weight"))
            # print("Model and weights saved.")

        accuracy_results = np.mean(fold_accuracies)
        if accuracy_results < lowest_error:
            lowest_error = accuracy_results
            best_learning_rate = learning_rate
            best_dropout_rate = dropout_rate
            best_lstm_activation = lstm_activation
            best_dense_activation = dense_activation

        # print(f'Average RMSE for {lstm_units} LSTM units: {accuracy_results[lstm_units]}')
    print(f"New lowest error achieved: {lowest_error}")
    print(f"Learning Rate: {best_learning_rate}")
    print(f"Dropout Rate: {best_dropout_rate}")
    print(f"LSTM Activation: {best_lstm_activation}")
    print(f"Dense Activation: {best_dense_activation}")
    # Plotting LSTM units vs. accuracy
    # plt.figure(figsize=(10, 6))
    # lstm_units = list(accuracy_results.keys())
    # accuracies = list(accuracy_results.values())
    # plt.plot(lstm_units, accuracies, marker='o')
    # plt.xlabel('Number of LSTM units')
    # plt.ylabel('Validation RMSE')
    # plt.title('LSTM units vs. Validation RMSE')
    # plt.grid(True)
    # plt.show()

if __name__ == "__main__":
    main()


Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 5, 400)            691200    
                                                                 
 dropout (Dropout)           (None, 5, 400)            0         
                                                                 
 lstm_1 (LSTM)               (None, 400)               1281600   
                                                                 
 dense (Dense)               (None, 128)               51328     
                                                                 
 dense_1 (Dense)             (None, 1)                 129       
                                                                 
Total params: 2024257 (7.72 MB)
Trainable params: 2024257 (7.72 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
Epoch 1/100
Epoch 