In [None]:
# Battery Degradation Estimation - Hybrid Model Pipeline

import scipy.io
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from scipy.optimize import differential_evolution

# Load and preprocess data
def load_battery_data(file_path):
    mat_data = scipy.io.loadmat(file_path)
    cycles = mat_data['B0005'][0][0]['cycle'][0]
    records = []
    for i, cycle in enumerate(cycles):
        if cycle['type'][0] == 'discharge':
            data = cycle['data'][0, 0]
            time = data['Time'].flatten()
            voltage = data['Voltage_measured'].flatten()
            current = data['Current_measured'].flatten()
            temp = data['Temperature_measured'].flatten()
            capacity = data['Capacity'][0, 0] if 'Capacity' in data.dtype.names else None
            for t, v, c, temp_val in zip(time, voltage, current, temp):
                records.append({
                    'Cycle_Index': i + 1,
                    'Time (s)': t,
                    'Voltage (V)': v,
                    'Current (A)': c,
                    'Temperature (C)': temp_val,
                    'Capacity (Ah)': capacity
                })
    df = pd.DataFrame(records)
    df = df.sort_values(by=['Cycle_Index', 'Time (s)'])
    return df

# Feature engineering and SOH calculation
def add_features_and_soh(df):
    agg = df.groupby('Cycle_Index').agg({
        'Voltage (V)': ['min', 'max'],
        'Temperature (C)': 'mean',
        'Capacity (Ah)': 'first'
    })
    agg.columns = ['Voltage_Min', 'Voltage_Max', 'Temperature_Avg', 'Capacity']
    agg = agg.reset_index()
    max_capacity = agg['Capacity'].max()
    agg['SOH (%)'] = 100 * agg['Capacity'] / max_capacity
    return agg

# Train ML models and predict SOH
def train_ml_models(df):
    X = df[['Voltage_Min', 'Voltage_Max', 'Temperature_Avg']]
    y = df['SOH (%)']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    rf = RandomForestRegressor(random_state=42)
    gb = GradientBoostingRegressor(random_state=42)
    nn = MLPRegressor(random_state=42, max_iter=1000)

    rf.fit(X_train, y_train)
    gb.fit(X_train, y_train)
    nn.fit(X_train, y_train)

    df['RandomForest_SOH_Pred'] = rf.predict(X)
    df['GradientBoosting_SOH_Pred'] = gb.predict(X)
    df['NeuralNetwork_SOH_Pred'] = nn.predict(X)

    df['SOH_ML'] = df[['RandomForest_SOH_Pred', 'GradientBoosting_SOH_Pred', 'NeuralNetwork_SOH_Pred']].mean(axis=1)
    return df

# Kalman Filter
class SimpleKalman:
    def __init__(self, Q, R, P0):
        self.Q = Q
        self.R = R
        self.P = P0
        self.x = None

    def filter(self, z_values):
        result = []
        for z in z_values:
            if self.x is None:
                self.x = z
            # Prediction step
            self.P = self.P + self.Q
            # Update step
            K = self.P / (self.P + self.R)
            self.x = self.x + K * (z - self.x)
            self.P = (1 - K) * self.P
            result.append(self.x)
        return np.array(result)

# Genetic Algorithm Optimization
def fitness_function(params, true_soh, ml_soh):
    Q, R, P0 = params
    kf = SimpleKalman(Q, R, P0)
    predicted = kf.filter(ml_soh)
    return mean_squared_error(true_soh, predicted)

def optimize_kalman(true_soh, ml_soh):
    bounds = [(1e-6, 1e-1), (1e-6, 1e-1), (0.1, 10.0)]
    result = differential_evolution(fitness_function, bounds, args=(true_soh, ml_soh), seed=42)
    return result.x

# RMSE Calculation
def calculate_rmse(true, predicted):
    return np.sqrt(mean_squared_error(true, predicted))

# Main pipeline
mat_file_path = "Battery Data Set/1. BatteryAgingARC-FY08Q4/B0005.mat"
df_raw = load_battery_data(mat_file_path)
df_features = add_features_and_soh(df_raw)
df_predicted = train_ml_models(df_features)

true_soh = df_predicted['SOH (%)'].values
ml_soh = df_predicted['SOH_ML'].values

best_params = optimize_kalman(true_soh, ml_soh)
Q_opt, R_opt, P0_opt = best_params
print(f"\n🧬 Best Parameters Found: Q = {Q_opt:.8f}, R = {R_opt:.8f}, P0 = {P0_opt:.2f}")

kf = SimpleKalman(Q_opt, R_opt, P0_opt)
df_predicted['SOH_Kalman_Optimized'] = kf.filter(ml_soh)

# RMSE Comparison
rmse_ml = calculate_rmse(true_soh, ml_soh)
rmse_kalman = calculate_rmse(true_soh, df_predicted['SOH_Kalman_Optimized'].values)

print("\n📉 RMSE Comparison:")
print(f"🔹 ML-only RMSE        : {rmse_ml:.4f}")
print(f"🔸 Hybrid (ML+Kalman) RMSE : {rmse_kalman:.4f}")
print(f"✅ Improvement from Kalman: {(rmse_ml - rmse_kalman) / rmse_ml * 100:.2f}%")

# Optional: Save final DataFrame to CSV
df_predicted.to_csv("B0005_ML_Kalman_Optimized.csv", index=False)

# Plotting
plt.figure(figsize=(12, 6))
plt.plot(true_soh, label='True SOH', linewidth=2)
plt.plot(ml_soh, label='ML Predicted SOH', linestyle='--')
plt.plot(df_predicted['SOH_Kalman_Optimized'], label='Kalman Smoothed SOH', linestyle='-.')
plt.xlabel('Cycle Index')
plt.ylabel('SOH (%)')
plt.title('Battery SOH Estimation: ML vs Hybrid (Kalman)')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()