In [1]:
import numpy as np
import random
import math
import time
import os
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import pandas as pd
import glob
import scipy.io
import scipy.stats
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from datetime import datetime, timedelta
%matplotlib inline

from math import sqrt
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from datetime import datetime

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd
  from .autonotebook import tqdm as notebook_tqdm


In [2]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')


In [5]:
class BatteryDataPreprocessor:
    count = 0
    def __init__(self, dir_path,battery_list, dataset_type):
        self.dir_path = dir_path
        self.battery_list = battery_list
        self.dataset_type = dataset_type
        self.battery_data = {}

    def load_datasets(self):
        if self.dataset_type == 'CALCE':
            self.load_calce_datasets()
        elif self.dataset_type == 'NASA':
            self.load_nasa_datasets()
        else:
            raise ValueError("Unsupported dataset type.")

###########################################################################################################################################################################################
################################################################ CALCE ####################################################################################################################
    def load_calce_datasets(self):
        print("Loading datasets ...")
        for name in self.battery_list:
            print('Load Dataset ' + name + ' ...')
            
            discharge_capacities, health_indicator, internal_resistance, CCCT, CVCT = [], [], [], [], []
            path = glob.glob(self.dir_path + name + '/*.xlsx')
            dates = [pd.read_excel(p, sheet_name=1)['Date_Time'][0] for p in path]
            idx = np.argsort(dates)
            path_sorted = np.array(path)[idx]

            for p in path_sorted:
                df = pd.read_excel(p, sheet_name=1)
                self.process_battery_data(df,discharge_capacities, health_indicator, internal_resistance, CCCT, CVCT)
            
            self.battery_data[name] = self.aggregate_data(discharge_capacities, health_indicator, internal_resistance, CCCT, CVCT)

        print("Datasets loaded successfully.")

    def process_battery_data(self, df, discharge_capacities, health_indicator, internal_resistance, CCCT, CVCT):
        cycles = list(set(df['Cycle_Index']))
        for c in cycles:
            df_lim = df[df['Cycle_Index'] == c]
            df_cc = df_lim[df_lim['Step_Index'] == 2] # Constant current
            df_cv = df_lim[df_lim['Step_Index'] == 4] # Constant Voltage
            CCCT.append(np.max(df_cc['Test_Time(s)'])-np.min(df_cc['Test_Time(s)'])) # Time spent in CC
            CVCT.append(np.max(df_cv['Test_Time(s)'])-np.min(df_cv['Test_Time(s)'])) # Time spent in CV
            
            # Discharging
            df_d = df_lim[df_lim['Step_Index'] == 7]
            if not df_d.empty:
                d_v = df_d['Voltage(V)'].to_numpy()
                d_c = df_d['Current(A)'].to_numpy()
                d_t = df_d['Test_Time(s)'].to_numpy()
                # Calculate discharge capacity
                time_diff = np.diff(d_t)
                discharge_capacity = np.cumsum(time_diff * d_c[1:] / 3600)  # Convert to Ah
                discharge_capacities.append(-discharge_capacity[-1])
                start_capacity = discharge_capacity[np.abs(d_v[1:] - 3.8).argmin()]
                end_capacity = discharge_capacity[np.abs(d_v[1:] - 3.4).argmin()]
                health_indicator.append(-start_capacity + end_capacity)
                d_im = df_d['Internal_Resistance(Ohm)'].to_numpy()
                internal_resistance.append(np.mean(d_im))
                BatteryDataPreprocessor.count += 1

    def aggregate_data(self, discharge_capacities, health_indicator, internal_resistance, CCCT, CVCT):
        discharge_capacities = np.array(discharge_capacities)
        health_indicator = np.array(health_indicator)
        internal_resistance = np.array(internal_resistance)
        CCCT = np.array(CCCT)
        CVCT = np.array(CVCT)
    
        idx = drop_outlier(discharge_capacities, BatteryDataPreprocessor.count, 40)
        data = pd.DataFrame({'cycle':np.linspace(1,idx.shape[0],idx.shape[0]),
                              'capacity':discharge_capacities[idx],
                              'SoH':health_indicator[idx],
                              'resistance':internal_resistance[idx],
                              'CCCT':CCCT[idx],
                              'CVCT':CVCT[idx]})
        df = pd.DataFrame(data)
        return df
    def drop_outlier(self, array,count,bins):
        index = []
        range_ = np.arange(1,count,bins)
        for i in range_[:-1]:
            array_lim = array[i:i+bins]
            sigma = np.std(array_lim)
            mean = np.mean(array_lim)
            th_max,th_min = mean + sigma*2, mean - sigma*2
            idx = np.where((array_lim < th_max) & (array_lim > th_min))
            idx = idx[0] + i
            index.extend(list(idx))
        return np.array(index)
    
    
    ###########################################################################################################################################################################################
    ################################################################ NASA #####################################################################################################################
    
    def load_nasa_datasets(self):
        print("Loading NASA datasets...")
        for name in self.battery_list:
            print(f'Load Dataset {name}.mat ...')
            path = self.dir_path + name + '.mat'
            data = self.loadMat(path)
            self.battery_data[name] = self.getBatteryCapacity(data)
        print("Datasets loaded successfully.")

    # convert str to datatime 
    def convert_to_time(self, hmm):
        year, month, day, hour, minute, second = int(hmm[0]), int(hmm[1]), int(hmm[2]), int(hmm[3]), int(hmm[4]), int(hmm[5])
        return datetime(year=year, month=month, day=day, hour=hour, minute=minute, second=second)


    # load .mat data
    def loadMat(self, matfile):
        data = scipy.io.loadmat(matfile)
        filename = matfile.split("/")[-1].split(".")[0]
        col = data[filename]
        col = col[0][0][0][0]
        size = col.shape[0]

        data = []
        for i in range(size):
            k = list(col[i][3][0].dtype.fields.keys())
            d1, d2 = {}, {}
            if str(col[i][0][0]) != 'impedance':
                for j in range(len(k)):
                    t = col[i][3][0][0][j][0]
                    l = [t[m] for m in range(len(t))]
                    d2[k[j]] = l
            d1['type'], d1['ambient_temperature'], d1['time'], d1['data'] = str(col[i][0][0]), int(col[i][1][0]), str(self.convert_to_time(col[i][2][0])), d2
            data.append(d1)

        return data


    # get capacity data
    def getBatteryCapacity(self, Battery):
        cycle, capacity = [], []
        i = 1
        for Bat in Battery:
            if Bat['type'] == 'discharge':
                capacity.append(Bat['data']['Capacity'][0])
                cycle.append(i)
                i += 1
        return [cycle, capacity]


    # get the charge data of a battery
    def getBatteryValues(self, Battery, Type='charge'):
        data=[]
        for Bat in Battery:
            if Bat['type'] == Type:
                data.append(Bat['data'])
        return data
