In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import os

data_folder = "/content/drive/Othercomputers/マイ iMac/2024_中島/visual studio/波形/mnt/data/extend_data"
label_file = "/content/drive/Othercomputers/マイ iMac/2024_中島/visual studio/波形/mnt/data/label_data/label_data(ex).csv"

label_df = pd.read_csv(label_file)
#データの最大値、最小値の取得
global_min = {"x_data": float("inf"), "y_data": float("inf"), "x_data_fft": float("inf"), "y_data_fft": float("inf")}
global_max = {"x_data": float("-inf"), "y_data": float("-inf"), "x_data_fft": float("-inf"), "y_data_fft": float("-inf")}

for _, row in label_df.iterrows():
    file_name = row["file_name"]
    file_path = os.path.join(data_folder, file_name)

    if os.path.exists(file_path):
        df = pd.read_csv(file_path)

        for key in global_min.keys():
            global_min[key] = min(global_min[key], df[key].min())
            global_max[key] = max(global_max[key], df[key].max())

print("取得した最大・最小値")
print(global_min)
print(global_max)


取得した最大・最小値
{'x_data': -0.15785985886834, 'y_data': -0.2151365352777189, 'x_data_fft': 0.0, 'y_data_fft': 0.0}
{'x_data': 0.1554234849246505, 'y_data': 0.2118111809023308, 'x_data_fft': 33.977, 'y_data_fft': 40.005}


In [None]:
#データの正規化
def normalize(data, min_val, max_val):
    return (data - min_val) / (max_val - min_val)

all_data = []
all_labels = []

for _, row in label_df.iterrows():
    file_name, label = row["file_name"], row["label"]
    file_path = os.path.join(data_folder, file_name)

    if os.path.exists(file_path):
        df = pd.read_csv(file_path)

        x_data_norm = normalize(df["x_data"].values, global_min["x_data"], global_max["x_data"])
        y_data_norm = normalize(df["y_data"].values, global_min["y_data"], global_max["y_data"])
        x_data_fft_norm = normalize(df["x_data_fft"].values, global_min["x_data_fft"], global_max["x_data_fft"])
        y_data_fft_norm = normalize(df["y_data_fft"].values, global_min["y_data_fft"], global_max["y_data_fft"])

        normalized_data = np.stack([x_data_norm, y_data_norm, x_data_fft_norm, y_data_fft_norm], axis=0)
        all_data.append(normalized_data)
        all_labels.append(label)

X = np.array(all_data)
y = np.array(all_labels)

print(" 正規化後のデータ形状:", X.shape)
print(" ラベルの形状:", y.shape)


 正規化後のデータ形状: (0,)
 ラベルの形状: (0,)


In [None]:
#  データの分割 (80% を訓練, 20% をテスト)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)




In [None]:
# CNN-LSTMモデルの定義
class CNN_LSTM_Waveform(nn.Module):
    def __init__(self):
        super(CNN_LSTM_Waveform, self).__init__()

        self.layer1 = nn.Sequential(
            nn.Conv1d(4, 16, kernel_size=5, padding=2),
            nn.BatchNorm1d(16),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv1d(16, 32, kernel_size=5, padding=2),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        self.layer3 = nn.Sequential(
            nn.Conv1d(32, 64, kernel_size=5, padding=2),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.lstm = nn.LSTM(input_size=64, hidden_size=256, num_layers=3, batch_first=True)

        self.fc1 = nn.Linear(256, 128)
        self.fc2 = nn.Linear(128, 2)

    def forward(self, x):

        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)

        out = out.permute(0, 2, 1)
        lstm_out, _ = self.lstm(out)
        lstm_out = lstm_out[:, -1, :]

        # 全結合層
        out = self.fc1(lstm_out)
        out = self.fc2(out)
        return out




In [None]:
#学習
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN_LSTM_Waveform().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

import matplotlib.pyplot as plt

loss_history = []

def train_model():
    model.train()
    num_epochs = 30
    for epoch in range(num_epochs):
        running_loss = 0.0
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            # 順伝播
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)

            # 逆伝播と最適化
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        epoch_loss = running_loss / len(train_loader)
        loss_history.append(epoch_loss)
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}")

    plt.plot(loss_history, label="Training Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Training Loss Over Epochs")
    plt.legend()
    plt.show()



In [None]:
#学習
def evaluate_model():
    model.eval()
    correct = 0
    total = 0
    batch_size = test_loader.batch_size
    num_batches = len(train_loader)

    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            _, predicted = torch.max(outputs, 1)

            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()

    accuracy = 100 * correct / total

    print(f"学習データ数: {len(train_dataset)}")
    print(f"未学習データ数: {len(test_dataset)}")
    print(f"バッチサイズ: {batch_size}")
    print(f"バッチ数: {num_batches}")
    print(f":Accurcy {correct}/{total} ({accuracy:.2f}%)")


train_model()
evaluate_model()


In [None]:
torch.save(model.state_dict(), "cnn_lstm.pth")


In [None]:
model.load_state_dict(torch.load("cnn_lstm.pth"))
model.eval()


In [None]:
import torch

class CNN_LSTM_Waveform(nn.Module):
    def __init__(self):
        super(CNN_LSTM_Waveform, self).__init__()

        self.layer1 = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=5, padding=2),
            nn.BatchNorm1d(16),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv1d(16, 32, kernel_size=5, padding=2),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        self.layer3 = nn.Sequential(
            nn.Conv1d(32, 64, kernel_size=5, padding=2),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.lstm = nn.LSTM(input_size=64, hidden_size=256, num_layers=3, batch_first=True)

        self.fc1 = nn.Linear(256, 128)
        self.fc2 = nn.Linear(128, 2)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)

        out = out.permute(0, 2, 1)
        lstm_out, _ = self.lstm(out)
        lstm_out = lstm_out[:, -1, :]

        out = self.fc1(lstm_out)
        out = self.fc2(out)
        return out

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_path = "/mnt/data/trained_model.pth"
model = CNN_LSTM_Waveform().to(device)
model.load_state_dict(torch.load(model_path, map_location=device))
model.eval()
