In [2]:
# prompt: import dataset from google drive

import os
import zipfile
import pandas as pd
import numpy as np
from tqdm import tqdm
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Specify the path to your dataset zip file in Google Drive
zip_file_path = '/content/battery_dataset.zip'  # Replace with your actual path

# Ensure the directory exists
os.makedirs('/content/', exist_ok=True)



Mounted at /content/drive


In [3]:
# Extract files from the zip archive
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall('/content/')

In [27]:
import pandas as pd
import numpy as np
import os
from tqdm import tqdm

def split_batteries(battery_ids, n_val=6, random_state=None):
    """
    Randomly split battery IDs into training and validation sets

    Args:
        battery_ids: List of all battery IDs
        n_val: Number of batteries for validation
        random_state: Random seed for reproducibility

    Returns:
        train_battery_ids, val_battery_ids
    """
    if random_state is not None:
        np.random.seed(random_state)

    battery_ids = np.array(battery_ids)
    val_indices = np.random.choice(len(battery_ids), size=n_val, replace=False)
    val_battery_ids = battery_ids[val_indices]
    train_battery_ids = np.array([bid for bid in battery_ids if bid not in val_battery_ids])

    print(f"Training batteries ({len(train_battery_ids)}): {', '.join(train_battery_ids)}")
    print(f"Validation batteries ({len(val_battery_ids)}): {', '.join(val_battery_ids)}")

    return train_battery_ids, val_battery_ids

def parse_start_time(time_array):
    """Convert the time array to datetime"""
    try:
        return pd.Timestamp(year=int(time_array[0]),
                          month=int(time_array[1]),
                          day=int(time_array[2]),
                          hour=int(time_array[3]),
                          minute=int(time_array[4]),
                          second=int(time_array[5]))
    except:
        return pd.NaT

def load_metadata(metadata_path, battery_ids=None):
    """
    Load and process metadata file

    Args:
        metadata_path: Path to metadata file
        battery_ids: Optional list of battery IDs to filter
    """
    # Read metadata
    metadata = pd.read_csv(metadata_path)

    # Filter for specified batteries if provided
    if battery_ids is not None:
        metadata = metadata[metadata['battery_id'].isin(battery_ids)]

    # Convert start_time from string to list of numbers
    metadata['start_time'] = metadata['start_time'].apply(
        lambda x: [float(num) for num in x.strip('[]').split()]
    )

    # Convert to datetime
    metadata['datetime'] = metadata['start_time'].apply(parse_start_time)

    # Keep only discharge cycles
    discharge_data = metadata[metadata['type'] == 'discharge'].copy()

    # Sort by battery_id and datetime
    discharge_data = discharge_data.sort_values(['battery_id', 'datetime'])

    # Add cycle number for each battery
    discharge_data['cycle_number'] = discharge_data.groupby('battery_id').cumcount() + 1

    return discharge_data

def load_cycle_data(filename, data_folder):
    """Load individual cycle file and extract relevant features"""
    try:
        # Load the cycle file
        cycle_path = os.path.join(data_folder, filename)
        cycle_data = pd.read_csv(cycle_path)

        # Extract relevant features
        features = {
            'voltage_min': cycle_data['Voltage_measured'].min(),
            'voltage_max': cycle_data['Voltage_measured'].max(),
            'voltage_mean': cycle_data['Voltage_measured'].mean(),
            'current_mean': cycle_data['Current_measured'].mean(),
            'temperature_max': cycle_data['Temperature_measured'].max(),
            'temperature_mean': cycle_data['Temperature_measured'].mean(),
            'discharge_time': cycle_data['Time'].max()  # Total discharge time
        }

        return features

    except Exception as e:
        print(f"Error loading {filename}: {str(e)}")
        return None

def add_engineered_features(df):
    """Add engineered features for SOH prediction"""
    # Group by battery_id for calculations
    for battery_id in df['battery_id'].unique():
        battery_mask = df['battery_id'] == battery_id

        # Calculate rolling statistics (3, 5, 10 cycles)
        for window in [3, 5, 10]:
            # Capacity features
            df.loc[battery_mask, f'capacity_mean_{window}'] = df.loc[battery_mask, 'capacity'].rolling(window, min_periods=1).mean().shift(1)
            df.loc[battery_mask, f'capacity_std_{window}'] = df.loc[battery_mask, 'capacity'].rolling(window, min_periods=1).std().shift(1)

            # Voltage features
            df.loc[battery_mask, f'voltage_mean_{window}'] = df.loc[battery_mask, 'voltage_mean'].rolling(window, min_periods=1).mean().shift(1)

            # Temperature features
            df.loc[battery_mask, f'temperature_mean_{window}'] = df.loc[battery_mask, 'temperature_mean'].rolling(window, min_periods=1).mean().shift(1)

        # Calculate degradation rates
        df.loc[battery_mask, 'capacity_rate'] = df.loc[battery_mask, 'capacity'].pct_change()

        # Calculate difference from initial capacity
        initial_capacity = df.loc[battery_mask, 'capacity'].iloc[0]
        df.loc[battery_mask, 'capacity_pct_initial'] = (df.loc[battery_mask, 'capacity'] / initial_capacity) * 100

    return df




In [28]:
'''def create_battery_dataset(metadata_path, data_folder, battery_ids, max_files=None):
    """
    Create dataset for specified batteries only

    Args:
        metadata_path: Path to metadata file
        data_folder: Path to cycle data folder
        battery_ids: List of battery IDs to include
        max_files: Optional limit on number of files to process
    """
    print(f"Loading metadata for batteries: {', '.join(battery_ids)}")
    discharge_data = load_metadata(metadata_path, battery_ids)

    if max_files:
        discharge_data = discharge_data.head(max_files)

    print("Processing cycle files...")
    # Initialize lists to store data
    processed_data = []

    # Process each discharge cycle
    for _, row in tqdm(discharge_data.iterrows(), total=len(discharge_data)):
        cycle_features = load_cycle_data(row['filename'], data_folder)

        if cycle_features is not None:
            # Combine metadata with cycle features
            cycle_data = {
                'battery_id': row['battery_id'],
                'cycle_number': row['cycle_number'],
                'ambient_temperature': row['ambient_temperature'],
                'capacity': row['Capacity'],
                'datetime': row['datetime'],
                **cycle_features
            }
            processed_data.append(cycle_data)

    # Create DataFrame
    df = pd.DataFrame(processed_data)

    # Convert 'capacity' column to numeric
    df['capacity'] = pd.to_numeric(df['capacity'], errors='coerce')

    # Calculate SOH
    rated_capacity = 2.0  # Ah
    df['SOH'] = (df['capacity'] / rated_capacity) * 100

    # Add engineered features
    df = add_engineered_features(df)

    return df
'''


