# 工場設備の予知保全

# [1] バルブ

## 1.音声データの読み込み

### 1-1.音声データの読み込み

**音声データを扱うライブラリをインポート**

```
import librosa
```
**音声データを読み込む**

```
y, sr = librosa.load('ファイルパス', sr=None)
```




In [None]:
# ライブラリのインポート
import librosa

# ファイルパス
file_path = 'dataset/valve/train/normal/00.wav'

# データを読み込む
y, sr = librosa.load(file_path, sr=None)

# 読み込んだデータの確認
print(y)
# yはnumpy.array型なのでshapeが使えます
print(y.shape)
print(sr)

### 1-2.秒数の確認

**1秒辺りのデータ数**

```
秒数 = データ数/サンプリングレート
```



In [None]:
# ライブラリのインポート
import librosa

# ファイルパス
file_path = 'dataset/valve/train/normal/00.wav'

# データを読み込む
y, sr = librosa.load(file_path, sr=None)

# 読み込んだデータの確認
print(y)
print(y.shape)
print(sr)

### 1-3.複数データの読み込み

**以下の引数を指定**

```
*   category：機械の種類
*   train_test：学習データか評価データか
*   label：正常音か異常音か
```
**対象ディレクトリのファイル名を全て取得**

```
import glob
glob.glob('dataset/valve/train/normal/*.wav')
```
**文字列の前にfを付けることで、文字列内の一部を変数で置換**

```
f'dataset/{変数1}/{変数2}/{変数3}'
```





In [None]:
import glob
import numpy as np

category = 'valve'
train_test = 'train'
label = 'normal'

def read_data(category, train_test, label):
    # ファイル名一覧を取得
    files = sorted(glob.glob(f"dataset/{category}/{train_test}/{label}/*.wav"))
    dataset = []
    # それぞれのファイルを読み込み
    for file_name in files:
        y, sr = librosa.load(file_name, sr = None)
        dataset.append(y)
    # np.array型に変換
    return np.array(dataset)

valve = read_data('valve', 'train', 'normal')

## 2.教師あり学習による異常検知

### 2-1.音声データの可視化



```
正常音と異常音の違いを波形から確認する

音声波形は横軸に時間、縦軸に振幅値をとる
```
**音声波形の描画**

```
import matplotlib.pyplot as plt
y, sr = librosa.load('ファイルパス', sr=None)
plt.plot(y)
plt.show()
```




In [None]:
import matplotlib.pyplot as plt

# 正常音の描画
normal, sr = librosa.load('dataset/valve/train/normal/00.wav', sr=None)
plt.plot(normal)
plt.show()

# 異常音の描画
anomaly, sr = librosa.load('dataset/valve/train/anomaly/00.wav', sr=None)
plt.plot(anomaly, color='orange')
plt.show()

### 2-2.平均振幅の算出

**各データごとに振幅の平均値を計算し、データごとの平均値のヒストグラムを作成**

```
※音声データでは、振幅を2乗した値の平均値の平方根を平均振幅として扱う
```



In [None]:
import numpy as np

normal = read_data('valve','train','normal')
# 正常音の平均振幅の算出
normal_mean = np.sqrt(np.mean(normal**2,axis=1))

anomaly = read_data('valve','train','anomaly')
# 異常音の平均振幅の算出
anomaly_mean = np.sqrt(np.mean(anomaly**2,axis=1))

# ヒストグラムの描画
plt.hist(normal_mean,alpha=0.5,label='normal')
plt.hist(anomaly_mean,alpha=0.5,label='anomaly')
plt.title('平均振幅のヒストグラム')
plt.legend()
plt.show()

### 2-3.ゼロクロス数の算出

**ゼロクロス数とはどのくらい正負が切り替わっているかを表すもの**

```
np.sum(librosa.zero_crossings(y))
```



In [None]:
import numpy as np

normal = read_data('valve','train','normal')
# 正常音の平均振幅の算出
normal_mean = np.sqrt(np.mean(normal**2,axis=1))

# 正常音のゼロクロス数の算出
normal_zc = np.sum(librosa.zero_crossings(normal),axis=1)
print(normal_zc)

### 2-4.データフレームの作成

In [None]:
import pandas as pd

# 学習データの読み込み
train_normal = read_data('valve','train','normal')
train_anomaly = read_data('valve','train','anomaly')
train = np.concatenate([train_normal,train_anomaly])

