In [29]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder, MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.metrics import roc_curve, auc

file_path = "./Dummy_02.csv"

# 1. 데이터 로드
def load_data(file_path):
    df = pd.read_csv(file_path)
    return df

In [30]:
# 2. 시간 데이터 처리
def process_time_data(df):
    # event_time을 datetime 형식으로 변환
    df['event_time'] = pd.to_datetime(df['event_time'])
    
    # epc_code별로 시간 차이 계산
    df['time_diff'] = df.groupby('epc_code')['event_time'].diff().dt.total_seconds().fillna(0)
    
    # 시간의 순환적 특성 인코딩
    df['hour'] = df['event_time'].dt.hour
    df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
    df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
    return df

In [31]:
# 3. Label Encoding
def encode_categorical_data(df):
    # hub_type, event_type 라벨 인코딩
    le_hub = LabelEncoder()
    le_event = LabelEncoder()
    df['hub_type_encoded'] = le_hub.fit_transform(df['hub_type'])
    df['event_type_encoded'] = le_event.fit_transform(df['event_type'])

    # hub_event_combined 생성 후 인코딩
    df['hub_event_combined'] = df['hub_type'] + '_' + df['event_type']
    le_combined = LabelEncoder()
    df['hub_event_encoded'] = le_combined.fit_transform(df['hub_event_combined'])

    return df

In [32]:
# 4. 시계열 데이터 구성
def create_sequences(df, seq_length):
    sequences = []
    for _, group in df.groupby('epc_code'):
        group_data = group[['time_diff', 'hub_type_encoded', 'event_type_encoded', 'hub_event_encoded']].values
        for i in range(len(group_data) - seq_length + 1):
            sequences.append(group_data[i:i + seq_length])
    return np.array(sequences)


In [33]:
# 5. 데이터 정규화
def normalize_data(sequences):
    scaler = StandardScaler()
    for feature_idx in range(sequences.shape[2]):
        sequences[:, :, feature_idx] = scaler.fit_transform(sequences[:, :, feature_idx])
    return sequences

