In [34]:
import numpy as np 
import matplotlib.pylab as plt 
import scipy.io
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

def to_df(mat_db):
    """Returns one pd.DataFrame per cycle type"""

    # Features common for every cycle
    cycles_cols = ['type', 'ambient_temperature', 'time']

    # Features monitored during the cycle
    features_cols = {
        'charge': ['Voltage_measured', 'Current_measured', 'Temperature_measured', 
                   'Current_charge', 'Voltage_charge', 'Time'],
        'discharge': ['Voltage_measured', 'Current_measured', 'Temperature_measured', 
                      'Current_charge', 'Voltage_charge', 'Time', 'Capacity'],
        'impedance': ['Sense_current', 'Battery_current', 'Current_ratio',
                      'Battery_impedance', 'Rectified_impedance', 'Re', 'Rct']
    }

    # Define one pd.DataFrame per cycle type
    df = {key: pd.DataFrame() for key in features_cols.keys()}

    # Get every cycle
    cycles = [[row.flat[0] for row in line] for line in mat_db[0][0][0][0]]

    # Get measures for every cycle
    for cycle_id, cycle_data in enumerate(cycles):
        tmp = pd.DataFrame()

        # Data series for every cycle
        features_x_cycle = cycle_data[-1]

        # Get features for the specific cycle type
        features = features_cols[cycle_data[0]]
        
        for feature, data in zip(features, features_x_cycle):
            if len(data[0]) > 1:
                # Correct number of records
                tmp[feature] = data[0]
            else:
                # Single value, so assign it to all rows
                tmp[feature] = data[0][0]
        
        # Add columns common to the cycle measurements
        tmp['id_cycle'] = cycle_id
        for k, col in enumerate(cycles_cols):
            tmp[col] = cycle_data[k]
        
        # Append cycle data to the right pd.DataFrame using pd.concat()
        cycle_type = cycle_data[0]
        df[cycle_type] = pd.concat([df[cycle_type], tmp], ignore_index=True)
    
    return df


def pad_to_target_length(array, target_length):
    """
    배열을 지정된 길이로 확장합니다. 
    부족한 부분은 배열의 마지막 값으로 채웁니다.

    Parameters:
    - array (list or numpy.ndarray): 입력 배열
    - target_length (int): 목표 배열 길이

    Returns:
    - numpy.ndarray: 확장된 배열
    """
    # numpy 배열로 변환
    array = np.array(array)
    
    # 배열 길이가 목표 길이보다 짧은 경우 처리
    if len(array) < target_length:
        last_value = array[-1]  # 마지막 값 가져오기
        array = np.pad(array, 
                       (0, target_length - len(array)), 
                       'constant', 
                       constant_values=last_value)
    return array

def Mat2List(dfs_mat):
    scaler = MinMaxScaler(feature_range=(0, 1))  # Initialize MinMaxScaler
    # Example usage
    dfs_B0005 = to_df(dfs_mat)

    df_cycle_charge = dfs_B0005['charge'] #['id_cycle']
    df_cycle_dicharge = dfs_B0005['discharge'] #['id_cycle']

    # Max_df = df_cycle_dicharge[df_cycle_dicharge['id_cycle'] == 1]

    # df = dfs_B0018['discharge']
    init_cap = float(df_cycle_dicharge.iloc[0,:]['Capacity'])
    total_result = []   
    target_length = 500


    for i in df_cycle_charge['id_cycle'].unique():
        # Filter charge data for the current cycle
        df = df_cycle_charge[df_cycle_charge['id_cycle'] == i]

        # Extract the required columns
        temperature = df['Temperature_measured'].tolist() 
        current = df['Current_measured'].tolist()
        voltage = df['Voltage_measured'].tolist()
        
        # Extract required columns and apply MinMax scaling
        # temperature = np.array(df['Temperature_measured'].tolist()).reshape(-1, 1)
        # current = np.array(df['Current_measured'].tolist()).reshape(-1, 1)
        # voltage = np.array(df['Voltage_measured'].tolist()).reshape(-1, 1)

        # normalized_temperature = scaler.fit_transform(temperature).flatten()
        # normalized_current = scaler.fit_transform(current).flatten()
        # normalized_voltage = scaler.fit_transform(voltage).flatten()


        # Find corresponding discharge data
        dis = df_cycle_dicharge[df_cycle_dicharge['id_cycle'] == i + 1]
        
        # Fallback to next cycle if discharge data is empty
        if dis.empty:
            dis = df_cycle_dicharge[df_cycle_dicharge['id_cycle'] == i + 2]    

        # Calculate the label (mean capacity), handle if still empty
        label = dis['Capacity'].mean() /init_cap if not dis.empty else None

        if (label is None) or (label <= 0):
            continue

        else:
            # 배열 길이 확인 후 채우기

            temperature = [temperature[i] for i in range(0, len(temperature), 10)]
            current = [current[i] for i in range(0, len(current), 10)]
            voltage = [voltage[i] for i in range(0, len(voltage), 10)]

            temperature = pad_to_target_length(temperature, target_length)
            current = pad_to_target_length(current, target_length)
            voltage = pad_to_target_length(voltage, target_length)

            result = [np.array(temperature), np.array(current), np.array(voltage)], float(label)
            # result = [normalized_temperature, normalized_current, normalized_voltage], label
            total_result.append(result)

    return total_result