# 平均振幅、ゼロクロス数の算出
train_mean = np.sqrt(np.mean(train**2,axis=1))
train_zc = np.sum(librosa.zero_crossings(train),axis=1)

# 平均振幅(mean)、ゼロクロス数(zc)、正常or異常(label)をカラムにもつDataFrameの作成
train_df = pd.DataFrame()
train_df['mean'] = train_mean
train_df['zc'] = train_zc
train_df['label'] = np.concatenate([np.zeros(len(train_normal)),np.ones(len(train_anomaly))])
print(train_df.head())

### 2-5.2値分類モデルの作成・予測

**モデルの作成から予測まで**

```
from sklearn.ensemble import RandomForestClassifier

model = RandomForestClassifier()
model.fit(X,y)
model.predict(X)
```



In [None]:
from sklearn.ensemble import RandomForestClassifier

# 学習データのDataFrameの作成
train_normal = read_data('valve','train','normal')
train_anomaly = read_data('valve','train','anomaly')
train = np.concatenate([train_normal,train_anomaly])
train_df = pd.DataFrame()
train_df['mean']= np.sqrt(np.mean(train**2,axis=1))
train_df['zc'] = np.sum(librosa.zero_crossings(train),axis=1)
train_df['label'] = np.concatenate([np.zeros(len(train_normal)),np.ones(len(train_anomaly))])

# 評価データのDataFrameの作成
test_normal = read_data('valve','test','normal')
test_anomaly = read_data('valve','test','anomaly')
test = np.concatenate([test_normal,test_anomaly])
test_df = pd.DataFrame()
test_df['mean'] = np.sqrt(np.mean(test**2,axis=1))
test_df['zc'] = np.sum(librosa.zero_crossings(test),axis=1)
test_df['label'] = np.concatenate([np.zeros(len(test_normal)),np.ones(len(test_anomaly))])

train_X, train_y = train_df[['mean','zc']], train_df['label']
test_X, test_y = test_df[['mean','zc']], test_df['label']
# モデルの作成
model = RandomForestClassifier(random_state=42)
# 学習
model.fit(train_X,train_y)
# 予測
pred = model.predict(test_X)
print(pred)

### 2-6.予測精度の評価

**混同行列をpythonで作成する**

```
from sklearn.metrics import confusion_matrix
confusion_matrix(y_true=実際の値, y_pred=予測値)
```



In [None]:
from sklearn.metrics import confusion_matrix

# 学習データのDataFrameの作成
train_normal = read_data('valve','train','normal')
train_anomaly = read_data('valve','train','anomaly')
train = np.concatenate([train_normal,train_anomaly])
train_df = pd.DataFrame()
train_df['mean']= np.sqrt(np.mean(train**2,axis=1))
train_df['zc'] = np.sum(librosa.zero_crossings(train),axis=1)
train_df['label'] = np.concatenate([np.zeros(len(train_normal)),np.ones(len(train_anomaly))])

# 評価データのDataFrameの作成
test_normal = read_data('valve','test','normal')
test_anomaly = read_data('valve','test','anomaly')
test = np.concatenate([test_normal,test_anomaly])
test_df = pd.DataFrame()
test_df['mean'] = np.sqrt(np.mean(test**2,axis=1))
test_df['zc'] = np.sum(librosa.zero_crossings(test),axis=1)
test_df['label'] = np.concatenate([np.zeros(len(test_normal)),np.ones(len(test_anomaly))])

train_X, train_y = train_df[['mean','zc']], train_df['label']
test_X, test_y = test_df[['mean','zc']], test_df['label']
# モデルの作成
model = RandomForestClassifier(random_state=42)
# 学習
model.fit(train_X,train_y)
# 予測
pred = model.predict(test_X)
# 混同行列の作成
print(confusion_matrix(test_y, pred))

# [2] スライダー

## 1.教師あり学習による異常検知

### 1-1.教師あり学習による異常検知

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix

# 学習データのDataFrameの作成
train_normal = read_data('slider','train','normal')
train_anomaly = read_data('slider','train','anomaly')
train = np.concatenate([train_normal,train_anomaly])
train_df = pd.DataFrame()
train_df['mean']= np.sqrt(np.mean(train**2,axis=1))
train_df['zc'] = np.sum(librosa.zero_crossings(train),axis=1)
train_df['label'] = np.concatenate([np.zeros(len(train_normal)),np.ones(len(train_anomaly))])

