DATA LOADING & VISUALIZATION

In [None]:
# CELL 1: Setup

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os

# Plot settings
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (14, 6)


In [None]:
#CELL 2: Load CSV Data

battery_id = 'B0005'  # Change as needed
base_path = './data/'  # Change this path to your local folder

# Load CSV files
charge_df = pd.read_csv(os.path.join(base_path, battery_id, f'{battery_id}_charge_data.csv'))
discharge_df = pd.read_csv(os.path.join(base_path, battery_id, f'{battery_id}_discharge_data.csv'))
impedance_df = pd.read_csv(os.path.join(base_path, battery_id, f'{battery_id}_impedance_data.csv'))

# Show samples
display(charge_df.head())
display(discharge_df.head())
display(impedance_df.head())


In [None]:
# CELL 3: Dataset Overview

print(f"Charge Data Shape: {charge_df.shape}")
print(f"Discharge Data Shape: {discharge_df.shape}")
print(f"Impedance Data Shape: {impedance_df.shape}")

print("\nCharge Columns:", list(charge_df.columns))
print("Discharge Columns:", list(discharge_df.columns))
print("Impedance Columns:", list(impedance_df.columns))


In [None]:
# CELL 4: Fix Time Column (Generate Relative Time)

# For discharge cycles
discharge_df['relative_time'] = discharge_df.groupby('id_cycle').cumcount()

# For charge cycles
charge_df['relative_time'] = charge_df.groupby('id_cycle').cumcount()

# Verify fix
display(discharge_df[['id_cycle', 'relative_time']].head(10))
display(charge_df[['id_cycle', 'relative_time']].head(10))



In [None]:
# CELL 5: Plot Voltage vs Relative Time

fig, ax = plt.subplots(1, 2, figsize=(18,6))

# Discharge Voltage
ax[0].plot(discharge_df['relative_time'], discharge_df['Voltage_measured'], label='Discharge Voltage', color='red')
ax[0].set_xlabel('Relative Time (steps)')
ax[0].set_ylabel('Voltage (V)')
ax[0].set_title(f'{battery_id} Discharge Voltage Curve')
ax[0].legend()

# Charge Voltage
ax[1].plot(charge_df['relative_time'], charge_df['Voltage_measured'], label='Charge Voltage', color='green')
ax[1].set_xlabel('Relative Time (steps)')
ax[1].set_ylabel('Voltage (V)')
ax[1].set_title(f'{battery_id} Charge Voltage Curve')
ax[1].legend()

plt.show()


plt.show()


In [None]:
# CELL 6: Plot Current vs Relative Time

fig, ax = plt.subplots(1, 2, figsize=(18,6))

# Discharge Current
ax[0].plot(discharge_df['relative_time'], discharge_df['Current_measured'], label='Discharge Current', color='red')
ax[0].set_xlabel('Relative Time (steps)')
ax[0].set_ylabel('Current (A)')
ax[0].set_title(f'{battery_id} Discharge Current Curve')
ax[0].legend()

# Charge Current
ax[1].plot(charge_df['relative_time'], charge_df['Current_measured'], label='Charge Current', color='green')
ax[1].set_xlabel('Relative Time (steps)')
ax[1].set_ylabel('Current (A)')
ax[1].set_title(f'{battery_id} Charge Current Curve')
ax[1].legend()

plt.show()



In [None]:
# CELL 7: Plot Temperature vs Relative Time

fig, ax = plt.subplots(1, 2, figsize=(18,6))

# Discharge Temperature
ax[0].plot(discharge_df['relative_time'], discharge_df['Temperature_measured'], label='Discharge Temperature', color='blue')
ax[0].set_xlabel('Relative Time (steps)')
ax[0].set_ylabel('Temperature (°C)')
ax[0].set_title(f'{battery_id} Discharge Temperature Curve')
ax[0].legend()

# Charge Temperature
ax[1].plot(charge_df['relative_time'], charge_df['Temperature_measured'], label='Charge Temperature', color='purple')
ax[1].set_xlabel('Relative Time (steps)')
ax[1].set_ylabel('Temperature (°C)')
ax[1].set_title(f'{battery_id} Charge Temperature Curve')
ax[1].legend()

plt.show()


In [None]:
# CELL 8: Plot Capacity vs Cycle ID (SOH Proxy)

plt.figure(figsize=(12, 6))
plt.scatter(discharge_df['id_cycle'], discharge_df['Capacity'], color='orange', alpha=0.6)
plt.xlabel('Cycle ID')
plt.ylabel('Capacity (Ah)')
plt.title(f'{battery_id} Capacity Trend Over Cycles (SOH Proxy)')
plt.show()


