In [1]:
import tkinter as tk
from tkinter import filedialog
import os
import numpy as np
import re
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, UpSampling1D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

In [3]:
# 符号付に変換する関数
def unsigned2signed(x):
    if x > 32767:
        return x - 65536
    return x

In [4]:
# 学習データの読み込み
def load_binary_data(folder_path):
    data = {}  # ファイルの拡張子と数字を除いたフルパスをキーとしたディクショナリ
    file_names = []  # ファイル名を格納するリスト

    for root, dirs, files in os.walk(folder_path):
        for file in files:
            match = re.match(r'(.+?)\.d(\d+)', file)  # 正規表現で.d○○の形式のファイル名を捉える
            if match:
                file_name, data_index = match.groups()
                data_index = int(data_index)
                full_path = os.path.join(root, file)  # フルパス
                full_path_key = full_path[:-4]  # 拡張子と数字を除いたフルパス
                if full_path_key not in data or data_index > data[full_path_key][8]:
                    with open(full_path, 'rb') as in_file:
                        buf = in_file.read()
                    # 8つのセンサーデータをグループ化
                    sensor_data = [buf[i:i+8*2] for i in range(408, len(buf), 8*2)]
                    sensor_values = []  # センサーデータを格納するリスト
                    for i in range(8):
                        sensor_values.append([unsigned2signed(int.from_bytes(sensor[i*2:i*2+2], 'little')) for sensor in sensor_data])
                    # 拡張子の番号を追加してデータをタプルに格納
                    sensor_values.append(int(data_index))
                    data[full_path_key] = sensor_values  # センサーデータと拡張子の番号をタプルに格納
                    file_names.append(file_name)  # ファイル名をリストに追加

    return data, file_names

# Tkinterを初期化
root = tk.Tk()
root.withdraw()  # メインウィンドウを非表示にする

# フォルダを選択するためのダイアログを表示
folder_selected = filedialog.askdirectory()

if folder_selected:
    print("選択したフォルダ:", folder_selected)
    # フォルダ内のデータを読み込む
    data, data_filenames = load_binary_data(folder_selected)
    if data:
        print("データの読み込みが完了しました。")
    else:
        print("データが見つかりませんでした。")
else:
    print("フォルダが選択されませんでした。")

# Tkinterを終了
root.quit()

sensor_data_dict = {}  # センサーデータを格納するディクショナリ
for key, sensor_data in data.items():
    extension_number = sensor_data[-1]  # 拡張子の番号
    for i, sensor_value in enumerate(sensor_data[:-1]):  # 拡張子の番号を除くセンサーデータをループ
        sensor_key = f"{key}_{extension_number}_{i+1}"  # キーを生成
        if sensor_key not in sensor_data_dict:
            sensor_data_dict[sensor_key] = []  # キーが存在しない場合、空のリストを作成
        sensor_data_dict[sensor_key].append(sensor_value)  # キーに対応するリストにセンサーデータを追加

# データの形状を決定
num_samples = len(sensor_data_dict)
num_sensors = 8  # センサーの数

# 2次元のリストを初期化
data_matrix = [[None] * num_sensors for _ in range(num_samples)]

# センサーデータのみを保存する新しい変数を作成
sensor_data = []

# データを2次元リストと新しい変数に配置
for i, (key, sensor_values) in enumerate(sensor_data_dict.items()):
    data_matrix[i] = [key] + sensor_values  # キーとセンサーデータのリストを1つのリストに格納
    sensor_data.append(sensor_values)  # センサーデータのリストを新しい変数に格納

# data_matrix には各キー（インデックス）に対応するセンサーデータが格納されています
# sensor_data にはセンサーデータのリストのみが格納されています

選択したフォルダ: C:/Users/r-fujita/Desktop/FCC_E-2316B
データの読み込みが完了しました。


