In [3]:
import numpy as np
import os
import random
import scipy
from scipy import signal
from numpy.fft import fft
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.initializers import he_normal
from tensorflow.python.keras.utils.vis_utils import plot_model

from sklearn.model_selection import train_test_split

import pandas as pd
import pickle

import seaborn as sns
sns.set(font='Yu Gothic')
import matplotlib.pyplot as plt
%matplotlib inline

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

In [4]:
file_path = "../Data/センサ別npyファイル"

In [9]:
def norm(x):
    x = np.sqrt(np.square(x[:, :, 0]) + np.square(x[:, :, 1]) + np.square(x[:, :, 2]))
    return x.reshape([-1, 500])

def spectrogtam(x):
    nfft = 80
    overlap = nfft - 10
    
    x = scipy.signal.spectrogram(x, fs=100, nfft=nfft, noverlap=overlap, nperseg=nfft)[2]
    x = (x - x.mean(axis=1, keepdims=True)) / x.std(axis=1, keepdims=True)
    return x

In [6]:
train_label = np.load(file_path + "/train/train_Hips/train_Hips_Label.npy")[:, 0].reshape([-1])
train_label.shape

(195491,)

In [11]:
def load_npy(sensor="Acc", kind="train"):
    x = np.load(file_path + "/" + kind + "/" + kind + "_Hips/" + kind + "_Hips_Glo_" + sensor + "_ver3.npy")
    x = norm(x)
    x = np.apply_along_axis(spectrogtam, 1, x)
    return x

train_Hips_Acc = load_npy("Acc", "train")
train_Hips_Acc = train_Hips_Acc.reshape([-1, 1, train_Hips_Acc.shape[1], train_Hips_Acc.shape[2], 1])
train_Hips_Mag = load_npy("Mag", "train")
train_Hips_Mag = train_Hips_Mag.reshape([-1, 1, train_Hips_Mag.shape[1], train_Hips_Mag.shape[2], 1])
train_Hips_Gyr = load_npy("Gyr", "train")
train_Hips_Gyr = train_Hips_Gyr.reshape([-1, 1, train_Hips_Gyr.shape[1], train_Hips_Gyr.shape[2], 1])

train_Hips_Acc.shape

(195491, 1, 41, 43, 1)

In [None]:
X_train = np.concatenate([train_Hips_Acc, train_Hips_Mag, train_Hips_Gyr], axis=1)
X_train.shape

In [None]:
pattern_file = np.load("").reshape([-1]) #パターンファイルのパス入れてくれ
pattern_file.shape

In [None]:
val_label = np.load(file_path + "/validation/validation_Hips/validation_Hips_Label.npy")[:, 0].reshape([-1])
val_label.shape

In [None]:
validation_Hips_Acc = load_npy("Acc", "val").reshape([-1, 1, validation_Hips_Acc.shape[1], validation_Hips_Acc.shape[2], 1])
validation_Hips_Mag = load_npy("Mag", "val").reshape([-1, 1, validation_Hips_Mag.shape[1], validation_Hips_Mag.shape[2], 1])
validation_Hips_Gyr = load_npy("Gyr", "val").reshape([-1, 1, validation_Hips_Gyr.shape[1], validation_Hips_Gyr.shape[2], 1])

In [None]:
X_val = np.concatenate([val_Hips_Acc, val_Hips_Mag, val_Hips_Gyr], axis=1)
X_val.shape

# 前処理

In [None]:
X_train = np.round(X_train, 5)
X_val = np.round(X_val, 5)

X_train.shape, X_val.shape

# validationデータをパターンで分ける処理

In [None]:
X_val_pattern0 = X_val[pattern_file == 0]
X_val_pattern1 = X_val[pattern_file == 1]
X_val_pattern2 = X_val[pattern_file == 2]

X_val_pattern0.shape, X_val_pattern1.shape, X_val_pattern2.shape

# データを7:3にするパターン