In [29]:
'''def main(metadata_path, data_folder, random_state=42):
    """
    Main function to create train and validation datasets

    Args:
        metadata_path: Path to metadata file
        data_folder: Path to cycle data folder
        random_state: Random seed for reproducibility

    Returns:
        train_df, val_df: Training and validation datasets
    """
    # All battery IDs
    all_battery_ids = ['B0005', 'B0006', 'B0007', 'B0018', 'B0025', 'B0026', 'B0027', 'B0028',
                      'B0029', 'B0030', 'B0031', 'B0032', 'B0033', 'B0034', 'B0036', 'B0038',
                      'B0039', 'B0040', 'B0041', 'B0042', 'B0043', 'B0044', 'B0045', 'B0046',
                      'B0047', 'B0048', 'B0049', 'B0050', 'B0051', 'B0052', 'B0053', 'B0054',
                      'B0055', 'B0056']

    # Split batteries into train and validation sets
    train_battery_ids, val_battery_ids = split_batteries(all_battery_ids, n_val=6, random_state=random_state)

    # Create training dataset
    print("\nCreating training dataset...")
    train_df = create_battery_dataset(metadata_path, data_folder, train_battery_ids)

    # Create validation dataset
    print("\nCreating validation dataset...")
    val_df = create_battery_dataset(metadata_path, data_folder, val_battery_ids)

    return train_df, val_df
'''



In [64]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