PHASE 2: HEALTH INDICATOR EXTRACTION 

In [None]:
# CELL 1: Function to Extract Health Indicators per Cycle

def extract_hi_features(cycle_df, cycle_type='charge'):
    features = {}

    # Temperature-related Indicators
    temperature = cycle_df['Temperature_measured'].values
    time_steps = cycle_df['relative_time'].values

    # HI1: Time to reach peak temperature (relative time)
    peak_temp_idx = np.argmax(temperature)
    features['HI1_peak_temp_time'] = time_steps[peak_temp_idx]

    # HI2: Peak Temperature during the cycle
    features['HI2_peak_temp'] = np.max(temperature)

    # HI3: Average Temperature during the cycle
    features['HI3_avg_temp'] = np.mean(temperature)

    # Voltage-related Indicators
    voltage = cycle_df['Voltage_measured'].values

    # HI4: Initial voltage
    features['HI4_initial_voltage'] = voltage[0]

    # HI5: Final voltage
    features['HI5_final_voltage'] = voltage[-1]

    # HI6: Voltage drop (initial - final)
    features['HI6_voltage_drop'] = voltage[0] - voltage[-1]

    # HI7: Average voltage
    features['HI7_avg_voltage'] = np.mean(voltage)

    # Current-related Indicators
    current = cycle_df['Current_measured'].values

    # HI8: Average current
    features['HI8_avg_current'] = np.mean(current)

    # HI9: Peak current (absolute)
    features['HI9_peak_current'] = np.max(np.abs(current))

    # Cycle Metadata
    features['cycle_id'] = cycle_df['id_cycle'].iloc[0]

    return features


In [None]:
# CELL 2: Extract Features for All Discharge Cycles

discharge_hi_list = []

for cycle_id, cycle_df in discharge_df.groupby('id_cycle'):
    hi_features = extract_hi_features(cycle_df, cycle_type='discharge')
    hi_features['capacity'] = cycle_df['Capacity'].max()  # Capacity as SOH proxy
    discharge_hi_list.append(hi_features)

# Build DataFrame
discharge_hi_df = pd.DataFrame(discharge_hi_list)

# Show extracted features
display(discharge_hi_df.head())


In [None]:
# CELL 3: Plot Example Health Indicator over Cycles

plt.figure(figsize=(12,6))
plt.plot(discharge_hi_df['cycle_id'], discharge_hi_df['HI3_avg_temp'], label='HI3 - Avg Temperature')
plt.xlabel('Cycle ID')
plt.ylabel('Average Temperature (°C)')
plt.title(f'{battery_id} HI3 Trend Over Cycles (Discharge)')
plt.legend()
plt.show()


In [None]:
# CELL 4: Extract Features for All Charge Cycles

charge_hi_list = []

for cycle_id, cycle_df in charge_df.groupby('id_cycle'):
    hi_features = extract_hi_features(cycle_df, cycle_type='charge')
    charge_hi_list.append(hi_features)

# Build DataFrame
charge_hi_df = pd.DataFrame(charge_hi_list)

# Show extracted features
display(charge_hi_df.head())


In [None]:
# CELL 5: Plot Example Health Indicator over Charge Cycles

plt.figure(figsize=(12,6))
plt.plot(charge_hi_df['cycle_id'], charge_hi_df['HI3_avg_temp'], label='HI3 - Avg Temperature (Charge)')
plt.xlabel('Cycle ID')
plt.ylabel('Average Temperature (°C)')
plt.title(f'{battery_id} HI3 Trend Over Cycles (Charge)')
plt.legend()
plt.show()


In [None]:
# CELL 6: Correlation Analysis (Discharge Health Indicators vs Capacity)

# Drop non-feature columns
features_only = discharge_hi_df.drop(columns=['cycle_id', 'capacity'])

# Compute Pearson correlations
correlation_series = discharge_hi_df.corr()['capacity'].drop('capacity')

# Display correlations
display(correlation_series.sort_values(ascending=False))


In [None]:
# CELL 7: Correlation Heatmap 

import seaborn as sns

plt.figure(figsize=(10,8))
sns.heatmap(discharge_hi_df.corr(), annot=True, cmap='coolwarm', fmt=".2f")
plt.title(f'{battery_id} Feature Correlation Heatmap (Discharge)')
plt.show()


In [None]:
# CELL 8: Select Features with High Correlation to Capacity

# Define threshold (as per paper: 0.75)
correlation_threshold = 0.75

# Select features based on threshold
selected_features = correlation_series[abs(correlation_series) >= correlation_threshold].index.tolist()

