In [None]:
import os
import numpy as np
import pandas as pd
import torch

from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data.sampler import SubsetRandomSampler
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# 检查CUDA是否可用
cuda_available = torch.cuda.is_available()

if cuda_available:
    print("CUDA is available.")
    # 获取CUDA设备数量
    device_count = torch.cuda.device_count()
    print(f"Device count: {device_count}")
    # 获取CUDA运行模式（CPU、CUDA）
    current_device = torch.cuda.current_device()
    print(f"Current device: {current_device}")
else:
    print("CUDA is not available.")

In [None]:
def import_df(file: str):
    """
    Get the dataframe by stating the letter ('a', 'b', 'c', etc.) from the name of the .csv file
    Parameters
    ----------
    file: letters ('a', 'b', 'c', etc.)

    Returns
    -------
    The dataframe df_features from the main.py
    """
    name_file = 'HNEI_' + file + '_features.csv'
    df_features = pd.read_csv(os.getcwd() + '/Datasets/HNEI_Processed/' + name_file)
    
    return df_features


final_df = pd.DataFrame()  # 初始化 dataframe
# 枚举所有特征文件
files = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'j', 'l', 'n', 'o', 'p', 's', 't']
for i in files:
    df_features = import_df(i)
    final_df = pd.concat([final_df, df_features], axis=0)  # 合并df
    
final_df.drop('Unnamed: 0', axis=1, inplace=True)  # 删除索引列

save_path = os.getcwd() + '/Datasets/HNEI_Processed/'
if not os.path.exists(save_path):
    os.makedirs(save_path)
# 保存最终的df
name_database = 'Final Database.csv'
final_df.to_csv(save_path + name_database)

final_df


In [None]:
class BatteryDataSet(Dataset):

    def __init__(self):
        # Data loading
        xy = scaled_df_np
        self.x = torch.from_numpy(xy[:, 2:-2])
        self.y = torch.from_numpy(xy[:, [-1]])
        self.n_samples = xy.shape[0]

    def __getitem__(self, index):
        return self.x[index], self.y[index]

    def __len__(self):
        # len(Dataset)
        return self.n_samples


def classifyer(dataset, batch_size, shuffle_dataset=False):

    dataset_len = len(dataset)

    dataset_size = torch.tensor([dataset_len])

    indices = list(range(dataset_len))

    # 训练集、测试集划分
    # train:        ~ 80 %
    # test:         ~ 20 %

    first_split = int(torch.floor(0.8 * dataset_size))

    train_indices = indices[:first_split]
    test_indices = indices[first_split:]

    # shuffle数据集
    if shuffle_dataset:
        np.random.seed()
        np.random.shuffle(train_indices)
        np.random.shuffle(test_indices)

    # 设置训练集的采样器和数据加载器
    train_sampler = SubsetRandomSampler(train_indices)
    train_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)

    # 设置测试集的采样器和数据加载器
    test_sampler = SubsetRandomSampler(test_indices)
    test_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, sampler=test_sampler)

    return (train_loader, test_loader)


class NeuralNet(nn.Module):
    # 定义神经网络结构
    def __init__(self, input_size, hidden_size, num_classes):
        super(NeuralNet, self).__init__()
        self.l1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.l2 = nn.Linear(hidden_size, 50)
        self.relu = nn.ReLU()
        self.l3 = nn.Linear(50, 20)
        self.relu = nn.ReLU()
        self.l4 = nn.Linear(20, 5)
        self.relu = nn.ReLU()
        self.l5 = nn.Linear(5, num_classes)

    def forward(self, x):
        out = self.l1(x)
        out = self.relu(out)
        out = self.l2(out)
        out = self.relu(out)
        out = self.l3(out)
        out = self.relu(out)
        out = self.l4(out)
        out = self.relu(out)
        out = self.l5(out)
        return out


In [None]:
# 超参数
batch_size = 6
input_size = 6
hidden_size = 10
num_classes = 1
learning_rate = 0.0001
# 需要修改这个参数
epochs = 50

In [None]:
# 训练
def train_loop(train_loader, model, loss_fn, optimizer):
    for batch, (features, RUL) in enumerate(train_loader):
        # 前向传播
        outputs = model(features)
        
        loss = loss_fn(outputs, RUL)

        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

# 测试
import warnings
import math

warnings.filterwarnings('ignore', category=DeprecationWarning)

def test_loop(t, dataloader, model, loss_fn):
    num_batches = len(dataloader)
    test_loss = 0

    diff_list = []
    targets_list = []
    pred_list = []

    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()

            # 预测与目标之间的差异
            diff = abs(y - pred) / y
            diff = diff.numpy()
            mean_diff = np.mean(diff)
            if math.isinf(mean_diff):
                mean_diff = 0
            diff_list.append(mean_diff)

            # 目标与预测
            pred_np = pred.squeeze().tolist()
            target_np = y.squeeze().tolist()

            try:
                for i in pred_np:
                    pred_list.append(i)
                for i in target_np:
                    targets_list.append(i)
            except:
                pass

    # 平均损失
    test_loss /= num_batches

    # 平均差异
    difference_mean = np.mean(diff_list)

    # 打印平均差异和平均损失
    print(f"Test: \n Avg Difference: {(100*difference_mean):>0.2f}%, Avg loss: {test_loss:>8f} \n")

    # 最小差异及其epoch
    min_diff_dict[t+1] = (difference_mean*100)
    min_diff_value = min(min_diff_dict.items(), key=lambda x:x[1])
    print("LOWEST DIFFERENCE AND EPOCH:")
    print(f"Epoch: {min_diff_value[0]}, diff: {min_diff_value[1]:>0.2f}%")
    
    fig = plt.figure()

    # 目标与预测对比取消
    plt.rcParams["figure.dpi"] = 600
    plt.scatter(targets_list, pred_list)
    plt.xlabel('Target', fontsize=10)
    plt.ylabel('Prediction', fontsize=10)
    plt.ylim(0, 1300)
    plt.title(f"Epoch {t+1}", fontsize=13)
    plt.show()


In [None]:
# 导入数据
dataset_raw = pd.read_csv(os.getcwd() + '/Datasets/HNEI_Processed/Final Database.csv')
dataset_raw.drop('Unnamed: 0', axis=1, inplace=True)

# 特征缩放
data = dataset_raw.values[:, :-1]
trans = MinMaxScaler()
data = trans.fit_transform(data)
dataset = pd.DataFrame(data)
dataset_scaled = dataset.join(dataset_raw['RUL'])
scaled_df_np = dataset_scaled.to_numpy(dtype=np.float32)

# 加载数据集
dataset = BatteryDataSet()

# 加载训练集、测试集
train_loader, test_loader = classifyer(dataset=dataset, batch_size=batch_size
                                       , shuffle_dataset=True)
# 模型初始化
model = NeuralNet(input_size, hidden_size, num_classes)

# L1损失函数
loss_fn = nn.L1Loss()

# 优化器
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# 用于存储epoch和差异的辅助字典：
min_diff_dict = {}

for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")

    train_loop(train_loader, model, loss_fn, optimizer)

    test_loop(t, test_loader, model, loss_fn)

print("Fertig!")


# 保存模型
# torch.save(NeuralNet.state_dict(), save_path + '/Datasets/FF_Net_1.pth')