In [2]:
# @title
import pandas as pd
import numpy as np


# 0번 라벨(normal) + 2번 라벨(fake) -> 0번 라벨로 바꿔서 0과 1라벨로 이루어진 데이터셋 반환
def load_dataset(file="./all_dataset.csv"):
    df = pd.read_csv(file)
    df["label"] = df["label"].apply(lambda x: 1 if x == 1 else 0)

    df = df.sample(frac=1).reset_index(drop=True)

    feature_cols = [col for col in df.columns if col.startswith("v")]

    X_all = df[feature_cols].values
    y_all = df["label"].values

    return X_all, y_all

# 데이터 셋을 train, val, test로 분리하는 함수. val과 test는 normall:falling 비율이 1:1이 되도록함\
def split_dataset(X_all, y_all, val_ratio=0.2, test_ratio=0.2):
    y_all = np.asarray(y_all)

    idx_normal = np.where(y_all == 0)[0]
    idx_falling = np.where(y_all == 1)[0]

    # 셔플
    np.random.shuffle(idx_normal)
    np.random.shuffle(idx_falling)

    total_len = len(y_all)
    val_each = int(total_len * val_ratio) // 2
    test_each = int(total_len * test_ratio) // 2

    # val 데이터셋
    val_idx = np.concatenate([idx_normal[:val_each], idx_falling[:val_each]])

    # test 데이터셋
    test_idx = np.concatenate([idx_normal[val_each:val_each + test_each], idx_falling[val_each:val_each + test_each]])

    # train 데이터셋
    train_idx = np.concatenate([idx_normal[val_each + test_each:], idx_falling[val_each + test_each:]])

    # 데이터셋 분리후 내부에서 한번 더 셔플
    np.random.shuffle(train_idx)
    np.random.shuffle(val_idx)
    np.random.shuffle(test_idx)

    # 데이터셋 반환
    X_train = X_all[train_idx]
    y_train = y_all[train_idx]

    X_val = X_all[val_idx]
    y_val = y_all[val_idx]

    X_test = X_all[test_idx]
    y_test = y_all[test_idx]

    print(f"[Split 완료] Train {len(X_train)}, Val {len(X_val)}, Test {len(X_test)}")
    print(f" > Val normal/falling : {np.sum(y_val == 0)} / {np.sum(y_val == 1)}")
    print(f" > Test normal/falling : {np.sum(y_test == 0)} / {np.sum(y_test == 1)}")

    return X_train, y_train, X_val, y_val, X_test, y_test

In [3]:
# @title
import numpy as np
import random as pyrandom


# 스파이크 노이즈 추가 함수
def make_spike_noise(v_data):
    v_new = np.array(v_data, dtype=int)
    data_len = len(v_new)

    # 무작위 위치 선택
    duration = pyrandom.randint(1, 3)
    start_idx = pyrandom.randint(0, data_len - duration)
    end_idx = start_idx + duration

    if pyrandom.random() < 0.5:  # 하한 스파이크
        value = pyrandom.randint(1, 10)
    else:  # 상한 스파이크
        value = pyrandom.randint(2900,3000)


    v_new[start_idx:end_idx] = value

    return v_new

# 가우시안 노이즈 추가 함수
def make_gaussian_noise(v_data):
    v_new = np.array(v_data, dtype=float)

    # 가우시안 노이즈 생성
    noise = np.random.normal(scale=10.0, size=v_new.shape)
    v_new = v_new + noise

    # 데이터 보정(최대/최소, 자료형)
    v_new = np.clip(v_new, 1.0, 3000.0)
    v_new = v_new.astype(int)

    return v_new

# 시간 왜곡 함수
def make_time_warping(v_data):
    v_new = np.array(v_data, dtype=float)
    data_len = len(v_new)

    # 배속 설정
    scale = pyrandom.uniform(0.8, 1.2)
    new_len = int(data_len * scale)

    x_axis = np.arange(data_len)
    x_axis_new = np.linspace(0, data_len - 1, new_len)
    v_scaled = np.interp(x_axis_new, x_axis, v_new)

    v_final = np.zeros(data_len, dtype=float)

    if new_len > data_len:
        cut = new_len - data_len
        start_idx = cut // 2
        end_idx = start_idx + data_len

        v_final = v_scaled[start_idx:end_idx]
    else:
        padding = data_len - new_len
        padding_left = padding // 2
        padding_right = padding - padding_left

        v_final[padding_left:padding_left+new_len] = v_scaled
        v_final[:padding_left] = v_scaled[0]
        v_final[padding_left+new_len:] = v_scaled[-1]

    v_final = v_final.astype(int)

    return v_final