print("Selected Features for Modeling:")
print(selected_features)


In [None]:
# CELL 9: Prepare X (features) and y (target)

# Features matrix (X)
X = discharge_hi_df[selected_features].copy()

# Target vector (y)
y = discharge_hi_df['capacity'].copy()

print("X shape:", X.shape)
print("y shape:", y.shape)


In [None]:
# CELL 10: Normalize Features

from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)

# Convert back to DataFrame 
X_scaled_df = pd.DataFrame(X_scaled, columns=selected_features)

display(X_scaled_df.head())


In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.3, random_state=42)


PSO-SVR Implementation 

In [None]:
def svr_fitness(params, X_train, y_train, X_test, y_test):
    
    C, gamma, epsilon = params

    model = SVR(C=C, gamma=gamma, epsilon=epsilon)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))

    return rmse


In [None]:
from pyswarm import pso

# Parameter bounds: [C, gamma, epsilon]
lower_bounds = [1, 0.0001, 0.0001]
upper_bounds = [100, 0.1, 0.1]

# Run PSO using args=() to pass data
best_params, best_rmse = pso(
    svr_fitness,
    lower_bounds,
    upper_bounds,
    args=(X_train, y_train, X_test, y_test),   
    swarmsize=15,
    maxiter=30
)

print("Best Parameters Found by PSO (C, gamma, epsilon):", best_params)
print("Best RMSE Found by PSO:", best_rmse)


In [None]:
# Train final SVR model using optimized parameters
C_opt, gamma_opt, epsilon_opt = best_params

final_model = SVR(C=C_opt, gamma=gamma_opt, epsilon=epsilon_opt)
final_model.fit(X_train, y_train)

# Predict on test data
y_pred = final_model.predict(X_test)


In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error, r2_score

# Evaluate
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
mae = mean_absolute_error(y_test, y_pred)
mape = mean_absolute_percentage_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)

print(f"Final SVR Performance:")
print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"MAPE: {mape:.4f}")
print(f"R2 Score: {r2:.4f}")


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 6))
plt.plot(y_test.values, label='Actual Capacity (SOH)')
plt.plot(y_pred, linestyle='--', label='Predicted Capacity (SOH)')
plt.xlabel('Sample Index')
plt.ylabel('Capacity (Ah)')
plt.title('PSO-Optimized SVR: Actual vs Predicted SOH')
plt.legend()
plt.show()


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import mean_squared_error
from pyswarm import pso


In [None]:
def cnn_fitness(params, X_train, y_train, X_test, y_test):
    
    filters = int(params[0])
    kernel_size = int(params[1])
    dense_units = int(params[2])
    learning_rate = params[3]

    # Reshape for Conv1D (samples, timesteps, features)
    X_train_cnn = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
    X_test_cnn = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

    # Build CNN Model
    model = Sequential([
        Conv1D(filters=filters, kernel_size=kernel_size, activation='relu', input_shape=(X_train.shape[1], 1)),
        MaxPooling1D(pool_size=2),
        Flatten(),
        Dense(dense_units, activation='relu'),
        Dense(1)  
    ])

    model.compile(optimizer=Adam(learning_rate=learning_rate), loss='mse')

    # Train Model
    model.fit(X_train_cnn, y_train, epochs=20, batch_size=16, verbose=0)

    # Predict
    y_pred = model.predict(X_test_cnn).flatten()

    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    return rmse


In [None]:
# Define bounds for: filters, kernel size, dense units, learning rate
lower_bounds = [16, 2, 8, 0.0001]
upper_bounds = [64, 5, 64, 0.01]

# Run PSO
best_params, best_rmse = pso(
    cnn_fitness,
    lower_bounds,
    upper_bounds,
    args=(X_train, y_train, X_test, y_test),
    swarmsize=10,
    maxiter=20
)

print("Best CNN Hyperparameters Found:", best_params)
print("Best CNN RMSE Found by PSO:", best_rmse)


In [None]:
# Extract optimized parameters
filters_opt = int(best_params[0])
kernel_size_opt = int(best_params[1])
dense_units_opt = int(best_params[2])
learning_rate_opt = best_params[3]

# Prepare data
X_train_cnn = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
X_test_cnn = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

# Build final CNN
model_cnn = Sequential([
    Conv1D(filters=filters_opt, kernel_size=kernel_size_opt, activation='relu', input_shape=(X_train.shape[1], 1)),
    MaxPooling1D(pool_size=2),
    Flatten(),
    Dense(dense_units_opt, activation='relu'),
    Dense(1)
])

