# Laser Machine Listener

The following code implements machine learning with Chainer to classify sounds emitted by a laser machine during processing such as cutting and marking. The assumed states are as follows. As learning data, prepare folders as many as the number of states in the **data** folder, and place one or more sound files corresponding to each state in each folder.

- Background
- Cutting (in focus)
- Cutting (not in focus)
- Marking
- Waiting
- Sleeping

以下のコードは、レーザー加工機がカットやマーキングなどの加工中に発する音を分類するための機械学習をChainerで実装したものです。想定している状態は以下の通りです。学習用のデータとして、**data**フォルダの中に状態の数だけフォルダを用意し、それぞれのフォルダの中にそれぞれの状態に対応する1つまたはそれ以上の音声ファイルを配置してください。

- 背景音
- カット中（焦点が合っている）
- カット中（焦点が合っていない）
- マーキング中
- 待機中
- スリープ中

| State | Laser Machine | Dust collector | Ventilator |
| --: | :-- | :-- | :-- |
| Background | OFF | OFF | OFF |
| Sleeping | OFF | OFF | ON |
| Waiting | ON | ON | ON |
| Cutting (in focus) | ON | ON | ON |
| Cutting (not in focus) | ON | ON | ON |
| Marking | ON | ON | ON |

First, we initialise an array for containing state names and a matrix for storing voice data and correct labels. Next, read sound files matching the pattern of the path for 30 seconds in order, normalize, obtain STFT, and register together with the same number of correct answer labels.

まず、状態名を収めるための配列と、音声データと正解ラベルを収めるための行列を初期化する。次に、パスのパターンに合致する音声ファイルを順に30秒間ずつ読み込み、正規化したのちにSTFTを求め、同数の正解ラベルと共に登録する。

In [None]:
import numpy as np
import glob
import librosa

SAMPLING_RATE = 16000
FFT_SIZE = 256
STFT_MATRIX_SIZE = 1 + FFT_SIZE // 2

state_names = []
data = np.empty((0, STFT_MATRIX_SIZE), dtype=np.float32)
index = np.empty(0, dtype=np.int32)

for path_name in sorted(glob.glob('data/*/*.wav')):
    state_name = path_name.split('/')[1]

    if state_name not in state_names:
        state_names.append(state_name)

    audio, sr = librosa.load(path_name, sr=SAMPLING_RATE, duration=30)
    print('{}: {} ({} Hz) '.format(state_name, path_name, sr))
    d = np.abs(librosa.stft(librosa.util.normalize(audio),
                            n_fft=FFT_SIZE, window='hamming'))
    data = np.vstack((data, d.transpose()))
    index = np.hstack([index, [state_names.index(state_name)] * d.shape[1]])

To check what kind of difference is seen when viewing each state on the frequency axis, we calculate the average of STFT for each state and draw with matplotlib.

それぞれの状態を周波数軸で見たときにどのような違いがあるのかを確認するため、状態ごとにSTFTの平均を求め、matplotlibを用いて描画します。

In [None]:
import matplotlib.pyplot as plt

n_states = len(state_names)

fig = plt.figure(figsize=(15, 5 * n_states))

for i, state_name in enumerate(state_names):
    plt.subplot(n_states, 1, 1 + i)
    plt.plot(librosa.fft_frequencies(sr=SAMPLING_RATE, n_fft=FFT_SIZE),
             np.mean(data[np.where(index == i)], axis=0))
    plt.title(state_name)

plt.show()

Now that data is ready, we define a neural network with Chainer, generate a model, and prepare a dataset with data and correct index as a set. Of the data set, we will use 80% for learning and the remaining 20% for verification.

以上でデータが用意できましたので、Chainerでニューラルネットワークを定義してモデルを生成し、データと正解インデックスをセットにしたデータセットを用意します。データセットのうち、8割を学習用、残りの2割を検証用に用います。

In [None]:
import chainer
import chainer.links as L
import chainer.functions as F


class LaserMachineListener(chainer.Chain):

    def __init__(self, n_mid_units, n_out):
        super(LaserMachineListener, self).__init__()

        with self.init_scope():
            self.l1 = L.Linear(None, n_mid_units)
            self.l2 = L.Linear(None, n_mid_units // 2)
            self.l3 = L.Linear(None, n_out)

    def __call__(self, x):
        h1 = F.relu(self.l1(x))
        h2 = F.relu(self.l2(h1))
        return self.l3(h2)


net = LaserMachineListener(n_mid_units=128, n_out=n_states)
model = L.Classifier(net)

dataset = chainer.datasets.TupleDataset(data, index)
n_train = int(len(dataset) * 0.8)
train, test = chainer.datasets.split_dataset_random(dataset, n_train, seed=0)

Select Adam as the optimisation method set the number of epochs to 10, configure to report progress in progress during learning, and execute learning. When using files bundled as samples, the correct answer rate is about 95%. If you do not get the proper answer rate for the samples prepared by yourself, try tuning parameters such as `n_mid_units` and `BATCH_SIZE`.

最適化手法としてAdamを選択し、エポック数は10に設定し、学習中の途中経過をレポートするように設定した上で学習を実行します。サンプルとして同梱しているファイルを用いた場合、正解率は約95%程度になります。もし、自分で用意したサンプルに対する正解率が上がらない場合には、`n_mid_units`や`BATCH_SIZE`などのパラメータをチューニングしてみましょう。

In [None]:
from chainer import optimizers
from chainer import training

optimizer = optimizers.Adam(alpha=0.01)
optimizer.setup(model)

GPU_ID = -1

# Specify the seed value of the random number
# so that the same result will be obtained each time
np.random.seed(1)

BATCH_SIZE = 32

train_iter = chainer.iterators.SerialIterator(train, BATCH_SIZE)
test_iter = chainer.iterators.SerialIterator(test, BATCH_SIZE,
                                             repeat=False, shuffle=False)
updater = training.StandardUpdater(train_iter, optimizer, device=GPU_ID)

MAX_EPOCH = 20

trainer = training.Trainer(updater, (MAX_EPOCH, 'epoch'), out='result')

from chainer.training import extensions

trainer.extend(extensions.LogReport())
trainer.extend(extensions.Evaluator(
    test_iter, model, device=GPU_ID), name='val')
trainer.extend(extensions.PrintReport(
    ['epoch',
     'main/loss', 'main/accuracy', 'val/main/loss', 'val/main/accuracy',
     'elapsed_time']))
trainer.extend(extensions.PlotReport(
    ['main/loss', 'val/main/loss'],
    x_key='epoch', file_name='loss.png'))
trainer.extend(extensions.PlotReport(
    ['main/accuracy', 'val/main/accuracy'],
    x_key='epoch', file_name='accuracy.png'))

trainer.run()

When learning is over, save the array containing the state name and the learned model as separate files.

学習が終わったら、状態の名前を収めた配列と学習済みのモデルを個別のファイルとして保存します。

In [None]:
import json

with open('state_names.json', 'w') as file_to_save:
    json.dump(state_names, file_to_save)

model.to_cpu()
chainer.serializers.save_npz('laser_machine_listener.model', model)