<a href="https://colab.research.google.com/github/jhChoi1997/EE488_AI_Convergence_Capstone_Design_Anomaly_Detection_2022spring/blob/main/EE488_DCASE2020_ResNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
import os
drive.mount('/content/gdrive')
root_path = 'gdrive/MyDrive/EE488/'
os.chdir(root_path)

Mounted at /content/gdrive


In [2]:
import sys
import librosa
import librosa.core
import librosa.feature
import yaml
import glob
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
from sklearn import metrics

In [3]:
dataset_dir = './valve'
test_dir = './valve_test'
model_dir = './model'

n_fft = 2048
hop_length = 512
n_mels = 128
power = 2
n_mul = 6
kernel_size = 3

EPOCHS = 10
BATCH = 32

In [20]:
def file_load(wav_name):
  try:
    return librosa.load(wav_name, sr=None, mono=False)
  except:
    print('file_broken or not exists!! : {}'.format(wav_name))
    

def file_list_generator(target_dir):
  training_list_path = os.path.abspath('{dir}/*.wav'.format(dir=target_dir))
  files = sorted(glob.glob(training_list_path))
  if len(files) == 0:
    print('no_wav_file!!')
  return files


def file_to_log_mel(file_name, n_mels, n_fft, hop_length, power):
  y, sr = file_load(file_name)
  mel_spectrogram = librosa.feature.melspectrogram(y=y,
                                                   sr=sr,
                                                   n_fft=n_fft,
                                                   hop_length=hop_length,
                                                   n_mels=n_mels,
                                                   power=power)
  
  log_mel_spectrogram = 20.0 / power * np.log10(mel_spectrogram + sys.float_info.epsilon)

  return log_mel_spectrogram


def list_to_dataset(file_list, n_mels, n_fft, hop_length, power):
  for idx in tqdm(range(len(file_list))):
    log_mel = file_to_log_mel(file_list[idx],
                              n_mels=n_mels,
                              n_fft=n_fft,
                              hop_length=hop_length,
                              power=power)
    if idx == 0:
      dataset = np.zeros((len(file_list), 1, len(log_mel[:,0]), len(log_mel[0,:])), float)
    dataset[idx, 0, :, :] = log_mel
  
  return dataset

In [5]:
os.makedirs(model_dir, exist_ok=True)

dataset_dir = os.path.abspath(dataset_dir)
machine_type = os.path.split(dataset_dir)[1]
model_file_path = '{model}/model_{machine_type}'.format(model=model_dir, machine_type=machine_type)

files = file_list_generator(dataset_dir)
train_data = list_to_dataset(files,
                             n_mels=n_mels,
                             n_fft=n_fft,
                             hop_length=hop_length,
                             power=power)
train_data = torch.Tensor(train_data)

label_list = ['id_00', 'id_02', 'id_04', 'id_06']

train_label = torch.LongTensor([idx for file_name in files for idx, label_idx in enumerate(label_list) if label_idx in file_name])
train_label = nn.functional.one_hot(train_label, num_classes=len(label_list))

100%|██████████| 3291/3291 [02:31<00:00, 21.79it/s]


In [6]:
train_label

tensor([[1, 0, 0, 0],
        [1, 0, 0, 0],
        [1, 0, 0, 0],
        ...,
        [0, 0, 0, 1],
        [0, 0, 0, 1],
        [0, 0, 0, 1]])

In [7]:
train_dataset = TensorDataset(train_data, train_label)
train_dataloader = DataLoader(train_dataset, batch_size=BATCH, shuffle=True)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using {device} device')

Using cuda device


In [8]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channel, out_channel, projection=False):
        super(ResidualBlock, self).__init__()
        self.in_channel = in_channel
        self.out_channel = out_channel
        self.projection = projection
        if self.projection:
            self.conv1 = nn.Conv2d(self.in_channel, self.out_channel, kernel_size=3, stride=2, padding=(1, 1))
        else:
            self.conv1 = nn.Conv2d(self.in_channel, self.out_channel, kernel_size=3, padding=(1, 1))
        self.bn1 = nn.BatchNorm2d(self.out_channel)
        self.relu = nn.ReLU()

        self.conv2 = nn.Conv2d(self.out_channel, self.out_channel, kernel_size=3, padding='same')
        self.bn2 = nn.BatchNorm2d(self.out_channel)
        if self.projection:
            self.downsample = nn.Conv2d(self.in_channel, self.out_channel, stride=2, kernel_size=1)
        else:
            self.downsample = nn.Conv2d(self.in_channel, self.out_channel, kernel_size=1)

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        if self.projection:
            skip = self.downsample(x)
        else:
            skip = x

        out += skip
        out = self.relu(out)

        return out