model_cnn.compile(optimizer=Adam(learning_rate=learning_rate_opt), loss='mse')

# Train final model
model_cnn.fit(X_train_cnn, y_train, epochs=30, batch_size=16, verbose=1)

# Predict
y_pred_cnn = model_cnn.predict(X_test_cnn).flatten()


In [None]:
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, r2_score

rmse = np.sqrt(mean_squared_error(y_test, y_pred_cnn))
mae = mean_absolute_error(y_test, y_pred_cnn)
mape = mean_absolute_percentage_error(y_test, y_pred_cnn)
r2 = r2_score(y_test, y_pred_cnn)

print(f"Final CNN Performance:")
print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"MAPE: {mape:.4f}")
print(f"R2 Score: {r2:.4f}")


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(12,6))
plt.plot(y_test.values, label='Actual Capacity (SOH)')
plt.plot(y_pred_cnn, linestyle='--', label='Predicted Capacity (SOH)')
plt.xlabel('Sample Index')
plt.ylabel('Capacity (Ah)')
plt.title('PSO-Optimized CNN: Actual vs Predicted SOH')
plt.legend()
plt.show()


FULL PSO-LSTM IMPLEMENTATION

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import mean_squared_error
from pyswarm import pso


In [None]:
def lstm_fitness(params, X_train, y_train, X_test, y_test):
    """
    Fitness function for PSO – builds LSTM model, returns RMSE.
    """
    lstm_units = int(params[0])
    dense_units = int(params[1])
    learning_rate = params[2]

    # Reshape for LSTM (samples, timesteps, features)
    X_train_lstm = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
    X_test_lstm = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

    # Build Model
    model = Sequential([
        LSTM(lstm_units, input_shape=(X_train.shape[1], 1)),
        Dense(dense_units, activation='relu'),
        Dense(1)
    ])

    model.compile(optimizer=Adam(learning_rate=learning_rate), loss='mse')

    model.fit(X_train_lstm, y_train, epochs=20, batch_size=16, verbose=0)

    y_pred = model.predict(X_test_lstm).flatten()

    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    return rmse


In [None]:
# Define bounds: [LSTM units, Dense units, learning rate]
lower_bounds = [8, 8, 0.0001]
upper_bounds = [64, 64, 0.01]

# Run PSO
best_params, best_rmse = pso(
    lstm_fitness,
    lower_bounds,
    upper_bounds,
    args=(X_train, y_train, X_test, y_test),
    swarmsize=10,
    maxiter=20
)

print("Best LSTM Hyperparameters Found:", best_params)
print("Best LSTM RMSE Found by PSO:", best_rmse)


In [None]:
# Extract optimized parameters
lstm_units_opt = int(best_params[0])
dense_units_opt = int(best_params[1])
learning_rate_opt = best_params[2]

# Prepare data
X_train_lstm = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
X_test_lstm = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

# Build Final Model
model_lstm = Sequential([
    LSTM(lstm_units_opt, input_shape=(X_train.shape[1], 1)),
    Dense(dense_units_opt, activation='relu'),
    Dense(1)
])

model_lstm.compile(optimizer=Adam(learning_rate=learning_rate_opt), loss='mse')

# Train Final LSTM Model
model_lstm.fit(X_train_lstm, y_train, epochs=30, batch_size=16, verbose=1)

# Predict
y_pred_lstm = model_lstm.predict(X_test_lstm).flatten()


In [None]:
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, r2_score

rmse = np.sqrt(mean_squared_error(y_test, y_pred_lstm))
mae = mean_absolute_error(y_test, y_pred_lstm)
mape = mean_absolute_percentage_error(y_test, y_pred_lstm)
r2 = r2_score(y_test, y_pred_lstm)

print(f"Final LSTM Performance:")
print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"MAPE: {mape:.4f}")
print(f"R2 Score: {r2:.4f}")


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(12,6))
plt.plot(y_test.values, label='Actual Capacity (SOH)')
plt.plot(y_pred_lstm, linestyle='--', label='Predicted Capacity (SOH)')
plt.xlabel('Sample Index')
plt.ylabel('Capacity (Ah)')
plt.title('PSO-Optimized LSTM: Actual vs Predicted SOH')
plt.legend()
plt.show()


In [1]:
import joblib

# Save SVR model
joblib.dump(final_model, 'final_svr_model.pkl')
print("SVR model saved as final_svr_model.pkl")

# Optional: Save the scaler too (needed for deployment)
joblib.dump(scaler, 'scaler.pkl')
print("Scaler saved as scaler.pkl")


ModuleNotFoundError: No module named 'joblib'