def split_batteries(battery_ids, n_val=6, random_state=42):
    """
    Stratified split of batteries ensuring representative validation set

    Args:
        battery_ids: List of all battery IDs
        n_val: Number of batteries for validation
        random_state: Random seed for reproducibility

    Returns:
        train_battery_ids, val_battery_ids
    """
    # First, load metadata to get initial information about batteries
    metadata = pd.read_csv(metadata_path)

    # Calculate initial metrics for each battery
    battery_metrics = []
    for battery_id in battery_ids:
        battery_data = metadata[metadata['battery_id'] == battery_id]

        metrics = {
            'battery_id': battery_id,
            'total_cycles': len(battery_data),
            'initial_capacity': battery_data['Capacity'].iloc[0],
            'final_capacity': battery_data['Capacity'].iloc[-1]
        }
        battery_metrics.append(metrics)

    # Convert to DataFrame for easier manipulation
    battery_metrics_df = pd.DataFrame(battery_metrics)

    # Sort batteries by total cycles to ensure spread
    battery_metrics_df = battery_metrics_df.sort_values('total_cycles')

    # Stratified split considering multiple factors
    np.random.seed(random_state)
    val_battery_ids = []

    # Ensure validation set captures variety
    while len(val_battery_ids) < n_val:
        # Select a battery with a mix of characteristics
        candidates = [
            bat for bat in battery_metrics_df['battery_id']
            if bat not in val_battery_ids
        ]

        if not candidates:
            break

        # Select battery with median characteristics
        selected_battery = candidates[len(candidates) // 2]
        val_battery_ids.append(selected_battery)

    # Remaining batteries go to training
    train_battery_ids = [
        bat for bat in battery_ids
        if bat not in val_battery_ids
    ]

    print("Validation Batteries:", val_battery_ids)
    print("Training Batteries:", train_battery_ids)

    return train_battery_ids, val_battery_ids

def create_battery_dataset(metadata_path, data_folder, battery_ids, max_files=None):
    """
    Create dataset for specified batteries with enhanced preprocessing

    Args:
        metadata_path: Path to metadata file
        data_folder: Path to cycle data folder
        battery_ids: List of battery IDs to include
        max_files: Optional limit on number of files to process

    Returns:
        Processed DataFrame with engineered features
    """
    print(f"Loading metadata for batteries: {', '.join(battery_ids)}")

    # Load metadata filtered for specified batteries
    discharge_data = load_metadata(metadata_path, battery_ids)

    # Optional file limit
    if max_files:
        discharge_data = discharge_data.head(max_files)

    print(f"Processing {len(discharge_data)} discharge cycles...")
    processed_data = []

    # Process each discharge cycle
    for _, row in tqdm(discharge_data.iterrows(), total=len(discharge_data)):
        cycle_features = load_cycle_data(row['filename'], data_folder)

        if cycle_features is not None:
            cycle_data = {
                'battery_id': row['battery_id'],
                'cycle_number': row['cycle_number'],
                'ambient_temperature': row['ambient_temperature'],
                'capacity': row['Capacity'],
                'datetime': row['datetime'],
                **cycle_features
            }
            processed_data.append(cycle_data)

    # Create DataFrame
    df = pd.DataFrame(processed_data)

    # Rigorous numeric conversion and cleaning
    df['capacity'] = pd.to_numeric(df['capacity'], errors='coerce')
    df.dropna(subset=['capacity'], inplace=True)

    # SOH calculation with more robust approach
    rated_capacities = {
        'B0005': 2.0,  # Example, replace with actual rated capacities
        'B0006': 2.0,
        'B0007': 2.0,
        'B0018': 2.0,
        'B0025': 2.0,
        'B0026': 2.0,
        'B0027': 2.0,
        'B0028': 2.0,
        'B0029': 2.0,
        'B0030': 2.0,
        'B0031': 2.0,
        'B0032': 2.0,
        'B0033': 2.0,
        'B0034': 2.0,
        'B0036': 2.0,
        'B0038': 2.0,
        'B0039': 2.0,
        'B0040': 2.0,
        'B0041': 2.0,
        'B0042': 2.0,
        'B0043': 2.0,
        'B0044': 2.0,
        'B0045': 2.0,
        'B0046': 2.0,
        'B0047': 2.0,
        'B0048': 2.0,
        'B0049': 2.0,
        'B0050': 2.0,
        'B0051': 2.0,
        'B0052': 2.0,
        'B0053': 2.0,
        'B0054': 2.0,
        'B0055': 2.0,
        'B0056': 2.0
        # Add more battery-specific rated capacities
    }

    def calculate_soh(row):
        rated_cap = rated_capacities.get(row['battery_id'], 2.0)  # Default 2.0 if not specified
        return (row['capacity'] / rated_cap) * 100

    df['SOH'] = df.apply(calculate_soh, axis=1)

    # Perform feature engineering
    df = add_engineered_features(df)

    # Sort by battery and cycle number
    df.sort_values(['battery_id', 'cycle_number'], inplace=True)

    return df

def main(metadata_path, data_folder, random_state=42):
    """
    Main function to create train and validation datasets

    Args:
        metadata_path: Path to metadata file
        data_folder: Path to cycle data folder
        random_state: Random seed for reproducibility

    Returns:
        train_df, val_df: Training and validation datasets
    """
    # All battery IDs
    all_battery_ids = ['B0005', 'B0006', 'B0007', 'B0018', 'B0025', 'B0026', 'B0027', 'B0028',
                      'B0029', 'B0030', 'B0031', 'B0032', 'B0033', 'B0034', 'B0036', 'B0038',
                      'B0039', 'B0040', 'B0041', 'B0042', 'B0043', 'B0044', 'B0045', 'B0046',
                      'B0047', 'B0048', 'B0049', 'B0050', 'B0051', 'B0052', 'B0053', 'B0054',
                      'B0055', 'B0056']  # Your complete list

    # Split batteries
    train_battery_ids, val_battery_ids = split_batteries(
        all_battery_ids,
        n_val=6,
        random_state=random_state
    )

    # Create datasets
    print("\nCreating training dataset...")
    train_df = create_battery_dataset(metadata_path, data_folder, train_battery_ids)

    print("\nCreating validation dataset...")
    val_df = create_battery_dataset(metadata_path, data_folder, val_battery_ids)

    return train_df, val_df

In [65]:
# Example usage
if __name__ == "__main__":
    metadata_path = "/content/cleaned_dataset/metadata.csv"
    data_folder = "/content/cleaned_dataset/data/"

    train_df, val_df = main(metadata_path, data_folder)

    print("\nDataset shapes:")
    print(f"Training set: {train_df.shape}")
    print(f"Validation set: {val_df.shape}")

Validation Batteries: ['B0046', 'B0041', 'B0047', 'B0053', 'B0048', 'B0039']
Training Batteries: ['B0005', 'B0006', 'B0007', 'B0018', 'B0025', 'B0026', 'B0027', 'B0028', 'B0029', 'B0030', 'B0031', 'B0032', 'B0033', 'B0034', 'B0036', 'B0038', 'B0040', 'B0042', 'B0043', 'B0044', 'B0045', 'B0049', 'B0050', 'B0051', 'B0052', 'B0054', 'B0055', 'B0056']

Creating training dataset...
Loading metadata for batteries: B0005, B0006, B0007, B0018, B0025, B0026, B0027, B0028, B0029, B0030, B0031, B0032, B0033, B0034, B0036, B0038, B0040, B0042, B0043, B0044, B0045, B0049, B0050, B0051, B0052, B0054, B0055, B0056
Processing 2408 discharge cycles...


100%|██████████| 2408/2408 [00:05<00:00, 414.41it/s]



Creating validation dataset...
Loading metadata for batteries: B0046, B0041, B0047, B0053, B0048, B0039
Processing 386 discharge cycles...


100%|██████████| 386/386 [00:00<00:00, 491.35it/s]



Dataset shapes:
Training set: (2383, 13)
Validation set: (386, 13)


In [66]:
train_df['battery_id'].nunique()

28

In [67]:
train_df.head()

Unnamed: 0,battery_id,cycle_number,ambient_temperature,capacity,datetime,voltage_min,voltage_max,voltage_mean,current_mean,temperature_max,temperature_mean,discharge_time,SOH
0,B0005,1,24,1.856487,2008-04-02 15:25:41,2.612467,4.191492,3.529829,-1.818702,38.982181,32.572328,3690.234,92.824371
1,B0005,2,24,1.846327,2008-04-02 19:43:48,2.587209,4.189773,3.53732,-1.81756,39.033398,32.725235,3672.344,92.316362
2,B0005,3,24,1.835349,2008-04-03 00:01:06,2.651917,4.188187,3.543737,-1.816487,38.818797,32.642862,3651.641,91.76746
3,B0005,4,24,1.835263,2008-04-03 04:16:37,2.592948,4.188461,3.543666,-1.825589,38.762305,32.514876,3631.563,91.763126
4,B0005,5,24,1.834646,2008-04-03 08:33:25,2.54742,4.188299,3.542343,-1.826114,38.665393,32.382349,3629.172,91.732275


In [68]:
train_df.columns

Index(['battery_id', 'cycle_number', 'ambient_temperature', 'capacity',
       'datetime', 'voltage_min', 'voltage_max', 'voltage_mean',
       'current_mean', 'temperature_max', 'temperature_mean', 'discharge_time',
       'SOH'],
      dtype='object')

In [69]:
train_df['battery_id'].nunique()

28

In [70]:
train_df['SOH']

Unnamed: 0,SOH
0,92.824371
1,92.316362
2,91.767460
3,91.763126
4,91.732275
...,...
2403,56.510957
2404,56.293597
2405,57.150541
2406,56.863659


In [71]:
train_df.describe().T

Unnamed: 0,count,mean,min,25%,50%,75%,max,std
cycle_number,2383.0,65.211918,1.0,22.0,54.0,98.5,197.0,50.074403
ambient_temperature,2383.0,20.161981,4.0,4.0,24.0,24.0,44.0,11.553357
capacity,2383.0,1.368017,0.0,1.209136,1.472248,1.695568,2.640149,0.458322
datetime,2383.0,2009-07-31 20:51:03.386487808,2008-04-02 15:25:41,2008-08-09 01:45:33.500000,2009-07-28 06:15:44,2010-06-19 05:42:11,2010-09-30 15:32:33,
voltage_min,2383.0,2.322313,0.192738,2.089385,2.381438,2.62664,3.838396,0.304967
voltage_max,2383.0,4.18622,3.677219,4.182461,4.187148,4.19796,4.542427,0.039418
voltage_mean,2383.0,3.431155,2.53001,3.370423,3.421761,3.493673,4.452558,0.190367
current_mean,2383.0,-1.83261,-3.976473,-1.960844,-1.81882,-1.675116,-0.029284,0.76961
temperature_max,2383.0,37.187487,6.383481,24.256223,38.67192,49.007244,69.869746,15.28778
temperature_mean,2383.0,29.165898,4.857764,14.170293,31.969088,37.265468,58.954811,12.90064


In [72]:
# prompt: 3456789123

train_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2383 entries, 0 to 2407
Data columns (total 13 columns):
 #   Column               Non-Null Count  Dtype         
---  ------               --------------  -----         
 0   battery_id           2383 non-null   object        
 1   cycle_number         2383 non-null   int64         
 2   ambient_temperature  2383 non-null   int64         
 3   capacity             2383 non-null   float64       
 4   datetime             2383 non-null   datetime64[ns]
 5   voltage_min          2383 non-null   float64       
 6   voltage_max          2383 non-null   float64       
 7   voltage_mean         2383 non-null   float64       
 8   current_mean         2383 non-null   float64       
 9   temperature_max      2383 non-null   float64       
 10  temperature_mean     2383 non-null   float64       
 11  discharge_time       2383 non-null   float64       
 12  SOH                  2383 non-null   float64       
dtypes: datetime64[ns](1), float64(9), int6

In [73]:
# prompt: Drop all null values in df

train_df = train_df.dropna()

In [74]:
train_df.isnull().sum()

Unnamed: 0,0
battery_id,0
cycle_number,0
ambient_temperature,0
capacity,0
datetime,0
voltage_min,0
voltage_max,0
voltage_mean,0
current_mean,0
temperature_max,0


In [75]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import learning_curve
import warnings
warnings.filterwarnings('ignore')



In [76]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import (
    train_test_split,
    GridSearchCV,
    KFold,
    cross_val_score
)
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.ensemble import ExtraTreesRegressor
import pickle


class TrainDataSOHPredictor:
    def __init__(self, train_df):
        self.train_df = train_df
        self.scaler = StandardScaler()
        self.best_models = {}
        self.cv_results = {}

    def prepare_features(self):
        """Prepare features for training with imputation and overfitting prevention."""
        exclude_cols = ['SOH', 'battery_id', 'datetime']
        feature_cols = [col for col in self.train_df.columns if col not in exclude_cols]

        X = self.train_df[feature_cols]
        y = self.train_df['SOH']

        # Replace infinite values with NaN
        X = X.replace([np.inf, -np.inf], np.nan)

        # Perform iterative imputation for missing values
        self.imputer = IterativeImputer(
            estimator=ExtraTreesRegressor(n_estimators=10),
            max_iter=10,
            random_state=42
        )
        X = pd.DataFrame(self.imputer.fit_transform(X), columns=X.columns)

        # Feature engineering to prevent overfitting
        def remove_correlated_features(X, threshold=0.95):
            corr_matrix = X.corr().abs()
            upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
            to_drop = [column for column in upper.columns if any(upper[column] > threshold)]
            return X.drop(columns=to_drop)

        def select_relevant_features(X, y, threshold=0.1):
            correlations = np.abs(pd.DataFrame(X).corrwith(pd.Series(y)))
            selected_features = correlations[correlations > threshold].index
            return X[selected_features]

        X = remove_correlated_features(X)
        X = select_relevant_features(X, y)

        self.selected_features = X.columns.tolist()

        # Save features to file
        with open('selected_features.pkl', 'wb') as f:
            pickle.dump(self.selected_features, f)


        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, shuffle=False
        )

        # Scale features
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        # Convert back to DataFrame for easier interpretation
        X_train_scaled = pd.DataFrame(X_train_scaled, columns=X.columns)
        X_test_scaled = pd.DataFrame(X_test_scaled, columns=X.columns)

        return X_train_scaled, X_test_scaled, y_train, y_test

    def tune_models(self, X_train, y_train):
        """Tune models using grid search for optimal hyperparameters."""
        param_grids = {
            'GradientBoost': {
                'n_estimators': [100, 150],
                'learning_rate': [0.05, 0.1],
                'max_depth': [3, 4]
            },
            'RandomForest': {
                'n_estimators': [100, 150],
                'max_depth': [None, 10],
                'min_samples_split': [2, 5]
            }
        }

        models = {
            'GradientBoost': GradientBoostingRegressor(random_state=42),
            'RandomForest': RandomForestRegressor(random_state=42)
        }

        # Perform grid search with cross-validation
        for name, model in models.items():
            print(f"\nTuning {name}...")

            grid_search = GridSearchCV(
                estimator=model,
                param_grid=param_grids[name],
                cv=3,
                scoring='r2',
                n_jobs=-1,
                verbose=1
            )

            grid_search.fit(X_train, y_train)

            self.best_models[name] = grid_search.best_estimator_
            self.cv_results[name] = {
                'best_params': grid_search.best_params_,
                'best_score': grid_search.best_score_
            }

            print(f"Best parameters for {name}: {grid_search.best_params_}")
            print(f"Best cross-validation score: {grid_search.best_score_:.4f}")

    def evaluate_models(self, X_train, X_test, y_train, y_test):
        """Evaluate the performance of the tuned models."""
        results = {}

        for name, model in self.best_models.items():
            print(f"\n{name} Results:")

            # Cross-validation scores
            cv = KFold(n_splits=5, shuffle=True, random_state=42)
            cv_scores = cross_val_score(model, X_train, y_train, cv=cv, scoring='r2')

            print(f"Cross-validation R2 scores: {cv_scores}")
            print(f"Mean CV R2: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")

            # Predictions
            y_train_pred = model.predict(X_train)
            y_test_pred = model.predict(X_test)

            # Metrics
            train_mae = mean_absolute_error(y_train, y_train_pred)
            test_mae = mean_absolute_error(y_test, y_test_pred)
            train_rmse = np.sqrt(mean_squared_error(y_train, y_train_pred))
            test_rmse = np.sqrt(mean_squared_error(y_test, y_test_pred))
            train_r2 = r2_score(y_train, y_train_pred)
            test_r2 = r2_score(y_test, y_test_pred)

            metrics = {
                'Train MAE': train_mae,
                'Test MAE': test_mae,
                'Train RMSE': train_rmse,
                'Test RMSE': test_rmse,
                'Train R2': train_r2,
                'Test R2': test_r2,
                'CV R2 Mean': cv_scores.mean(),
                'CV R2 Std': cv_scores.std(),
            }

            for metric, value in metrics.items():
                print(f"{metric}: {value}")

            results[name] = metrics

        return results

    def plot_feature_importance(self, X_columns):
        """Plot feature importance for the tuned models."""
        plt.figure(figsize=(15, 6))

        for idx, (name, model) in enumerate(self.best_models.items(), 1):
            plt.subplot(1, 2, idx)

            if hasattr(model, 'feature_importances_'):
                importance = model.feature_importances_
                indices = np.argsort(importance)[-10:]
                plt.barh(range(10), importance[indices])
                plt.yticks(range(10), X_columns[indices])
                plt.xlabel('Feature Importance')
                plt.title(f'{name} - Top 10 Features')

        plt.tight_layout()
        plt.show()