# 시간축 이동 증강 함수
def make_time_shift(v_data):
    v_new = np.array(v_data, dtype=int)

    shift = pyrandom.randint(-10, 10)

    if shift == 0:
        return v_new

    v_final = np.zeros_like(v_new, dtype=int)

    if shift > 0:
        v_final[shift:] = v_new[:-shift]
        v_final[:shift] = v_new[0]
    else:
        v_final[:shift] = v_new[-shift:]
        v_final[shift:] = v_new[-1]

    return v_final

# 스케일 변화 증강 함수
def make_scale_different(v_data):
    v_new = np.array(v_data, dtype=float)

    mean = np.mean(v_data)
    v_new = v_new - mean

    # 새로운 랜덤 거리 모사
    distance_new = pyrandom.uniform(1, 2.5)
    scale_factor = (1.5 / distance_new)**2
    v_new = v_new * scale_factor

    v_new = v_new + mean
    v_new = np.clip(v_new, 1.0, pyrandom.uniform(2950,3000))
    v_new = v_new.astype(int)

    return v_new

# 특정 데이터(v0~v299)에 대해서 랜덤한 증강(1개~3개)을 적용하는 함수
def augment(X, factor, label):
    n_to_add = len(X) * (factor - 1)
    X_augmented = []

    aug_functions = [make_spike_noise, make_gaussian_noise, make_time_warping, make_time_shift, make_scale_different]

    for _ in range(n_to_add):
        sample_data = X[pyrandom.randint(0, len(X) - 1)].copy()

        n_to_augment = pyrandom.randint(1, 3)
        selected_functions = pyrandom.sample(aug_functions, n_to_augment)

        for func in selected_functions:
            sample_data = func(sample_data)

        X_augmented.append(sample_data)

    X_augmented = np.array(X_augmented)
    y_augmented = np.full(len(X_augmented), label, dtype=int)

    return X_augmented, y_augmented

# 데이터 셋에 대해서 정한 배율만큼 데이터를 증강시키는 함수
def augment_dataset(X, y, normal_aug, falling_aug):
    # 클래스별로 데이터 분리
    X_normal = X[y == 0]
    X_falling = X[y == 1]

    X_augmented = [X]
    y_augmented = [y]

    # 증강
    X_normal_augmented, y_normal_augmented = augment(X_normal, normal_aug, 0)
    X_augmented.append(X_normal_augmented)
    y_augmented.append(y_normal_augmented)

    X_falling_augmented, y_falling_augmented = augment(X_falling, falling_aug, 1)
    X_augmented.append(X_falling_augmented)
    y_augmented.append(y_falling_augmented)

    # 증강된 데이터셋 정리 및 셔플
    X_final = np.vstack(X_augmented)
    y_final = np.concatenate(y_augmented)

    rand_idx = np.random.permutation(len(X_final))
    X_final = X_final[rand_idx]
    y_final = y_final[rand_idx]

    return X_final, y_final

In [4]:
# @title
import librosa
import numpy as np


# 센서 값 위치를 정하면 정해진 윈도우 크기의 앞 센서값을 불러오는 함수
def get_window(v, size, end):
    return v[end - size + 1 : end + 1]

# F0: V(t) - median (window: 50)
def calculate_Detrend(v):
    output = np.zeros(250)

    for t in range(250):
       t_in = t + 50
       window = get_window(v, 50, t_in)

       output[t] = v[t_in] - np.median(window)

    return output

# F1: MovingMAD (window: 50)
def calculate_MovingMAD(v):
    output = np.zeros(250)

    for t in range(250):
        t_in = t + 50
        window = get_window(v, 50, t_in)

        # MAD 계산
        abs_deviations = np.abs(window - np.median(window))
        mad = np.median(abs_deviations)

        output[t] = mad

    return output