# 評価データのDataFrameの作成
test_normal = read_data('slider','test','normal')
test_anomaly = read_data('slider','test','anomaly')
test = np.concatenate([test_normal,test_anomaly])
test_df = pd.DataFrame()
test_df['mean'] = np.sqrt(np.mean(test**2,axis=1))
test_df['zc'] = np.sum(librosa.zero_crossings(test),axis=1)
test_df['label'] = np.concatenate([np.zeros(len(test_normal)),np.ones(len(test_anomaly))])

train_X, train_y = train_df[['mean','zc']], train_df['label']
test_X, test_y = test_df[['mean','zc']], test_df['label']
# モデルの作成
model = RandomForestClassifier(random_state=42)
# 学習
model.fit(train_X,train_y)
# 予測
pred = model.predict(test_X)
# 混同行列の作成
print(confusion_matrix(test_y, pred))

### 1-2.評価データの確認

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix

# 学習データのDataFrameの作成
train_normal = read_data('slider','train','normal')
train_anomaly = read_data('slider','train','anomaly')
train = np.concatenate([train_normal,train_anomaly])
train_df = pd.DataFrame()
train_df['mean']= np.sqrt(np.mean(train**2,axis=1))
train_df['zc'] = np.sum(librosa.zero_crossings(train),axis=1)
train_df['label'] = np.concatenate([np.zeros(len(train_normal)),np.ones(len(train_anomaly))])

# 評価データのDataFrameの作成
test_normal = read_data('slider','test','normal')
test_anomaly = read_data('slider','test','anomaly')
test = np.concatenate([test_normal,test_anomaly])
test_df = pd.DataFrame()
test_df['mean'] = np.sqrt(np.mean(test**2,axis=1))
test_df['zc'] = np.sum(librosa.zero_crossings(test),axis=1)
test_df['label'] = np.concatenate([np.zeros(len(test_normal)),np.ones(len(test_anomaly))])

train_df[train_df['label']==0]['mean'].plot.hist(alpha=0.5, label='正常音')
train_df[train_df['label']==1]['mean'].plot.hist(alpha=0.5, label='異常音(学習用)')
test_df[test_df['label']==1]['mean'].plot.hist(alpha=0.5, label='異常音(評価用)')
plt.legend()
plt.show()

## 2.教師なし学習による異常検知

### 2-1.教師なし学習による異常検知①

**正常と異常パターンを学習させていた教師あり学習から発想を変えて、正常パターンのみを学習させ、正常パターンと異なるものは異常と予測するモデルを作成**

```
1.   未知の異常にも対応できる
2.   大量の異常データがなくても良い
```



### 2-2.教師なし学習による異常検知②

### 2-3.One Class SVM

**One Class SVM**

```
*   教師あり分類モデルであるSVMを、教師なしの1クラス分類に応用したモデル
*   SVMなので標準化が必要
*   識別境界を基準に正常か異常かを2値で返す
```



### 2-4.標準化

**sklearnの標準化ライブラリ**

```
from sklearn import preprocessing

sc = preprocessing.StandardScaler()
sc.fit(X)
sc.transform(X)
```



In [None]:
# 学習データのDataFrameの作成
train = read_data('slider','train','normal')
train_df = pd.DataFrame()
train_df['mean']= np.sqrt(np.mean(train**2,axis=1))
train_df['zc'] = np.sum(librosa.zero_crossings(train),axis=1)
train_df['label'] = 0

# 評価データのDataFrameの作成
test_normal = read_data('slider','test','normal')
test_anomaly = read_data('slider','test','anomaly')
test = np.concatenate([test_normal,test_anomaly])
test_df = pd.DataFrame()
test_df['mean'] = np.sqrt(np.mean(test**2,axis=1))
test_df['zc'] = np.sum(librosa.zero_crossings(test),axis=1)
test_df['label'] = np.concatenate([np.zeros(len(test_normal)),np.ones(len(test_anomaly))])

train_X, train_y = train_df[['mean','zc']], train_df['label']
test_X, test_y = test_df[['mean','zc']], test_df['label']

# 標準化
from sklearn import preprocessing
sc = preprocessing.StandardScaler()
sc.fit(train_X)
train_X = sc.transform(train_X)
test_X = sc.transform(test_X)
print(train_X)

### 2-5.One Class SVMの作成・予測

**One Class SVMの学習と予測**

