In [1]:
import os
import sys
import random
import openpyxl
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from math import sqrt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.init as init
from torch.nn import functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import KFold, GroupShuffleSplit, GridSearchCV
from sklearn.linear_model import ElasticNet, LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percentage_error
from sklearn.utils.validation import check_array, check_random_state, _deprecate_positional_args
from sklearn.model_selection._split import _BaseKFold, _RepeatedSplits
from scipy.interpolate import make_interp_spline
from functions import *

In [None]:
# set device to GPU if available

if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"GPU is available: {torch.cuda.get_device_name(0)}")
    print(f"CUDA Version: {torch.version.cuda}")
else:
    device = torch.device("cpu")
    print("GPU is not available, using CPU instead.")

In [None]:
# cycle vs capacity retention

file_path = "capacity list.xlsx"
data = pd.read_excel(file_path)
cycle_numbers = data.columns[2:].astype(int)

last_cycle_values = data.iloc[:, -1].values # set colors based on final capacity
norm = plt.Normalize(vmin=last_cycle_values.min(), vmax=last_cycle_values.max())
cmap = plt.get_cmap('viridis').reversed()

fig, ax = plt.subplots(figsize=(8, 5))
y_min, y_max = 2.4, 3.25

for index, row in data.iterrows():
    y = row[2:].values  
    x_new = np.linspace(cycle_numbers.min(), cycle_numbers.max(), 300)
    spline = make_interp_spline(cycle_numbers, y, k=3)
    y_smooth = spline(x_new)    
    cell_color = cmap(norm(row.iloc[-2]))
    ax.plot(x_new, y_smooth, color=cell_color, linewidth=1.5)
    ax.scatter(cycle_numbers, y, color=cell_color, s=25, marker='o', zorder=3)

sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
cbar = fig.colorbar(sm, ax=ax, fraction=0.03, pad=0.04)
cbar.set_label('Capacity (Ah)')
ax.set_ylim([y_min, y_max])
ax.set_xlabel('Cycle Number')
ax.set_ylabel('Capacity (Ah)')
ax.set_title('Cycle Number vs Capacity for Each Cell')
ax.grid(True)

plt.show()

In [None]:
# Future degradation path prediction

# input data
diagcap_final = current_health_state('diagnosis', 'capacity list.xlsx')
sequence = torch.tensor(pd.read_excel('sequence list_index.xlsx', header=None).values, dtype=torch.long)

# Select mode
mode = 'full' # 'full', 'diagnosis-only', 'sequence-only'

if mode == 'diagnosis-only':
    sequence = torch.zeros_like(sequence)
elif mode == 'sequence-only':
    diagcap_final = torch.zeros_like(diagcap_final)

# target data
futureVR = process_folder_futureDI('diagnosis')

# Hyperparameters
input_dim_encoder = 11
n_cat = 4
hidden_dim = 64
output_dim = 10
lr = 0.0004
batch_size = 6
randomst = 0
num_folds = 6
num_encoder_fc_layers = 2
num_gru_layers = 2
num_decoder_fc_layers = 0  
num_epochs = 10000
early_stopping_patience = 1500
groups = np.repeat(np.arange(24), 3).tolist()
date = 241009
refs = 2 # 'refs' specifies the number of points excluded from the last cycle point for model evaluation and can be extended up to a range(1,11).

# Results folder 
result_folder = f'degradation path prediction_{mode}_{date}'
if not os.path.exists(result_folder):
    os.makedirs(result_folder)
        
# perform prediction
train_and_evaluate(diagcap_final, sequence, futureVR, groups, num_folds, randomst, 
                       input_dim_encoder, hidden_dim, num_encoder_fc_layers, 
                       n_cat, num_gru_layers, num_decoder_fc_layers, 
                       output_dim, batch_size, lr, num_epochs, early_stopping_patience, 
                       result_folder, device, refs)


In [None]:
# future capacity trajectory prediction

capacity = torch.tensor(pd.read_excel('capacity list.xlsx').iloc[:, 3:].values, dtype=torch.float32)

capacity_folder = 'capacity_per_fold'
if not os.path.exists(capacity_folder):
    os.makedirs(capacity_folder)
    
# Hyperparameters
randomst = 0
num_folds = 6
groups = np.repeat(np.arange(24), 3).tolist()

# future capacity for various forecast length and folds
target_capacity(capacity, groups, num_folds, randomst, capacity_folder)

#Unified regression model with true DI and capacity
result_folder = f'degradation path prediction_{mode}_{date}'
ref_true = 1
n_true = 1
n_folds = 6
model = unified_regression(result_folder, capacity_folder, n_folds, n_true, ref_true)

#Prediction using the unified regression model with predicted DI as input
evaluate_predictions(result_folder, capacity_folder, model, n_folds=6)

In [None]:
# Data analysis (observed vs predicted VR, capacity at n = 1)

result_folder = f'degradation path prediction_{mode}_{date}'