###########################################################################################################################################################################################
############################################################ TRIPLET GENERATION ###########################################################################################################    
    def generate_triplets(self, input_size=100, feature='Capacity'):
        all_triplets = []
        if self.dataset_type == 'CALCE':
            for name in self.battery_list:
                df_result = self.battery_data[name]
                cycles = df_result['cycle'].to_numpy()
                capacities = df_result['capacity'].to_numpy()
                mask_bits = np.ones(len(cycles))
                triplets = [(feature, cycle, capacity, mask) for cycle, capacity, mask in zip(cycles, capacities, mask_bits)]
                if len(triplets) > input_size:
                    selected_indices = np.random.choice(len(triplets), size=input_size, replace=False)
                    triplets = [triplets[i] for i in selected_indices]
                all_triplets.extend(triplets)
        elif self.dataset_type == 'NASA':
            for name in self.battery_list:
                cycles, capacities = self.battery_data[name]
                triplets = [(feature, cycle, capacity, 1) for cycle, capacity in zip(cycles, capacities)]
                if len(triplets) > input_size:
                    selected_indices = np.random.choice(len(triplets), size=input_size, replace=False)
                    triplets = [triplets[i] for i in selected_indices]
                all_triplets.extend(triplets)
            return all_triplets


###########################################################################################################################################################################################
################################################################ PLOT #####################################################################################################################
    def plot_capacity_degradation(self, title='Capacity degradation at ambient temperature of 24°C'):
        fig, ax = plt.subplots(1, figsize=(12, 8))
        color_list = ['b:', 'g--', 'r-.', 'c.']

        if self.dataset_type == 'NASA':
            # Assuming NASA dataset 'Battery' structure: {name: [cycles, capacities]}
            for name, color in zip(self.battery_list, color_list):
                cycles, capacities = self.battery_data[name]
                ax.plot(cycles, capacities, color, label=name)
        else:
            # Assuming CALCE dataset structure or similar where data is in DataFrame
            for name, color in zip(self.battery_list, color_list):
                df_result = self.battery_data[name]
                # Update this line if CALCE dataset structure is different
                ax.plot(df_result['cycle'], df_result['capacity'], color, label=f'Battery_{name}')
        
        ax.set(xlabel='Discharge cycles', ylabel='Capacity (Ah)', title=title)
        plt.legend()
        plt.show()
