In [1]:
pwd

'c:\\Users\\user\\KeepMeSafe\\AI_Project'

In [2]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import os

class Autoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers):
        super(Autoencoder, self).__init__()
        self.encoder = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.decoder = nn.LSTM(hidden_dim, input_dim, num_layers, batch_first=True)
        self.linear = nn.Linear(input_dim, input_dim)

    def forward(self, x):
        _, (hn, _) = self.encoder(x)
        hn = hn[-1].unsqueeze(0)
        repeated_hn = hn.repeat(1, x.size(1), 1)
        decoded, _ = self.decoder(repeated_hn)
        decoded = decoded.contiguous().view(-1, x.size(2))
        decoded = self.linear(decoded)
        return decoded.view(x.size(0), x.size(1), -1)

def train_model(file_path):
    df = pd.read_csv(file_path)
    df['RegisterDate'] = pd.to_datetime(df['RegisterDate'])
    df.sort_values(by='RegisterDate', inplace=True)
    df.fillna(method='ffill', inplace=True)

    heartbeat_data = df['Heartbeat'].values.reshape(-1, 1)
    scaler = MinMaxScaler(feature_range=(0, 1))
    heartbeat_data_scaled = scaler.fit_transform(heartbeat_data)

    X = torch.Tensor(heartbeat_data_scaled).unsqueeze(2)
    X_train, X_valid = train_test_split(X, test_size=0.2, random_state=42)

    train_dataset = TensorDataset(X_train, X_train)
    valid_dataset = TensorDataset(X_valid, X_valid)

    train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
    valid_loader = DataLoader(dataset=valid_dataset, batch_size=64, shuffle=False)

    model = Autoencoder(input_dim=1, hidden_dim=50, num_layers=2)
    
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    epochs = 100
    best_loss = float('inf')
    for epoch in range(epochs):
        model.train()
        train_loss = 0.0
        for inputs, _ in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, inputs)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        
        model.eval()
        valid_loss = 0.0
        with torch.no_grad():
            for inputs, _ in valid_loader:
                outputs = model(inputs)
                loss = criterion(outputs, inputs)
                valid_loss += loss.item()

        train_loss = train_loss / len(train_loader)
        valid_loss = valid_loss / len(valid_loader)
        
        if valid_loss < best_loss:
            best_loss = valid_loss
            user_code = os.path.basename(file_path).split('.')[0]
            model_save_path = f'best_model_{user_code}.pth'
            torch.save(model.state_dict(), model_save_path)
    
    print(f'Training complete for {file_path}.')

# 파일 경로
data_folder_path = '../data/user_code_total/'

# 지정된 폴더 내의 모든 CSV 파일을 반복 처리
for file_name in os.listdir(data_folder_path):
    if file_name.endswith('.csv'):
        file_path = os.path.join(data_folder_path, file_name)
        train_model(file_path)


  df.fillna(method='ffill', inplace=True)


Training complete for ../data/user_code_total/user_code_001.csv.


  df.fillna(method='ffill', inplace=True)


Training complete for ../data/user_code_total/user_code_006.csv.


  df.fillna(method='ffill', inplace=True)


Training complete for ../data/user_code_total/user_code_007.csv.


  df.fillna(method='ffill', inplace=True)


Training complete for ../data/user_code_total/user_code_009.csv.


  df.fillna(method='ffill', inplace=True)


Training complete for ../data/user_code_total/user_code_010.csv.


  df.fillna(method='ffill', inplace=True)


KeyboardInterrupt: 