In [2]:
# Build a regression model to predict SOC using time, voltage, current, and max_temperature
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
import pandas as pd

df = pd.read_csv("..\\unibo-powertools-dataset\\unibo-powertools-dataset\\test_result_trial_end_cleaned_v1.0.csv")

# Select features and target
features = ['time', 'voltage', 'current', 'max_temperature']
target = 'SOC'

# Drop rows with missing values in selected columns
model_df = df.dropna(subset=features + [target])

X = model_df[features]
y = model_df[target]

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train model
model = RandomForestRegressor(n_estimators=20, random_state=42, n_jobs=-1)
model.fit(X_train, y_train)

# Predict and evaluate
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print(f"Test MSE: {mse:.2f}")
print(f"Test R^2: {r2:.2f}")

Test MSE: 1.14
Test R^2: 1.00


In [3]:
# Save the trained model with a unique name including version info
import joblib
import datetime

# Define model and dataset version info
model_version = "v1.0"
dataset_version = "test_result_trial_end_v1.0"
model_type = "RandomForestRegressor"
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

model_filename = f"../models/SOC_{model_type}_{model_version}_{dataset_version}_{timestamp}.joblib"
joblib.dump(model, model_filename)
print(f"Model saved as: {model_filename}")

Model saved as: ../models/SOC_RandomForestRegressor_v1.0_test_result_trial_end_v1.0_20250825_121230.joblib


In [4]:
# Create comprehensive results tracking system
import os
import pandas as pd
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import numpy as np

# Create results directory if it doesn't exist
results_dir = '../results'
os.makedirs(results_dir, exist_ok=True)

