In [None]:
import os
import shutil
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import timedelta
from scipy.signal import stft, resample
from typing import Dict, List, Tuple, Any

#####################################
# 1. 설정 클래스: 모든 파라미터를 중앙에서 관리
#####################################

class Config:
    """
    프로젝트의 모든 설정을 담는 클래스
    """
    def __init__(self):
        # STFT 파라미터
        self.fs = 80000
        self.nperseg = 1024
        self.hop_length = self.nperseg // 4
        self.freq_limit = 1000
        self.target_time_bins = 16

        # 윈도우 파라미터
        self.window_size_atoms = 5
        self.step = 1
        self.window_duration_seconds = self.window_size_atoms * 0.0102

        # 데이터 분할 파라미터
        self.block_size_seconds = 20
        self.train_ratio = 0.6
        self.val_ratio = 0.2

        # 입출력 디렉토리
        self.output_stft_dir = "processed_data_stft"
        self.output_raw_dir = "processed_data_raw"
        
        # 입력 파일 경로
        self.normal_files = {f"normal_{i}": f"raw_data/normal/normal_{i}.csv" for i in range(1, 21)}
        self.abnormal_files = {
            "abnormal_1": {f"abnormal_1_{i}": f"raw_data/abnormal/abnormal_1_{i}.csv" for i in range(1, 4)},
            "abnormal_2": {f"abnormal_2_{i}": f"raw_data/abnormal/abnormal_2_{i}.csv" for i in range(1, 3)},
            "abnormal_3": {f"abnormal_3_{i}": f"raw_data/abnormal/abnormal_3_{i}.csv" for i in range(1, 3)},
            "abnormal_4": {f"abnormal_4_{i}": f"raw_data/abnormal/abnormal_4_{i}.csv" for i in range(3, 6)},
        }
        self.all_files = {"normal": self.normal_files, **self.abnormal_files}
        
        # 증강 설정
        self.classes_to_augment = ['abnormal_1', 'abnormal_2']

#####################################
# 2. 기본 유틸리티 함수: 데이터 로드, 원자 추출, 윈도우 생성 등
#####################################

def load_data(file_path: str) -> pd.DataFrame:
    """CSV 파일을 읽어 'time' 컬럼을 datetime으로 변환 후 인덱스로 설정"""
    df = pd.read_csv(file_path)
    df['time'] = pd.to_datetime(df['time'], errors='coerce')
    df.set_index("time", inplace=True)
    return df

def group_atoms(data: pd.DataFrame) -> List[pd.DataFrame]:
    """원자(Valid Pattern) 추출"""
    data = data.copy().sort_index()
    data['time_interval'] = data.index.to_series().diff().dt.total_seconds().fillna(0)
    data['is_interval'] = (data['time_interval'] >= 0.7) & (data['time_interval'] <= 0.75)
    data['is_vibration'] = data['time_interval'] < 0.1
    data['group'] = data['is_interval'].cumsum()
    
    atoms = []
    for _, group in data.groupby('group'):
        if group['is_interval'].any() and group['is_vibration'].any():
            atom = group[group['is_vibration']]
            if len(atom) > 1:
                atoms.append(atom)
    return atoms

def create_sliding_windows(atoms: List[pd.DataFrame], window_size: int, step: int) -> List[pd.DataFrame]:
    """원자 리스트로부터 슬라이딩 윈도우 생성"""
    return [pd.concat(atoms[i:i+window_size]) for i in range(0, len(atoms) - window_size + 1, step)]

def augment_window(window: pd.DataFrame, augment_factor: int, methods=['noise', 'shift', 'scaling']) -> List[pd.DataFrame]:
    """주어진 윈도우에 대해 데이터 증강 수행"""
    augmented_windows = []
    data = window[['x', 'y', 'z']].values
    for _ in range(augment_factor):
        for method in methods:
            if method == 'noise':
                noise = np.random.normal(0, 0.01, data.shape)
                aug_data = data + noise
            elif method == 'shift':
                shift = np.random.randint(1, int(data.shape[0] * 0.2) + 1)
                aug_data = np.roll(data, shift, axis=0)
            elif method == 'scaling':
                scale = np.random.uniform(0.9, 1.1)
                aug_data = data * scale
            else:
                aug_data = data.copy()
            
            aug_window = pd.DataFrame(aug_data, columns=['x', 'y', 'z'], index=window.index)
            augmented_windows.append(aug_window)
    return augmented_windows