#TARGET = self.battery_list

In [6]:
def convert_to_triplets(data, target_feature, max_triplets):
    #features = ['capacity', 'SoH', 'resistance', 'CCCT', 'CVCT']
    triplets = []
    
    for index, row in data.iterrows():
        cycle = row['cycle']

        for feature in data.columns.difference(['cycle', target_feature]):
            value = row[feature]
            mask = 1 if pd.notnull(value) else 0 # Set mask to 1 if data is present, 0 otherwise

            triplets.append({
                            'Feature': feature,
                            'Cycle': cycle,
                            'Value': value,
                            'Mask': mask
            })
    if len(triplets) > max_triplets:
        correlated_features = data.corr()[target_feature].sort_values(ascending=False).index[1:]
        selected_features = [target_feature] + list(correlated_features)
        triplets = [triplet for triplet in triplets if triplet['Feature'] in selected_features]

        while len(triplets) < max_triplets:
            random_index = np.random.choice(data.index)
            cycle = data.loc[random_index, 'cycle']

            avaiable_features = data.columns[data.loc[random_index].notnull().difference(['cycle',target_feature]).tolist()]
            feature = np.random.choice(avaiable_features)
            value = data.loc[random_index, feature]
            mask = 1

            triplets.append({
                'Feature': feature,
                'cycle': cycle,
                'Value': value,
                'Mask': mask
            })
    return triplets

In [7]:
def generate_triplets(data, target_feature, max_triplets):
    triplets_x = []
    target_values = []

    #correlation
    correlation = data.corr()[target_feature].abs().sort_values(ascending=False)
    correlation.drop(target_feature, inplace=True) #avoid self-correlation
    
    for index, row in data.iterrows():
        cycle = row['cycle']
        for feature in data.columns.difference(['cycle']):
            if feature == target_feature:
                target_values.append({
                    'Cycle':cycle,
                    'Value': row[target_feature] if pd.notnull(row[target_feature]) else 0,
                    'Mask': 1 if pd.notnull(row[target_feature]) else 0
                })
    
            value = row[feature]
            mask = 1 if pd.notnull(value) else 0  # Set mask to 1 if data is present, 0 otherwise

            triplets_x.append({
                'Feature': feature,
                'Cycle': cycle,
                'Value': value,
                'Mask': mask
            })

    if len(triplets_x) > max_triplets:
        max_features = max_triplets // len(data['cycle'].unique())
        selected_features = correlation.index[:max_features].tolist()
        triplets_x = [t for t in triplets_x if t['Feature'] in selected_features]

    while len(triplets_x) < max_triplets:
        random_index = np.random.choice(data.index)
        cycle = data.loc[random_index, 'cycle']

        available_features = data.columns[data.loc[random_index].notnull()].difference(['cycle', target_feature])
        feature = np.random.choice(available_features)
        value = data.loc[random_index, feature]
        mask = 1 

        triplets_x.append({
            'Feature': feature,
            'Cycle': cycle,
            'Value': value,
            'Mask': mask
        })
    target_list = [list(d.values()) for d in target_values]
    target_values_array = np.array(target_list)
    return triplets_x, target_values_array