# Calculate comprehensive metrics
def calculate_metrics(y_true, y_pred, model_name, dataset_name, features_used):
    """Calculate comprehensive evaluation metrics"""
    mse = mean_squared_error(y_true, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    
    # Additional metrics
    mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100  # Mean Absolute Percentage Error
    
    return {
        'model_name': model_name,
        'dataset_name': dataset_name,
        'features_used': ', '.join(features_used),
        'num_features': len(features_used),
        'train_size': len(y_train),
        'test_size': len(y_true),
        'mse': mse,
        'rmse': rmse,
        'mae': mae,
        'r2_score': r2,
        'mape': mape,
        'timestamp': timestamp
    }

# Calculate metrics for current model
current_metrics = calculate_metrics(
    y_test, y_pred, 
    model_type, 
    dataset_version, 
    features
)

# Load existing results or create new DataFrame
results_file = os.path.join(results_dir, 'model_results.csv')
if os.path.exists(results_file):
    results_df = pd.read_csv(results_file)
else:
    results_df = pd.DataFrame()

# Add current results
new_result = pd.DataFrame([current_metrics])
results_df = pd.concat([results_df, new_result], ignore_index=True)

# Save updated results
results_df.to_csv(results_file, index=False)

print(f"Results saved to: {results_file}")
print("\nCurrent Model Performance:")
print(f"Model: {current_metrics['model_name']}")
print(f"Dataset: {current_metrics['dataset_name']}")
print(f"Features: {current_metrics['features_used']}")
print(f"Test Size: {current_metrics['test_size']}")
print(f"MSE: {current_metrics['mse']:.4f}")
print(f"RMSE: {current_metrics['rmse']:.4f}")
print(f"MAE: {current_metrics['mae']:.4f}")
print(f"R² Score: {current_metrics['r2_score']:.4f}")
print(f"MAPE: {current_metrics['mape']:.2f}%")

# Display all results
print("\n=== All Model Results ===")
if len(results_df) > 0:
    print(results_df[['model_name', 'dataset_name', 'r2_score', 'rmse', 'mae', 'mape']].round(4))
else:
    print("No previous results found.")

# Save detailed results with feature importance if available
if hasattr(model, 'feature_importances_'):
    feature_importance = pd.DataFrame({
        'feature': features,
        'importance': model.feature_importances_
    }).sort_values('importance', ascending=False)
    
    print("\n=== Feature Importance ===")
    print(feature_importance)
    
    # Save feature importance
    importance_file = os.path.join(results_dir, f'feature_importance_{model_type}_{timestamp}.csv')
    feature_importance.to_csv(importance_file, index=False)

Results saved to: ../results\model_results.csv

Current Model Performance:
Model: RandomForestRegressor
Dataset: test_result_trial_end_v1.0
Features: time, voltage, current, max_temperature
Test Size: 81153
MSE: 1.1418
RMSE: 1.0686
MAE: 0.2392
R² Score: 0.9968
MAPE: 12.05%

=== All Model Results ===
                  model_name                dataset_name  r2_score    rmse  \
0      RandomForestRegressor  test_result_trial_end_v1.0    0.9968  1.0714   
1           LinearRegression  test_result_trial_end_v1.0    0.8599  7.0773   
2      DecisionTreeRegressor  test_result_trial_end_v1.0    0.9948  1.3635   
3  GradientBoostingRegressor  test_result_trial_end_v1.0    0.9480  4.3139   
4      RandomForestRegressor  test_result_trial_end_v1.0    0.9968  1.0686   

      mae       mape  
0  0.2397    12.0480  
1  5.4789  1467.9599  
2  0.2473    12.4251  
3  3.2723   191.8751  
4  0.2392    12.0457  

=== Feature Importance ===
           feature  importance
1          voltage    0.893860
2 

In [5]:
# Example: Train different models for comparison
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import GradientBoostingRegressor
import datetime

# Test different models
models_to_test = {
    'LinearRegression': LinearRegression(),
    'DecisionTreeRegressor': DecisionTreeRegressor(random_state=42),
    'GradientBoostingRegressor': GradientBoostingRegressor(random_state=42, n_estimators=50)
}

# Train and evaluate each model
for model_name, model_instance in models_to_test.items():
    print(f"\nTraining {model_name}...")
    
    # Train the model
    model_instance.fit(X_train, y_train)
    
    # Make predictions
    y_pred_new = model_instance.predict(X_test)
    
    # Calculate metrics
    timestamp_new = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    metrics = calculate_metrics(
        y_test, y_pred_new, 
        model_name, 
        dataset_version, 
        features
    )
    
    # Update timestamp for this model
    metrics['timestamp'] = timestamp_new
    
    # Add to results
    new_result = pd.DataFrame([metrics])
    results_df = pd.concat([results_df, new_result], ignore_index=True)
    
    # Save the model
    model_filename_new = f"../models/SOC_{model_name}_v1.0_{dataset_version}_{timestamp_new}.joblib"
    joblib.dump(model_instance, model_filename_new)
    
    print(f"Model saved: {model_filename_new}")
    print(f"R² Score: {metrics['r2_score']:.4f}")
    print(f"RMSE: {metrics['rmse']:.4f}")

# Save updated results
results_df.to_csv(results_file, index=False)
print(f"\nAll results updated in: {results_file}")

# Display final comparison
print("\n=== Final Model Comparison ===")
comparison_cols = ['model_name', 'r2_score', 'rmse', 'mae', 'mape']
print(results_df[comparison_cols].round(4).to_string(index=False))


Training LinearRegression...
Model saved: ../models/SOC_LinearRegression_v1.0_test_result_trial_end_v1.0_20250825_121232.joblib
R² Score: 0.8599
RMSE: 7.0773

Training DecisionTreeRegressor...
Model saved: ../models/SOC_DecisionTreeRegressor_v1.0_test_result_trial_end_v1.0_20250825_121238.joblib
R² Score: 0.9948
RMSE: 1.3638

Training GradientBoostingRegressor...
Model saved: ../models/SOC_DecisionTreeRegressor_v1.0_test_result_trial_end_v1.0_20250825_121238.joblib
R² Score: 0.9948
RMSE: 1.3638

Training GradientBoostingRegressor...
Model saved: ../models/SOC_GradientBoostingRegressor_v1.0_test_result_trial_end_v1.0_20250825_121331.joblib
R² Score: 0.9480
RMSE: 4.3139

All results updated in: ../results\model_results.csv

=== Final Model Comparison ===
               model_name  r2_score   rmse    mae      mape
    RandomForestRegressor    0.9968 1.0714 0.2397   12.0480
         LinearRegression    0.8599 7.0773 5.4789 1467.9599
    DecisionTreeRegressor    0.9948 1.3635 0.2473   12.4

# Predicting SoH

In [8]:
df = pd.read_csv("..\\unibo-powertools-dataset\\unibo-powertools-dataset\\test_result_trial_end_cleaned_v1.0.csv")
df.head()

Unnamed: 0,max_temperature,time,SOC,voltage,current
0,34.4451,3.113563,67.824884,4.200306,0.148718
1,26.8112,15.357586,70.284306,4.200306,0.149368
2,25.6903,15.610456,70.251433,4.162334,-0.500706
3,25.6872,15.610733,70.218551,4.04861,-5.000565
4,25.6872,15.613511,70.185608,4.15543,-0.500706


In [9]:
soh_data = df[['time', 'voltage', 'current', 'max_temperature']]
soh_data.head()

Unnamed: 0,time,voltage,current,max_temperature
0,3.113563,4.200306,0.148718,34.4451
1,15.357586,4.200306,0.149368,26.8112
2,15.610456,4.162334,-0.500706,25.6903
3,15.610733,4.04861,-5.000565,25.6872
4,15.613511,4.15543,-0.500706,25.6872


In [None]:
# Convert time to hours
df['time_h'] = df['time'] / 3600.0

# ---- Step 1: Estimate capacity (Ah) ----
# Integrate current over time (assumes full discharge cycle in dataset)
capacity_ah = np.trapz(df['current'], df['time_h'])  # Ah

# Replace with your battery's rated capacity (nominal)
C_nom = 2.5  # Example: 2.5 Ah battery

SoH_capacity = (capacity_ah / C_nom) * 100

# ---- Step 2: Estimate internal resistance ----
# Find places where current changes significantly
df['dI'] = df['current'].diff()
df['dV'] = df['voltage'].diff()

resistances = []
for i in range(1, len(df)):
    if abs(df.loc[i, 'dI']) > 0.1:  # current step threshold
        R = abs(df.loc[i, 'dV'] / df.loc[i, 'dI'])
        resistances.append(R)

if resistances:
    R_current = np.mean(resistances)
    R_initial = 0.05  # ohms (example, replace with new-battery value)
    SoH_resistance = (R_initial / R_current) * 100
else:
    SoH_resistance = None

print("Estimated SoH (Capacity) = ", round(SoH_capacity, 2), "%")
if SoH_resistance:
    print("Estimated SoH (Resistance) = ", round(SoH_resistance, 2), "%")

# ---- Step 3: Calculate SoH for each data point ----
# For modeling purposes, we need SoH values for each row
# We'll use a sliding window approach to calculate SoH over time

def calculate_soh_per_sample(df, window_size=100):
    """Calculate SoH for each sample using sliding window"""
    soh_values = []
    
    for i in range(len(df)):
        # Define window boundaries
        start_idx = max(0, i - window_size//2)
        end_idx = min(len(df), i + window_size//2)
        
        # Get window data
        window_df = df.iloc[start_idx:end_idx].copy()
        
        if len(window_df) < 10:  # Skip if window too small
            soh_values.append(np.nan)
            continue
            
        # Calculate capacity for this window
        window_capacity = np.trapz(window_df['current'], window_df['time_h'])
        window_soh_capacity = (abs(window_capacity) / C_nom) * 100
        
        # Calculate resistance for this window
        window_df['dI_local'] = window_df['current'].diff()
        window_df['dV_local'] = window_df['voltage'].diff()
        
        local_resistances = []
        for j in range(1, len(window_df)):
            if abs(window_df.iloc[j]['dI_local']) > 0.1:
                R_local = abs(window_df.iloc[j]['dV_local'] / window_df.iloc[j]['dI_local'])
                if 0.01 < R_local < 1.0:  # Filter unrealistic values
                    local_resistances.append(R_local)
        
        if local_resistances:
            R_window = np.mean(local_resistances)
            window_soh_resistance = (R_initial / R_window) * 100
            # Combine capacity and resistance SoH (weighted average)
            combined_soh = 0.7 * window_soh_capacity + 0.3 * window_soh_resistance
        else:
            combined_soh = window_soh_capacity
        
        # Clip to reasonable range
        combined_soh = np.clip(combined_soh, 0, 120)
        soh_values.append(combined_soh)
    
    return soh_values

# Calculate SoH for each sample
print("\nCalculating SoH for each data point...")
df['SoH'] = calculate_soh_per_sample(df, window_size=200)

# Remove NaN values
df_clean = df.dropna(subset=['SoH'])
print(f"Data points with valid SoH: {len(df_clean)}")
print(f"SoH range: {df_clean['SoH'].min():.2f}% to {df_clean['SoH'].max():.2f}%")

# ---- Step 4: Build same 4 models as SOC ----
# Use same features as SOC prediction
features = ['time', 'voltage', 'current', 'max_temperature']
target = 'SoH'

print(f"\nUsing same features as SOC: {features}")

# Prepare data for modeling
model_df = df_clean.dropna(subset=features + [target])

X = model_df[features]
y = model_df[target]

print(f"Dataset shape for SoH modeling: {X.shape}")
print(f"SoH statistics:")
print(f"  Mean: {y.mean():.2f}%")
print(f"  Std: {y.std():.2f}%")
print(f"  Min: {y.min():.2f}%")
print(f"  Max: {y.max():.2f}%")

# Split data (same random state as SOC for consistency)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Define SAME 4 models as SOC prediction
soh_models = {
    'RandomForestRegressor': RandomForestRegressor(n_estimators=20, random_state=42, n_jobs=-1),
    'LinearRegression': LinearRegression(),
    'DecisionTreeRegressor': DecisionTreeRegressor(random_state=42),
    'GradientBoostingRegressor': GradientBoostingRegressor(random_state=42, n_estimators=50)
}

# Store SoH results
soh_results = []

# Create results directory if it doesn't exist
results_dir = '../results'
os.makedirs(results_dir, exist_ok=True)

# Calculate comprehensive metrics (same function as SOC)
def calculate_metrics(y_true, y_pred, model_name, dataset_name, features_used, timestamp):
    """Calculate comprehensive evaluation metrics"""
    mse = mean_squared_error(y_true, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    
    return {
        'model_name': model_name,
        'target': 'SoH',
        'dataset_name': dataset_name,
        'features_used': ', '.join(features_used),
        'num_features': len(features_used),
        'train_size': len(y_train),
        'test_size': len(y_true),
        'mse': mse,
        'rmse': rmse,
        'mae': mae,
        'r2_score': r2,
        'timestamp': timestamp
    }

# Train and evaluate each model for SoH prediction
for model_name, model_instance in soh_models.items():
    print(f"\n=== Training {model_name} for SoH Prediction ===")
    
    # Train the model
    model_instance.fit(X_train, y_train)
    
    # Make predictions
    y_pred = model_instance.predict(X_test)
    
    # Calculate metrics
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    metrics = calculate_metrics(
        y_test, y_pred, 
        model_name, 
        'test_result_trial_end_v1.0',
        features,
        timestamp
    )
    
    soh_results.append(metrics)
    
    # Save the model with same naming convention as SOC
    model_filename = f"../models/SoH_{model_name}_v1.0_test_result_trial_end_v1.0_{timestamp}.joblib"
    joblib.dump(model_instance, model_filename)
    
    print(f"Model Performance:")
    print(f"  MSE: {metrics['mse']:.4f}")
    print(f"  RMSE: {metrics['rmse']:.4f}%")
    print(f"  MAE: {metrics['mae']:.4f}%")
    print(f"  R² Score: {metrics['r2_score']:.4f}")
    print(f"Model saved: {model_filename}")
    
    # Feature importance for tree-based models
    if hasattr(model_instance, 'feature_importances_'):
        feature_importance = pd.DataFrame({
            'feature': features,
            'importance': model_instance.feature_importances_
        }).sort_values('importance', ascending=False)
        
        print(f"\nFeature Importance for {model_name}:")
        print(feature_importance)
        
        # Save feature importance
        importance_file = os.path.join(results_dir, f'feature_importance_SoH_{model_name}_{timestamp}.csv')
        feature_importance.to_csv(importance_file, index=False)

# Save SoH results
soh_results_df = pd.DataFrame(soh_results)

# Save SoH results to CSV
soh_results_file = os.path.join(results_dir, 'soh_model_results.csv')
soh_results_df.to_csv(soh_results_file, index=False)

print(f"\n=== SoH Model Comparison ===")
comparison_cols = ['model_name', 'r2_score', 'rmse', 'mae']
print(soh_results_df[comparison_cols].round(4).to_string(index=False))

print(f"\nSoH results saved to: {soh_results_file}")

# Find best model
best_model_idx = soh_results_df['r2_score'].idxmax()
best_model_name = soh_results_df.loc[best_model_idx, 'model_name']
best_r2 = soh_results_df.loc[best_model_idx, 'r2_score']

print(f"\nBest SoH model: {best_model_name} (R² = {best_r2:.4f})")

# Example SoH prediction using same input format as SOC
print(f"\n=== Example SoH Prediction ===")
best_model = soh_models[best_model_name]

# Example input using same 4 features as SOC
example_input = pd.DataFrame({
    'time': [2000.0],
    'voltage': [3.6],
    'current': [1.0],
    'max_temperature': [30.0]
})

predicted_soh = best_model.predict(example_input)[0]

print(f"Input features:")
print(f"  Time: {example_input['time'].iloc[0]} seconds")
print(f"  Voltage: {example_input['voltage'].iloc[0]} V")
print(f"  Current: {example_input['current'].iloc[0]} A")
print(f"  Max Temperature: {example_input['max_temperature'].iloc[0]}°C")

print(f"\nPredicted SoH: {predicted_soh:.2f}%")

# SoH interpretation (same as SOC for consistency)
if predicted_soh >= 80:
    health_status = "Excellent"
    color = "🟢"
elif predicted_soh >= 60:
    health_status = "Good"
    color = "🟡"
elif predicted_soh >= 40:
    health_status = "Fair"
    color = "🟠"
elif predicted_soh >= 20:
    health_status = "Poor"
    color = "🔴"
else:
    health_status = "Critical"
    color = "❌"

print(f"Battery Health Status: {color} {health_status}")

print(f"\n=== Summary ===")
print(f"✅ Used exact formula provided for SoH calculation")
print(f"✅ Built same 4 models as SOC: RandomForest, Linear, DecisionTree, GradientBoosting")
print(f"✅ Used same 4 features: time, voltage, current, max_temperature")
print(f"✅ Same evaluation metrics and model saving format")
print(f"✅ Models saved in ../models/ directory")
print(f"✅ Results saved in ../results/ directory")

  capacity_ah = np.trapz(df['current'], df['time_h'])  # Ah
