In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.decomposition import PCA
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset

# PCA 적용 함수 정의
def apply_pca(X_train, X_test, n_components):
    pca = PCA(n_components=n_components)
    X_train_pca = pca.fit_transform(X_train.reshape(X_train.shape[0], -1))  # 차원 축소
    X_test_pca = pca.transform(X_test.reshape(X_test.shape[0], -1))
    
    # RNN 모델의 입력 형태로 다시 변환 (배치 크기, 시퀀스 길이, 차원)
    X_train_pca = X_train_pca.reshape(X_train_pca.shape[0], -1, n_components)
    X_test_pca = X_test_pca.reshape(X_test_pca.shape[0], -1, n_components)
    
    return X_train_pca, X_test_pca

# 데이터 불러오기 및 전처리 (생략된 부분은 기존 코드 참조)
data = pd.read_csv('merged_data_final2.csv')  # 예시 파일 이름
data['Timestamp'] = pd.to_datetime(data['Timestamp'])
data['Timestamp'] = data['Timestamp'].astype('int64') // 10**9
data = data[['Timestamp', 'DI_uiSpeed', 'DI_vehicleSpeed','Physical_value','acc_value']]

# 기존 전처리 과정 그대로
scaler = MinMaxScaler()
data_normalized = scaler.fit_transform(data)

# 시퀀스 생성 (150 시퀀스 길이로 설정)
sequence_length = 150
X = []
y = []

for i in range(len(data_normalized) - sequence_length):
    X.append(data_normalized[i:i+sequence_length, :-1])
    y.append(data_normalized[i+sequence_length, -1])

X = np.array(X)
y = np.array(y)

# train-test split
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# PCA 적용 (축소된 차원 수 설정: 예시로 2)
n_components = 2
X_train_pca, X_test_pca = apply_pca(X_train, X_test, n_components)

# Tensor 변환
X_train_tensor = torch.tensor(X_train_pca, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)

X_test_tensor = torch.tensor(X_test_pca, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

# 데이터셋 및 데이터로더 생성
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)



In [5]:


# 데이터 불러오기 및 가공
data = pd.read_csv('test_data.csv')
data['Timestamp'] = pd.to_datetime(data['Timestamp'])
data['Timestamp'] = data['Timestamp'].astype('int64') // 10**9
data = data[['Timestamp', 'uiSpeed', 'vehicleSpeed', 'acc_value','Physical_value']]

# 데이터 정규화
scaler = MinMaxScaler()
data_normalized = scaler.fit_transform(data)
print(f"data_normalized.shape: {data_normalized.shape}")

# 시퀀스 생성
sequence_length = 150  # 시퀀스 길이
X = []
y = []

# acc_value를 타겟으로 설정 (data_normalized의 마지막 열)
for i in range(len(data_normalized) - sequence_length):
    X.append(data_normalized[i:i+sequence_length, :-1])
    y.append(data_normalized[i+sequence_length, -1])  # acc_value를 타겟으로 사용

X = np.array(X)
y = np.array(y)



data_normalized.shape: (3192, 5)


In [6]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# PCA 적용 (축소된 차원 수 설정: 예시로 2)
n_components = 2
X_train_pca, X_test_pca = apply_pca(X_train, X_test, n_components)

# Tensor 변환
X_train_tensor = torch.tensor(X_train_pca, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)

X_test_tensor = torch.tensor(X_test_pca, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

# 데이터셋 및 데이터로더 생성
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [7]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, dropout):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.rnn(x, h0)
        out = self.fc(out[:, -1, :])  # 마지막 타임스텝의 출력을 가져옴
        out = self.sigmoid(out)  # Binary classification을 위해 Sigmoid 적용
        return out

input_size = n_components  # PCA로 축소한 차원을 사용
hidden_size = 64
num_layers = 4
output_size = 1
dropout = 0.5

model = RNN(input_size, hidden_size, num_layers, output_size, dropout)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)



In [10]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

# Confusion Matrix 그리기 함수
def plot_confusion_matrix(y_true, y_pred_label):
    cm = confusion_matrix(y_true, y_pred_label)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["Negative", "Positive"], yticklabels=["Negative", "Positive"])
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.title("Confusion Matrix")
    plt.show()

# 학습 및 평가 함수 정의
def train_model(model, train_dataloader, test_dataloader, criterion, optimizer, num_epochs=10):
    model.train()
    global train_losses
    train_losses = [] # Train loss 저장 리스트
    global test_losses
    test_losses = []   # Test loss 저장 리스트
    for epoch in range(num_epochs):
        #start_time = time.time()
        model.train()
        running_loss = 0.0
        for X_batch, y_batch in train_dataloader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * X_batch.size(0)
        
        epoch_loss = running_loss / len(train_dataloader.dataset)
        train_losses.append(epoch_loss)

        #train_time = time.time() - start_time

        test_loss, test_accuracy, test_precision, test_recall, test_f1, y_true, y_pred_label = evaluate_model(model, test_dataloader, criterion)
        test_losses.append(test_loss)
        

        print(f'Epoch [{epoch+1}/{num_epochs}], '
              f'Train Loss: {epoch_loss:.4f}, '
              f'Test Loss: {test_loss:.4f}, '

            )
        
        print("-------------------------------------------------------")
    return y_true, y_pred_label  # 최종적으로 y_true와 y_pred_label 반환

def evaluate_model(model, test_dataloader, criterion):
    model.eval()
    global accuracy
    global precision
    global recall
    global f1
    with torch.no_grad():
        test_loss = 0.0
        y_pred = []
        y_true = []
        for X_batch, y_batch in test_dataloader:
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            test_loss += loss.item() * X_batch.size(0)
            
            # 예측값과 실제값 저장
            y_pred.extend(outputs.squeeze().cpu().numpy())
            y_true.extend(y_batch.squeeze().cpu().numpy())
        
        test_loss = test_loss / len(test_dataloader.dataset)
        
        # 예측값을 0.5 기준으로 이진분류
        y_pred = np.array(y_pred)
        y_pred_label = (y_pred >= 0.5).astype(int)
        
        # 평가 지표 계산
        accuracy = accuracy_score(y_true, y_pred_label)
        precision = precision_score(y_true, y_pred_label)
        recall = recall_score(y_true, y_pred_label)
        f1 = f1_score(y_true, y_pred_label)

        return test_loss, accuracy, precision, recall, f1, y_true, y_pred_label

def plot_losses(train_losses, test_losses):
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(test_losses, label='Test Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Test Loss Over Epochs')
    plt.legend()
    plt.show()


# 모델 학습 및 평가
num_epochs = 30  # 에포크 수 조정 가능
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Train the model and get the true and predicted labels
y_true, y_pred_label = train_model(model, train_dataloader, test_dataloader, criterion, optimizer, num_epochs)

# Plot the confusion matrix
plot_confusion_matrix(y_true, y_pred_label)

plot_losses(train_losses, test_losses)

# 최종 평가 지표 출력
evaluate_model(model, test_dataloader, criterion)
print(
    f'Test Accuracy: {accuracy:.4f}, '
    f'Test Precision: {precision:.4f}, '
    f'Test Recall: {recall:.4f}, '
    f'Test F1-Score: {f1:.4f}, '
)


TypeError: iteration over a 0-d array