In [None]:
import pandas as pd
import numpy as np
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, UpSampling1D, BatchNormalization, LeakyReLU, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping

In [None]:
# Load dataset
mitbih_df = pd.read_csv('../datasets/mitbih_100_signals.csv')
nstdb_em_df = pd.read_csv('../datasets/nstdb_em_signal.csv')
nstdb_ma_df = pd.read_csv('../datasets/nstdb_ma_signal.csv')
nstdb_bw_df = pd.read_csv('../datasets/nstdb_bw_signal.csv')

## Preprocessing

In [None]:
# Normalisasi ke [-1, 1]
def normalize(signal):
    min_val = np.min(signal)
    max_val = np.max(signal)
    range_val = max_val - min_val
    if range_val > 0:
        return 2 * (signal - min_val) / range_val - 1
    else:
        return signal

# Terapkan normalisasi [-1, 1]
mitbih_df['signal_norm'] = mitbih_df['MLII'].apply(normalize)
nstdb_bw_df['signal_norm'] = nstdb_bw_df['noise1'].apply(normalize)
nstdb_ma_df['signal_norm'] = nstdb_ma_df['noise1'].apply(normalize)
nstdb_em_df['signal_norm'] = nstdb_em_df['noise1'].apply(normalize)

In [None]:
def segment_signal(signal, window_size):
    segments = []
    for i in range(0, len(signal) - window_size, window_size):
        segments.append(signal[i:i + window_size])
    return np.array(segments).reshape(-1, window_size, 1)

window_size = 256
min_len = min(len(mitbih_df), len(nstdb_bw_df), len(nstdb_ma_df), len(nstdb_em_df))

clean_signals = mitbih_df['signal_norm'].values[:min_len]
bw = nstdb_bw_df['signal_norm'].values[:min_len]
ma = nstdb_ma_df['signal_norm'].values[:min_len]
em = nstdb_em_df['signal_norm'].values[:min_len]

# Gabungkan noise
combined_noise = (bw + ma + em) / 3.0

# Segmentasi
clean_segments = segment_signal(clean_signals, window_size)
noisy_segments = segment_signal(combined_noise, window_size)

print("Shape of clean segments:", clean_segments.shape)
print("Shape of noisy segments:", noisy_segments.shape)

Shape of clean segments: (2539, 256, 1)
Shape of noisy segments: (2539, 256, 1)


## Model Autoencoder

In [None]:
def build_autoencoder(input_shape):
    input_layer = Input(shape=input_shape)

    # ENCODER
    # Blok 1
    x = Conv1D(filters=32, kernel_size=7, padding="same")(input_layer)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)
    x = Dropout(0.2)(x)

    # Blok 2
    x = Conv1D(filters=64, kernel_size=7, padding="same")(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)
    x = Dropout(0.2)(x)

    # DECODER
    # Blok 3
    x = Conv1D(filters=32, kernel_size=7, padding="same")(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)
    x = Dropout(0.2)(x)

    # Layer Output
    # Merekonstruksi sinyal kembali ke 1 channel
    decoded = Conv1D(filters=1, kernel_size=7, activation="tanh", padding="same")(x)

    # Buat dan kompilasi model
    model = Model(input_layer, decoded)
    optimizer = Adam(learning_rate=0.0005)
    model.compile(optimizer=optimizer, loss='mean_squared_error')

    return model

autoencoder = build_autoencoder(input_shape=(window_size, 1))
autoencoder.summary()

In [None]:
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True,
    verbose=1
)

history = autoencoder.fit(
    noisy_segments,
    clean_segments,
    epochs=1000,
    batch_size=32,
    validation_split=0.2,
    callbacks=[early_stopping]
)

Epoch 1/1000
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 55ms/step - loss: 0.2125 - val_loss: 0.0578
Epoch 2/1000
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.1252 - val_loss: 0.0463
Epoch 3/1000
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.1055 - val_loss: 0.0447
Epoch 4/1000
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0925 - val_loss: 0.0452
Epoch 5/1000
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0818 - val_loss: 0.0454
Epoch 6/1000
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0731 - val_loss: 0.0462
Epoch 7/1000
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0657 - val_loss: 0.0450
Epoch 8/1000
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0584 - val_loss: 0.0436
Epoch 9/1000
[1m64/64[0m [32m━━━━━━━

## Denoising dan Evaluasi

In [None]:
noisy_datasets = {
    'Baseline Wander': bw,
    'Muscle Artifact': ma,
    'Electrode Motion': em
}

def calculate_rmse(clean, denoised):
    return np.sqrt(np.mean((denoised - clean) ** 2))
def calculate_prd(clean, denoised):
    return 100 * np.sqrt(np.sum((denoised - clean) ** 2) / np.sum(clean ** 2))

for noise_type, signal in noisy_datasets.items():
    noisy_eval_segments = segment_signal(signal, window_size)
    clean_eval_segments = segment_signal(clean_signals, window_size)

    denoised = autoencoder.predict(noisy_eval_segments).reshape(-1, window_size)
    original = clean_eval_segments.reshape(-1, window_size)

    rmses = [calculate_rmse(o, d) for o, d in zip(original, denoised)]
    prds = [calculate_prd(o, d) for o, d in zip(original, denoised)]

    print(f"\nHasil untuk: {noise_type}")
    print(f"Rata-rata RMSE: {np.mean(rmses):.4f}")
    print(f"Rata-rata PRD : {np.mean(prds):.2f}%")


[1m80/80[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step

Hasil untuk: Baseline Wander
Rata-rata RMSE: 0.1880
Rata-rata PRD : 52.76%
[1m80/80[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step

Hasil untuk: Muscle Artifact
Rata-rata RMSE: 0.1880
Rata-rata PRD : 52.72%
[1m80/80[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step

Hasil untuk: Electrode Motion
Rata-rata RMSE: 0.1880
Rata-rata PRD : 52.81%