class ResNet(nn.Module):
    def __init__(self, n_class):
        super(ResNet, self).__init__()

        self.n_channel = 8
        self.n_class = n_class

        self.conv1 = nn.Conv2d(1, self.n_channel, kernel_size=7, stride=2, padding=(3, 2))
        self.bn1 = nn.BatchNorm2d(self.n_channel)
        self.relu = nn.ReLU()
        self.pooling1 = nn.MaxPool2d(kernel_size=3, stride=2, padding=(1, 1))

        self.block1 = ResidualBlock(self.n_channel, self.n_channel)
        self.block2 = ResidualBlock(self.n_channel, self.n_channel)
        self.block3 = ResidualBlock(self.n_channel, self.n_channel * 2, True)
        self.block4 = ResidualBlock(self.n_channel * 2, self.n_channel * 2)
        self.block5 = ResidualBlock(self.n_channel * 2, self.n_channel * 4, True)
        self.block6 = ResidualBlock(self.n_channel * 4, self.n_channel * 4)
        self.block7 = ResidualBlock(self.n_channel * 4, self.n_channel * 8, True)
        self.block8 = ResidualBlock(self.n_channel * 8, self.n_channel * 8)

        self.gap1 = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(self.n_channel * 8, self.n_class)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.pooling1(x)

        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        x = self.block6(x)
        x = self.block7(x)
        x = self.block8(x)

        x = self.gap1(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

In [9]:
model = ResNet(len(label_list)).to(device)

In [10]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [11]:
def train(dataloader, mdoel, loss_fn, optimizer):
  size = len(dataloader.dataset)
  for batch, (X, y) in enumerate(dataloader):
    X = X.to(device)
    y = y.float().to(device)

    pred = model(X)
    loss = loss_fn(pred, y)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if batch % 30 == 0:
      loss, current = loss.item(), batch * len(X)
      print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [12]:
for t in range(EPOCHS):
  print(f"Epoch {t + 1}\n-------------------------------")
  train(train_dataloader, model, loss_fn, optimizer)

Epoch 1
-------------------------------
loss: 1.507398  [    0/ 3291]
loss: 0.360370  [  960/ 3291]
loss: 0.059299  [ 1920/ 3291]
loss: 0.024497  [ 2880/ 3291]
Epoch 2
-------------------------------
loss: 0.053262  [    0/ 3291]
loss: 0.015846  [  960/ 3291]
loss: 0.003955  [ 1920/ 3291]
loss: 0.003126  [ 2880/ 3291]
Epoch 3
-------------------------------
loss: 0.005282  [    0/ 3291]
loss: 0.001708  [  960/ 3291]
loss: 0.002747  [ 1920/ 3291]
loss: 0.001000  [ 2880/ 3291]
Epoch 4
-------------------------------
loss: 0.000549  [    0/ 3291]
loss: 0.001906  [  960/ 3291]
loss: 0.001021  [ 1920/ 3291]
loss: 0.000538  [ 2880/ 3291]
Epoch 5
-------------------------------
loss: 0.000322  [    0/ 3291]
loss: 0.000263  [  960/ 3291]
loss: 0.001160  [ 1920/ 3291]
loss: 0.000373  [ 2880/ 3291]
Epoch 6
-------------------------------
loss: 0.000622  [    0/ 3291]
loss: 0.003613  [  960/ 3291]
loss: 0.000312  [ 1920/ 3291]
loss: 0.000580  [ 2880/ 3291]
Epoch 7
-------------------------------


In [13]:
def get_anomaly_score(true, pred):
  anomaly_score = nn.CrossEntropyLoss()(pred, true)
  return anomaly_score

In [14]:
normal_files = sorted(glob.glob('{dir}/normal_*'.format(dir=test_dir)))
anomaly_files = sorted(glob.glob('{dir}/anomaly_*'.format(dir=test_dir)))

normal_labels = np.zeros(len(normal_files))
anomaly_labels = np.ones(len(anomaly_files))

test_files = np.concatenate((normal_files, anomaly_files), axis=0)
y_true = np.concatenate((normal_labels, anomaly_labels), axis=0)
y_pred = [0. for k in test_files]

In [21]:
test_dataset = list_to_dataset(test_files, n_mels, n_fft, hop_length, power)

test_label = torch.LongTensor([idx for file_name in test_files for idx, label_idx in enumerate(label_list) if label_idx in file_name])
test_label = nn.functional.one_hot(test_label, num_classes=len(label_list))

100%|██████████| 160/160 [00:05<00:00, 27.57it/s]


In [22]:
test_dataset.shape

(160, 1, 128, 313)

In [36]:
for file_idx in tqdm(range(len(test_files)), desc='test data '):
  data = torch.Tensor(test_dataset[file_idx]).unsqueeze(0).to(device)


  output = model(data)
  true = test_label[file_idx].float().unsqueeze(0).to(device)
  score = get_anomaly_score(true, output)

  y_pred[file_idx] = score.cpu().detach().numpy()

auc = metrics.roc_auc_score(y_true, y_pred)

test data : 100%|██████████| 160/160 [00:02<00:00, 72.02it/s]


In [37]:
auc

0.70765625