```
from sklearn.svm import OneClassSVM

model = OneClassSVM()
model.fit(X)
model.predict(X)
```



In [None]:
train_X, train_y = train_df[['mean','zc']], train_df['label']
test_X, test_y = test_df[['mean','zc']], test_df['label']

# 標準化
sc = preprocessing.StandardScaler()
sc.fit(train_X)
train_X = sc.transform(train_X)
test_X = sc.transform(test_X)

# モデルの作成・学習・予測
from sklearn.svm import OneClassSVM

model = OneClassSVM()
model.fit(train_X)
pred = model.predict(test_X)
print(pred)

### 2-6.One Class SVMの精度評価

**評価データの形式(0が正常、1が異常)に修正**

```
np.where(条件式, 真の場合の値, 偽の場合の値)
```



In [None]:
train_X, train_y = train_df[['mean','zc']], train_df['label']
test_X, test_y = test_df[['mean','zc']], test_df['label']

# 標準化
sc = preprocessing.StandardScaler()
sc.fit(train_X)
train_X = sc.transform(train_X)
test_X = sc.transform(test_X)

# モデルの作成・学習・予測
model = OneClassSVM()
model.fit(train_X)
pred = model.predict(test_X)
pred = np.where(pred==-1, 1, 0)
print(confusion_matrix(test_y,pred))

# [3] ポンプ

## 1.音声特徴量

### 1-1.教師あり学習による異常検知

In [None]:
# train_df,test_dfにはポンプデータが代入されています。
train_X, train_y = train_df[['mean','zc']], train_df['label']
test_X, test_y = test_df[['mean','zc']], test_df['label']

# 標準化
sc = preprocessing.StandardScaler()
sc.fit(train_X)
train_X = sc.transform(train_X)
test_X = sc.transform(test_X)

# モデルの作成・学習・予測
model = OneClassSVM(nu=0.01)
model.fit(train_X)
pred = model.predict(test_X)
pred = np.where(pred==-1, 1, 0)
print(confusion_matrix(test_y,pred))

### 1-2.パワースペクトル

**音声の特徴量としてよく使われるもの**

```
1.   パワースペクトル
2.   スペクトログラム
3.   メルスペクトログラム
```

**パワースペクトルとは**

```
音声波形は通常複数のsin波で構成されています。即ち、音声波形の特徴=各sin波の特徴、と言い換えることができます。構成しているsin波に分解し、各sin波の周波数と振幅を記載したものをパワースペクトルと言います。
```



### 1-3.パワースペクトルの作成

In [None]:
def create_power_spectral(data):
    N = data.shape[1]
    dt = 10/N
    F = np.abs(np.fft.fft(data)/(N/2))
    fq = np.linspace(0,1/dt,N)
    return F[:,:int(N/2)+1], fq[:int(N/2)+1]

F, freq = create_power_spectral(train)
plt.plot(freq,F[0])
plt.show()

### 1-4.スペクトログラム

**スペクトログラムとは**

```
パワースペクトルには時間軸の情報が失われるという欠点がありました、そこでパワースペクトルに時間情報も持たせるために、一定時間ごとにパワースペクトルをとったものをスペクトログラムと言います。
```



### 1-5.メルスペクトログラム

**メル尺度とメルスペクトグラム**

```
周波数の間隔を人間が聞く感覚に近いように、低いところは細かく、高いところは粗く変換したものをメル尺度といいます。スペクトログラムの周波数をメル尺度化したものを、メルスペクトログラムといいます。
```



### 1-6.メルスペクトログラムの作成

**メルスペクトログラムを作成するための関数**

```
librosa.feature.melspectrogram(X)
```
**可視化するための関数**

```
librosa.diaplay.spechshow(X, x_axis='time', y_axis='mel')
```



In [None]:
# メルスペクトログラムの作成
melspec = librosa.feature.melspectrogram(train[0])
# 可視化
librosa.display.specshow(melspec, x_axis='time', y_axis='mel')
plt.show()

### 1-7.デシベル変換

**デジベルとは**

```
人間が感じる感覚に音量を変換したものをデシベル(dB)といいます。
```



In [None]:
# メルスペクトログラムの作成
melspec = librosa.feature.melspectrogram(train[0])
# デシベル変換
melspec_db = librosa.amplitude_to_db(melspec)
# 可視化
librosa.display.specshow(melspec_db, x_axis='time', y_axis='mel')
plt.show()