In [77]:
# Initialize and prepare data
tuned_predictor = TrainDataSOHPredictor(train_df)
X_train_scaled, X_test_scaled, y_train, y_test = tuned_predictor.prepare_features()

# Perform hyperparameter tuning
tuned_predictor.tune_models(X_train_scaled, y_train)


Tuning GradientBoost...
Fitting 3 folds for each of 8 candidates, totalling 24 fits
Best parameters for GradientBoost: {'learning_rate': 0.05, 'max_depth': 3, 'n_estimators': 150}
Best cross-validation score: 0.9909

Tuning RandomForest...
Fitting 3 folds for each of 8 candidates, totalling 24 fits
Best parameters for RandomForest: {'max_depth': 10, 'min_samples_split': 2, 'n_estimators': 150}
Best cross-validation score: 0.9750


In [78]:
# Evaluate models with cross-validation
results = tuned_predictor.evaluate_models(
X_train_scaled, X_test_scaled, y_train, y_test)

# Plot feature importance
# tuned_predictor.plot_feature_importance(X_train.columns)


GradientBoost Results:
Cross-validation R2 scores: [0.9999113  0.99980359 0.99989354 0.99985835 0.99797525]
Mean CV R2: 0.9995 (+/- 0.0015)
Train MAE: 0.047332572052374684
Test MAE: 0.5090179758962423
Train RMSE: 0.06238808687978226
Test RMSE: 0.8952669099951583
Train R2: 0.9999922457510623
Test R2: 0.9958539229296276
CV R2 Mean: 0.9994884048938685
CV R2 Std: 0.0007574695448797356

RandomForest Results:
Cross-validation R2 scores: [0.99983175 0.99978314 0.99965398 0.99993673 0.99760883]
Mean CV R2: 0.9994 (+/- 0.0018)
Train MAE: 0.024550798578398723
Test MAE: 2.169697640919913
Train RMSE: 0.21185086670899433
Test RMSE: 2.9705117308648887
Train R2: 0.9999105877769439
Test R2: 0.9543548279094914
CV R2 Mean: 0.9993628877311925
CV R2 Std: 0.0008817266193148055


In [59]:
def remove_correlated_features(X, threshold=0.95):
    corr_matrix = X.corr().abs()
    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
    to_drop = [column for column in upper.columns if any(upper[column] > threshold)]

    # Add logging to understand which features are being dropped
    if to_drop:
        print(f"Dropping correlated features: {to_drop}")

    return X.drop(columns=to_drop)

def select_relevant_features(X, y, threshold=0.1):
    # Use mutual information for feature selection
    from sklearn.feature_selection import mutual_info_regression

    # Calculate mutual information scores
    mi_scores = mutual_info_regression(X, y)

    # Create a series of scores
    mi_series = pd.Series(mi_scores, index=X.columns)

    # Select top features based on mutual information
    selected_features = mi_series[mi_series > threshold]

    print("Selected features based on mutual information:")
    print(selected_features)

    return X[selected_features.index]

In [60]:
from sklearn.model_selection import TimeSeriesSplit

def time_series_cross_validation(model, X, y):
    """Perform time series cross-validation"""
    tscv = TimeSeriesSplit(n_splits=5)

    cv_scores = []
    for train_index, test_index in tscv.split(X):
        X_train, X_test = X.iloc[train_index], X.iloc[test_index]
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]

        model.fit(X_train, y_train)
        score = r2_score(y_test, model.predict(X_test))
        cv_scores.append(score)

    return cv_scores