# F2: MovingKurtosis (window: 50)
def calculate_MovingKurtosis(v):
    output = np.zeros(250)

    for t in range(250):
        t_in = t + 50
        window = get_window(v, 50, t_in)

        # Kurtosis 계산
        kurtosis = np.sum((((window - np.mean(window)) / np.std(window))**4) / len(window)) - 3

        output[t] = kurtosis

    return output

# F3: 미분값(기울기)
def calculate_Gradient(v):
    output = np.asarray(v, dtype=float)
    output = output[50:300] - output[49:299]

    return output

# F4: 창적분값 (window: 15)
def calculate_Integral(v):
    output = np.asarray(v, dtype=float)
    c = np.cumsum(output)
    sum = c[50:300] - c[35:285]

    return sum

# F5~F12: STFT (window: 50)
def calculate_STFT(v):
    n_fft = 50
    STFT_BANDS = [(1, 2), (2, 3), (3, 4), (4, 5),
                  (5, 6), (6, 7), (7, 8), (8, 15)]

    stft = librosa.stft(y=v.astype(float), n_fft=n_fft, win_length=50, hop_length=1, center=False)
    mag = np.abs(stft)
    mag = mag[:, 1:251]

    # 라이브러리에서 나온 각 밴드가 실제 출력 밴드 어디에 해당하는지 계산
    fft_freqs = librosa.fft_frequencies(sr=50, n_fft=n_fft)
    band_indices_list = []
    for f_min, f_max in STFT_BANDS:
        indices = np.where((fft_freqs >= f_min) & (fft_freqs < f_max))[0]
        band_indices_list.append(indices)

    output_map = np.zeros((250, len(STFT_BANDS)), dtype=np.float32)

    for i, band_indices in enumerate(band_indices_list):
        if len(band_indices) == 0:
            continue
        band_energy = mag[band_indices, :].sum(axis=0)
        output_map[:, i] = band_energy

    return output_map

# (300,) 의 데이터를 받고 (250, 13) 데이터로 바꾸는 처리 함수
def preprocess_data(v):
    feature_map = np.zeros((250, 13))

    feature_map[:, 0] = calculate_Detrend(v)
    feature_map[:, 1] = calculate_MovingMAD(v)
    feature_map[:, 2] = calculate_MovingKurtosis(v)
    feature_map[:, 3] = calculate_Gradient(v)
    feature_map[:, 4] = calculate_Integral(v)
    feature_map[:, 5:] = calculate_STFT(v)

    return feature_map

def create_feature(dataset):
    n_data = dataset.shape[0]

    X_processed = np.zeros((n_data, 250, 13))  # (N, 250, 13)의 빈 3D 텐서 생성
    for i in range(n_data):
        v = dataset[i, :]
        X_processed[i, :, :] = preprocess_data(v)

    return X_processed


In [7]:
# @title
import numpy as np
import tensorflow as tf
import joblib
from tensorflow.keras import layers, models
from tensorflow.keras.metrics import AUC
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import RobustScaler

def build_model(input_shape):
    model = models.Sequential([
        layers.Input(shape=input_shape),

        layers.Conv1D(20, 10, padding="same", activation="relu"),
        layers.MaxPooling1D(2),
        layers.Dropout(0.2),

        layers.Conv1D(64, 12, padding="same", activation="relu"),
        layers.MaxPooling1D(2),
        layers.Dropout(0.1),

        layers.Bidirectional(layers.LSTM(64, return_sequences=False)),
        layers.Dropout(0.1),

        layers.Dense(64, activation="relu"),
        layers.Dropout(0.3),
        layers.Dense(32, activation="relu"),
        layers.Dropout(0.2),
        layers.Dense(16, activation="relu"),
        layers.Dense(1, activation="sigmoid"),
    ])
    return model


# 메인 코드
BATCH_SIZE = 64
EPOCHS = 100
LEARNING_RATE = 1e-4
MODEL_PATH = "best_model.keras"

# 데이터셋 로드
print(">> 데이터 로드 중...")
X_all, y_all = load_dataset("all_dataset.csv")