In [5]:
# テストデータの読み込み
def load_test_data(test_folder_path):
    test_data = {}  # ファイルの拡張子と数字を除いたフルパスをキーとしたディクショナリ
    test_file_names = []  # ファイル名を格納するリスト

    for root, dirs, files in os.walk(test_folder_path):
        for file in files:
            match = re.match(r'(.+?)\.d(\d+)', file)  # 正規表現で.d○○の形式のファイル名を捉える
            if match:
                file_name, data_index = match.groups()
                data_index = int(data_index)
                full_path = os.path.join(root, file)  # フルパス
                full_path_key = full_path[:-4]  # 拡張子と数字を除いたフルパス
                if full_path_key not in test_data or data_index > test_data[full_path_key][8]:
                    with open(full_path, 'rb') as in_file:
                        buf = in_file.read()
                    # 8つのセンサーデータをグループ化
                    sensor_data = [buf[i:i+8*2] for i in range(408, len(buf), 8*2)]
                    sensor_values = []  # センサーデータを格納するリスト
                    for i in range(8):
                        sensor_values.append([unsigned2signed(int.from_bytes(sensor[i*2:i*2+2], 'little')) for sensor in sensor_data])
                    # 拡張子の番号を追加してデータをタプルに格納
                    sensor_values.append(int(data_index))
                    test_data[full_path_key] = sensor_values  # センサーデータと拡張子の番号をタプルに格納
                    test_file_names.append(file_name)  # ファイル名をリストに追加

    return test_data, test_file_names

# Tkinterを初期化
root = tk.Tk()
root.withdraw()  # メインウィンドウを非表示にする

# テストデータのフォルダを選択するためのダイアログを表示
test_folder_path = filedialog.askdirectory()

if test_folder_path:
    print("選択したテストデータのフォルダ:", test_folder_path)
    # テストデータを読み込む
    test_data, test_data_filenames = load_test_data(test_folder_path)
    if test_data:
        print("テストデータの読み込みが完了しました。")
    else:
        print("テストデータが見つかりませんでした.")
else:
    print("テストデータのフォルダが選択されませんでした.")

test_sensor_data_dict = {}  # テストデータのセンサーデータを格納するディクショナリ
for key, test_sensor_data in test_data.items():
    extension_number = test_sensor_data[-1]  # 拡張子の番号
    for i, test_sensor_value in enumerate(test_sensor_data[:-1]):  # 拡張子の番号を除くセンサーデータをループ
        test_sensor_key = f"{key}_{extension_number}_{i+1}"  # キーを生成
        if test_sensor_key not in test_sensor_data_dict:
            test_sensor_data_dict[test_sensor_key] = []  # キーが存在しない場合、空のリストを作成
        test_sensor_data_dict[test_sensor_key].append(test_sensor_value)  # キーに対応するリストにテストデータのセンサーデータを追加

# データの形状を決定
num_test_samples = len(test_sensor_data_dict)
num_sensors = 8  # センサーの数

# テストデータの2次元のリストを初期化
test_data_matrix = [[None] * num_sensors for _ in range(num_test_samples)]

# テストデータのセンサーデータのみを保存する新しい変数を作成
test_sensor_data = []

# テストデータを2次元リストと新しい変数に配置
for i, (key, test_sensor_values) in enumerate(test_sensor_data_dict.items()):
    test_data_matrix[i] = [key] + test_sensor_values  # キーとセンサーデータのリストを1つのリストに格納
    test_sensor_data.append(test_sensor_values)  # テストデータのセンサーデータのリストを新しい変数に格納

# test_data_matrix には各キー（インデックス）に対応するテストデータのセンサーデータが格納されています
# test_sensor_data にはテストデータのセンサーデータのリストのみが格納されています


選択したテストデータのフォルダ: C:/Users/r-fujita/Desktop/テストデータ_FCC_E-2316A
テストデータの読み込みが完了しました。


In [6]:
#不要な変数を削除する
del data, data_filenames, extension_number, folder_selected, key, num_samples, num_test_samples, sensor_key, sensor_value, sensor_values, test_data, test_data_filenames, test_folder_path, test_sensor_key, test_sensor_value, test_sensor_values, i, root

In [7]:
# データ長を揃える
# 最大の波形データ長を見つける
max_length = max(max(len(waveform) for waveform in sensor) for sensor in sensor_data + test_sensor_data)

# ゼロパディングを行う
padded_sensor_data = []
for sensor in sensor_data:
    padded_sensor = []
    for waveform in sensor:
        # ゼロパディング
        padding = [0] * (max_length - len(waveform))
        padded_waveform = waveform + padding
        padded_sensor.append(padded_waveform)
    padded_sensor_data.append(padded_sensor)