In [61]:
def enhanced_model_evaluation(self, X_train, X_test, y_train, y_test):
    results = {}

    for name, model in self.best_models.items():
        # Standard cross-validation
        cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring='r2')

        # Time series cross-validation
        ts_cv_scores = time_series_cross_validation(model, X_train, y_train)

        # Predictions
        y_train_pred = model.predict(X_train)
        y_test_pred = model.predict(X_test)

        # Metrics
        metrics = {
            'Standard CV R2 Mean': cv_scores.mean(),
            'Standard CV R2 Std': cv_scores.std(),
            'Time Series CV R2 Mean': np.mean(ts_cv_scores),
            'Time Series CV R2 Std': np.std(ts_cv_scores),
            'Train MAE': mean_absolute_error(y_train, y_train_pred),
            'Test MAE': mean_absolute_error(y_test, y_test_pred),
            'Train R2': r2_score(y_train, y_train_pred),
            'Test R2': r2_score(y_test, y_test_pred)
        }

        results[name] = metrics

        # Residual analysis
        self.plot_residuals(y_train, y_train_pred, name + ' Train')
        self.plot_residuals(y_test, y_test_pred, name + ' Test')

    return results

def plot_residuals(self, y_true, y_pred, title):
    residuals = y_true - y_pred
    plt.figure(figsize=(10, 5))

    plt.subplot(1, 2, 1)
    plt.scatter(y_pred, residuals)
    plt.title(f'Residual Plot - {title}')
    plt.xlabel('Predicted Values')
    plt.ylabel('Residuals')
    plt.axhline(y=0, color='r', linestyle='--')

    plt.subplot(1, 2, 2)
    sns.histplot(residuals, kde=True)
    plt.title(f'Residual Distribution - {title}')
    plt.xlabel('Residuals')

    plt.tight_layout()
    plt.show()

In [62]:
def predict_with_uncertainty(self, X, model_name='GradientBoost'):
    """
    Estimate prediction uncertainty using ensemble methods
    """
    model = self.best_models[model_name]

    if hasattr(model, 'estimators_'):
        # For ensemble methods like Random Forest
        predictions = np.array([tree.predict(X) for tree in model.estimators_])
        mean_prediction = predictions.mean(axis=0)
        prediction_std = predictions.std(axis=0)

        return mean_prediction, prediction_std
    else:
        return model.predict(X), None

In [21]:
# prompt: save the iterative imputer scaler

import joblib

# Assuming 'imputer' and 'scaler' are your fitted objects
# Replace with your actual variable names
# Example: imputer = IterativeImputer(...)
# Example: scaler = StandardScaler(...)

# Save the imputer
joblib.dump(tuned_predictor.imputer, 'imputer.pkl')

# Save the scaler
joblib.dump(tuned_predictor.scaler, 'scaler.pkl')

['scaler.pkl']

In [22]:
# prompt: save the Gradient Boosting model.

import joblib

# Assuming 'tuned_predictor' is your initialized and trained BatterySOHPredictorTuned object

# Save the best Gradient Boosting model
#joblib.dump(tuned_predictor.best_models['GradientBoost'], '/content/drive/My Drive/gradient_boosting_model_v5_01.pkl')
joblib.dump(tuned_predictor.best_models['GradientBoost'], 'gradient_boosting_model_v5_01.pkl')

['gradient_boosting_model_v5_01.pkl']

In [23]:
val_df.head()

Unnamed: 0,battery_id,cycle_number,ambient_temperature,capacity,datetime,voltage_min,voltage_max,voltage_mean,current_mean,temperature_max,...,capacity_mean_5,capacity_std_5,voltage_mean_5,temperature_mean_5,capacity_mean_10,capacity_std_10,voltage_mean_10,temperature_mean_10,capacity_rate,capacity_pct_initial
0,B0029,1,43,1.697507,2009-04-07 16:31:01,1.999936,4.122759,3.365186,-3.975396,58.726269,...,,,,,,,,,,100.0
1,B0029,2,43,1.844701,2009-04-07 19:44:26,1.852059,4.201346,3.390593,-3.934249,59.767964,...,1.697507,,3.365186,51.631546,1.697507,,3.365186,51.631546,0.086712,108.671178
2,B0029,3,43,1.825438,2009-04-07 22:58:18,1.885726,4.200751,3.394458,-3.933295,59.844347,...,1.771104,0.104082,3.377889,52.240494,1.771104,0.104082,3.377889,52.240494,-0.010443,107.536364
3,B0029,4,43,1.81575,2009-04-08 02:11:30,1.853478,4.201219,3.394696,-3.932113,59.919947,...,1.789215,0.080003,3.383412,52.462285,1.789215,0.080003,3.383412,52.462285,-0.005307,106.96569
4,B0029,5,43,1.813299,2009-04-08 05:23:26,1.913444,4.201517,3.398579,-3.931581,59.894754,...,1.795849,0.066656,3.386233,52.57281,1.795849,0.066656,3.386233,52.57281,-0.00135,106.821296


In [24]:
val_df.columns == train_df.columns

array([ True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True])

In [25]:
# working ValDataPreparer
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import pickle

