In [None]:
#Library 
import pandas as pd
from datetime import timedelta
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error

import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential
from keras.layers import LSTM
from tensorflow.keras.layers import LSTM, Dense

#File That We Will Use
train_names = [ 
        
        '10degC/752_Mixed1', 
        '10degC/752_Mixed2',
        '10degC/756_Mixed3', 
        '10degC/756_Mixed4',
        '10degC/756_Mixed5', 
        '10degC/756_Mixed6',
        '10degC/756_Mixed7',
        '10degC/756_Mixed8',

        '25degC/734_Mixed1', 
        '25degC/734_Mixed2', 
        '25degC/740_Mixed3',
        '25degC/740_Mixed4',
        '25degC/740_Mixed5',
        '25degC/740_Mixed6',
        '25degC/740_Mixed7', 
        '25degC/740_Mixed8', 

        '40degC/710_Mixed1',
        '40degC/710_Mixed2',
        '40degC/722_Mixed3',
        '40degC/722_Mixed4', 
        '40degC/722_Mixed5',
        '40degC/722_Mixed6',
        '40degC/722_Mixed7',
        '40degC/722_Mixed8',
        
        ]

test_names = [

        '10degC/752_LA92'
        '10degC/752_UDDS'

        '25degC/734_LA92'
        '25degC/734_UDDS'
        
        '40degC/710_LA92'
        '40degC/710_UDDS'

        ]

path = 'C:/Kuliah/Skripsi/Dataset/Samsung INR21700 30T/'

In [None]:
# Function to Applied 3 Other Function into the Dataset
def get_discharge_whole_cycle(train_names, test_names, scale_test=False):
        train = _get_data(train_names)
        test = _get_data(test_names)
        train, test = _scale_x(train, test, scale_test=scale_test)        
        return (train, test)

# Function to Choose X and Y
def _get_data(names):
        cycles = []
        for name in names:
            cycle = pd.read_csv(path + name + '.csv', skiprows=30)
            cycle.columns = ['Time Stamp','Step','Status','Prog Time','Step Time','Cycle',
                            'Cycle Level','Procedure','Voltage','Current','Temperature','Capacity','WhAccu','Cnt','Empty']
            cycle = cycle[(cycle["Status"] == "TABLE") | (cycle["Status"] == "DCH")]

            max_discharge = abs(min(cycle["Capacity"]))
            cycle["SoC Capacity"] = max_discharge + cycle["Capacity"]
            cycle["SoC Percentage"] = cycle["SoC Capacity"] / max(cycle["SoC Capacity"])

            cycle['Prog Time'] = cycle['Prog Time'].apply(_time_string_to_seconds)
            cycle['Time in Seconds'] = cycle['Prog Time'] - cycle['Prog Time'][0]
            cycle['Time in Seconds'] = cycle['Time in Seconds'].round()

            cycle_per_second = cycle.groupby('Time in Seconds').agg({
                'Voltage': 'mean',
                'Current': 'mean',
                'Temperature': 'mean',
                'SoC Percentage': 'mean',
            }).reset_index()

            x = cycle_per_second[["Voltage", "Current", "Temperature"]].to_numpy()
            y = cycle_per_second[["SoC Percentage"]].to_numpy()

            cycles.append((x, y))

        return cycles

# Function to Transform Time Sampling into Seconds
def _time_string_to_seconds(input_string):
    time_parts = input_string.split(':')
    second_parts = time_parts[2].split('.')
    return timedelta(hours=int(time_parts[0]), 
        minutes=int(time_parts[1]), 
        seconds=int(second_parts[0]), 
        microseconds=int(second_parts[1])).total_seconds()

# Function to Normalize Dataset
def _scale_x(train, test, scale_test=False):
    for index_feature in range(len(train[0][0][0])):
        feature_min = min([min(cycle[0][:,index_feature]) for cycle in train])
        feature_max = max([max(cycle[0][:,index_feature]) for cycle in train])
        for i in range(len(train)):
            train[i][0][:,index_feature] = (train[i][0][:,index_feature]-feature_min)/(feature_max-feature_min)
        if scale_test:
            for i in range(len(test)):
                test[i][0][:,index_feature] = (test[i][0][:,index_feature]-feature_min)/(feature_max-feature_min)

    return train, test

In [None]:
# Applying Every Function to Dataset That We Use
cycles = get_discharge_whole_cycle(train_names, test_names, scale_test=True)

In [None]:
# Define window size
window_size = 20

# Function to create windowed dataset
def create_windowed_dataset(input_data, target_data, window_size):
    X, y = [], []
    for i in range(len(input_data) - window_size):
        X.append(input_data[i:i+window_size])
        y.append(target_data[i+window_size])
    return np.array(X), np.array(y)

# Create windowed dataset for training data
X_train, y_train = [], []
for input_data, target_data in cycles[0]:  
    X, y = create_windowed_dataset(input_data, target_data, window_size)
    X_train.append(X)
    y_train.append(y)
X_train, y_train = np.concatenate(X_train), np.concatenate(y_train)

# Reshape data for LSTM
X_train = X_train.reshape(X_train.shape[0], window_size, X_train.shape[2])

# Create windowed dataset for testing data
X_test, y_test = [], []
for input_data, target_data in cycles[1]:  # Assuming cycles[1] contains testing data
    X, y = create_windowed_dataset(input_data, target_data, window_size)
    X_test.append(X)
    y_test.append(y)
X_test, y_test = np.concatenate(X_test), np.concatenate(y_test)

# Reshape data for LSTM
X_test = X_test.reshape(X_test.shape[0], window_size, X_test.shape[2])


In [None]:
# Build LSTM Model

opt = tf.keras.optimizers.Adam(learning_rate=0.00001)

model = Sequential()
model.add(LSTM(64, return_sequences=False, input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(Dense(32))
model.add(Dense(1))
model.compile(optimizer=opt, loss='huber', metrics=['mae', tf.keras.metrics.RootMeanSquaredError(name='rmse')])

model.summary()

In [None]:
# Train model
history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, verbose=1)

In [None]:
# Plot training history
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

plt.plot(history.history['mae'], label='Training MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.title('Training and Validation MAE')
plt.xlabel('Epoch')
plt.ylabel('MAE')
plt.legend()
plt.show()

plt.plot(history.history['rmse'], label='Training RMSE')
plt.plot(history.history['val_rmse'], label='Validation RMSE')
plt.title('Training and Validation RMSE')
plt.xlabel('Epoch')
plt.ylabel('RMSE')
plt.legend()
plt.show()

In [None]:
# Save the trained model
model.save("lstm_model2.h5")

In [None]:
# Make predictions
y_pred = model.predict(X_test)

# Plotting
plt.figure(figsize=(10, 6))
plt.plot(y_pred.flatten(), label='SoC Predicted', color='blue')
plt.plot(y_test.flatten(), label='SoC Actual', color='red')
plt.title('LSTM Predictions vs Actual Values on 40C US06')
plt.xlabel('Time')
plt.ylabel('SoC Percentage')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Calculate RMSE
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
print("RMSE:", rmse)

# Calculate MAE
mae = mean_absolute_error(y_test, y_pred)
print("MAE:", mae)