In [None]:
from scipy.io import loadmat
import math
import numpy as np
import pandas as pd
from datetime import datetime
from IPython.display import clear_output
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras import Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from sklearn.model_selection import train_test_split
import plotly.express as px
import matplotlib.pyplot as plt
import plotly.graph_objs as go

### Display options

In [None]:
#How the display() function prints the dataframes in the notebook
pd.set_option('display.width', 2000)
pd.set_option('display.max_rows', 5)

### Battery dataset description

In [None]:
#MatLab File Data Structure:
loadmat("NASA_Battery_Data_Files/B0005.mat", simplify_cells=True)['B0005']['cycle'][1]['data'].keys()

#file['__header__', '__version__', '__globals__', 'B0005']
#file['B0005']['cycle']
#file['B0005']['cycle'][1]['type', 'ambient_temperature', 'time', 'data']
#file['B0005']['cycle'][1]['data']['Voltage_measured', 'Current_measured', 'Temperature_measured', 'Current_load', 'Voltage_load', 'Time', 'Capacity']

#cycle:	top level structure array containing the charge, discharge and impedance operations
#	type: 	operation  type, can be charge, discharge or impedance
#	ambient_temperature:	ambient temperature (degree C)
#	time: 	the date and time of the start of the cycle, in MATLAB  date vector format
#	data:	data structure containing the measurements
#	   for charge the fields are:
#		Voltage_measured: 	Battery terminal voltage (Volts)
#		Current_measured:	Battery output current (Amps)
#		Temperature_measured: 	Battery temperature (degree C)
#		Current_charge:		Current measured at charger (Amps)
#		Voltage_charge:		Voltage measured at charger (Volts)
#		Time:			Time vector for the cycle (secs)
#	   for discharge the fields are:
#		Voltage_measured: 	Battery terminal voltage (Volts)
#		Current_measured:	Battery output current (Amps)
#		Temperature_measured: 	Battery temperature (degree C)
#		Current_charge:		Current measured at load (Amps)
#		Voltage_charge:		Voltage measured at load (Volts)
#		Time:			Time vector for the cycle (secs)
#		Capacity:		Battery capacity (Ahr) for discharge till 2.7V 
#	   for impedance the fields are:
#		Sense_current:		Current in sense branch (Amps)
#		Battery_current:	Current in battery branch (Amps)
#		Current_ratio:		Ratio of the above currents 
#		Battery_impedance:	Battery impedance (Ohms) computed from raw data
#		Rectified_impedance:	Calibrated and smoothed battery impedance (Ohms) 
#		Re:			Estimated electrolyte resistance (Ohms)
#		Rct:			Estimated charge transfer resistance (Ohms)

### Battery data loading function, returns a Pandas dataframe

In [None]:
def loadAsPandas(filename):
    lst = []
    
    #scipy Matlab file loader func
    mat = loadmat("NASA_Battery_Data_Files/" + filename, simplify_cells=True)
    battery_id = filename.split(".")[0]

    #Number of cycles, All cycles: charge cycles + discharge cycles + impedance cycles
    cycle_count = len(mat[battery_id]['cycle'])
    
    #Loop through all cycles
    for i in range(0, cycle_count):
        #Print parsing progress to console
        clear_output(wait=True)
        print("parsing cycle:", (i+1), "/", cycle_count)
        
        #Skip Impedance cycles
        if mat[battery_id]['cycle'][i]['type'] == "impedance":
            continue

        #Number of readings / datapoints in the cycle
        readings_count = len(mat[battery_id]['cycle'][i]['data']['Time'])

        #type: 'charge', 'dischage'
        cycle_type = mat[battery_id]['cycle'][i]['type']

        #Loop through all readings for cycle
        for j in range(0, readings_count):
            time = mat[battery_id]['cycle'][i]['data']['Time'][j]
            voltage = mat[battery_id]['cycle'][i]['data']['Voltage_measured'][j]
            current = mat[battery_id]['cycle'][i]['data']['Current_measured'][j]
            temperature = mat[battery_id]['cycle'][i]['data']['Temperature_measured'][j]
            
            charge_voltage = None
            charge_current = None
            load_voltage = None
            load_current = None
            
            if cycle_type == "charge":
                charge_voltage = mat[battery_id]['cycle'][i]['data']['Voltage_charge'][j]
                charge_current = mat[battery_id]['cycle'][i]['data']['Current_charge'][j]
            elif cycle_type == "discharge":
                load_voltage = mat[battery_id]['cycle'][i]['data']['Voltage_load'][j]
                load_current = mat[battery_id]['cycle'][i]['data']['Current_load'][j]
            
            lst.append([i, time, voltage, current, temperature, charge_voltage, charge_current, load_voltage, load_current, cycle_type])
            
    #Convert list to Pandas dataframe
    pdf = pd.DataFrame(lst, columns=['cycle', 'time', 'voltage', 'current', 'temperature', 'charge_voltage', 'charge_current', 'load_voltage', 'load_current', 'type'])
    print("parsing complete!")
    return pdf

