# 実験概要
## CMNによる特徴量抽出
**ケプストラム平均正規化** (Cepstrum Mean Normalization : CMN) を使用し、特徴量を抽出する。
切り出した波形に対して**離散フーリエ変換** (Discrete Fourier Transform : DFT) を行い、絶対値を取ることで**振幅スペクトル**を得る。
これに対して対数を取って**対数振幅スペクトル**に変換し、逆離散フーリエ変換 (Inverse Discrete Fourier Transform : IDFT) を行うことで、ケプストラム領域へと変換し、先頭50要素を切り出し、結合した100要素の配列を前処理として返す。
この前処理は
```
left, right, posture = pp.slicer("raw\\" + tester.**.**.value)
cepstrum = pp.cmn_denoise(left, right)
```
によって行われる。

## DNNによる寝姿勢分類
DNNによって前処理したデータから学習・分類を行う。

# モジュールのインポート

In [134]:
import numpy as np
import glob

# 自作モジュール
import datapath as dpath
import preprocess as pp

# DataLoaderの定義

In [135]:
# tester以外のrawを読み込む
for p in glob.glob(".\\raw\\LMH\\*\\", recursive=True):
    print(p)

In [136]:
e = dpath.LMH.H002.value.fl_center
print(e.value)

type, tester, mattress = dpath.getattributes(e, position=False)
print(f"type : {type},  tester : {tester},  mattress : {mattress}")
train_paths = eval(f"dpath.{type}.serch('{mattress}')")
print(train_paths)