file_prefix = ['fold_1', 'fold_2', 'fold_3', 'fold_4', 'fold_5', 'fold_6']
file_suffix_performance = 'seq2seq_avg_rmse_mape_results_n1_ref1.csv'
file_paths = [os.path.join(result_folder, f"{prefix}_{file_suffix_performance}") for prefix in file_prefix]
rmse_values = [pd.read_csv(f)['Best RMSE'].dropna().values for f in file_paths]
mape_values = [pd.read_csv(f)['Best MAPE'].dropna().values for f in file_paths]
average_rmse = round(sum(r.sum() for r in rmse_values) / sum(len(r) for r in rmse_values),6)
average_mape = round(sum(m.sum() for m in mape_values) / sum(len(m) for m in mape_values),2)
print(f'RMSE : {average_rmse}, MAPE : {average_mape} for future degradation path prediction')

file_path = f'{result_folder}/fold_performance_pred_rf_n1_ref1.csv'
df = pd.read_csv(file_path)
rmse_mean = round(df['RMSE'].mean(),6)
mape_mean = round(df['MAPE'].mean(),2)
print(f'RMSE : {rmse_mean}, MAPE : {mape_mean} for future capacity trajectory prediction')

file_suffix_predictions = 'seq2seq_predictions_sheet_n1_ref1.xlsx'
file_suffix_targets = 'seq2seq_targets_sheet_n1_ref1.xlsx'
predictions = {}
targets = {}
for prefix in file_prefix:
    predictions_file = os.path.join(result_folder, f"{prefix}_{file_suffix_predictions}")
    targets_file = os.path.join(result_folder, f"{prefix}_{file_suffix_targets}")    
    predictions[prefix] = pd.read_excel(predictions_file, sheet_name=None)
    targets[prefix] = pd.read_excel(targets_file, sheet_name=None)

soc_10_50_combined_targets = pd.DataFrame()
soc_10_50_combined_predictions = pd.DataFrame()
soc_60_100_combined_targets = pd.DataFrame()
soc_60_100_combined_predictions = pd.DataFrame()

for prefix in file_prefix:
    for sheet_name in predictions[prefix]:
        pred_df = predictions[prefix][sheet_name]
        targ_df = targets[prefix][sheet_name]
        soc_10_50_combined_targets = pd.concat([soc_10_50_combined_targets, targ_df.iloc[:, 5:10]], axis=0)
        soc_10_50_combined_predictions = pd.concat([soc_10_50_combined_predictions, pred_df.iloc[:, 5:10]], axis=0)
        soc_60_100_combined_targets = pd.concat([soc_60_100_combined_targets, targ_df.iloc[:, 0:5]], axis=0)
        soc_60_100_combined_predictions = pd.concat([soc_60_100_combined_predictions, pred_df.iloc[:, 0:5]], axis=0)

predicted_results_file = os.path.join(result_folder, "predicted_results_pred_rf_n1_ref1.xlsx")
predicted_results = pd.read_excel(predicted_results_file, sheet_name=None)
actual_combined = pd.DataFrame()
predicted_combined = pd.DataFrame()

for sheet_name in predicted_results:
    sheet_df = predicted_results[sheet_name]
    actual_combined = pd.concat([actual_combined, sheet_df['Actual']], axis=0)
    predicted_combined = pd.concat([predicted_combined, sheet_df['Predicted']], axis=0)

plt.figure(figsize=(18, 4))
colors = ['red', 'blue', 'green', 'purple', 'orange']

# SOC 10~50 scatter plot
plt.subplot(1, 3, 1) 
for i, color in enumerate(colors):
    plt.scatter(soc_10_50_combined_targets.iloc[:, i], soc_10_50_combined_predictions.iloc[:, i], label=f'SOC {50 - i * 10}', c=color, alpha=0.6)
plt.xlabel('Observed ΔVR (V)')
plt.ylabel('Predicted ΔVR (V)')
plt.xlim([0.02, 0.13])
plt.ylim([0.02, 0.13])
plt.plot([0.02, 0.13], [0.02, 0.13], 'k--', linewidth=2)
plt.title('Target vs Predictions (SOC 10~50)')
plt.legend()
plt.grid(True)

# SOC 60~100 scatter plot
plt.subplot(1, 3, 2)  
for i, color in enumerate(colors):
    plt.scatter(soc_60_100_combined_targets.iloc[:, i], soc_60_100_combined_predictions.iloc[:, i], label=f'SOC {60 + i * 10}', c=color, alpha=0.6)
plt.xlabel('Observed ΔVR (V)')
plt.ylabel('Predicted ΔVR (V)')
plt.xlim([-0.10, -0.02])
plt.ylim([-0.10, -0.02])
plt.plot([-0.10, -0.02], [-0.10, -0.02], 'k--', linewidth=2)
plt.title('Target vs Predictions (SOC 60~100)')
plt.legend()
plt.grid(True)

# capacity scatter plot
plt.subplot(1, 3, 3) 
plt.scatter(actual_combined, predicted_combined, label='Actual vs Predicted', c='teal', alpha=0.6)
plt.xlabel('Observed capacity (Ah)')
plt.ylabel('Predicted capacity (Ah)')
plt.xlim([2.4,3.25])
plt.ylim([2.4,3.25])
plt.plot([2.4,3.25], [2.4,3.25], 'k--', linewidth=2)
plt.title('Target vs Predictions (Capacity)')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()