def get_time_segments_by_blocks(data: pd.DataFrame, config: Config) -> Dict[str, List[Tuple[pd.Timestamp, pd.Timestamp]]]:
    """데이터를 20초 블록으로 나누고, 각 블록의 train/val/test 시간 구간 반환"""
    segments = {'train': [], 'val': [], 'test': []}
    total_seconds = (data.index[-1] - data.index[0]).total_seconds()

    for i in range(int(total_seconds / config.block_size_seconds) + 1):
        block_start = data.index[0] + timedelta(seconds=i * config.block_size_seconds)
        block_end = block_start + timedelta(seconds=config.block_size_seconds)
        block_data = data[(data.index >= block_start) & (data.index < block_end)]
        
        if len(block_data) > 0:
            n = len(block_data)
            train_end_idx = int(n * config.train_ratio)
            val_end_idx = train_end_idx + int(n * config.val_ratio)
            
            if train_end_idx > 0:
                segments['train'].append((block_data.index[0], block_data.index[train_end_idx - 1]))
            if val_end_idx > train_end_idx:
                segments['val'].append((block_data.index[train_end_idx], block_data.index[val_end_idx - 1]))
            if n > val_end_idx:
                segments['test'].append((block_data.index[val_end_idx], block_data.index[-1]))
                
    return segments

#####################################
# 3. 데이터 변환 및 저장 함수
#####################################

def resample_window(window: pd.DataFrame, config: Config) -> pd.DataFrame:
    """윈도우를 target_fs 기준으로 리샘플링"""
    num_samples = len(window)
    end_time = num_samples / config.fs
    new_time = np.arange(0, end_time, 1 / config.fs)
    if len(new_time) < 2: return pd.DataFrame() # 리샘플링 불가 케이스
    
    resampled_signal = resample(window[['x', 'y', 'z']].values, len(new_time), axis=0)
    resampled_window = pd.DataFrame(resampled_signal, columns=['x', 'y', 'z'])
    resampled_window['time'] = pd.to_datetime(new_time, unit='s')
    return resampled_window.set_index('time')

def window_to_stft_image(window: pd.DataFrame, config: Config) -> np.ndarray:
    """윈도우에서 STFT 이미지(3채널) 생성"""
    noverlap = config.nperseg - config.hop_length
    stft_images = []
    for axis in ['x', 'y', 'z']:
        f, t, Zxx = stft(window[axis].values, fs=config.fs, nperseg=config.nperseg, noverlap=noverlap)
        freq_idx = f <= config.freq_limit
        stft_img = np.abs(Zxx[freq_idx, :])
        
        # 시간 축 길이 맞추기 (패딩 또는 절삭)
        if stft_img.shape[1] < config.target_time_bins:
            padding = np.zeros((stft_img.shape[0], config.target_time_bins - stft_img.shape[1]))
            stft_img = np.concatenate((stft_img, padding), axis=1)
        else:
            stft_img = stft_img[:, :config.target_time_bins]
        stft_images.append(stft_img)
    return np.stack(stft_images, axis=0)

def window_to_raw_signal(window: pd.DataFrame, config: Config) -> np.ndarray:
    """윈도우에서 Raw 신호(3채널) 생성"""
    values = window[['x', 'y', 'z']].values.T  # shape: (3, L)
    target_raw_length = int(config.window_duration_seconds * config.fs)
    if values.shape[1] != target_raw_length:
        values = resample(values, target_raw_length, axis=1)
    return values

def save_data_for_split(class_name: str, split_name: str, windows: List[pd.DataFrame], metadata: List[Dict], config: Config):
    """지정된 split의 윈도우들을 STFT와 Raw 신호로 변환하여 저장"""
    if not windows:
        print(f"  ⏩ No windows to save for class '{class_name}', split '{split_name}'.")
        return

    stft_tensors, raw_tensors = [], []
    for window in windows:
        resampled = resample_window(window, config)
        if not resampled.empty:
            stft_tensors.append(window_to_stft_image(resampled, config))
            raw_tensors.append(window_to_raw_signal(resampled, config))

    # STFT 저장
    stft_dir = os.path.join(config.output_stft_dir, split_name)
    os.makedirs(stft_dir, exist_ok=True)
    stft_path = os.path.join(stft_dir, f"{class_name}_stft_tensors.npy")
    np.save(stft_path, np.stack(stft_tensors))

    # Raw 신호 저장
    raw_dir = os.path.join(config.output_raw_dir, split_name)
    os.makedirs(raw_dir, exist_ok=True)
    raw_path = os.path.join(raw_dir, f"{class_name}_raw_tensors.npy")
    np.save(raw_path, np.stack(raw_tensors))

    print(f"  💾 Saved {class_name}/{split_name}: {len(windows)} windows ({len(stft_tensors)} STFT, {len(raw_tensors)} Raw)")
    
    # 메타데이터 저장
    meta_df = pd.DataFrame(metadata)
    meta_filename = f"{class_name}_window_metadata_{split_name}.csv"
    meta_df_path = os.path.join(config.output_raw_dir, meta_filename)
    meta_df.to_csv(meta_df_path, index=False)
    print(f"  📝 Saved metadata: {meta_df_path}")

#####################################
# 4. 데이터 처리 워크플로우 함수
#####################################

