In [None]:
import numpy as np
np.random.seed(42)
import scipy.io as sio
import os
import datetime
import keras.backend as K
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import Dense, LSTM, TimeDistributed, Conv1D, Dropout
from keras.layers.wrappers import Bidirectional
from keras.preprocessing import sequence
from sklearn.preprocessing import MinMaxScaler
%matplotlib inline

In [None]:
# Data preparation

train_num = 5
test_num = 4
sampling_interval = 10
max_capacity = 2.9
timesteps = 90
features = 3
epochs = 2000
batch_size = 128
filters = 256
kernel_size = 3
dropout = 0.2
hidden_units = 64
search_epoch = 500
os.environ["CUDA_VISIBLE_DEVICES"] = "1"


data_path = '/Desktop/18650 panasonic'
set_name = ['degC_Cycle_1_Pan18650PF', 'degC_Cycle_2_Pan18650PF', 'degC_Cycle_3_Pan18650PF',\
            'degC_Cycle_4_Pan18650PF', 'degC_NN_Pan18650PF', 'degC_US06_Pan18650PF',\
            'degC_HWFET_Pan18650PF', 'degC_LA92_Pan18650PF', 'degC_UDDS_Pan18650PF']
test_name = ['US06', 'HWFET', 'LA92', 'UDDS']
temp = ['25', '10', '0', 'n10', 'n20']
pre_cut = [[0, 0, 0, 0, 0, 0, 0, 0, 0],
           [0, 0, 0, 0, 61, 0, 61, 61, 61],
           [0, 0, 0, 0, 121, 0, 0, 121, 0],
           [0, 0, 0, 0, 0, 121, 121, 121, 121],
           [0, 0, 0, 0, 121, 0, 121, 121, 121]]
post_cut = [[106640, 108267, 99440, 117862, 113981, 45060, 72954, 137873, 221186],
            [90793, 78095, 97795, 95986, 102134, 39051, 67429, 123575, 207519],
            [85000, 83213, 59465, 74051, 60281, 33631, 56852, 79666, 125521],
            [57227, 56711, 53856, 58098, 49521, 28198, 48386, 66608, 106715],
            [47710, 47374, 47145, 47343, 42361, 23556, 39327, 54194, 86404]]

In [None]:
# Data plot

def plot_data(time, voltage, current, ah, battery_temp, train_name):
    fig, axes = plt.subplots(2, 2, figsize = (16,8))
    fig.subplots_adjust(hspace = 0.5)
    suptitle = train_name
    axes[0,0].plot(time, voltage)
    axes[0,0].set_xlabel('Time(s)')
    axes[0,0].set_ylabel('Voltage(V)')
    axes[0,0].set_yticks(np.arange(2.3, 4.5, 0.2))
    axes[0,0].grid(True, linestyle = '-')
    axes[0,1].plot(time, current)
    axes[0,1].set_xlabel('Time(s)')
    axes[0,1].set_ylabel('Current(A)')
    axes[0,1].set_yticks(np.arange(-20, 15, 3))
    axes[0,1].grid(True, linestyle = '-')
    axes[1,0].plot(time, ah)
    axes[1,0].set_xlabel('Time(s)')
    axes[1,0].set_ylabel('Capacity(Ah)')
    axes[1,0].set_yticks(np.arange(-2.7, 0, 0.3))
    axes[1,0].grid(True, linestyle = '-')
    axes[1,1].plot(time, battery_temp)
    axes[1,1].set_xlabel('Time(s)')
    axes[1,1].set_ylabel('Battery_Temp_degC(℃)')
    axes[1,1].grid(True, linestyle = '-')
    plt.show()
    
def plot_history(history):
    plt.figure(figsize = (16, 5))
    plt.plot(history.history['mean_absolute_error'])
    plt.title('Train History')
    plt.xlabel('epoch')
    plt.ylabel('mae')
    plt.yticks(np.arange(0.0, 0.55, 0.05))
    plt.grid(True, linestyle = '-')
    plt.show()
    
def plot_result(Y_test, predict, test_name):
    plt.figure(figsize = (18,5))
    plt.title(test_name)
    plt.plot(Y_test, label = 'Y_test')
    plt.plot(predict, label = 'predict')
    plt.xlabel('Time(s)')
    plt.ylabel('Capacity(Ah)')
    plt.yticks(np.arange(0.0, 1.1, 0.1))
    plt.grid(True,linestyle = '-')
    plt.legend()
    plt.show()
    
def plot_error(error, test_name):
    plt.figure(figsize=(18,5))
    plt.title(test_name)
    plt.plot(error)
    plt.xlabel('Time(s)')
    plt.ylabel('Error')
    plt.grid(True,linestyle='-')
    plt.show()

    
def get_data(temp, num, start):
    time = []
    voltage = []
    current = []
    ah = []
    battery_temp = []
    for i in range(num):
        data = sio.loadmat(data_path + '/' + temp + '/' + temp + set_name[i + start])
        time.append(data['meas']['Time'][0][0].flatten())
        voltage.append(data['meas']['Voltage'][0][0].flatten())
        current.append(data['meas']['Current'][0][0].flatten())
        ah.append(data['meas']['Ah'][0][0].flatten())
        battery_temp.append(data['meas']['Battery_Temp_degC'][0][0].flatten())
        #plot_data(time[i], voltage[i], current[i], ah[i], battery_temp[i], train_name[i])
    return time, voltage, current, ah, battery_temp
    