class ValDataSOHPredictor:
    def __init__(self, val_df, scaler_path='scaler.pkl', imputer_path='imputer.pkl', features_path='selected_features.pkl'):
        """
        Initialize the validator with validation dataset and paths to saved preprocessors

        Args:
            val_df: Validation dataframe
            scaler_path: Path to saved StandardScaler
            imputer_path: Path to saved IterativeImputer
        """
        self.val_df = val_df

        # Load pre-fitted scaler and imputer
        #with open(scaler_path, 'rb') as f:
        #    self.scaler = pickle.load(f)
        #with open(imputer_path, 'rb') as f:
        #    self.imputer = pickle.load(f)

        self.scaler = joblib.load(scaler_path)
        self.imputer = joblib.load(imputer_path)
        self.selected_features = joblib.load('selected_features.pkl')

    def prepare_features(self):

        self.val_df = self.val_df.dropna()
        """
        Prepare features for validation using pre-fitted imputer and scaler
        """
        exclude_cols = ['SOH', 'battery_id', 'datetime']
        feature_cols = [col for col in self.val_df.columns if col not in exclude_cols]

        X = self.val_df[feature_cols]
        y = self.val_df['SOH']

        # Replace infinite values with NaN
        X = X.replace([np.inf, -np.inf], np.nan)

        # Use pre-fitted imputer
        X = pd.DataFrame(
            self.imputer.transform(X),
            columns=feature_cols
        )

        X = X[self.selected_features]

        # Feature selection (using the same correlation thresholds as training)
        #def remove_correlated_features(X, threshold=0.95):
        #    corr_matrix = X.corr().abs()
        #    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
        #    to_drop = [column for column in upper.columns if any(upper[column] > threshold)]
        #    return X.drop(columns=to_drop)

        #def select_relevant_features(X, y, threshold=0.1):
        #    correlations = np.abs(pd.DataFrame(X).corrwith(pd.Series(y)))
        #    selected_features = correlations[correlations > threshold].index
        #    return X[selected_features]

        #X = remove_correlated_features(X)
        #X = select_relevant_features(X, y)

        # Use pre-fitted scaler
        X_scaled = self.scaler.transform(X)
        X_scaled = pd.DataFrame(X_scaled, columns=self.selected_features)

        return X_scaled, y

    def evaluate_model(self, model, X_val, y_val):
        """
        Evaluate a trained model on validation data

        Args:
            model: Trained model (GradientBoostingRegressor or RandomForestRegressor)
            X_val: Prepared validation features
            y_val: Validation target values
        """
        # Make predictions
        y_val_pred = model.predict(X_val)

        # Calculate metrics
        val_mae = mean_absolute_error(y_val, y_val_pred)
        val_rmse = np.sqrt(mean_squared_error(y_val, y_val_pred))
        val_r2 = r2_score(y_val, y_val_pred)

        metrics = {
            'Validation MAE': val_mae,
            'Validation RMSE': val_rmse,
            'Validation R2': val_r2
        }

        # Print metrics
        print("\nValidation Metrics:")
        for metric, value in metrics.items():
            print(f"{metric}: {value:.4f}")

        return metrics, y_val_pred

    def plot_predictions(self, y_val, y_val_pred, model_name):
        """
        Plot actual vs predicted values and residuals

        Args:
            y_val: True validation values
            y_val_pred: Predicted validation values
            model_name: Name of the model used
        """
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

        # Actual vs Predicted plot
        ax1.scatter(y_val, y_val_pred, alpha=0.5)
        ax1.plot([y_val.min(), y_val.max()], [y_val.min(), y_val.max()], 'r--', lw=2)
        ax1.set_xlabel('Actual SOH')
        ax1.set_ylabel('Predicted SOH')
        ax1.set_title(f'{model_name} - Actual vs Predicted SOH')

        # Residuals plot
        residuals = y_val_pred - y_val
        ax2.scatter(y_val_pred, residuals, alpha=0.5)
        ax2.axhline(y=0, color='r', linestyle='--')
        ax2.set_xlabel('Predicted SOH')
        ax2.set_ylabel('Residuals')
        ax2.set_title('Residuals Plot')

        plt.tight_layout()
        plt.show()

    def plot_battery_predictions(self, model, X_val, y_val):
        """
        Plot predictions for each battery separately

        Args:
            model: Trained model
            X_val: Prepared validation features
            y_val: Validation target values
        """
        y_val_pred = model.predict(X_val)

        plt.figure(figsize=(15, 8))

        for battery in self.val_df['battery_id'].unique():
            battery_mask = self.val_df['battery_id'] == battery
            cycles = self.val_df[battery_mask]['cycle_number']

            actual = y_val[self.val_df['battery_id'] == battery]
            predicted = y_val_pred[self.val_df['battery_id'] == battery]

            plt.plot(cycles, actual, 'o-', label=f'{battery} (Actual)', alpha=0.5)
            plt.plot(cycles, predicted, 's--', label=f'{battery} (Predicted)', alpha=0.5)

        plt.xlabel('Cycle Number')
        plt.ylabel('SOH')
        plt.title('SOH Degradation: Actual vs Predicted by Battery')
        plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()

In [26]:
predictor = ValDataSOHPredictor(val_df)
X_train, X_test, y_train, y_test = predictor.prepare_features()
results = predictor.train_and_evaluate_models(X_train, X_test, y_train, y_test)
predictor.visualize_best_model(X_test, y_test)

ValueError: not enough values to unpack (expected 4, got 2)

In [None]:
class ValDataSOHPredictor:
    def __init__(self, val_df, scaler_path='scaler.pkl', imputer_path='imputer.pkl', features_path='selected_features.pkl'):
        """
        Initialize the validator with validation dataset and paths to saved preprocessors

        Args:
            val_df: Validation dataframe
            scaler_path: Path to saved StandardScaler
            imputer_path: Path to saved IterativeImputer
        """
        self.val_df = val_df

        # Load pre-fitted scaler, imputer, and selected features
        self.scaler = joblib.load(scaler_path)
        self.imputer = joblib.load(imputer_path)
        self.selected_features = joblib.load(features_path)

    def prepare_features(self):
        """
        Prepare features for validation using pre-fitted imputer and scaler, with added noise.
        """

        self.val_df = self.val_df.dropna()

        exclude_cols = ['SOH', 'battery_id', 'datetime']
        feature_cols = [col for col in self.val_df.columns if col not in exclude_cols]

        X = self.val_df[feature_cols]
        y = self.val_df['SOH']

        # Replace infinite values with NaN
        X = X.replace([np.inf, -np.inf], np.nan)

        # Use pre-fitted imputer
        X = pd.DataFrame(
            self.imputer.transform(X),
            columns=feature_cols
        )

        # Select features
        X = X[self.selected_features]

        # Use pre-fitted scaler
        X_scaled = self.scaler.transform(X)
        X_scaled = pd.DataFrame(X_scaled, columns=self.selected_features)

        # Add noise to scaled features
        '''def add_noise(X, noise_range=(0.2, 0.3)):
            """
            Add random noise to features within a specified percentage range.

            Args:
                X: Feature dataframe
                noise_range: Tuple indicating the min and max percentage noise to add

            Returns:
                Noisy feature dataframe
            """
            noise = np.random.uniform(
                low=1 - noise_range[1],
                high=1 + noise_range[1],
                size=X.shape
            )
            return X * noise

        X_noisy = add_noise(X_scaled)

        return X_noisy, y'''
        return X_scaled, y

    def evaluate_model(self, model, X_val, y_val):
        """
        Evaluate a trained model on validation data

        Args:
            model: Trained model (GradientBoostingRegressor or RandomForestRegressor)
            X_val: Prepared validation features
            y_val: Validation target values
        """
        # Make predictions
        y_val_pred = model.predict(X_val)

        # Calculate metrics
        val_mae = mean_absolute_error(y_val, y_val_pred)
        val_rmse = np.sqrt(mean_squared_error(y_val, y_val_pred))
        val_r2 = r2_score(y_val, y_val_pred)

        metrics = {
            'Validation MAE': val_mae,
            'Validation RMSE': val_rmse,
            'Validation R2': val_r2
        }

        # Print metrics
        print("\nValidation Metrics:")
        for metric, value in metrics.items():
            print(f"{metric}: {value:.4f}")

        return metrics, y_val_pred


In [None]:
# Example usage:
if __name__ == "__main__":
    # Assuming val_df and trained_model are available
    val_predictor = ValDataSOHPredictor(val_df)

    # Prepare validation features
    X_val, y_val = val_predictor.prepare_features()

    # Prepare model
    trained_model = joblib.load('gradient_boosting_model_v5_01.pkl')

    # Evaluate model
    metrics, predictions = val_predictor.evaluate_model(trained_model, X_val, y_val)

    # Plot results
   # val_predictor.plot_predictions(y_val, predictions, "GradientBoost")
   # val_predictor.plot_battery_predictions(trained_model, X_val, y_val)

In [None]:
# prompt: convert X_val to dataframe and display head()

X_val_df = pd.DataFrame(X_val)
X_val_df.head()

X_val_df.describe().T

In [None]:
# Load your validation dataframe
val_predictor = ValDataSOHPredictor(val_df)

# Prepare features
X_val, y_val = val_predictor.prepare_features()



# For each trained model you want to evaluate:
metrics, predictions = val_predictor.evaluate_model(trained_model, X_val, y_val)

# Generate visualizations
# val_predictor.plot_predictions(y_val, predictions, "Model Name")
val_predictor.plot_battery_predictions(trained_model, X_val, y_val)

# Misc

In [None]:
# Save the feature list after preprocessing
feature_list = list(X_train_scaled.columns)
joblib.dump(feature_list, 'feature_list.pkl')


In [None]:
# prompt: save out.df to a csv

tuned_predictor.out_df.to_csv('out.csv', index=False)

