# 轴承故障诊断结果演示

### 导入必要的外部库

In [22]:
import os
import numpy as np
import pandas as pd
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, random_split,Dataset
from scipy import io


### 定义CNN网络结构模型

In [23]:
class CNN(nn.Module):
    '''定义一维卷积神经网络模型'''

    def __init__(self, in_channel=3, out_channel=9):
        super(CNN, self).__init__()
        '''除输入层外，每个层级都包含了卷积、激活和池化三层'''
        '''输出层额外包含了BatchNorm层，提高网络收敛速度以及稳定性'''
        '''第一层卷积核大小为64，之后逐层递减'''
        self.layer1 = nn.Sequential(
            nn.Conv1d(in_channel, 16, kernel_size=64, stride=16, padding=24),
            nn.BatchNorm1d(16),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.layer2 = nn.Sequential(
            nn.Conv1d(16, 32, kernel_size=16, padding=8),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2))

        self.layer3 = nn.Sequential(
            nn.Conv1d(32, 48, kernel_size=8, padding=4),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        self.layer4 = nn.Sequential(
            nn.Conv1d(48, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.layer5 = nn.Sequential(
            nn.Conv1d(64, 64, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        # 全连接层定义 引入Dropout机制以提高泛化能力
        self.fc = nn.Sequential(
            nn.Linear(64, 128),
            nn.Dropout(0.3),
            nn.ReLU(inplace=True),
            nn.Linear(128, out_channel)
        )
        # 使用softmax函数以计算输出从属于每一类的概率
        self.softmax = nn.Softmax(1)

    def forward(self, x):
        '''前向传播'''
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        x = x.view(x.size(0), x.size(1), 1)
        x = self.softmax(x)
        return x

class LSTM_CNN(nn.Module):
    '''定义LSTM-CNN网络模型'''

    def __init__(self, DEVICE, in_channel=3, out_channel=9):
        super(LSTM_CNN, self).__init__()
        self.DEVICE = DEVICE
        '''LSTM相关神经元定义'''
        self.lstm_layer1 = nn.LSTM(in_channel, 32)
        self.lstm_layer2 = nn.LSTM(64, 1)
        self.lstm_fc1 = nn.Sequential(
            nn.Linear(32, 64),
            nn.ReLU(inplace=True),
            nn.Linear(64, 64)
        )

        self.lstm_fc2 = nn.Sequential(
            nn.Linear(1024, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 64)
        )

        '''CNN相关神经元定义'''
        self.cnn_layer1 = nn.Sequential(
            nn.Conv1d(in_channel, 16, kernel_size=64, stride=16, padding=24),
            nn.BatchNorm1d(16),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.cnn_layer2 = nn.Sequential(
            nn.Conv1d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm1d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2))

        self.cnn_layer3 = nn.Sequential(
            nn.Conv1d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.cnn_layer4 = nn.Sequential(
            nn.Conv1d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.cnn_layer5 = nn.Sequential(
            nn.Conv1d(64, 64, kernel_size=3),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.fc = nn.Sequential(
            nn.Linear(128, 64),
            nn.Dropout(0.3),
            nn.ReLU(inplace=True),
            nn.Linear(64, out_channel)
        )
        # 使用softmax函数以计算输出从属于每一类的概率
        self.softmax = nn.Softmax(1)

    def forward(self, x):
        '''前向传播'''
        '''*******LSTM*******'''
        # 初始化隐藏神经元
        x_lstm = x.permute(0, 2, 1)
        x_lstm.to(self.DEVICE)
        h1 = torch.zeros(1, 128, 32).to(self.DEVICE)
        c1 = torch.zeros_like(h1).to(self.DEVICE)
        h2 = torch.zeros(1, 128, 1).to(self.DEVICE)
        c2 = torch.zeros_like(h2).to(self.DEVICE)
        y_lstm_ = []
        # 对原时序信号分段
        for i in range(8):
            x_lstm_ = x_lstm[:, i*128:(i+1)*128]
            y, (h1, c1) = self.lstm_layer1(x_lstm_, (h1, c1))
            y = self.lstm_fc1(y)
            y, (h2, c2) = self.lstm_layer2(y, (h2, c2))
            y.to(self.DEVICE)
            y_lstm_.append(y)
        # 合并每一段的结果
        y_lstm = torch.cat(y_lstm_, 1)
        y_lstm = y_lstm.view(y_lstm.size(0), -1)
        y_lstm = self.lstm_fc2(y_lstm)
        '''*******CNN*******'''
        x = self.cnn_layer1(x)
        x = self.cnn_layer2(x)
        x = self.cnn_layer3(x)
        x = self.cnn_layer4(x)
        x = self.cnn_layer5(x)
        x = x.view(x.size(0), -1)
        '''******LSTM+CNN******'''
        # 连接LSTM和CNN的输出，并通过全连接神经元
        x = torch.cat([x, y_lstm], 1)
        x = self.fc(x)
        x = x.view(x.size(0), x.size(1), 1)
        y = self.softmax(x)
        return y


### 定义数据集类以及必要的工具函数

In [24]:
class Data_set(Dataset):
    def __init__(self, data, label):
        super().__init__()
        self.x = torch.tensor(data)
        self.y = torch.tensor(label).view(-1, 1)

    def __getitem__(self, idx):
        assert idx < len(self.y)
        return self.x[idx], self.y[idx]

    def __len__(self):
        return len(self.y)


def file_read(filename: str):
    '''读取数据'''
    label = filename.split('.')[0].split('/')[-1].split("\\")[-1]
    data = pd.read_csv(filename)
    array = data.to_numpy(dtype=np.float32)
    return get_idx(label), array


def get_idx(label: str):
    '''根据标签获得其索引'''
    labels = ['BL07', 'BL14', 'BL21', 'IR07',
              'IR14', 'IR21', 'OR07', 'OR14', 'OR21']
    idx = labels.index(label)
    return idx


def get_label(idx):
    '''根据索引获得标签内容'''
    labels = ['BL07', 'BL14', 'BL21', 'IR07',
              'IR14', 'IR21', 'OR07', 'OR14', 'OR21']
    return labels[idx]


### 获取数据

In [25]:
global DEVICE, BATCH_SIZE, EPOCHS, LENGTH, PATH, LOSSES, ACCS
LENGTH = 1024
BATCH_SIZE = 64
EPOCHS = 20
STRIDE = 128
PATH = os.path.abspath('./../data')
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
LOSSES = []
ACCS = []
print(f"DEVICE:{DEVICE}")
files = os.listdir(PATH)
label = []
data = []
for file in files:
    if "csv" not in file:
        continue
    labeli, xi = file_read(PATH + "/" + file)
    for j in range(0, len(xi)-LENGTH, STRIDE):
        label.append(labeli)
        data.append(xi[j:j+LENGTH, :].T)
    label.append(labeli)
    data.append(xi[-LENGTH:, :].T)


DEVICE:cuda


In [26]:
ds = Data_set(data, label)
'''数据集分割'''
train_size = int(0.7*len(ds))
test_size = len(ds) - train_size
train_loader, test_loader = random_split(ds, [train_size, test_size])
train_loader = DataLoader(train_loader, BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_loader, BATCH_SIZE, shuffle=True)

### 定义训练及验证函数

In [27]:
def train(net: CNN, dataloader: DataLoader, loss_func, optimizer: optim.Adam, epoch: int):
    global DEVICE
    net.train()
    for i, (x, label) in enumerate(dataloader):
        x, label = x.to(DEVICE), label.to(DEVICE)
        optimizer.zero_grad()
        y = net(x).to(DEVICE)
        loss = loss_func(y, label)
        loss.backward()
        optimizer.step()
        if ((epoch+1) % 10 == 0) and ((i+1) % 8 == 0):
            print("Train Epoch: {}\t[{: >4}/{} ({:.0f}%)] Loss: {:.6f}".format(
                epoch+1, i*len(x), len(dataloader.dataset), 100*i/len(dataloader), loss.item()))


def test(net: CNN, dataloader: DataLoader, loss_func, datatype: str):
    global DEVICE
    net.eval()
    test_loss = 0
    cnt = 0
    with torch.no_grad():
        for x, label in dataloader:
            x, label = x.to(DEVICE), label.to(DEVICE)
            y = net(x).to(DEVICE)
            test_loss += loss_func(y, label)
            predict = y.max(1, keepdim=True)[1]
            cnt += predict.eq(label.view_as(predict)).sum().item()
    test_loss /= len(dataloader.dataset)
    print("{} Data:  Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)".format(datatype,
                                                                             test_loss, cnt, len(dataloader.dataset), 100*cnt/len(dataloader.dataset)))


### 训练卷积神经网络

训练一维卷积网络，输出其loss以及其在数据集上的准确率

In [28]:
net = CNN(3, 9).to(DEVICE)
optimizer = optim.Adam(net.parameters())
loss_func = nn.CrossEntropyLoss()

for epoch in range(EPOCHS):
    train(net, train_loader, loss_func, optimizer, epoch)
    if (epoch+1) % 10 == 0:
        print()
        test(net,  train_loader, loss_func, "训练")
        test(net, test_loader, loss_func, "验证")
        print()



训练 Data:  Average loss: 0.0218, Accuracy: 4157/4166 (100%)
验证 Data:  Average loss: 0.0216, Accuracy: 1779/1786 (100%)


训练 Data:  Average loss: 0.0220, Accuracy: 4094/4166 (98%)
验证 Data:  Average loss: 0.0219, Accuracy: 1742/1786 (98%)



In [29]:
net = LSTM_CNN(DEVICE).to(DEVICE)
optimizer = optim.Adam(net.parameters())
loss_func = nn.CrossEntropyLoss()

for epoch in range(EPOCHS):
    train(net, train_loader, loss_func, optimizer, epoch)
    if (epoch+1) % 10 == 0:
        print()
        test(net,  train_loader, loss_func, "训练")
        test(net, test_loader, loss_func, "验证")
        print()



训练 Data:  Average loss: 0.0217, Accuracy: 4166/4166 (100%)
验证 Data:  Average loss: 0.0215, Accuracy: 1786/1786 (100%)


训练 Data:  Average loss: 0.0217, Accuracy: 4166/4166 (100%)
验证 Data:  Average loss: 0.0215, Accuracy: 1786/1786 (100%)



### 生成测试集标签

In [33]:
data = io.loadmat(PATH+"\\Fault_Diag_Data.mat")
test_data = torch.tensor(
    data["TestDataArray"], dtype=torch.float32).permute(2, 1, 0)
net.eval()
predict = []
for x in test_data:
    x = x.reshape(1, x.shape[0], x.shape[1]).to(DEVICE)
    y = net(x)
    output = y.max(1, keepdim=True)[1]
    predict.append(get_label(output.item()))

df = {
    "index": [i for i in range(len(test_data))],
    "label": predict
}

df = pd.DataFrame(df)
# df.to_excel("results.xlsx", index=False)
df


Unnamed: 0,index,label
0,0,BL21
1,1,IR14
2,2,OR14
3,3,OR14
4,4,OR07
...,...,...
1795,1795,IR14
1796,1796,IR21
1797,1797,IR14
1798,1798,IR21