X_train_raw, y_train, X_val_raw, y_val, X_test_raw, y_test = split_dataset(X_all, y_all, 0.2, 0.2)

# 데이터 증강
print(">> 데이터 증강 중...")
X_train_aug_raw, y_train_aug = augment_dataset(X_train_raw, y_train, normal_aug=4, falling_aug=16)

# feature 생성
print(">> feature 생성 중...")
X_train_feat = create_feature(X_train_aug_raw)
X_val_feat   = create_feature(X_val_raw)
X_test_feat  = create_feature(X_test_raw)

# RobustScaler 스케일러 적용
print(">> RobustScaler 적용 중...")
scaler = RobustScaler()

N_train, T, F = X_train_feat.shape
X_train_flat = X_train_feat.reshape(-1, F)
X_val_flat   = X_val_feat.reshape(-1, F)
X_test_flat  = X_test_feat.reshape(-1, F)

scaler.fit(X_train_flat)
joblib.dump(scaler, "robust_scaler.pkl")

X_train = scaler.transform(X_train_flat).reshape(N_train, T, F)
X_val   = scaler.transform(X_val_flat).reshape(X_val_feat.shape)
X_test  = scaler.transform(X_test_flat).reshape(X_test_feat.shape)

# 모델
input_shape = X_train.shape[1:]
model = build_model(input_shape)
model.summary()

model.compile(
    optimizer=tf.keras.optimizers.Adam(LEARNING_RATE),
    loss="binary_crossentropy",
    metrics=["accuracy", AUC(name="auc")]
)

callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor="val_auc", mode="max",
        patience=5, restore_best_weights=True
    ),
    tf.keras.callbacks.ModelCheckpoint(
        MODEL_PATH,
        monitor="val_auc", mode="max",
        save_best_only=True
    )
]

# 학습
print(">> 모델 학습 시작...")
model.fit(
    X_train, y_train_aug,
    validation_data=(X_val, y_val),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=callbacks,
    shuffle=True,
    verbose=1
)

# 평가
val_loss, val_acc, val_auc = model.evaluate(X_val, y_val, verbose=0)
test_loss, test_acc, test_auc = model.evaluate(X_test, y_test, verbose=0)

print("[VAL]", f"loss={val_loss:.4f} acc={val_acc:.4f} auc={val_auc:.4f}")
print("[TEST]", f"loss={test_loss:.4f} acc={test_acc:.4f} auc={test_auc:.4f}")

y_prob = model.predict(X_test, batch_size=BATCH_SIZE, verbose=0).ravel()
y_pred = (y_prob >= 0.4).astype(int)

print(classification_report(y_test, y_pred, digits=4))
print(confusion_matrix(y_test, y_pred))

print("\n베스트 모델 저장:", MODEL_PATH)

>> 데이터 로드 중...
[Split 완료] Train 366, Val 122, Test 122
 > Val normal/falling : 61 / 61
 > Test normal/falling : 61 / 61
>> 데이터 증강 중...
>> feature 생성 중...
>> RobustScaler 적용 중...


>> 모델 학습 시작...
Epoch 1/100
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 30ms/step - accuracy: 0.5099 - auc: 0.5644 - loss: 0.6955 - val_accuracy: 0.7787 - val_auc: 0.8584 - val_loss: 0.6534
Epoch 2/100
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 17ms/step - accuracy: 0.6056 - auc: 0.6836 - loss: 0.6696 - val_accuracy: 0.7951 - val_auc: 0.8875 - val_loss: 0.5900
Epoch 3/100
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 17ms/step - accuracy: 0.6842 - auc: 0.7743 - loss: 0.6209 - val_accuracy: 0.8361 - val_auc: 0.8904 - val_loss: 0.4978
Epoch 4/100
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 17ms/step - accuracy: 0.7403 - auc: 0.8157 - loss: 0.5585 - val_accuracy: 0.8525 - val_auc: 0.8956 - val_loss: 0.4212
Epoch 5/100
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 16ms/step - accuracy: 0.7939 - auc: 0.8579 - loss: 0.4848 - val_accuracy: 0.8279 - val_auc: 0.9145 - val_loss: 0.3883
Epoch 6/100
[1