In [34]:
# 6. 데이터 분할
def split_data(sequences, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
    X_train, X_temp = train_test_split(sequences, test_size=val_ratio + test_ratio, random_state=42)
    X_val, X_test = train_test_split(X_temp, test_size=test_ratio / (val_ratio + test_ratio), random_state=42)
    return X_train, X_val, X_test

In [35]:
# 7. Autoencoder 모델 생성
def build_autoencoder(seq_length, feature_dim):
    input_seq = layers.Input(shape=(seq_length, feature_dim))
    x = layers.LSTM(256, activation='relu', return_sequences=True)(input_seq)
    x = layers.LSTM(128, activation='relu', return_sequences=True)(x)
    x = layers.LSTM(64, activation='relu', return_sequences=False)(x)
    latent = layers.Dense(32, activation='relu')(x)
    x = layers.RepeatVector(seq_length)(latent)
    x = layers.LSTM(64, activation='relu', return_sequences=True)(x)
    x = layers.LSTM(128, activation='relu', return_sequences=True)(x)
    x = layers.LSTM(256, activation='relu', return_sequences=True)(x)
    output_seq = layers.TimeDistributed(layers.Dense(feature_dim))(x)
    autoencoder = models.Model(input_seq, output_seq)
    return autoencoder

In [36]:
# 8. 재구성 결과 시각화
def visualize_reconstruction(X_test, reconstructed, n_samples=5):
    fig, axes = plt.subplots(n_samples, 2, figsize=(10, 10))
    for i in range(n_samples):
        axes[i, 0].plot(np.mean(X_test[i], axis=1), label="Original", alpha=0.7)
        axes[i, 0].set_title(f"Original Sequence {i+1}")
        axes[i, 0].legend()
        axes[i, 1].plot(np.mean(reconstructed[i], axis=1), label="Reconstructed", alpha=0.7, linestyle="--")
        axes[i, 1].set_title(f"Reconstructed Sequence {i+1}")
        axes[i, 1].legend()
    plt.tight_layout()
    plt.show()

In [37]:
# 9. 재구성 오류 분포 시각화
def plot_reconstruction_error(X_test, reconstructed):
    errors = np.mean(np.abs(X_test - reconstructed), axis=(1, 2)) #mse -> abs로 교체 후 성능 개선
    plt.figure(figsize=(8, 5))
    plt.hist(errors, bins=50, alpha=0.75, color='blue', label='Reconstruction Error')
    plt.axvline(np.mean(errors) + 2 * np.std(errors), color='red', linestyle='--', label='Threshold (Mean + 2*Std)')
    plt.title("Reconstruction Error Distribution")
    plt.xlabel("Reconstruction Error")
    plt.ylabel("Frequency")
    plt.legend()
    plt.show()
    return errors

In [38]:
# 10. 이상치 탐지 및 시각화
def detect_anomalies(errors, threshold):
    total_samples = len(errors)
    anomalies = errors > threshold
    anomaly_count = np.sum(anomalies)
    anomaly_ratio = anomaly_count / total_samples * 100
    print(f"Anomaly ratio: {anomaly_ratio:.2f}%")
    print(f"Number of anomalies detected: {np.sum(anomalies)}")
    return anomalies

def visualize_anomalies(X_test, reconstructed, anomalies, n_samples=5):
    # Ensure data is numpy arrays
    X_test = np.array(X_test)
    reconstructed = np.array(reconstructed)
    
    anomaly_indices = np.where(anomalies)[0]
    
    if len(anomaly_indices) == 0:
        print("No anomalies detected.")
        return
    
    # Ensure anomaly indices do not exceed available samples
    n_samples = min(n_samples, len(anomaly_indices))
    selected_indices = anomaly_indices[:n_samples]
    
    fig, axes = plt.subplots(n_samples, 2, figsize=(10, 10))
    for i, idx in enumerate(selected_indices):
        axes[i, 0].plot(np.mean(X_test[idx], axis=1), label="Original", alpha=0.7)
        axes[i, 0].set_title(f"Anomalous Sequence {idx+1}: Original")
        axes[i, 0].legend()
        
        axes[i, 1].plot(np.mean(reconstructed[idx], axis=1), label="Reconstructed", alpha=0.7, linestyle="--")
        axes[i, 1].set_title(f"Anomalous Sequence {idx+1}: Reconstructed")
        axes[i, 1].legend()
    
    plt.tight_layout()
    plt.show()


In [39]:
# 11. 파이프라인 실행
def preprocess_pipeline(file_path, seq_length=10):
    df = load_data(file_path)
    df = process_time_data(df)
    df = encode_categorical_data(df)
    df = df.drop(columns=['product_serial', 'product_name'])
    sequences = create_sequences(df, seq_length)
    sequences = normalize_data(sequences)
    X_train, X_val, X_test = split_data(sequences)
    return X_train, X_val, X_test

loss는 계속 감소하지만 val_loss가 증가하기 시작하면 과적합 신호입니다.

In [40]:
USE_CUDA = torch.cuda.is_available()
device = torch.device("cuda" if USE_CUDA else 'cpu')
print(f'다음 기기로 학습 : {device}')

다음 기기로 학습 : cpu


In [None]:
# 12. 실행
seq_length = 10
X_train, X_val, X_test = preprocess_pipeline(file_path, seq_length)

# Autoencoder 생성 및 학습
feature_dim = X_train.shape[2]
autoencoder = build_autoencoder(seq_length, feature_dim)
autoencoder.compile(optimizer='adam', loss='mse', metrics=['accuracy'])
history = autoencoder.fit(X_train, X_train, validation_data=(X_val, X_val), epochs=20, batch_size=32, shuffle=True)

# 학습 과정 시각화 추가
def plot_training_history(history):
    plt.figure(figsize=(8, 5))
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title("Training and Validation Loss Over Epochs")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

plot_training_history(history)

# 테스트 데이터 평가 및 결과 분석
reconstructed = autoencoder.predict(X_test)
visualize_reconstruction(X_test, reconstructed)
reconstruction_errors = plot_reconstruction_error(X_test, reconstructed)

# 이상치 탐지
threshold = np.mean(reconstruction_errors) + 2 * np.std(reconstruction_errors)
anomalies = detect_anomalies(reconstruction_errors, threshold)
visualize_anomalies(X_test, reconstructed, anomalies)

In [42]:
#13. 모델 저장
autoencoder.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError(), metrics=['accuracy'])
autoencoder.save('autoencoder_model.h5')

