In [1]:
import random

# scp -i ~/Desktop/www/wolanx-note/__cicd__/ssh-106006/KEY-GIMC-BIGDATA-D.pem vb.ipynb root@10.231.9.124:/www/test/vb-test/
import h5py
import lightning.pytorch as pl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from h5py import File
from scipy import signal
from sklearn.preprocessing import MinMaxScaler
from torch import nn, optim
from torch import utils
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torchmetrics import F1Score, Accuracy
from torchvision import transforms

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

In [2]:
# f: File = h5py.File("/www/test/vb-test/data/fault-data/fault_data.h5", "r")
f: File = h5py.File("./data/fault-data-2280.h5", "r")

codeArr = f['label']
waveArr = f['wave_data']
fftAbsArr = f['wave_fft_abs']
dataSize = len(codeArr)

# codeArr[:] > 4

waveWeb = waveArr[1]
waveWeb = np.array(waveWeb)

In [3]:
cnt = len(waveWeb)

fftArr = np.fft.fft(waveWeb)
hzArr = np.fft.fftfreq(len(fftArr), 1 / 5120)

hzArr = hzArr[:cnt // 2]
fftArr = fftArr[:cnt // 2]

# 绘制频域数据
plt.figure(figsize=(20, 3))
# plt.plot(waveWeb)
plt.plot(hzArr, np.abs(fftArr))
plt.xlim(0)
plt.ylim(0)
plt.xlabel('Hz')
plt.ylabel('nm/s')
plt.grid()
plt.show()

In [4]:
# 频谱图
hzArr, times, sxx = signal.spectrogram(waveArr[219], 5120)

print(sxx.shape)
print(sxx)

plt.pcolormesh(times, hzArr, 10 * np.log10(sxx))
plt.title('Spectrogram of the Waveform')
plt.xlabel('Time [s]')
plt.ylabel('Frequency [Hz]')
plt.colorbar(label='Power Spectral Density [dB/Hz]')
plt.show()

In [5]:
# 画图 tool
def drawWave(ids: list = [], df=None, log=False):
    fig, ax = plt.subplots(nrows=len(ids), ncols=2, figsize=(20, 10), dpi=100)
    for i, idx in enumerate(ids):
        y, y_, ok = df.loc[i]['y'], df.loc[i]['y_'], df.loc[i]['ok']
        # ax[i, 0].set_ylim(-2.5, 2.5)
        ax[i, 0].set_ylabel(f"{idx}    ", rotation=0, fontdict={'size': 16})
        ax[i, 0].plot(waveArr[idx])

        color_ = '#0f0' if ok else '#f00'
        ax[i, 1].set_title(f"{y} - {y_}", fontdict={'size': 16, 'color': color_}, x=0.5, y=0.5)
        # ax[i, 1].set_ylim(0, max(fftAbsArr[idx]))
        if log:
            ax[i, 1].set_yscale("symlog")
        ax[i, 1].plot(fftAbsArr[idx])


def drawHis(ids: list = [], df=None, log=False):
    nrows, ncols = len(ids) // 5, 5
    fig, ax = plt.subplots(nrows=nrows, ncols=ncols, figsize=(20, 20), dpi=100)
    for i, id_ in enumerate(ids):
        y, y_, ok = df.loc[i]['y'], df.loc[i]['y_'], df.loc[i]['ok']
        hzArr, times, Sxx = signal.spectrogram(np.array(waveArr[id_], dtype=np.float32), 5120)
        ax[i // 5, i % 5].set_ylabel(f"{id_}", rotation=0, fontdict={'size': 16}, x=1, y=0.5)
        color_ = '#0f0' if ok else '#f00'
        ax[i // 5, i % 5].set_title(f"{y} - {y_}", fontdict={'size': 24, 'color': color_}, x=0.5, y=0.5)
        ax[i // 5, i % 5].pcolormesh(times, hzArr, 10 * np.log10(Sxx))

In [7]:
class MyDataModule(pl.LightningDataModule):
    def __init__(self, trainSet, batchSize: int, transform=None):
        super().__init__()
        self.batchSize = batchSize
        self.transform = transform
        self.trainSet, self.validSet, self.testsSet = utils.data.random_split(trainSet, [0.7, 0.2, 0.1])
        print('dataset num', len(self.trainSet), len(self.validSet), len(self.testsSet))

    def train_dataloader(self):
        return DataLoader(self.trainSet, batch_size=self.batchSize, shuffle=False)

    def val_dataloader(self):
        return DataLoader(self.validSet, batch_size=self.batchSize, shuffle=False)

    def test_dataloader(self):
        return DataLoader(self.testsSet, batch_size=self.batchSize, shuffle=False)

    def apply_transform(self, x):
        if self.transform:
            x = self.transform(x)
        return x


def min_max_scale(x):
    min_val = torch.min(x)
    max_val = torch.max(x)
    scaled_data = (x - min_val) / (max_val - min_val)
    return scaled_data


# 定义 torchvision.transforms 的组合
custom_transform = transforms.Compose([
    transforms.Lambda(lambda x: min_max_scale(x)),
])
scaler = MinMaxScaler(feature_range=(0, 1))
fftAbsArr2 = scaler.fit_transform(fftAbsArr)

sxxArr = []
for waveOne in waveArr:
    _, _, sxx = signal.spectrogram(waveOne, 5120)
    sxxArr.append(sxx)

dataset = TensorDataset(torch.tensor(sxxArr), torch.tensor(codeArr, dtype=torch.int64))
dm = MyDataModule(dataset, batchSize=128, transform=None)  # custom_transform

In [11]:
class MyModel(pl.LightningModule):
    def __init__(self, lr: float):
        super().__init__()
        self.save_hyperparameters("lr")

        self.val_acc = Accuracy(task="multiclass", num_classes=6)
        self.val_f1 = F1Score(task="multiclass", num_classes=6)

        self.conv1 = nn.Conv2d(in_channels=1, out_channels=8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.relu = nn.ReLU()
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)

        self.dropout1 = nn.Dropout(p=.25)
        self.fc1 = nn.Linear(18576, 256)
        self.fc2 = nn.Linear(256, 6)

    def forward(self, x):
        # print(x.shape)
        x = x.view(-1, 1, 129, 9)  # [64, 1, 1024]
        x = self.conv1(x)  # [64, 3, 1024]
        x = self.relu(x)
        # x = self.pool(x)  # [64, 3, 512]
        x = self.conv2(x)  # [64, 3, 512]
        x = self.relu(x)
        # x = self.pool(x)  # [64, 3, 256]
        x = torch.flatten(x, 1)  # [64, 129, 9] => [64, 1161]
        # print(x.shape)

        x = self.fc1(x)
        x = F.relu(x)
        # x = self.dropout1(x)
        x = self.fc2(x)  # batch, 6

        # x = F.sigmoid(x)  # batch, 6
        # x = F.sigmoid(x, dim=-1)  # batch, 6
        return x

    def configure_optimizers(self):
        hparams = self.hparams
        optimizer = optim.Adam(self.parameters(), lr=hparams.lr)
        return optimizer

    def training_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self.forward(inputs)
        loss = F.cross_entropy(outputs, labels) # , reduction='sum'
        return loss

    def validation_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self.forward(inputs)

        loss = F.cross_entropy(outputs, labels)
        _, y_ = torch.max(outputs, 1)
        # print('yt.shape', labels.shape, labels)
        # print('y_.shape', y_.shape, y_)
        self.val_acc.update(y_, labels)
        self.val_f1.update(y_, labels)

        self.log("val_loss", loss)
        return loss

    def on_validation_epoch_end(self) -> None:
        print("{} val acc={:.3f} f1={:.3f}".format(self.current_epoch, self.val_acc.compute(), self.val_f1.compute()))
        self.val_acc.reset()
        self.val_f1.reset()

    def test_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self.forward(inputs)

        loss = F.cross_entropy(outputs, labels)
        _, y_ = torch.max(outputs, 1)
        print('yt.shape', labels.shape, labels)
        print('y_.shape', y_.shape, y_)
        self.val_acc.update(y_, labels)
        self.val_f1.update(y_, labels)

        self.log("test_loss", loss)

    def on_test_epoch_end(self) -> None:
        print("{} test acc={:.3f} f1={:.3f}".format(self.current_epoch, self.val_acc.compute(), self.val_f1.compute()))
        self.val_acc.reset()
        self.val_f1.reset()


model = MyModel(lr=0.001)

logger = None
trainer = pl.Trainer(logger=logger, max_epochs=10, enable_progress_bar=True, enable_checkpointing=False)
trainer.fit(model, datamodule=dm)
trainer.test(model, datamodule=dm)

In [13]:
"""
BaseplateUnbalance: 1
Misaligned: 2
Bearing: 3
ImpellerUnbalance: 4
Cavitation: 5
"""
model.eval()

_page = 0
# ids = range(10 * _page, 10 * _page + 25, 1)
ids = [random.randint(1, 2000) for i in range(20)]

df = pd.DataFrame(columns=['id', "y", "y_", 'ratio'])
for i, id_ in enumerate(ids):
    x = torch.tensor([[fftAbsArr[id_]]], dtype=torch.float32)
    ret = model(x)
    # print(ret[0])

    y = codeArr[id_]
    # p_, y_ = torch.max(ret[0], -1)
    p_, y_ = torch.max(torch.softmax(ret[0], dim=-1), dim=-1)
    df.loc[len(df)] = {'id': id_, 'y': y, "y_": y_.item(), 'ratio': p_.item()}

df["ok"] = df["y"] == df["y_"]
print(df)

# drawWave(ids=ids, df=df, log=False)
# drawHis(ids=ids, df=df, log=True)

In [14]:
from sklearn.metrics import accuracy_score, f1_score
import onnxruntime as ort

sess = ort.InferenceSession("vb-2023-11-22.onnx")


def get_score(dataSet, label: str = ''):
    y_pred = []
    for one in dataSet:
        # print(dm.apply_transform(one[0]))
        y_pred.append(torch.argmax(model(one[0].unsqueeze(0)), dim=1).item())

    acc = accuracy_score(dataSet[:][1], y_pred)
    f1 = f1_score(dataSet[:][1], y_pred, average='macro')
    print('acc {:.3f} , f1 {:.3f} {}'.format(acc, f1, label))


def get_onnx(dataSet, label: str = ''):
    y_pred = []
    for one in dataSet:
        result = sess.run(None, {"input": np.array(one[0], np.float32)})[0]
        y_pred.append(np.argmax(result))

    arr = dataSet[:][1].numpy().tolist()
    # print(arr)
    # print(y_pred)
    acc = accuracy_score(arr, y_pred)
    f1 = f1_score(arr, y_pred, average='macro')
    print('acc {:.3f} , f1 {:.3f} {}'.format(acc, f1, label))


get_score(dm.trainSet, 'train')
get_score(dm.validSet, 'valid')
get_score(dm.testsSet, 'tests')
print('------ model')
get_score(dataset)

print()
print()

# print(dataset[:10])

get_onnx(dm.trainSet, 'train')
get_onnx(dm.validSet, 'valid')
get_onnx(dm.testsSet, 'tests')
print('------ onnx')
get_onnx(dataset)
# dm.trainSet[1][0].numpy()

In [99]:
inputs = torch.randn(1024).unsqueeze(0)  # sample
print(inputs.shape)

model.to_onnx(
    'vb-2024-05-14.onnx',
    input_sample=inputs,
    export_params=True,
    input_names=["input"],
    output_names=["output"],
    dynamic_axes={"input": {0: "batch"}, "output": {0: "batch"}},
)