B0005 = scipy.io.loadmat('./DATA/1. BatteryAgingARC-FY08Q4/B0005.mat')
B0006 = scipy.io.loadmat('./DATA/1. BatteryAgingARC-FY08Q4/B0006.mat')
B0007 = scipy.io.loadmat('./DATA/1. BatteryAgingARC-FY08Q4/B0007.mat')
B0018 = scipy.io.loadmat('./DATA/1. BatteryAgingARC-FY08Q4/B0018.mat')

B0005 = B0005['B0005']
B0006 = B0006['B0006']
B0007 = B0007['B0007']
B0018 = B0018['B0018']
# Example usage
dfs_B0005 = to_df(B0005)
dfs_B0006 = to_df(B0006)
dfs_B0007 = to_df(B0007)
dfs_B0018 = to_df(B0018)

# Process data
batt_list = [B0005, B0006, B0007]
df_train = []
for batt in batt_list:
    df_train += Mat2List(batt)

df_test = Mat2List(B0018)


In [35]:
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn

# Dataset 정의
class SequenceDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sequence, label = self.data[idx]
        sequence = torch.tensor(sequence, dtype=torch.float32).unsqueeze(0)  # Channel dimension 추가
        label = torch.tensor(label, dtype=torch.float32)
        return sequence, label




# Dataset 객체 생성
train_dataset = SequenceDataset(df_train)
test_dataset = SequenceDataset(df_test)


batch_size = 2
# DataLoader 생성
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn

# Dataset 정의
class SequenceDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sequence, label = self.data[idx]
        sequence = torch.tensor(sequence, dtype=torch.float32).unsqueeze(0)  # Channel dimension 추가
        label = torch.tensor(label, dtype=torch.float32)
        return sequence, label
    
def collate_fn(batch):
    sequences, labels = zip(*batch)
    # 가장 긴 길이 계산 (마지막 차원 기준)
    max_length = max(seq.shape[-1] for seq in sequences)
    # 각 텐서에 대해 패딩 적용
    padded_sequences = [
        torch.nn.functional.pad(seq, (0, max_length - seq.shape[-1]))  # 마지막 차원을 기준으로 패딩
        for seq in sequences
    ]
    # 배치를 쌓기
    padded_sequences = torch.stack(padded_sequences)
    # 레이블 배치 생성
    labels = torch.tensor(labels, dtype=torch.float32)
    return padded_sequences, labels

# 데이터 클리닝 함수
def clean_data(dataset):
    cleaned_data = []
    for sequence, label in dataset:
        # NaN이 없는 데이터만 추가
        if not (torch.isnan(sequence).any() or torch.isnan(label).any()):
            cleaned_data.append((sequence.numpy(), label.item()))  # Python 기본 타입으로 변환
    return cleaned_data

# 샘플 데이터 (train과 val 데이터셋 정의)

# Dataset 객체 생성
train_dataset_with_nan = SequenceDataset(df_train)
val_dataset_with_nan = SequenceDataset(df_test)

# NaN 데이터 제거
cleaned_train_data = clean_data(train_dataset_with_nan)
cleaned_val_data = clean_data(val_dataset_with_nan)

# NaN 제거된 데이터셋 래핑
cleaned_train_dataset = SequenceDataset(cleaned_train_data)
cleaned_val_dataset = SequenceDataset(cleaned_val_data)