### 1-8.One Class SVMの精度評価

**メルスペクトログラムは2次元配列になっているため、flatten()で1次元にする**

```
# 学習データのDataFrameの作成
train = read_data('pump','train','normal')
melspec_dbs = []
for i in range(len(train)):
    # メルスペクトログラムの作成
    melspec = librosa.feature.melspectrogram(train[i])
    # dB化
    melspec_db = librosa.amplitude_to_db(melspec).flatten()
    melspec_dbs.append(melspec_db.astype(np.float16))
train_df = pd.DataFrame(melspec_dbs)

# 評価データのDataFrameの作成
test_normal = read_data('pump','test','normal')
test_anomaly = read_data('pump','test','anomaly')
test = np.concatenate([test_normal,test_anomaly])
melspec_dbs = []
for i in range(len(test)):
    # メルスペクトログラムの作成
    melspec = librosa.feature.melspectrogram(test[i])
    # dB化
    melspec_db = librosa.amplitude_to_db(melspec).flatten()
    melspec_dbs.append(melspec_db.astype(np.float16))
test_df = pd.DataFrame(melspec_dbs)
test_df['label'] = np.concatenate([np.zeros(len(test_normal)),np.ones(len(test_anomaly))])
```



In [None]:
train_X = train_df
test_X, test_y = test_df.drop(columns=['label']), test_df['label']

# # 標準化
sc = preprocessing.StandardScaler()
sc.fit(train_X)
train_X = sc.transform(train_X)
test_X = sc.transform(test_X)

# モデルの作成・学習・予測
model = OneClassSVM()
model.fit(train_X)
pred = model.predict(test_X)
pred = np.where(pred==-1, 1, 0)
print(confusion_matrix(test_y,pred))

# [4] ファン

## 1.Autoencoderによる異常検知

### 1-1.Autoencoderによる異常検知

```
正常音と異常音で大きく異なる音の特徴が音量なら平均振幅を、音色ならばスペクトログラムやMFCC(メル周波数ケプストラム係数)を、ノイズならばゼロクロス数を特徴量に使用する必要があり、特徴量を間違えると精度の良いモデルは作成できない。
```
**Autoencoder**

```
*   DeepLearningの一種であるため、特徴量を手動で決める必要がない
*   入力データを圧縮し、再度入力データを復元するネットワーク
```



### 1-2.Autoencoderの作成

In [None]:
import keras.models
from keras.models import Model
from keras.layers import Input, Dense, BatchNormalization, Activation
from keras.models import Sequential

inputDim = 40064
model = Sequential()

model.add(Dense(64,input_shape=(inputDim,)))
model.add(BatchNormalization())
model.add(Activation('relu'))

model.add(Dense(8))
model.add(BatchNormalization())
model.add(Activation('relu'))

model.add(Dense(64))
model.add(BatchNormalization())
model.add(Activation('relu'))

model.add(Dense(inputDim))

### 1-3.モデルの学習

**最適化アルゴリズムにはadamを、損失関数には平均二乗誤差を使用**

```
model.compile(optimizer='adam', loss='mean_squared_error')
```
**Autoencoderの場合は入力データが復元できるように学習させるので、x,yともに同じデータを与える**

```
history = model.fit(x=学習データ, y=学習データ, epochs=10, batch_size=50, validation_split=0.1)
```




### 1-4.異常度の算出

In [None]:
train_X = train_df
test_X, test_y = test_df.drop(columns=['label']), test_df['label']

model = keras.models.load_model('model.h5', compile=False)
train_pred = model.predict(train_X)
# 異常度の算出
train_score = np.mean(np.square(train_X- train_pred), axis=1)

# 学習データの異常度のヒストグラム
train_score.plot.hist()
plt.show()

### 1-5.閾値の決定

In [None]:
train_X = train_df
test_X, test_y = test_df.drop(columns=['label']), test_df['label']

model = keras.models.load_model('model.h5', compile=False)
train_pred = model.predict(train_X)
train_score = np.mean(np.square(train_X- train_pred), axis=1)

test_pred = model.predict(test_X)
test_score = np.mean(np.square(test_X - test_pred), axis=1)

# 閾値の設定
threshold = train_score.quantile(0.8)

# 異常度が閾値より大きければ1、小さければ0に
pred = np.where(test_score > threshold, 1, 0)
print(confusion_matrix(test_y,pred))