def generate_windows_for_class(class_name: str, files: Dict[str, str], config: Config, 
                               augment_splits: Dict[str, int] = {}) -> Tuple[Dict[str, list], List[Dict]]:
    """
    클래스에 속한 모든 파일에 대해 윈도우를 생성하고, 지정된 split에 증강 적용
    augment_splits: {'train': 5, 'val': 1} 처럼 증강할 split과 계수를 지정
    """
    class_windows = {'train': [], 'val': [], 'test': []}
    class_metadata = []

    for file_id, file_path in files.items():
        print(f"    📄 Processing file: {file_id}")
        data = load_data(file_path)
        time_segments = get_time_segments_by_blocks(data, config)

        for split, segments in time_segments.items():
            for seg_idx, (start_time, end_time) in enumerate(segments):
                block_data = data.loc[start_time:end_time]
                if block_data.empty: continue

                atoms = group_atoms(block_data)
                windows = create_sliding_windows(atoms, config.window_size_atoms, config.step)
                
                # 원본 윈도우 추가
                class_windows[split].extend(windows)
                
                # 메타데이터 기록
                for win_idx, window in enumerate(windows):
                    class_metadata.append({
                        "class": class_name, "file": file_id, "split": split, "block_index": seg_idx,
                        "window_idx": win_idx, "start_time": window.index[0], "end_time": window.index[-1],
                        "is_augmented": False
                    })
                
                # 증강 적용
                augment_factor = augment_splits.get(split, 0)
                if augment_factor > 0:
                    augmented_wins = []
                    for window in windows:
                        augmented_wins.extend(augment_window(window, augment_factor))
                    class_windows[split].extend(augmented_wins)
                    print(f"      ➕ Augmented {split} with {len(augmented_wins)} windows (factor={augment_factor})")

    return class_windows, class_metadata

def run_processing(config: Config):
    """Train/Val/Test 데이터셋을 생성. Train set의 abnormal_1,2는 증강"""
    print("\n🚀 Starting Data Processing (Train/Val/Test)")
    for class_name, files in config.all_files.items():
        print(f"  🔹 Processing class: {class_name}")
        
        # abnormal_1, 2의 train split에만 증강 적용
        augment_config = {}
        if class_name in config.classes_to_augment:
            augment_config = {'train': 5}
            print(f"    ✨ Augmentation will be applied to 'train' split.")

        all_windows, all_metadata = generate_windows_for_class(class_name, files, config, augment_splits=augment_config)
        
        for split in ['train', 'val', 'test']:
            split_windows = all_windows[split]
            split_metadata = [m for m in all_metadata if m['split'] == split]
            save_data_for_split(class_name, split, split_windows, split_metadata, config)

#####################################
# 5. 결과 확인 및 메인 실행 함수
#####################################

def print_sample_counts(config: Config):
    """각 split 디렉토리별 샘플 수 출력"""
    splits_to_check = ['train', 'val', 'test']
    class_names = list(config.all_files.keys())
    
    print("\n📊 Sample Count Summary")
    print("─────────────────────────────────────────────────────────────")
    print(f"{'Split':<10} | {'Class':<12} | {'STFT':>6} | {'Raw':>6}")
    print("─────────────────────────────────────────────────────────────")

    for split in splits_to_check:
        for cls in class_names:
            stft_path = os.path.join(config.output_stft_dir, split, f"{cls}_stft_tensors.npy")
            raw_path = os.path.join(config.output_raw_dir, split, f"{cls}_raw_tensors.npy")
            
            stft_count = len(np.load(stft_path)) if os.path.exists(stft_path) else 0
            raw_count = len(np.load(raw_path)) if os.path.exists(raw_path) else 0
            
            if stft_count > 0 or raw_count > 0:
                print(f"{split:<10} | {cls:<12} | {stft_count:6d} | {raw_count:6d}")
    print("─────────────────────────────────────────────────────────────\n")


In [2]:

if __name__ == "__main__":
    # 1. 설정 객체 생성
    config = Config()

    # 2. Train/Val/Test 데이터셋 생성
    # (abnormal_1, abnormal_2의 train set은 내부적으로 증강됨)
    run_processing(config)

    # 3. 최종 결과 확인
    print_sample_counts(config)


🚀 Starting Data Processing (Train/Val/Test)
  🔹 Processing class: normal
    📄 Processing file: normal_1
    📄 Processing file: normal_2
    📄 Processing file: normal_3
    📄 Processing file: normal_4
    📄 Processing file: normal_5
    📄 Processing file: normal_6
    📄 Processing file: normal_7
    📄 Processing file: normal_8
    📄 Processing file: normal_9
    📄 Processing file: normal_10
    📄 Processing file: normal_11
    📄 Processing file: normal_12
    📄 Processing file: normal_13
    📄 Processing file: normal_14
    📄 Processing file: normal_15
    📄 Processing file: normal_16
    📄 Processing file: normal_17
    📄 Processing file: normal_18
    📄 Processing file: normal_19
    📄 Processing file: normal_20
  💾 Saved normal/train: 1904 windows (1904 STFT, 1904 Raw)
  📝 Saved metadata: processed_data_raw/normal_window_metadata_train.csv
  💾 Saved normal/val: 156 windows (156 STFT, 156 Raw)
  📝 Saved metadata: processed_data_raw/normal_window_metadata_val.csv
  💾 Saved normal/tes