raw\LMH\H002\fl_center
type : LMH,  tester : H002,  mattress : fl
[WindowsPath('raw/LMH/H002/fl_center'), WindowsPath('raw/LMH/H003/fl_center'), WindowsPath('raw/LMH/L001/fl_center'), WindowsPath('raw/LMH/L003/fl_center'), WindowsPath('raw/LMH/M001/fls_center'), WindowsPath('raw/LMH/M001/fld_center'), WindowsPath('raw/LMH/M001/fls_center'), WindowsPath('raw/LMH/M002/fls_center'), WindowsPath('raw/LMH/M002/fld_center'), WindowsPath('raw/LMH/M002/fls_center'), WindowsPath('raw/LMH/M003/fls_center'), WindowsPath('raw/LMH/M003/fld_center'), WindowsPath('raw/LMH/M003/fls_center'), WindowsPath('raw/LMH/M004/fls_center'), WindowsPath('raw/LMH/M004/fld_center'), WindowsPath('raw/LMH/M004/fls_center'), WindowsPath('raw/LMH/H002/fl_center'), WindowsPath('raw/LMH/H003/fl_center'), WindowsPath('raw/LMH/L001/fl_center'), WindowsPath('raw/LMH/L003/fl_center'), WindowsPath('raw/LMH/M001/fls_center'), WindowsPath('raw/LMH/M001/fld_center'), WindowsPath('raw/LMH/M001/fls_center'), WindowsPath('raw/LMH/

In [137]:
import torch
from typing import Tuple
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

# 学習用データセットを読み込むためのDataLoaderを定義
class train_datasets(Dataset):
    def __init__(self, identity, transforms=None):
        self.transform = transforms
        self.type, self.tester, self.mattress = dpath.getattributes(identity)
        print(self.type, self.tester, self.mattress)
        print("train")

        # テスト以外のrawを読み込む
        self.train_cepstrum = np.empty((0, 100))  # 100要素の配列
        self.train_posture = np.empty(0)  # 姿勢データ配列

        # mattressに該当するrawのpathを読み込む
        train_paths = eval(f"dpath.{self.type}.serch('{self.mattress}')")
        for p in train_paths:
            if identity.value == p:
                continue
            left, right, posture = pp.slicer(p)
            cepstrum = pp.cmn_denoise(left, right)
            for c in cepstrum:
                self.train_cepstrum = np.vstack((self.train_cepstrum, c)) if self.train_cepstrum.size else c
            self.train_posture = np.append(self.train_posture, posture-1) if self.train_posture.size else posture

    def __len__(self):
        return len(self.train_posture)

    def __getitem__(self, idx) -> Tuple[torch.tensor, torch.tensor]:
        cepstrum = torch.tensor(self.train_cepstrum[idx], dtype=torch.float32)
        posture = torch.tensor(self.train_posture[idx], dtype=torch.long)
        if self.transform:
            cepstrum = self.transform(cepstrum)
        return cepstrum, posture


# test用データセット
class test_datasets(Dataset):
    def __init__(self, identity, transform=None):
        self.transform = transform
        self.type, self.tester, self.mattress = dpath.getattributes(identity)
        print("test")

        # 前処理したrawを読み込む
        self.left, self.right, self.test_posture = pp.slicer(identity.value)
        self.test_posture -= 1
        self.test_cepstrum = pp.cmn_denoise(self.left, self.right)

    def __len__(self):
        return len(self.test_posture)

    def __getitem__(self, idx) -> Tuple[torch.tensor, torch.tensor]:
        cepstrum = torch.tensor(self.test_cepstrum[idx], dtype=torch.float32)
        posture = torch.tensor(self.test_posture[idx], dtype=torch.long)
        if self.transform:
            cepstrum = self.transform(cepstrum)
        return cepstrum, posture

# 学習＆検証

In [138]:
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


## モデル定義
1次元ResNetを定義する。nn.Conv1dを用いて残差ブロックを定義する。

In [139]:
import torch.nn as nn

class Bottlrneck(torch.nn.Module):
    def __init__(self,In_channel,Med_channel,Out_channel,downsample=False):
        super(Bottlrneck, self).__init__()
        self.stride = 1
        if downsample == True:
            self.stride = 2

        self.layer = torch.nn.Sequential(
            nn.Conv1d(In_channel, Med_channel, 1, self.stride),
            nn.BatchNorm1d(Med_channel),
            nn.ReLU(),
            nn.Conv1d(Med_channel, Med_channel, 3, padding=1),
            nn.BatchNorm1d(Med_channel),
            nn.ReLU(),
            nn.Conv1d(Med_channel, Out_channel, 1),
            nn.BatchNorm1d(Out_channel),
            nn.ReLU(),
        )

        if In_channel != Out_channel:
            self.res_layer = torch.nn.Conv1d(In_channel, Out_channel,1,self.stride)
        else:
            self.res_layer = None

    def forward(self,x):
        if self.res_layer is not None:
            residual = self.res_layer(x)
        else:
            residual = x
        return self.layer(x)+residual


class ResNet50(torch.nn.Module):
    def __init__(self, in_channels, classes):
        super(ResNet50, self).__init__()
        self.features = torch.nn.Sequential(
            torch.nn.Conv1d(in_channels,64,kernel_size=7,stride=2,padding=3),
            torch.nn.MaxPool1d(3,2,1),

            Bottlrneck(64,64,256,False),
            Bottlrneck(256,64,256,False),
            Bottlrneck(256,64,256,False),
            #
            Bottlrneck(256,128,512, True),
            Bottlrneck(512,128,512, False),
            Bottlrneck(512,128,512, False),
            Bottlrneck(512,128,512, False),
            #
            Bottlrneck(512,256,1024, True),
            Bottlrneck(1024,256,1024, False),
            Bottlrneck(1024,256,1024, False),
            Bottlrneck(1024,256,1024, False),
            Bottlrneck(1024,256,1024, False),
            Bottlrneck(1024,256,1024, False),
            #
            Bottlrneck(1024,512,2048, True),
            Bottlrneck(2048,512,2048, False),
            Bottlrneck(2048,512,2048, False),

            torch.nn.AdaptiveAvgPool1d(1)
        )
        self.classifer = torch.nn.Sequential(
            torch.nn.Linear(2048,classes)
        )

    def forward(self,x):
        x = self.features(x)
        x = x.view(-1,2048)
        x = self.classifer(x)
        return x

if __name__ == '__main__':
    x = torch.randn(size=(1,1,224))
    # x = torch.randn(size=(1,64,224))
    # model = Bottlrneck(64,64,256,True)
    model = ResNet50(in_channels=1,classes=5)

    output = model(x)
    print(output.shape)



torch.Size([1, 5])


ResNetを定義する

In [140]:
identity = dpath.LMH.H002.value.st_center

train = train_datasets(identity)
test = test_datasets(identity)
batch_size = 64

train_loader = DataLoader(
    train,
    batch_size=batch_size,
    shuffle=True,
)

test_loader = DataLoader(
    test,
    batch_size=batch_size,
    shuffle=True,
)

LMH H002 st
train
data[340 / 403]
data[186 / 208]
data[173 / 208]
data[192 / 324]
data[174 / 438]
data[220 / 481]
data[87 / 281]
data[340 / 403]
data[186 / 208]
data[173 / 208]
data[192 / 324]
data[174 / 438]
data[220 / 481]
data[87 / 281]
test
data[194 / 230]


## 学習

In [141]:
import torch.optim as optim

# モデルのインスタンス化
net = ResNet50(in_channels=100, classes=4).to(device)

# 誤差関数を交差エントロピーで計算
criterion = nn.CrossEntropyLoss()

# 最適化アルゴリズム
optimizer = optim.Adam(net.parameters(), lr = 0.1)

# 学習
n_epoch = 100
for epoch in range(n_epoch):
    # 精度と損失の初期化
    train_acc, train_loss = 0, 0
    val_acc, val_loss = 0, 0
    n_train, n_test = 0, 0

    # 学習
    for train_input, train_label in train_loader:
        n_train += len(train_label)

        # 入力と正解ラベルをGPU上に移動
        input = train_input.to(device)
        label = train_label.to(device)
        print(f'input : {input.shape}, label : {label.shape}')

        optimizer.zero_grad()
        output = net(input)
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()

        predicted = torch.max(output, 1)[1]

        train_loss += loss.item()
        train_acc += (predicted == label).sum().item()

    # 検証
    for test_input, test_label in test_loader:
        n_test += len(test_label)

        test_input = test_input.to(device)
        test_label = test_label.to(device)

        test_output = net(test_input)
        test_loss = criterion(test_output, test_label)

        test_predicted = torch.max(test_output, 1)[1]

        test_loss += test_loss.item()
        val_acc += (test_predicted == test_label).sum().item()

    # 精度を確率に変換
    train_acc /= n_train
    val_acc /= n_test
    train_loss = train_loss * batch_size / n_train
    val_loss = val_loss * batch_size / n_test

    print(f"Epoch[{epoch+1}/{n_epoch}], | loss: {train_loss:.5f} | acc: {train_acc:.5f} | val_loss: {val_loss:.5f} | val_acc: {val_acc:.5f}")


input : torch.Size([64, 100]), label : torch.Size([64])


RuntimeError: Given groups=1, weight of size [64, 100, 7], expected input[1, 64, 100] to have 100 channels, but got 64 channels instead