In [None]:
tuned_predictor.out_df.info()

In [None]:
['temperature_mean', 'voltage_mean', 'current_mean', 'capacity']

In [None]:
!pip freeze | grep scikit-learn
!pip freeze | grep joblib

In [None]:
# prompt: check versions of all these libraries:
# flask==2.0.1
# werkzeug==2.0.3
# pandas==1.3.3
# scikit-learn==1.5.2
# numpy==1.21.2
# gunicorn==20.1.0
# joblib==1.5.2

import subprocess

def get_package_version(package_name):
    try:
        result = subprocess.run(['pip', 'show', package_name], capture_output=True, text=True, check=True)
        for line in result.stdout.splitlines():
            if line.startswith('Version:'):
                return line.split(':')[1].strip()
        return None  # Version not found
    except subprocess.CalledProcessError:
        return None  # Package not found

packages = {
    'flask': '2.0.1',
    'werkzeug': '2.0.3',
    'pandas': '1.3.3',
    'scikit-learn': '1.5.2',
    'numpy': '1.21.2',
    'gunicorn': '20.1.0',
    'joblib': '1.5.2'
}

for package, expected_version in packages.items():
    version = get_package_version(package)
    if version:
        print(f'{package}: Installed version - {version}, Expected version - {expected_version}')
    else:
        print(f'{package}: Not installed')

In [None]:
!pip freeze > requirements.txt

In [None]:
# prompt: check python version

import sys
sys.version

In [None]:
import joblib
import pandas as pd

# Load the saved Gradient Boosting model
loaded_model = joblib.load('/content/drive/My Drive/gradient_boosting_model_v3.pkl')

# Assuming 'out_df' is your DataFrame with the records for inference
# Replace 'out.csv' with the actual path to your out_df file
out_df = pd.read_csv('out.csv')

# Get the feature names the model was trained on
training_feature_names = tuned_predictor.scaler.feature_names_in_

# Ensure X_inference has the same columns in the same order as training data
X_inference = out_df[training_feature_names]

# Scale features using the same scaler used during training
X_inference_scaled = tuned_predictor.scaler.transform(X_inference)

# Make predictions
y_pred = loaded_model.predict(X_inference_scaled)

# Display actual and predicted values
print("Actual vs Predicted Values:")
for i in range(len(out_df)):
  print(f"Record {i+1}: Actual SOH = {out_df['SOH'].iloc[i]}, Predicted SOH = {y_pred[i]}")

In [None]:
# prompt: Load model from Google Drive, its named as 'gradient_boosting_model_v3.pkl'. Then, run inference on it using out_df, and print the evaluation metrics.

# Load the saved Gradient Boosting model
loaded_model = joblib.load('/content/gradient_boosting_model_v5_01.pkl')

# Assuming 'out_df' is your DataFrame with the records for inference
# Replace 'out.csv' with the actual path to your out_df file
out_df = pd.read_csv('/content/cleaned_dataset/metadata.csv')

# Get the feature names the model was trained on
feature_list = joblib.load('/content/selected_features.pkl')


# Ensure X_inference has the same columns in the same order as training data
X_inference = out_df[feature_list]

# Load the scaler
scaler = joblib.load('scaler.pkl')

# Scale features using the same scaler used during training
X_inference_scaled = scaler.transform(X_inference)

# Make predictions
y_pred = loaded_model.predict(X_inference_scaled)

# Evaluate the model on the inference data
mae = mean_absolute_error(out_df['SOH'], y_pred)
mse = mean_squared_error(out_df['SOH'], y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(out_df['SOH'], y_pred)

print(f"Mean Absolute Error (MAE): {mae}")
print(f"Mean Squared Error (MSE): {mse}")
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"R-squared (R2): {r2}")

Previous:

Mean Absolute Error (MAE): 0.1317185813160859
Mean Squared Error (MSE): 0.17074409977468324
Root Mean Squared Error (RMSE): 0.4132119308232559
R-squared (R2): 0.9996839225247577

In [None]:
# prompt: Find out what values to give to the model, if we want to run it for inference.

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler

# Load the saved model
loaded_gb_model = joblib.load('/content/drive/My Drive/gradient_boosting_model_test.pkl')

# Sample input features (replace with your actual data)
# The input features MUST be in the same order and number as they were during training.
# If you have different features in the test data than in the training data,
# you'll need to retrain your model.

# Example (replace with your actual data)
sample_input = {
    'feature1': [1.0],
    'feature2': [2.0],
    # ... add other features
}

# Create a pandas DataFrame from the sample input
sample_df = pd.DataFrame(sample_input)

# Initialize a StandardScaler object - Use the same scaler used for training
scaler = StandardScaler()


# Fit the scaler ONLY on the training data and then transform the sample input using the fitted scaler
X_train, _, _, _ = tuned_predictor.prepare_features()
scaler.fit(X_train) # fit scaler on training features

# Now, transform your sample input using the fitted scaler
scaled_sample_input = scaler.transform(sample_df)

# Make predictions
prediction = loaded_gb_model.predict(scaled_sample_input)
print(f"Prediction: {prediction}")


In [None]:
# Meta Ensemble

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import (
    train_test_split,
    GridSearchCV,
    RandomizedSearchCV,
    KFold,
    cross_val_score
)
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from sklearn.impute import KNNImputer, SimpleImputer, IterativeImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.ensemble import ExtraTreesRegressor
from sklearn.base import BaseEstimator, RegressorMixin

class MetaEnsemble(BaseEstimator, RegressorMixin):
    """Meta-ensemble that combines predictions from multiple models"""
    def __init__(self, models, weights=None):
        self.models = models
        self.weights = weights if weights is not None else np.ones(len(models)) / len(models)

    def fit(self, X, y):
        """Fit all models in the ensemble"""
        for model in self.models:
            model.fit(X, y)
        return self

    def predict(self, X):
        """Make weighted predictions"""
        predictions = np.column_stack([
            model.predict(X) for model in self.models
        ])
        return np.average(predictions, weights=self.weights, axis=1)

    def set_weights(self, weights):
        """Update ensemble weights"""
        self.weights = weights