### Select a battery to load data from

In [None]:
B0005 = "B0005.mat" #Battery 5
B0006 = "B0006.mat" #Battery 6
B0007 = "B0007.mat" #Battery 7
B0018 = "B0018.mat" #Battery 18

pdf = loadAsPandas(B0005)
display(pdf)

### Print number of cycles in the set

In [None]:
print("charge cycles: ", pdf[pdf['type'] == 'charge']['cycle'].nunique())
print("discharge cycles: ", pdf[pdf['type'] == 'discharge']['cycle'].nunique())

### Filter to keep charge cycles only


In [None]:
#Extract only charge cycles
pdf = (pdf[pdf['type'] == 'charge'])

### Calculate Ampere-hours for each charge cycle

In [None]:
#Calculate capacity function
def compute_capacity(df):
    df['cycle'] = df['cycle']
    df['delta_time'] = df['time'].diff().fillna(0)
    df['delta_capacity_As'] = df['current'] * df['delta_time']
    df['capacity_Ah'] = df['delta_capacity_As'].cumsum() / 3600
    return df.drop(columns=['delta_time', 'delta_capacity_As'])

#Apply the function grouped by cycle
pdf = pdf.sort_values(['cycle', 'time']).reset_index(drop=True)
pdf = pdf.groupby('cycle', group_keys=False)[pdf.columns].apply(compute_capacity)

display(pdf)


### Calculate dQ/dV, highest measured dQ/dV, and timing for the highest measured dQ/dV for each charge cycle

In [None]:
#Calculate capacity function
def compute_dQdV(df):
    df['delta_t'] = df['time'].diff().fillna(0)  # first value will be 0
    df['dQ'] = df['current'] * df['delta_t']
    df['Q'] = df['dQ'].cumsum()
    df['delta_Q'] = df['Q'].diff()
    df['delta_V'] = df['voltage'].diff()
    
    df['dQdV'] = df['delta_Q'] / df['delta_V']
    return df.drop(columns=['delta_Q', 'delta_V'])

#Apply the function per cycle
pdf = pdf.sort_values(['cycle', 'time']).reset_index(drop=True)
pdf = pdf.groupby('cycle', group_keys=False)[pdf.columns].apply(compute_dQdV)

#Calculate highest measured dQ/dV for each cycle
pdf['peak_dQdV'] = pdf.groupby('cycle')['dQdV'].transform('max')

#Find the rows when the dQdV peaked for each cycle and extract as a separate dataframe 
peak_times = pdf.loc[pdf.groupby('cycle')['dQdV'].idxmax(), ['cycle', 'time']]

#Rename time column so we don't merge 'time' into 'time'
peak_times = peak_times.rename(columns={'time': 'peak_dQdV_time'})

#Merge back to original DataFrame
pdf = pdf.merge(peak_times, on='cycle')

display(pdf)

### Extact the last row of each cycle, because capacity increases as the battery is charging we want to grab the last reading to get the total capacity

In [None]:
#Select last entry in each group (Has the sumsum Ah for cycle)
pdf = pdf.groupby('cycle', group_keys=False).tail(1)