# 同じことをtest_sensor_dataにも適用
padded_test_sensor_data = []
for sensor in test_sensor_data:
    padded_sensor = []
    for waveform in sensor:
        # ゼロパディング
        padding = [0] * (max_length - len(waveform))
        padded_waveform = waveform + padding
        padded_sensor.append(padded_waveform)
    padded_test_sensor_data.append(padded_sensor)

# 学習データとテストデータをNumPyの配列に変換
padded_sensor_data = np.array(padded_sensor_data)  # リストをNumPy配列に変換
padded_test_sensor_data = np.array(padded_test_sensor_data)  # リストをNumPy配列に変換
# 中間の次元（次元1）を削除して2次元に reshape
padded_sensor_data = np.squeeze(padded_sensor_data, axis=1)
padded_test_sensor_data = np.squeeze(padded_test_sensor_data, axis=1)


# データの標準化
scaler = StandardScaler()
padded_sensor_data = scaler.fit_transform(padded_sensor_data)
padded_test_sensor_data = scaler.transform(padded_test_sensor_data)

# データの変形
#padded_sensor_data = np.expand_dims(padded_sensor_data, axis=-1)
#padded_test_sensor_data = np.expand_dims(padded_test_sensor_data, axis=-1)
#padded_sensor_data = padded_sensor_data.reshape(int(len(sensor_data)/2),max_length,2)
#padded_test_sensor_data = padded_test_sensor_data.reshape(int(len(test_sensor_data)/2),max_length,2)
padded_sensor_data = padded_sensor_data.reshape(int(len(sensor_data)/num_sensors),max_length,num_sensors)
padded_test_sensor_data = padded_test_sensor_data.reshape(int(len(test_sensor_data)/num_sensors),max_length,num_sensors)
#padded_sensor_data = np.expand_dims(padded_sensor_data, axis=1)
#padded_test_sensor_data = np.expand_dims(padded_test_sensor_data, axis=1)

In [7]:
model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(max_length, 8)),  # 入力層のshapeを修正
    tf.keras.layers.Conv1D(filters=8, kernel_size=3, activation='relu', padding='same'),
    tf.keras.layers.MaxPooling1D(pool_size=2),
    tf.keras.layers.Conv1D(filters=8, kernel_size=3, activation='relu', padding='same'),
    tf.keras.layers.MaxPooling1D(pool_size=2),
    tf.keras.layers.Flatten(),
    #tf.keras.layers.Dense(64, activation='relu'),  
    tf.keras.layers.Dense(1)  # 1つの出力ニューロン
])

In [8]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d (Conv1D)             (None, 17728, 8)          200       
                                                                 
 max_pooling1d (MaxPooling1D  (None, 8864, 8)          0         
 )                                                               
                                                                 
 conv1d_1 (Conv1D)           (None, 8864, 8)           200       
                                                                 
 max_pooling1d_1 (MaxPooling  (None, 4432, 8)          0         
 1D)                                                             
                                                                 
 flatten (Flatten)           (None, 35456)             0         
                                                                 
 dense (Dense)               (None, 1)                 3

In [10]:
# モデルのコンパイル
model.compile(optimizer='adam', loss='mse')  # 平均二乗誤差を損失関数とする

In [11]:
# モデルの訓練
model.fit(padded_sensor_data, padded_sensor_data, epochs=10, batch_size=1)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x2a29fa6c5e0>

In [21]:
# テストデータでの異常スコアの計算
anomaly_scores = model.predict(padded_test_sensor_data)

# 閾値を設定して異常と正常を分離
threshold = 0.05
anomaly_indices = np.where(anomaly_scores > threshold)[0]
anomalies = [padded_test_sensor_data[i] for i in anomaly_indices]
print("検出された異常データ数:", len(anomaly_indices))
print("検出された異常データのインデックス:", anomaly_indices)

検出された異常データ数: 26
検出された異常データ数: [ 12  78 111 216 226 240 308 377 438 448 462 495 521 576 591 599 616 646
 657 697 737 738 804 822 884 900]


In [13]:
anomaly_keys = [list(test_sensor_data_dict.keys())[i] for i in anomaly_indices]
print("検出された異常データのインデックス:", anomaly_keys)