In [27]:
def generate_triplets_v2(data, target_feature, max_triplets):
    triplets_data = []
    target_data = []
    correlation = data.corr()[target_feature].abs().sort_values(ascending=False)
    correlation.drop(target_feature, inplace=True)  # Avoid self-correlation
    
    for index, row in data.iterrows():
        cycle = row['cycle']
        for feature in data.columns.difference(['cycle']):
            if feature == target_feature:
                target_data.append((cycle, row[target_feature] if pd.notnull(row[target_feature]) else 0, 1 if pd.notnull(row[target_feature]) else 0))
            else:
                value = row[feature]
                mask = 1 if pd.notnull(value) else 0  # Set mask to 1 if data is present, 0 otherwise
                triplets_data.append((feature, cycle, value if mask else 0, mask))

    if len(triplets_data) > max_triplets:
        max_features = max_triplets // len(data['cycle'].unique())
        selected_features = correlation.index[:max_features].tolist()
        triplets_data = [t for t in triplets_data if t[0] in selected_features]

    while len(triplets_data) < max_triplets:
        random_index = np.random.choice(data.index)
        cycle = data.loc[random_index, 'cycle']
        available_features = data.columns[data.loc[random_index].notnull()].difference(['cycle', target_feature])
        feature = np.random.choice(available_features)
        value = data.loc[random_index, feature]
        mask = 1 
        triplets_data.append((feature, cycle, value, mask))
    
    dtypes_triplets = np.dtype([
        ('Feature', 'U50'),  
        ('Cycle', np.int_),   
        ('Value', np.float_), 
        ('Mask', np.int_)     
    ])
    dtypes_target = np.dtype([
        ('Cycle', np.int_),   
        ('Value', np.float_), 
        ('Mask', np.int_)   
    ])
    
    triplets_x_array = np.array(triplets_data, dtype=dtypes_triplets)
    target_values_array = np.array(target_data, dtype=dtypes_target)

    return triplets_x_array, target_values_array

In [24]:
dir_path = 'datasets/NASA/'
battery_list = ['B0005', 'B0006', 'B0007', 'B0018']
preprocessor = BatteryDataPreprocessor(dir_path, battery_list, dataset_type='NASA')
preprocessor.load_datasets()



Loading NASA datasets...
Load Dataset B0005.mat ...


  d1['type'], d1['ambient_temperature'], d1['time'], d1['data'] = str(col[i][0][0]), int(col[i][1][0]), str(self.convert_to_time(col[i][2][0])), d2


Load Dataset B0006.mat ...
Load Dataset B0007.mat ...
Load Dataset B0018.mat ...
Datasets loaded successfully.


In [31]:
dir_path = 'datasets/CALCE/'
battery_list = ['CS2_35', 'CS2_36', 'CS2_37', 'CS2_38']
preprocessor = BatteryDataPreprocessor(dir_path, battery_list, dataset_type='CALCE')
preprocessor.load_datasets()