class BatterySOHPredictorTuned:
    def __init__(self, df):
        self.df = df
        self.scaler = StandardScaler()
        self.best_models = {}
        self.cv_results = {}
        self.meta_ensemble = None
        self.best_weights = None

    # [Previous prepare_features method remains the same]

    def prepare_features(self):
        """Prepare features with advanced imputation methods and overfitting prevention"""
        exclude_cols = ['SOH', 'battery_id', 'datetime']
        feature_cols = [col for col in self.df.columns if col not in exclude_cols]

        X = self.df[feature_cols]
        y = self.df['SOH']

        # Handle infinite values first
        X = X.replace([np.inf, -np.inf], np.nan)

        # ---------- Basic Imputation Methods (commented) ----------
        #Mean imputation
        #X = X.fillna(X.mean())

        # Median imputation
        # X = X.fillna(X.median())

        # ---------- Advanced Imputation Methods ----------

        # 1. KNN Imputation
        # Good for finding patterns in data, considers feature relationships
        # imputer = KNNImputer(n_neighbors=5, weights='uniform')
        # X = pd.DataFrame(imputer.fit_transform(X), columns=X.columns)

         #2. Iterative Imputation (MICE - Multivariate Imputation by Chained Equations)
         #Uses all features to predict missing values through multiple rounds
        imputer = IterativeImputer(
             estimator=ExtraTreesRegressor(n_estimators=100),
             max_iter=10,
             random_state=42
        )
        X = pd.DataFrame(imputer.fit_transform(X), columns=X.columns)

        # 3. ANOVA-based imputation for categorical variables
        # def anova_impute(df, target_col):
        #     grouped_means = df.groupby(target_col).mean()
        #     return df.fillna(grouped_means)
        #
        # categorical_cols = X.select_dtypes(include=['category', 'object']).columns
        # for col in categorical_cols:
        #     X[col] = anova_impute(X, col)

        # ---------- Feature Engineering for Overfitting Prevention ----------

        # 1. Add noise to training data (helps prevent overfitting)
        def add_noise(X, noise_level=0.01):
            noise = np.random.normal(0, noise_level, X.shape)
            return X + noise

        # 2. Feature selection based on correlation with target
        def select_relevant_features(X, y, threshold=0.1):
            correlations = np.abs(pd.DataFrame(X).corrwith(pd.Series(y)))
            selected_features = correlations[correlations > threshold].index
            return X[selected_features]

        # 3. Remove highly correlated features
        def remove_correlated_features(X, threshold=0.95):
            corr_matrix = X.corr().abs()
            upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
            to_drop = [column for column in upper.columns if any(upper[column] > threshold)]
            return X.drop(columns=to_drop)

        # Apply overfitting prevention techniques
        X = remove_correlated_features(X)
        X = select_relevant_features(X, y)

        # Scale features
        X_scaled = self.scaler.fit_transform(X)
        X_scaled = pd.DataFrame(X_scaled, columns=X.columns)

        # Optional: Add noise to training data
        # X_scaled = add_noise(X_scaled)

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X_scaled, y, test_size=0.2, random_state=42, shuffle=False
        )

        return X_train, X_test, y_train, y_test

    def tune_models(self, X_train, y_train):
        """Enhanced model tuning with regularization and meta-ensemble optimization"""
        param_grids = {
          'GradientBoost': {
              'n_estimators': [100, 150],
              'learning_rate': [0.05, 0.1],
              'max_depth': [3, 4]
          },
          'RandomForest': {
              'n_estimators': [100, 150],
              'max_depth': [None, 10],
              'min_samples_split': [2, 5]
          }
      }

        models = {
            'GradientBoost': GradientBoostingRegressor(random_state=42),
            'RandomForest': RandomForestRegressor(random_state=42)
        }

        # Tune individual models first
        for name, model in models.items():
            print(f"\nTuning {name}...")

            grid_search = GridSearchCV(
                estimator=model,
                param_grid=param_grids[name],
                cv=3,
                scoring=['r2', 'neg_mean_squared_error'],
                refit='r2',
                n_jobs=-1,
                verbose=1
            )

            grid_search.fit(X_train, y_train)
            self.best_models[name] = grid_search.best_estimator_
            self.cv_results[name] = {
                'best_params': grid_search.best_params_,
                'best_score': grid_search.best_score_
            }

        # Create and optimize meta-ensemble
        self._optimize_ensemble_weights(X_train, y_train)

    def _optimize_ensemble_weights(self, X, y, n_trials=100):
        """Optimize ensemble weights using random search"""
        print("\nOptimizing ensemble weights...")

        best_score = float('-inf')
        best_weights = None

        # Initialize meta-ensemble
        models_list = list(self.best_models.values())
        self.meta_ensemble = MetaEnsemble(models_list)

        # Random search for optimal weights
        for _ in range(n_trials):
            # Generate random weights that sum to 1
            weights = np.random.dirichlet(np.ones(len(models_list)))
            self.meta_ensemble.set_weights(weights)

            # Evaluate using cross-validation
            scores = cross_val_score(
                self.meta_ensemble, X, y,
                cv=5, scoring='neg_mean_squared_error'
            )
            mean_score = -scores.mean()  # Convert back to MSE

            if mean_score > best_score:
                best_score = mean_score
                best_weights = weights

        # Set the best weights
        self.best_weights = best_weights
        self.meta_ensemble.set_weights(best_weights)
        print(f"Best ensemble weights: {dict(zip(self.best_models.keys(), best_weights))}")

    def evaluate_models(self, X_train, X_test, y_train, y_test):
        """Evaluate individual models and meta-ensemble"""
        results = {}

        # Evaluate individual models
        for name, model in self.best_models.items():
            print(f"\n{name} Results:")
            results[name] = self._evaluate_single_model(
                model, X_train, X_test, y_train, y_test
            )

        # Evaluate meta-ensemble
        print("\nMeta-Ensemble Results:")
        self.meta_ensemble.fit(X_train, y_train)
        results['MetaEnsemble'] = self._evaluate_single_model(
            self.meta_ensemble, X_train, X_test, y_train, y_test
        )

        return results

    def _evaluate_single_model(self, model, X_train, X_test, y_train, y_test):
        """Helper method to evaluate a single model"""
        cv = KFold(n_splits=5, shuffle=True, random_state=42)
        cv_scores = cross_val_score(model, X_train, y_train, cv=cv, scoring='r2')

        print(f"Cross-validation R2 scores: {cv_scores}")
        print(f"Mean CV R2: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")

        y_train_pred = model.predict(X_train)
        y_test_pred = model.predict(X_test)

        metrics = self._calculate_metrics(
            y_train, y_train_pred, y_test, y_test_pred, cv_scores
        )

        for metric, value in metrics.items():
            print(f"{metric}: {value}")

        return metrics

    def _calculate_metrics(self, y_train, y_train_pred, y_test, y_test_pred, cv_scores):
        """Calculate comprehensive metrics"""
        train_mae = mean_absolute_error(y_train, y_train_pred)
        test_mae = mean_absolute_error(y_test, y_test_pred)
        train_rmse = np.sqrt(mean_squared_error(y_train, y_train_pred))
        test_rmse = np.sqrt(mean_squared_error(y_test, y_test_pred))
        train_r2 = r2_score(y_train, y_train_pred)
        test_r2 = r2_score(y_test, y_test_pred)

        y_range = y_test.max() - y_test.min()
        r2_percentage = max(0, test_r2 * 100)
        mae_percentage = max(0, 100 * (1 - test_mae / y_range))
        rmse_percentage = max(0, 100 * (1 - test_rmse / y_range))
        overall_score = (r2_percentage + mae_percentage + rmse_percentage) / 3

        return {
            'Train MAE': train_mae,
            'Test MAE': test_mae,
            'Train RMSE': train_rmse,
            'Test RMSE': test_rmse,
            'Train R2': train_r2,
            'Test R2': test_r2,
            'CV R2 Mean': cv_scores.mean(),
            'CV R2 Std': cv_scores.std(),
            'R2 Performance': f"{r2_percentage:.2f}%",
            'MAE Performance': f"{mae_percentage:.2f}%",
            'RMSE Performance': f"{rmse_percentage:.2f}%",
            'Overall Performance': f"{overall_score:.2f}%"
        }

    def predict(self, X):
        """Make predictions using the meta-ensemble"""
        if self.meta_ensemble is None:
            raise ValueError("Models haven't been trained yet!")
        return self.meta_ensemble.predict(X)

In [None]:
predictor = BatterySOHPredictorTuned(df)
X_train, X_test, y_train, y_test = predictor.prepare_features()
predictor.tune_models(X_train, y_train)
results = predictor.evaluate_models(X_train, X_test, y_train, y_test)