検出された異常データのインデックス: ['C:/Users/r-fujita/Desktop/テストデータ_FCC_E-2316A\\2019\\#1\\ASSORT\\E-2316A\\APD000\\001-002_0_5', 'C:/Users/r-fujita/Desktop/テストデータ_FCC_E-2316A\\2019\\#1\\ASSORT\\E-2316A\\APD000\\001-010_1_7', 'C:/Users/r-fujita/Desktop/テストデータ_FCC_E-2316A\\2019\\#1\\ASSORT\\E-2316A\\APD000\\001-014_0_8', 'C:/Users/r-fujita/Desktop/テストデータ_FCC_E-2316A\\2019\\#1\\ASSORT\\E-2316A\\APD000\\002-006_0_1', 'C:/Users/r-fujita/Desktop/テストデータ_FCC_E-2316A\\2019\\#1\\ASSORT\\E-2316A\\APD000\\002-007_0_3', 'C:/Users/r-fujita/Desktop/テストデータ_FCC_E-2316A\\2019\\#1\\ASSORT\\E-2316A\\APD000\\002-009_0_1', 'C:/Users/r-fujita/Desktop/テストデータ_FCC_E-2316A\\2019\\#1\\ASSORT\\E-2316A\\APD000\\002-017_0_5', 'C:/Users/r-fujita/Desktop/テストデータ_FCC_E-2316A\\2019\\#1\\ASSORT\\E-2316A\\APD000\\002-026_0_2', 'C:/Users/r-fujita/Desktop/テストデータ_FCC_E-2316A\\2019\\#1\\ASSORT\\E-2316A\\APD000\\003-007_0_7', 'C:/Users/r-fujita/Desktop/テストデータ_FCC_E-2316A\\2019\\#1\\ASSORT\\E-2316A\\APD000\\003-009_0_1', 'C:/Users/r-fujita/D

In [None]:
# Stacked What-Where Autoencoderの構築
def stacked_what_where_autoencoder(input_shape):
    # エンコーダー
    inputs = Input(shape=input_shape)
    conv1 = Conv1D(filters=32, kernel_size=3, activation='relu', padding='same')(inputs)
    pool1 = MaxPooling1D(pool_size=2)(conv1)
    conv2 = Conv1D(filters=64, kernel_size=3, activation='relu', padding='same')(pool1)
    pool2 = MaxPooling1D(pool_size=2)(conv2)
    # デコーダー
    conv3 = Conv1D(filters=64, kernel_size=3, activation='relu', padding='same')(pool2)
    up1 = UpSampling1D(size=2)(conv3)
    conv4 = Conv1D(filters=32, kernel_size=3, activation='relu', padding='same')(up1)
    up2 = UpSampling1D(size=2)(conv4)
    decoded = Conv1D(filters=input_shape[-1], kernel_size=3, activation='sigmoid', padding='same')(up2)


    # モデルの定義
    autoencoder = Model(inputs, decoded)
    return autoencoder



Epoch 1/10


In [None]:
# モデルの構築
input_shape = padded_sensor_data.shape[1:]  # データの形状
autoencoder = stacked_what_where_autoencoder(input_shape)

# モデルのコンパイル
optimizer = Adam(learning_rate=0.001)  # 学習率を0.001に設定
autoencoder.compile(optimizer=optimizer, loss='mean_squared_error')

# モデルの学習
autoencoder.fit(padded_sensor_data, padded_sensor_data, epochs=10, batch_size=2, shuffle=True, verbose=1)

In [8]:
whos

Variable                  Type              Data/Info
-----------------------------------------------------
Adam                      type              <class 'keras.optimizers.optimizer_v2.adam.Adam'>
Conv1D                    type              <class 'keras.layers.conv<...>olutional.conv1d.Conv1D'>
Input                     function          <function Input at 0x000002637F4B0AF0>
MaxPooling1D              type              <class 'keras.layers.pool<...>_pooling1d.MaxPooling1D'>
Model                     type              <class 'keras.engine.training.Model'>
StandardScaler            type              <class 'sklearn.preproces<...>ng._data.StandardScaler'>
UpSampling1D              type              <class 'keras.layers.resh<...>sampling1d.UpSampling1D'>
data_matrix               list              n=6752
filedialog                module            <module 'tkinter.filedial<...>\tkinter\\filedialog.py'>
load_binary_data          function          <function load_binary_data at 0x000002