In [4]:
import os
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
import librosa
from scipy.fftpack import dct
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd
from sklearn.multiclass import OneVsRestClassifier
from sklearn.svm import SVC
import matplotlib.pyplot as plt
from sklearn.model_selection import GridSearchCV

## Ekstraksi fitur menggunakan MFCC

In [18]:
# Fungsi ekstraksi MFCC
def dc_removal(signal):
    mean = np.mean(signal)
    return signal - mean

def pre_emphasis(signal):
    pre_emphasis = 0.97
    signal_emphasis = np.append(signal[0], signal[1:] - pre_emphasis * signal[:-1])
    return signal_emphasis

def frame_blocking(signal_emphasis, sample_rate):
    frame_size = 0.025
    frame_stride = 0.01
    frame_length = int(frame_size * sample_rate)
    frame_step = int(frame_stride * sample_rate)
    signal_length = len(signal_emphasis)
    frames_overlap = frame_length - frame_step

    num_frames = (np.abs(signal_length - frames_overlap) // np.abs(frame_length - frame_step)).astype(int)
    rest_samples = np.abs(signal_length - frames_overlap) % np.abs(frame_length - frame_step)

    pad_signal_length = int(frame_length - rest_samples)
    z = np.zeros((pad_signal_length))
    pad_signal = np.append(signal_emphasis, z)

    indices = np.tile(np.arange(0, frame_length), (num_frames, 1)) + np.tile(
        np.arange(0, num_frames * frame_step, frame_step), (frame_length, 1)
    ).T

    frames = pad_signal[indices.astype(np.int32, copy=False)]
    return frames, frame_length

def windowing(frames, frame_length):
    frames = frames * (np.hamming(frame_length))
    return frames

def fft(frames):
    NFFT = 512
    mag_frames = np.absolute(np.fft.rfft(frames, NFFT))
    pow_frames = ((1.0 / NFFT) * ((mag_frames) ** 2))
    return pow_frames, NFFT

def filter_bank(pow_frames, sample_rate, NFFT):
    nfilt = 40
    low_freq_mel = 0
    high_freq_mel = (2595 * np.log10(1 + (sample_rate / 2) / 700))  # Convert Hz to Mel
    mel_points = np.linspace(low_freq_mel, high_freq_mel, nfilt + 2)  # Equally spaced in Mel scale
    hz_points = (700 * (10 ** (mel_points / 2595) - 1))  # Convert Mel to Hz
    bin = np.floor((NFFT + 1) * hz_points / sample_rate)

    fbank = np.zeros((nfilt, int(np.floor(NFFT / 2 + 1))))
    for m in range(1, nfilt + 1):
        f_m_minus = int(bin[m - 1])  # left
        f_m = int(bin[m])  # center
        f_m_plus = int(bin[m + 1])  # right

        for k in range(f_m_minus, f_m):
            fbank[m - 1, k] = (k - bin[m - 1]) / (bin[m] - bin[m - 1])
        for k in range(f_m, f_m_plus):
            fbank[m - 1, k] = (bin[m + 1] - k) / (bin[m + 1] - bin[m])

    filter_banks = np.dot(pow_frames, fbank.T)
    filter_banks = np.where(filter_banks == 0, np.finfo(float).eps, filter_banks)  # Numerical Stability
    filter_banks = 20 * np.log10(filter_banks)  # dB

    return (filter_banks / np.amax(filter_banks)) * 255

def cepstral_liftering(filter_banks):
    num_ceps = 12
    cep_lifter = 11
    mfcc = dct(filter_banks, type=2, axis=1, norm='ortho')[:, :(num_ceps)]
    (nframes, ncoeff) = mfcc.shape
    n = np.arange(ncoeff)
    lift = 1 + (cep_lifter / 2) * np.sin(np.pi * n / cep_lifter)
    mfcc = (np.mean(mfcc, axis=0) + 1e-8)
    return mfcc

def extract_mfcc(file_path):
    try:
        # Baca file audio WAV
        audio_data, sample_rate = librosa.load(file_path, sr=None)

        # DC Removal
        dc_removed_signal = dc_removal(audio_data)

        # Pre-Emphasis
        pre_emphasized_signal = pre_emphasis(dc_removed_signal)

        # Frame Blocking
        frames, frame_length = frame_blocking(pre_emphasized_signal, sample_rate)

        # Windowing
        windowed_frames = windowing(frames, frame_length)

        # FFT, Mel Frequency Wrapping, dan DCT
        pow_frames, NFFT = fft(windowed_frames)
        filter_banks = filter_bank(pow_frames, sample_rate, NFFT)
        mfcc_features = cepstral_liftering(filter_banks)

        return mfcc_features
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None

def pad_mfcc_features(features, max_length):
    num_features = len(features)
    feature_shape = features[0].shape[1] if len(features[0].shape) > 1 else 1

    # Menyesuaikan panjang fitur tanpa DTW
    padded_features = np.zeros((num_features, max_length, feature_shape))

    for i, feature in enumerate(features):
        feature_length = len(feature)

        if feature_length < max_length:
            # Membuat pad dengan menggunakan nilai nol
            padded_feature = np.zeros((max_length, feature_shape))
            padded_feature[:feature_length, :] = feature  # Mengisi dengan fitur yang sebenarnya
            padded_features[i] = padded_feature
        else:
            # Jika panjang fitur sudah cukup, gunakan fitur tanpa modifikasi
            padded_features[i, :feature_length, :] = feature.reshape((feature_length, feature_shape))  # Reshape here

    return padded_features

## Dataset di representasikan dan dilakukan pelatihan model terhadap data latih dengan svm. Pada tahap ini data latih melewati tahap ekstraksi ciri lalu hasil tersebut akan dilakukan pelatihan model dengan SVM


In [33]:
def get_dataset(base_folder, labels):
    # Mendapatkan dataset
    dataset = {"features": [], "labels": []}

    for label in labels:
        folder_path = os.path.join(base_folder, label)
        files = os.listdir(folder_path)

        for file in files:
            file_path = os.path.join(folder_path, file)
            features = extract_mfcc(file_path)

            if features is not None:
                dataset["features"].append(features)
                dataset["labels"].append(label)

    return dataset


if __name__ == "__main__":
    base_folder = "/content/drive/MyDrive/Dataset EKG /Data Latih EKG"

    labels = ["extrastole", "normal", "murmur"]
    print(f"Analisis Kinerja MFCC dan SVM untuk Klasifikasi Aritmia Jantung")
    # Mendapatkan dataset
    dataset = get_dataset(base_folder, labels)

    # Normalisasi panjang fitur MFCC tanpa DTW
    max_length = max(len(feature) for feature in dataset["features"])

    # Sisipkan padding pada setiap sampel
    padded_features = pad_mfcc_features(dataset["features"], max_length)

    # Ubah matriks dua dimensi
    num_samples, max_length, num_features = padded_features.shape
    reshaped_features = padded_features.reshape((num_samples, max_length * num_features))

    # Normalisasi fitur MFCC menggunakan StandardScaler
    scaler = StandardScaler()
    reshaped_features_scaled = scaler.fit_transform(reshaped_features)

    # Menggunakan pipeline untuk menggabungkan normalisasi dan klasifikasi
    model = make_pipeline(StandardScaler(), OneVsRestClassifier(SVC(kernel='rbf', C=20, gamma=0.01)))

    # Melatih model
    model.fit(reshaped_features_scaled, dataset["labels"])

    print("\n----------------Model Training Information----------------")
    print(f"Kernel: {model.named_steps['onevsrestclassifier'].estimators_[0].kernel}")
    print(f"C: {model.named_steps['onevsrestclassifier'].estimators_[0].C}")
    print(f"Gamma: {model.named_steps['onevsrestclassifier'].estimators_[0].gamma}")

Analisis Kinerja MFCC dan SVM untuk Klasifikasi Aritmia Jantung

----------------Model Training Information----------------
Kernel: rbf
C: 20
Gamma: 0.01


## Setelah model SVM dari data latih didapat maka dilanjutkan dengan menguji kinerja model pada data uji

In [34]:
    # Evaluasi model menggunakan data uji
    correct_predictions = 0
    total_samples = 0
    true_labels = []
    predicted_labels = []

    unique_samples = set()

    for label in labels:
        test_folder = os.path.join("/content/drive/MyDrive/Dataset EKG /Data Uji EKG", label)
        files = os.listdir(test_folder)

        for file in files:
            test_audio_file = os.path.join(test_folder, file)
            # print(f"Test Audio File: {test_audio_file}")

            # Ekstraksi fitur MFCC
            mfcc_features_test = extract_mfcc(test_audio_file)

            if mfcc_features_test is not None:
                # Menampilkan audio waveform
                sample_rate, audio_data_test = librosa.load(test_audio_file, sr=None)

                # Normalisasi panjang fitur MFCC pada data uji
                mfcc_features_test = pad_mfcc_features([mfcc_features_test], max_length)[0]

                # Ratakan dimensi kedua dan ketiga dari fitur MFCC
                mfcc_features_test_flat = mfcc_features_test.reshape((1, -1))

                # Normalisasi fitur MFCC uji menggunakan StandardScaler
                mfcc_features_test_flat_scaled = scaler.transform(mfcc_features_test_flat)

                # Prediksi label menggunakan model yang telah dilatih
                recognized_label = model.predict(mfcc_features_test_flat_scaled)

                # Uji akurasi
                total_samples += 1
                true_labels.append(label)
                predicted_labels.append(recognized_label[0])
                unique_samples.add(file)

                # Perbarui correct_predictions
                if recognized_label[0] == label:
                    correct_predictions += 1

    # Hitung total sampel unik
    total_samples = len(unique_samples)

## Evaluasi dan optimasi


In [35]:
    # Menghitung metrik evaluasi
    accuracy = accuracy_score(true_labels, predicted_labels)
    precision = precision_score(true_labels, predicted_labels, average='weighted')
    recall = recall_score(true_labels, predicted_labels, average='weighted')
    f1 = f1_score(true_labels, predicted_labels, average='weighted')
    print(f"\n----------------Model Evaluation----------------")
    print(f"Total Samples: {total_samples}")
    print(f"Correct Predictions: {correct_predictions}")
    print(f"Accuracy: {accuracy * 100:.2f}%")
    print(f"Precision: {precision:.2f}")
    print(f"Recall: {recall:.2f}")
    print(f"F1-Score: {f1:.2f}")


----------------Model Evaluation----------------
Total Samples: 45
Correct Predictions: 33
Accuracy: 73.33%
Precision: 0.76
Recall: 0.73
F1-Score: 0.74


## Simpan dataframe dalam bentuk csv

In [39]:
    df = pd.DataFrame({'True_Label': true_labels, 'Predicted_Label': predicted_labels})
    df_new = pd.concat([df, pd.DataFrame(reshaped_features_scaled)], axis=1)
    df_new.to_csv("/content/drive/MyDrive/Dataset EKG /df_result.csv", index=False)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
pip install python_speech_features

Collecting python_speech_features
  Downloading python_speech_features-0.6.tar.gz (5.6 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: python_speech_features
  Building wheel for python_speech_features (setup.py) ... [?25l[?25hdone
  Created wheel for python_speech_features: filename=python_speech_features-0.6-py3-none-any.whl size=5869 sha256=37941ad97c4bcbf88afd7afdeaecd510a2e0938022614f2001d7af89954cc579
  Stored in directory: /root/.cache/pip/wheels/5a/9e/68/30bad9462b3926c29e315df16b562216d12bdc215f4d240294
Successfully built python_speech_features
Installing collected packages: python_speech_features
Successfully installed python_speech_features-0.6