#Update cycle numbers 1, 2, 3, ..., n.
pdf.loc[:, 'cycle'] = range(1, len(pdf) + 1)

display(pdf)

### Plot raw capacity degradation

In [None]:
fig = px.line(pdf, x='cycle', y='capacity_Ah', markers=True,
              title='Ampere-hours over Cycle',
              labels={'cycle': 'Cycle', 'capacity_Ah': 'Capacity (Ampere-hours)'})

fig.show()

### Filter outliers < 1 Ah

In [None]:
pdf = pdf[pdf['capacity_Ah'] > 1.0]
pdf.loc[:, 'cycle'] = range(1, len(pdf) + 1)

fig = px.line(pdf, x='cycle', y='capacity_Ah', markers=True,
              title='Ampere-hours over Cycle without outliers',
              labels={'cycle': 'Cycle', 'capacity_Ah': 'Capacity (Ampere-hours)'})

fig.show()

### Long Short-Term Memory (LSTM) capacity prediction

In [None]:
#Features and target feature
features = ['capacity_Ah', 'peak_dQdV', 'peak_dQdV_time']
target = 'capacity_Ah'
look_back = 10

#Normalize features into range [0, 1]
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(pdf[features])

#Create sequences
def create_sequences(data, look_back):
    X, y = [], []
    for i in range(len(data) - look_back):
        X.append(data[i:i+look_back])
        y.append(data[i+look_back][0])  # target is capacity_Ah
    return np.array(X), np.array(y)

X, y = create_sequences(scaled_data, look_back)

#Split data into train and test subsets
split = int(len(X) * 0.7)
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

#LSTM model
model = Sequential()
model.add(Input(shape=(look_back, X.shape[2])))
model.add(LSTM(units=128, return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(units=128))
model.add(Dropout(0.2))
model.add(Dense(1))

model.compile(optimizer='adam', loss='mean_absolute_error')

results = model.fit(X_train, y_train, epochs=30, batch_size=16, validation_data=(X_test, y_test))

#Generatre predictions
y_pred = model.predict(X_test)

#Inverse transform predictions (becyase we normalize every datapoint to range [0,1] to work with LSTM we want them back to regular value ranges)
dummy = np.zeros((len(y_test), len(features)))
dummy[:, 0] = y_pred[:, 0]
predicted = scaler.inverse_transform(dummy)[:, 0]


### Plot the LSTM results, showing how the predicted capacity compared to the actual measured capacity

In [None]:
real_list = []
pred_list = []
len_actual = len(pdf['capacity_Ah'].values)
len_predicted = len(predicted)

#Fill list for index and actual values
for i in range(1, len_actual + 1):
    real_list.append([i, pdf['capacity_Ah'].values[i-1]])

#Fill list for index and predicted values
for i in range((1 + len_actual - len_predicted), len_actual + 1):
    pred_list.append([i, predicted[i - (len_actual - len_predicted) - 1]])

#Convert lists to Pandas dataframes
real_pdf = pd.DataFrame(real_list, columns=['cycle', 'capacity']) 
pred_pdf = pd.DataFrame(pred_list, columns=['cycle', 'capacity'])

#Instantiate figure
fig = go.Figure()

#Add plot line for actual capacity
fig.add_trace(go.Scatter(x=real_pdf['cycle'], y=real_pdf['capacity'], mode='lines', name='Actual Capacity'))

#Add plot line for predicted capacity
fig.add_trace(go.Scatter(x=pred_pdf['cycle'], y=pred_pdf['capacity'], mode='lines', name='Predicted Capacity'))

#Add title and axis text
fig.update_layout(
    width=1000,
    title="Actual vs Predicted capacity (Ah)",
    xaxis_title="charge cycle",
    yaxis_title="capacity (Ah)"
)

fig.show()

### Plot the training loss and validation loss for each epoch

In [None]:
print(results.history.keys())
plt.plot(results.history['loss'])
plt.plot(results.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['training loss', 'validation loss'], loc='upper center')
plt.show()