def create_dataset(voltage, current, temp, ah):
    voltage = voltage[::sampling_interval]
    current = current[::sampling_interval]
    temp = temp[::sampling_interval]
    ah = ah[::sampling_interval]
    examples = len(voltage)
    data = []
    X, Y = [], []
    for i in range(examples):
        data.append(current[i])
        data.append(voltage[i])
        data.append(temp[i])
        data.append((ah[i] + max_capacity) / max_capacity)
    data = np.array(data)
    data = np.reshape(data, (examples, features + 1))
    data = data[:examples // timesteps * timesteps][:]
    for i in range(data.shape[0]):
        X.append(data[i][:features])
        Y.append(data[i][features:])
    X = np.array(X)
    Y = np.array(Y)
    return X, Y

def MAX(y_true, y_pred):
    return K.max(K.abs(y_true - y_pred))
                       
def text_save(filename, data):
    file = open(filename, 'w')
    for i in range(len(data)):
        file.write(str(data[i]) + '\n')
    file.close()

In [None]:
# Model construction, training, and testing

for i in range(len(temp)):
    print(temp[i] + 'degree centigrade')
    time, voltage, current, ah, battery_temp = get_data(temp[i], train_num, 0)
    X_train = []
    Y_train = []
    for j in range(train_num):
        X_train_single, Y_train_single = create_dataset(voltage[j][pre_cut[i][j] : post_cut[i][j]],\
                                                        current[j][pre_cut[i][j] : post_cut[i][j]],\
                                                        battery_temp[j][pre_cut[i][j] : post_cut[i][j]],\
                                                        ah[j][pre_cut[i][j] : post_cut[i][j]])
        X_train.append(X_train_single)
        Y_train.append(Y_train_single)
        
    X_train = np.concatenate(X_train, axis = 0)
    Y_train = np.concatenate(Y_train, axis = 0)    
    scaler = MinMaxScaler()
    X_train = scaler.fit_transform(X_train)
    X_train = np.reshape(X_train, (X_train.shape[0] // timesteps, timesteps, features))
    Y_train = np.reshape(Y_train, (Y_train.shape[0] // timesteps, timesteps, 1))
    
    time_test, voltage_test, current_test, ah_test, battery_temp_test = get_data(temp[i], test_num, train_num)
    X_test = []
    Y_test = []
    for j in range(test_num):
        X_test_single, Y_test_single = create_dataset(voltage_test[j][pre_cut[i][j + train_num] : post_cut[i][j + train_num]],\
                                                      current_test[j][pre_cut[i][j + train_num] : post_cut[i][j + train_num]],\
                                                      battery_temp_test[j][pre_cut[i][j + train_num] : post_cut[i][j + train_num]],\
                                                      ah_test[j][pre_cut[i][j + train_num] : post_cut[i][j + train_num]])
        X_test.append(X_test_single)
        Y_test.append(Y_test_single)
        X_test[j] = scaler.transform(X_test[j])
        X_test[j] = np.reshape(X_test[j],(X_test[j].shape[0] // timesteps, timesteps, features))
        Y_test[j] = np.reshape(Y_test[j],(Y_test[j].shape[0] // timesteps, timesteps, 1))
    
    model = Sequential()
    model.add(Conv1D(filters = filters, kernel_size = kernel_size, input_shape = (timesteps, features),\
                     padding = 'same', activation = 'relu'))
    model.add(Dropout(dropout))
    model.add(Conv1D(filters = filters, kernel_size = kernel_size, padding = 'same', activation = 'relu'))
    model.add(Dropout(dropout))
    model.add(Bidirectional(LSTM(units = hidden_units, return_sequences = True)))
    model.add(Bidirectional(LSTM(units = hidden_units, return_sequences = True)))
    model.add(TimeDistributed(Dense(1, activation = 'relu')))
    model.summary()
    model.compile(loss = 'mse', optimizer = 'adam', metrics = ['mae', MAX, 'mape'])
    
    start = datetime.datetime.now()
    history = model.fit(X_train, Y_train, epochs = epochs, batch_size = batch_size, verbose = 0)
    end = datetime.datetime.now()
    print(end - start)
    plot_history(history)
    text_save(temp[i] + ' ' + 'history', history.history['mean_absolute_error'])
    
    predict = []
    min_mae = []
    min_max = []
    min_mape = []
    for j in range(test_num):
        score = model.evaluate(X_test[j], Y_test[j], verbose = 0)
        min_mae.append(score[1])
        min_max.append(score[2])
        min_mape.append(score[3])
        predict.append(model.predict(X_test[j]))
    for j in range(search_epoch):
        #print('No.' + str(j + 1) + 'th search')
        model.fit(X_train, Y_train, epochs = 1, batch_size = batch_size, verbose = 0)
        for k in range(test_num):
            score = model.evaluate(X_test[k], Y_test[k], verbose = 0)
            if score[1] < min_mae[k]:
                min_mae[k] = score[1]
                min_max[k] = score[2]
                min_mape[k] = score[3]
                predict[k] = model.predict(X_test[k])
                
    for j in range(test_num):
        print('mae = ', min_mae[j], 'max = ', min_max[j], 'mape = ', min_mape[j])

    error = []
    for j in range(test_num):
        Y_test[j] = Y_test[j].flatten()
        predict[j] = predict[j].flatten()
        error.append(100 * (abs(Y_test[j] - predict[j])))
        plot_result(Y_test[j], predict[j], test_name[j])
        plot_error(error[j], test_name[j])
    
    for j in range(test_num):
        text_save(temp[i] + ' ' + test_name[j] + ' ' + 'predict', predict[j])
        text_save(temp[i] + ' ' + test_name[j] + ' ' + 'label', Y_test[j])
        text_save(temp[i] + ' ' + test_name[j] + ' ' + 'error', error[j])
    
    del model