Loading datasets ...
Load Dataset CS2_35 ...


  ret = _var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  arrmean = um.true_divide(arrmean, div, out=arrmean,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


Load Dataset CS2_36 ...


  ret = _var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  arrmean = um.true_divide(arrmean, div, out=arrmean,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


Load Dataset CS2_37 ...


  ret = _var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  arrmean = um.true_divide(arrmean, div, out=arrmean,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


Load Dataset CS2_38 ...
Datasets loaded successfully.


  ret = _var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  arrmean = um.true_divide(arrmean, div, out=arrmean,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


In [32]:
battery1 = preprocessor.battery_data['CS2_35']

In [33]:
triplets_x, target_values = generate_triplets(battery1, 'capacity', 300)
triplets_x2, target_values2 = generate_triplets_v2(battery1, 'capacity', 300)

In [36]:
triplets_x2

array([('resistance', 880,  1.22263789e-01, 1),
       ('resistance', 608,  9.72511247e-02, 1),
       ('CCCT', 551,  5.01027210e+03, 1),
       ('CCCT', 272,  5.48836530e+03, 1),
       ('CCCT', 148,  5.84816055e+03, 1),
       ('CCCT', 808,  2.11935483e+03, 1),
       ('CVCT', 581,  2.71926204e+03, 1),
       ('SoH', 685, -5.31722120e-01, 1),
       ('resistance', 390,  9.31989923e-02, 1),
       ('resistance', 217,  9.52856615e-02, 1),
       ('SoH', 504, -6.87609292e-01, 1),
       ('CCCT', 890,  1.01420156e+03, 1),
       ('resistance', 293,  9.79729742e-02, 1),
       ('CCCT', 402,  5.69143984e+03, 1),
       ('SoH', 606, -6.32615889e-01, 1), ('SoH', 649, -5.59220004e-01, 1),
       ('CCCT', 539,  4.94073377e+03, 1),
       ('CVCT', 127,  2.10872840e+03, 1),
       ('SoH', 415, -7.15183585e-01, 1),
       ('resistance', 206,  8.98760334e-02, 1),
       ('SoH', 874, -1.46691451e-01, 1), ('SoH', 456, -6.87598111e-01, 1),
       ('CVCT', 543,  2.72285646e+03, 1),
       ('resistance

In [14]:
triplets_x

[{'Feature': 'resistance',
  'Cycle': 827.0,
  'Value': 0.12075348198413849,
  'Mask': 1},
 {'Feature': 'CCCT', 'Cycle': 94.0, 'Value': 5772.6566787153715, 'Mask': 1},
 {'Feature': 'resistance',
  'Cycle': 160.0,
  'Value': 0.09157814085483551,
  'Mask': 1},
 {'Feature': 'CCCT', 'Cycle': 799.0, 'Value': 2105.729702876415, 'Mask': 1},
 {'Feature': 'resistance',
  'Cycle': 768.0,
  'Value': 0.11264921724796295,
  'Mask': 1},
 {'Feature': 'resistance',
  'Cycle': 795.0,
  'Value': 0.11951082944869995,
  'Mask': 1},
 {'Feature': 'CVCT', 'Cycle': 834.0, 'Value': 3007.26218802901, 'Mask': 1},
 {'Feature': 'CCCT', 'Cycle': 95.0, 'Value': 5758.641031324631, 'Mask': 1},
 {'Feature': 'resistance',
  'Cycle': 97.0,
  'Value': 0.09230511635541916,
  'Mask': 1},
 {'Feature': 'resistance',
  'Cycle': 20.0,
  'Value': 0.08663725852966309,
  'Mask': 1},
 {'Feature': 'CVCT', 'Cycle': 473.0, 'Value': 2748.631390368333, 'Mask': 1},
 {'Feature': 'SoH', 'Cycle': 526.0, 'Value': -0.6783778131328368, 'Mask':

In [9]:
def drop_outlier(array,count,bins):
    index = []
    range_ = np.arange(1,count,bins)
    for i in range_[:-1]:
        array_lim = array[i:i+bins]
        sigma = np.std(array_lim)
        mean = np.mean(array_lim)
        th_max,th_min = mean + sigma*2, mean - sigma*2
        idx = np.where((array_lim < th_max) & (array_lim > th_min))
        idx = idx[0] + i
        index.extend(list(idx))
    return np.array(index)


def build_sequences(text, window_size):
    #text:list of capacity
    x, y = [],[]
    for i in range(len(text) - window_size):
        sequence = text[i:i+window_size]
        target = text[i+1:i+1+window_size]

        x.append(sequence)
        y.append(target)

    return np.array(x), np.array(y)


# leave-one-out evaluation: one battery is sampled randomly; the remainder are used for training.
def get_train_test(data_dict, name, window_size=8):
    data_sequence=data_dict[name]['capacity']
    train_data, test_data = data_sequence[:window_size+1], data_sequence[window_size+1:]
    train_x, train_y = build_sequences(text=train_data, window_size=window_size)
    for k, v in data_dict.items():
        if k != name:
            data_x, data_y = build_sequences(text=v['capacity'], window_size=window_size)
            train_x, train_y = np.r_[train_x, data_x], np.r_[train_y, data_y]
            
    return train_x, train_y, list(train_data), list(test_data)


def relative_error(y_test, y_predict, threshold):
    true_re, pred_re = len(y_test), 0
    for i in range(len(y_test)-1):
        if y_test[i] <= threshold >= y_test[i+1]:
            true_re = i - 1
            break
    for i in range(len(y_predict)-1):
        if y_predict[i] <= threshold:
            pred_re = i - 1
            break
    return abs(true_re - pred_re)/true_re if abs(true_re - pred_re)/true_re<=1 else 1


def evaluation(y_test, y_predict):
    mse = mean_squared_error(y_test, y_predict)
    rmse = sqrt(mean_squared_error(y_test, y_predict))
    return rmse
    
    
def setup_seed(seed):
    np.random.seed(seed)  # Numpy module.
    random.seed(seed)  # Python random module.
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.manual_seed(seed) 
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.benchmark = False
        torch.backends.cudnn.deterministic = True