# DataLoader 생성
train_loader = DataLoader(cleaned_train_dataset, batch_size=1, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(cleaned_val_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

In [37]:
import torch
import torch.nn as nn

# CNN 모델 정의
class CNNModel(nn.Module):
    def __init__(self, input_shape):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=input_shape[0], out_channels=32, kernel_size=(1, 1), stride=(1, 1))
        self.leaky_relu1 = nn.LeakyReLU(0.1)
        self.pool1 = nn.MaxPool2d(kernel_size=(2, 1), stride=(2, 1))

        self.conv2 = nn.Conv2d(in_channels=32, out_channels=16, kernel_size=(1, 1), stride=(1, 1))
        self.leaky_relu2 = nn.LeakyReLU(0.1)
        self.pool2 = nn.AdaptiveMaxPool2d((1, 1))

        self.conv3 = nn.Conv2d(in_channels=16, out_channels=16, kernel_size=(1, 1), stride=(1, 1))
        self.leaky_relu3 = nn.LeakyReLU(0.1)
        self.pool3 = nn.AdaptiveMaxPool2d((1, 1))

        # Dynamically calculate the flatten size
        self.flatten_size = self._get_flatten_size(input_shape)

        self.fc = nn.Linear(self.flatten_size, 1)

    def _get_flatten_size(self, input_shape):
        x = torch.zeros(1, *input_shape)  # Simulate a batch with size 1
        x = self.conv1(x)
        x = self.leaky_relu1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.leaky_relu2(x)
        x = self.pool2(x)
        x = self.conv3(x)
        x = self.leaky_relu3(x)
        x = self.pool3(x)
        return x.numel()

    def forward(self, x):
        x = self.conv1(x)
        x = self.leaky_relu1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.leaky_relu2(x)
        x = self.pool2(x)
        x = self.conv3(x)
        x = self.leaky_relu3(x)
        x = self.pool3(x)
        x = torch.flatten(x, start_dim=1)
        x = self.fc(x)
        return x

# Check for GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Example usage
input_shape = (1, 1, 3, 500)  # Example input shape from train_loader
model = CNNModel(input_shape[1:]).to(device)
print(model)

# Training parameters
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Training loop
epochs = 100
for epoch in range(epochs):
    model.train()
    running_loss = 0.0

    for features, labels in train_loader:
        # 데이터 이동
        features, labels = features.to(device), labels.to(device)

        # 불필요한 차원 제거
        features = features.squeeze(2)

        # Forward pass
        outputs = model(features)
        loss = criterion(outputs, labels.unsqueeze(1))

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # Calculate training loss
    train_loss = running_loss / len(train_loader)

    # Validation loop
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for val_features, val_labels in val_loader:
            val_features, val_labels = val_features.to(device), val_labels.to(device)
            val_features = val_features.squeeze(2)
            val_outputs = model(val_features)
            val_loss += criterion(val_outputs, val_labels.unsqueeze(1)).item()

    val_loss /= len(val_loader)

    # Print losses for this epoch
    print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

# Testing loop
test_loss = 0.0
with torch.no_grad():
    for features, labels in val_loader:
        features, labels = features.to(device), labels.to(device)
        features = features.squeeze(2)
        outputs = model(features)
        test_loss += criterion(outputs, labels.unsqueeze(1)).item()

test_loss /= len(val_loader)
print(f"Test Loss: {test_loss:.4f}")


Using device: cuda
CNNModel(
  (conv1): Conv2d(1, 32, kernel_size=(1, 1), stride=(1, 1))
  (leaky_relu1): LeakyReLU(negative_slope=0.1)
  (pool1): MaxPool2d(kernel_size=(2, 1), stride=(2, 1), padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1))
  (leaky_relu2): LeakyReLU(negative_slope=0.1)
  (pool2): AdaptiveMaxPool2d(output_size=(1, 1))
  (conv3): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1))
  (leaky_relu3): LeakyReLU(negative_slope=0.1)
  (pool3): AdaptiveMaxPool2d(output_size=(1, 1))
  (fc): Linear(in_features=16, out_features=1, bias=True)
)
Epoch 1/100, Train Loss: 0.0389, Val Loss: 0.0338
Epoch 2/100, Train Loss: 0.0149, Val Loss: 0.0259
Epoch 3/100, Train Loss: 0.0140, Val Loss: 0.0164
Epoch 4/100, Train Loss: 0.0143, Val Loss: 0.0153
Epoch 5/100, Train Loss: 0.0150, Val Loss: 0.0281
Epoch 6/100, Train Loss: 0.0133, Val Loss: 0.0254
Epoch 7/100, Train Loss: 0.0144, Val Loss: 0.0133
Epoch 8/100, Train Loss: 0.0136, Val Loss: 0

In [38]:
import math
model.eval()
test_loss = 0.0
predictions = []
actuals = []

with torch.no_grad():
    for features, labels in val_loader:
        # Move data to the same device as the model
        features, labels = features.to(device), labels.to(device)
        features = features.squeeze(2)  # Remove unnecessary dimensions
        # Forward pass
        outputs = model(features)

        # Collect predictions and actual values
        predictions.extend(outputs.view(-1).tolist())
        actuals.extend(labels.view(-1).tolist())

        # Compute batch loss
        test_loss += criterion(outputs, labels.unsqueeze(1)).item()

# Compute final RMSE
test_loss /= len(val_loader)
rmse = math.sqrt(sum((p - a) ** 2 for p, a in zip(predictions, actuals)) / len(actuals))

# Compute MAPE
mape = sum(abs((p - a) / a) for p, a in zip(predictions, actuals) if a != 0) / len(actuals) * 100

# Print results
print(f"Test Loss (MSE): {test_loss:.4f}")
print(f"Test RMSE: {rmse:.4f}")
print(f"Test MAPE: {mape:.2f}%")

Test Loss (MSE): 0.0697
Test RMSE: 0.2640
Test MAPE: 29.35%


In [39]:
# 모델 저장
torch.save(model.state_dict(), "cnn_model_weights.pth")

# 모델 로드
# 모델 초기화
input_shape = (1, 1, 3, 4000)  # Example input shape
loaded_model = CNNModel(input_shape[1:])
loaded_model.load_state_dict(torch.load("cnn_model_weights.pth"))
loaded_model.to(device)

print("Model weights loaded successfully!")


Model weights loaded successfully!


  loaded_model.load_state_dict(torch.load("cnn_model_weights.pth"))