In [None]:
def separateData(x, y):
    pattern = np.zeros(x.shape[0])
    for i in range(8):
        tmp = np.where(y == (i+1))[0]
        pattern[tmp[tmp.shape[0]//10*7:]] = 1
        
    x_train = x[pattern == 0]
    x_test = x[pattern == 1]
    y_train = y[pattern == 0]
    y_test = y[pattern == 1]
    return x_train, x_test, y_train, y_test

In [None]:
x_train, x_val, y_train, y_val = separateData(X_train, train_label)
x_train.shape, x_val.shape

# 学習

In [None]:
def BuildModel(input_shape):
    input1 = layers.Input(shape=input_shape)
    x = layers.Conv2D(64, (3, 3), padding='valid', activation='relu', kernel_initializer=he_normal())(input1)
    x = layers.Conv2D(32, (3, 3), padding='same', activation='relu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(64, (3, 3), padding='same', activation='relu')(x)
    x = layers.Conv2D(32, (3, 3), padding='same', activation='relu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Dropout(0.2)(x)
    x = layers.Conv2D(64, (3, 3), padding='same', activation='relu')(x)
    x = layers.Conv2D(32, (3, 3), padding='same', activation='relu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Flatten()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(16, activation='relu')(x)
    x = models.Model(inputs=input1, outputs=x)
    return x

In [None]:
x1 = BuildModel(x_train[0, 0].shape)
x2 = BuildModel(x_train[0, 1].shape)
x3 = BuildModel(x_train[0, 2].shape)

combined = layers.concatenate([x1.output, x2.output, x3.output])

z = layers.Dense(64, activation='relu')(combined)
z = layers.Dense(16, activation='relu')(z)
z = layers.Dense(5, activation='softmax')(z)

model = models.Model(inputs=[x1.input, x2.input, x3.input], outputs=z)
model.compile(optimizer=tf.keras.optimizers.Adam(0.01),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
model.summary()

In [None]:
save_folder = "/" #保存ディレクトリを指定(後ろにスラッシュ入れてね)

import os
if not os.path.exists(save_folder):
    os.makedirs(save_folder)

cp_cb = tf.keras.callbacks.ModelCheckpoint(filepath=save_folder + "model_{epoch:02d}-{loss:.2f}-{accuracy:.2f}-{val_loss:.2f}-{val_accuracy:.2f}.hdf5", 
                                           monitor='val_loss', verbose=0, save_best_only=True, mode='auto')
es_cb = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=30, verbose=0, mode='auto')
history = model.fit([x_train[:, 0], x_train[:, 1], x_train[:, 2]], y_train, epochs=256, batch_size=512, \
                    validation_data=([x_val[:, 0], x_val[:, 1], x_val[:, 2]], y_val), callbacks=[cp_cb, es_cb])

In [None]:
# Plot training & validation accuracy values
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

# Plot training & validation loss values
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
# plt.ylim((0, 3.5))
plt.show()

# 推定

In [None]:
def predict_func(x1, x2, x3):
    model = tf.keras.models.load_model("") #ベストモデルのパス入れて！
    predict = model.predict([x1, x2, x3])
    return predict

In [None]:
def save_npy(kind, hold_position, x):
    np.save(kind + "_" + hold_position + "_横山分類_pattern2", x)

In [None]:
def flow(kind, hold_position):
    label = np.load(file_path + kind + "/" + kind + "_Hips/" + kind + "_Hips_Label.npy")[:, 0].reshape([-1])
    x1, x2, x3 = load_npy("Acc", kind), load_npy("Mag", kind), load_npy("Gyr", kind)
    x1 = x1.reshape([-1, x1.shape[1], x1.shape[2], x1.shape[3], 1])
    x2 = x2.reshape([-1, x2.shape[1], x2.shape[2], x2.shape[3], 1])
    x3 = x3.reshape([-1, x3.shape[1], x3.shape[2], x3.shape[3], 1])
    x1 = np.round(x1, 5)
    x2 = np.round(x2, 5)
    x3 = np.round(x3, 5)
    predict = predict_func(x1, x2, x3)
    save_npy(kind, hold_position, predict.reshape([-1, 1]))

In [None]:
kinds = ["train", "validation"]
hold_positions = ["Hips"]

i = 0
for kind in kinds:
    for hold_position in tqdm(hold_positions):
        flow(kind, hold_position)

# testデータの推定

In [None]:
def test_load(sensor="Acc"):
    x = np.load(file_path + "/test/test_Glo_" sensor + "_ver3.npy")
    x = norm(x)
    x = spectrogtam(x)
    return x

In [None]:
test1, test2, test3 = test_load("Acc"), test_load("Mag"), test_load("Gyr")
test1 = np.round(test1, 5)
test2 = np.round(test2, 5)
test3 = np.round(test3, 5)


In [None]:
predict = model.predict([test1, test2, test3])
np.save("test_横山分類_pattern2.npy", predict)