von Likith https://www.kaggle.com/code/likith012/ecgnet-ptb-xl


In [77]:
import learn
import numpy as np
import pandas as pd
import wfdb
from scipy.signal import spectrogram
import math
from sklearn.preprocessing import MultiLabelBinarizer, StandardScaler
import keras

In [2]:
### Preprocessing
#   Using the super classes, multi label classification, excluding samples with no labels and considering atleast one label

path = '../'
Y = pd.read_csv(path + 'ptbxl_database.csv', index_col='ecg_id')

# rdsamp gibt Tupel (signal, meta) zurück
# data wird (N, L, C) NumPy-Array (Anzahl Records, Samples pro Record, Kanäle)
data = np.array([wfdb.rdsamp(path + f)[0] for f in Y.filename_lr])

# scp_codes als String gespeichert (z. B. "{'NORM': 100}").
# ast.literal_eval wandelt solche Strings in echte Python-Dictionaries um
Y.scp_codes = Y.scp_codes.apply(lambda x: ast.literal_eval(x))

# agg_df ordnet die scp_codes zu übergeordneten diagnostischen Klassen (diagnostic_class) zu.
# Mit agg_df[agg_df.diagnostic == 1] bleiben nur diagnostisch relevante Codes für Klassifikation
agg_df = pd.read_csv(path + 'scp_statements.csv', index_col=0)
agg_df = agg_df[agg_df.diagnostic == 1]


# Für jeden Record nimmt agg die Keys aus scp_codes und sammelt die zugehörigen diagnostic_class-Namen
# list(set()) entfernt Duplikate → Ergebnis: Liste mit 0, 1 oder mehreren Superklassen pro Record
def agg(y_dic):
    temp = []

    for key in y_dic.keys():
        if key in agg_df.index:  # gibt es den Code in der Mapping-Tabelle?
            c = agg_df.loc[key].diagnostic_class  # holt die Oberklasse
            if str(c) != 'nan':  # prüft, ob es nicht leer/NaN ist
                temp.append(c)  # fügt die Oberklasse zu temp hinzu
    return list(set(temp))  #  gibt eindeutige Liste zurück


# diagnostic_superclass: z. B. ['NORM'] oder ['MI'] oder ['NORM','STTC'].
# superdiagnostic_len: Anzahl Superklassen pro Record (0,1,>1)
Y['diagnostic_superclass'] = Y.scp_codes.apply(agg)
Y['superdiagnostic_len'] = Y['diagnostic_superclass'].apply(lambda x: len(x))

# nur diese drei Klassen behalten und nur Records mit GENAU 1 Superklasse (aus Multi-Label wird Single-Label)
keep_classes = {'NORM', 'STTC', 'CD'}
mask = Y['diagnostic_superclass'].apply(
    lambda xs: len(xs) == 1 and set(xs).issubset(keep_classes)
)

# gefilterte Signale + Metadaten
X_data = data[mask]  # (N, 1000, 12)
Y_data = Y[mask].copy()

# One-Hot-Encoding der Labels
mlb = MultiLabelBinarizer(classes=['CD', 'NORM', 'STTC'])  # feste Reihenfolge
y = mlb.fit_transform(Y_data['diagnostic_superclass'])
print("Klassen:", list(mlb.classes_))  # ['CD','NORM','STTC']

########

## Stratify split

# PTB-XL stellt strat_fold bereit (Folds 1–10)
# strat_fold < 9 → Trainingsdaten (Folds 1–8)
# strat_fold == 9 → Validation
# strat_fold == 10 → Test


X_train = X_data[Y_data.strat_fold < 9]
y_train = y[Y_data.strat_fold < 9]

X_val = X_data[Y_data.strat_fold == 9]
y_val = y[Y_data.strat_fold == 9]

X_test = X_data[Y_data.strat_fold == 10]
y_test = y[Y_data.strat_fold == 10]

# Löscht temporäre Variablen, um RAM freizugeben
del X_data, Y_data, y


#########

# Standardizing

def apply_scaler(X, scaler):
    # Liste für skalierte EKG-Signale
    X_tmp = []
    for x in X:
        # originale Form des EKGs
        x_shape = x.shape
        # macht 1D, formt es zu (N,1), standardisiert Werte, bringt Signal in Originalform und fügt es der Liste zu
        X_tmp.append(scaler.transform(x.flatten()[:, np.newaxis]).reshape(x_shape))
    # zu numpy-Array machen
    X_tmp = np.array(X_tmp)
    # skalierten Datensatz zurückgeben
    return X_tmp


# Standardscaler
scaler = StandardScaler()

scaler.fit(np.vstack(X_train).flatten()[:, np.newaxis].astype(float))

X_train = apply_scaler(X_train, scaler)
X_val = apply_scaler(X_val, scaler)
X_test = apply_scaler(X_test, scaler)

Klassen: ['CD', 'NORM', 'STTC']


In [18]:
'''
import joblib
# speichert die vorverarbeiteten Daten, den Scaler und die Klassenliste
np.savez("../ptbxl_preprocessed.npz",
         X_train=X_train, y_train=y_train,
         X_val=X_val, y_val=y_val,
         X_test=X_test, y_test=y_test)
joblib.dump(scaler, "../scaler.pkl")
joblib.dump(list(mlb.classes_), "../classes.pkl")
'''


['classes.pkl']

In [92]:
import joblib

# lädt die vorverarbeiteten Daten, den Scaler und die Klassenliste
data = np.load("../artifacts/ptbxl_preprocessed.npz")
X_train, y_train = data["X_train"], data["y_train"]
X_val, y_val = data["X_val"], data["y_val"]
X_test, y_test = data["X_test"], data["y_test"]

scaler = joblib.load("../artifacts/scaler.pkl")
classes = joblib.load("../artifacts/classes.pkl")


In [88]:
print("X_train:", X_train.shape)
print("X_val:", X_val.shape)
print("X_test:", X_test.shape)
print(X_train[0].shape)

print(X_train)


X_train: (10499, 1000)
X_val: (1340, 1000)
X_test: (1338, 1000)
(1000,)
[[-0.52630471 -0.51294149 -0.53075912 ...  0.3111239   0.38684882
   0.10176674]
 [ 0.02158741 -0.08531837 -0.23231382 ...  0.54275309 -1.44836706
  -1.54636403]
 [-0.12540804 -0.15213448 -0.23676822 ... -0.12540804 -0.21004178
  -0.21449619]
 ...
 [-0.21895059 -0.13877126  0.03940504 ...  0.26212541  2.23542793
   2.60514375]
 [ 0.17303726  0.35121356 -0.05859192 ... -0.46839741 -0.19667856
   2.06616045]
 [-0.25013145 -0.17886093  0.017133   ...  0.15076523  0.12403878
  -0.02295666]]


In [24]:
'''
X_train = X_train[..., None]
X_val   = X_val[..., None]
X_test  = X_test[..., None]
'''

In [25]:
# Anzahl Samples pro Klasse anzeigen
count_class = list(classes)


def show_counts(y, name):
    cnt = y.sum(axis=0).astype(int)
    print(f"\n{name} counts:")
    for c, n in zip(count_class, cnt):
        print(f"{c:}: {n}")
    print("total:", int(cnt.sum()))


show_counts(y_train, "Train")
show_counts(y_val, "Val")
show_counts(y_test, "Test")



Train counts:
   CD: 1353
 NORM: 7243
 STTC: 1903
total: 10499

Val counts:
   CD: 171
 NORM: 914
 STTC: 255
total: 1340

Test counts:
   CD: 184
 NORM: 912
 STTC: 242
total: 1338


In [6]:
# Nur den ersten Kanal (erste EKG-Ableitung) aus den Daten nehmen.
# weil Wearable auch nur 1 Kanal hat
'''
X_train = X_train[:, :, 0]
X_val = X_val[:, :, 0]
X_test = X_test[:, :, 0]
'''

'\nX_train = X_train[:, :, 0]\nX_val = X_val[:, :, 0]\nX_test = X_test[:, :, 0]\n'

In [93]:
class DataGen(keras.utils.Sequence):
    # X = Eingabedaten, y = Labels, batch_size = wie viele Samples pro Schritt an das Modell gehen
    # sampling = Abtastrate des Signals (z. B. 100 Hz), window_len + overlap_len = Parameter für das Spektrogramm
    def __init__(self, X, y, batch_size=32, window_len=40, overlap_len=10, sampling=100, **kwargs):
        super().__init__(**kwargs)
        self.batch_size = batch_size
        self.X = X
        self.y = y
        self.sampling = sampling
        self.window_len = window_len
        self.overlap_len = overlap_len

    # Gibt an, wie viele Batches pro Epoche existieren
    def __len__(self):
        return math.ceil(len(self.X) / self.batch_size)

    # Schneidet ein Stück (Batch) aus den Daten raus; batch_x = Signale, batch_y = zugehörige Labels
    def __getitem__(self, idx):
        batch_x = self.X[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]

        spectrogram_data = []
        for item in batch_x:
            # sicherstellen dass numpy Array ist und unnötige Dimensionen entfernen
            sig = np.asarray(item).squeeze().astype(np.float32)  # (1000,1) -> (1000,)
            # nper = Anzahl Samples pro Fenster; ist der kleinste Wert von den beiden Parametern weil wenn sig.shape[0] kleiner als window_len ist -> Fehler
            nper = min(self.window_len, sig.shape[0])
            # nover = Anzahl Samples, die sich zwei Fenster überlappen; Overlap darf nicht größer als Fenster sein
            nover = min(self.overlap_len, max(0, nper - 1))

            f, t, Sxx = spectrogram(sig, fs=self.sampling, nperseg=nper, noverlap=nover)
            # die unteren 13 Frequenzbänder nehmen und Matrix transposen aus (freq, time) wird (time, freq)
            Sxx = Sxx[:13].transpose()
            # Log macht Werte gleichmäßiger, nur positive Werte werden geloggt
            Sxx = np.where(Sxx > 0, np.log(Sxx), 0.0)

            # zusätzlicher Sicherheitsschritt, falls irgendwo kleine negative Werte auftauchen
            Sxx = abs(Sxx)
            # Maske für positive Werte
            mask = Sxx > 0
            # nochmal log anwenden
            Sxx[mask] = np.log(Sxx[mask])

            spectrogram_data.append(Sxx)

        # X_1: Rohsignal umformen auf (batch_size, 1000, 1) .
        # X_2: Spektrogramme als NumPy-Array
        # final_X = (X_1, X_2) → ein Tupel von zwei Inputs
        X_1 = batch_x.reshape(-1, 1000, 1).astype("float32")

        X_2 = np.array(spectrogram_data)

        final_X = (X_1, X_2)

        return final_X, batch_y.astype("float32")


window_len = 40
overlap_len = 10

train_gen = DataGen(X_train, y_train, window_len=window_len, overlap_len=overlap_len)
val_gen = DataGen(X_val, y_val, window_len=window_len, overlap_len=overlap_len)
test_gen = DataGen(X_test, y_test, window_len=window_len, overlap_len=overlap_len)

In [94]:
import keras
from keras import layers, ops


@keras.saving.register_keras_serializable(name="attention")
class attention(layers.Layer):

    def __init__(self, return_sequences=True, **kwargs):
        super().__init__(**kwargs)
        self.return_sequences = return_sequences

    def build(self, input_shape):
        # input_shape = (Batch, Zeit, Features)
        # time = Länge der Sequenz
        time = int(input_shape[1])
        # feat = Anzahl Features pro Zeit
        feat = int(input_shape[2])
        # Gewichtsmatrix die lernt wie wichtig jede Feature Kombi ist
        self.W = self.add_weight(name="att_weight", shape=(feat, 1), initializer="glorot_uniform")
        # Bias pro Zeitschritt
        self.b = self.add_weight(name="att_bias", shape=(time, 1), initializer="zeros")
        super().build(input_shape)

    def call(self, x):
        # berechnet für jeden Zeitschritt Relevanz/Score
        e = ops.tanh(ops.dot(x, self.W) + self.b)
        # macht aus Scores Gewichte die sich auf 1 summieren
        a = ops.softmax(e, axis=1)
        # multipliziert Originalsignal mit Gewichten: wichtige Zeitpunkte werden hervorgehoben
        output = x * a

        if self.return_sequences:
            return output

        return ops.sum(output, axis=1)

    # Layer werden gespeichert und können geladen werden
    def get_config(self):
        cfg = super().get_config()
        cfg.update({"return_sequences": self.return_sequences})
        return cfg


from keras.layers import Conv1D, Input, LSTM, Activation, Dense, Average, Attention

# erster Input = Rohsignalzweig
inputs_1 = Input(shape=(1000, 1), batch_size=None)

x = Conv1D(32, 2, padding='same')(inputs_1)
x = Activation('relu')(x)

x = Conv1D(64, 2, padding='same')(x)
x = Activation('relu')(x)

x = attention(return_sequences=True)(x)

x = LSTM(64, return_sequences=True)(x)
x = LSTM(64)(x)

output_1 = Dense(256, activation='relu')(x)

# zweiter Input = Spektrogrammzweig
inputs_2 = Input(shape=(33, 13), batch_size=None)

u = Conv1D(6, 2, padding='same')(inputs_2)
u = Activation('relu')(u)

u = Conv1D(16, 4, padding='same')(u)
u = Activation('relu')(u)

u = LSTM(256, name='lstm_spec')(u)
output_2 = Dense(256, activation='relu')(u)

# beide Zweige zusammenführen, Mittelwert bilden
avg = Average()([output_1, output_2])

#outputs = Dense(5, activation='softmax')(avg)
# Klassifikation in 3 Klassen
outputs = Dense(3, activation='softmax')(avg)  # 3 Klassen: CD, NORM, STTC

model = keras.models.Model(inputs=[inputs_1, inputs_2], outputs=outputs)

model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001), loss=keras.losses.CategoricalCrossentropy(),
              metrics=['accuracy', keras.metrics.AUC()])

# Zeigt dir alle Layer, Shapes und Anzahl Parameter.
model.summary()

In [81]:
bx, by = train_gen[0]
print(bx[0].shape)  # Rohsignale
print(bx[1].shape)  # Spektrogramm
print(by.shape)  # Labels


(32, 1000, 1)
(32, 33, 13)
(32, 3)


In [82]:
# Callbacks fürs Training
cbs = [
    # wenn sich val_loss 2 Epochen lang nicht verbessert, wird Lernrate halbiert
    keras.callbacks.ReduceLROnPlateau(monitor='val_loss', mode='min', patience=2, factor=0.5, verbose=1),

    # wenn sich val_loss 4 Epochen lang nicht verbessert, wird Training automatisch abgebrochen und die besten Gewichte wieder hergestellt
    keras.callbacks.EarlyStopping(monitor='val_loss', mode='min', patience=4, restore_best_weights=True, verbose=1),

    # keras.callbacks.ModelCheckpoint('../best.keras', monitor='val_loss', mode='min', save_best_only=True, verbose=1)
]
# Klassengewichte: CD stärker gewichten, STTC leicht stärken weil deren Recall noch zu schwach
class_weight = {0: 2.0, 1: 1.0, 2: 1.2}

In [None]:
model.fit(train_gen, validation_data=val_gen, epochs=50, callbacks=cbs, class_weight=class_weight)

Epoch 1/50
[1m329/329[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m244s[0m 718ms/step - accuracy: 0.7010 - auc_2: 0.7897 - loss: 1.0832 - val_accuracy: 0.6948 - val_auc_2: 0.8034 - val_loss: 0.8264 - learning_rate: 0.0010
Epoch 2/50
[1m127/329[0m [32m━━━━━━━[0m[37m━━━━━━━━━━━━━[0m [1m42:20[0m 13s/step - accuracy: 0.7006 - auc_2: 0.7948 - loss: 1.0629

AB
HIER
ERGÄNZT: TESTS
ETC

In [95]:
# Evaluation
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, f1_score

# Modell laden
model = keras.models.load_model("../artifacts/best.keras", compile=True, custom_objects={"Attention": attention})
# Klassennamen
class_names = list(classes)

# Labels
# One Hot Labels
y_true_oh = test_gen.y
# Integer draus machen
y_true = y_true_oh.argmax(axis=1)

# Vorhersagen
# Wahrscheinlichkeiten für jede Klasse
probs = model.predict(test_gen, verbose=0)
# Klasse mit höchsten Wahrscheinlichkeit
y_pred = probs.argmax(axis=1)

# Precision, Recall, F1-Score für jede Klasse
print(classification_report(y_true, y_pred, target_names=class_names))

# Konfusionsmatrix
# Zeile = tatsächlich, Spalte = vorhergesagt
print(confusion_matrix(y_true, y_pred))

# Macro AUROC
auc_macro = roc_auc_score(y_true_oh, probs, multi_class='ovr', average='macro')
print("Macro AUROC: ", auc_macro)

# AUROC pro Klasse
auc_per_class = roc_auc_score(y_true_oh, probs, multi_class='ovr', average=None)
for name, auc in zip(class_names, auc_per_class):
    print(f"AUC[{name}]: {auc:.3f}")

# Macro F1-Score
f1_macro = f1_score(y_true, y_pred, average='macro')
print("Macro F1: ", f1_macro)


              precision    recall  f1-score   support

          CD       0.77      0.37      0.50       184
        NORM       0.83      0.92      0.87       912
        STTC       0.64      0.63      0.63       242

    accuracy                           0.79      1338
   macro avg       0.75      0.64      0.67      1338
weighted avg       0.79      0.79      0.78      1338

[[ 68  94  22]
 [ 11 838  63]
 [  9  81 152]]
Macro AUROC:  0.828477412191762
AUC[CD]: 0.771
AUC[NORM]: 0.838
AUC[STTC]: 0.877
Macro F1:  0.6684349610028107


In [None]:
# Val Score berechnen

val_loss, val_acc, val_auc = model.evaluate(val_gen)
print("Val score: ")
print("Loss: ", val_loss)
print("Accuracy: ", val_acc)
print("AUC: ", val_auc)

# Test Score

test_loss, test_acc, test_auc = model.evaluate(test_gen)
print("Test score: ")
print("Loss: ", test_loss)
print("Accuracy: ", test_acc)
print("AUC: ", test_auc)



In [16]:
'''
# Speichert das beste Modell
model.save("best.keras")
'''

In [19]:
# Modell laden

model = keras.models.load_model(
    "../artifacts/best.keras",
    custom_objects={"attention": attention},
)
model.summary()


EXPERIMENTE

In [25]:
# HDF5-Datei öffnen und Daten inspizieren; https://data.dtu.dk/articles/dataset/CACHET-CADB_Short_Format/14547330?file=27917358
import h5py

dateipfad = '../cachet-cadb_short_format_without_context.hdf5'

try:

    with h5py.File(dateipfad, 'r') as f:

        print("Gruppen in der Datei:", list(f.keys()))

        for name in f.keys():
            gruppe = f[name]
            print(f"Inhalt von {gruppe}: {gruppe[:]}")

except FileNotFoundError:
    print(f"Fehler: Die Datei '{dateipfad}' wurde nicht gefunden.")
except Exception as e:
    print(f"Ein Fehler ist aufgetreten: {e}")



Gruppen in der Datei: ['labels', 'signal']
Inhalt von <HDF5 dataset "labels": shape (16404480,), type "<f8">: [3. 3. 3. ... 3. 3. 3.]
Inhalt von <HDF5 dataset "signal": shape (16404480,), type "<f8">: [-0.00135984  0.00279032  0.00586285 ... -0.0378265  -0.03591219
 -0.03357666]


In [27]:
import h5py

file_name = '../cachet-cadb_short_format_without_context.hdf5'
f1 = h5py.File(file_name, 'r')

print("Top-level keys:", list(f1.keys()))

'''
first_key = list(f1.keys())[0]
dataset = f1[first_key]

print("Typ:", type(dataset))
print("Shape:", dataset.shape)
print("Dtype:", dataset.dtype)

# Daten ins NumPy-Array laden
data = dataset[:]
print("Array:", data)
'''

# Signal-Daten anschauen
signal = f1['signal'][:]  # als NumPy-Array laden
print(signal.shape, signal.dtype)
print(signal[:10])  # erste 10 Werte

# Labels anschauen
labels = f1['labels'][:]
print(labels.shape, labels.dtype)
print(labels[:20])  # erste 20 Werte

Top-level keys: ['labels', 'signal']
(16404480,) float64
[-0.00135984  0.00279032  0.00586285  0.00804016  0.00948632  0.01034809
  0.01075599  0.01082523  0.01065672  0.010338  ]
(16404480,) float64
[3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3.]


In [42]:
# Testladen der Klassen und des Scalers
import joblib

try:
    with open('../artifacts/classes.pkl', 'rb') as file:
        geladene_daten = joblib.load(file)

    print(geladene_daten)

except FileNotFoundError:
    print("Fehler: Die Datei 'classes.pkl' wurde nicht gefunden.")
except Exception as e:
    print(f"Ein Fehler ist aufgetreten: {e}")

try:
    with open('../artifacts/scaler.pkl', 'rb') as file:
        geladene_daten = joblib.load(file)

    print(geladene_daten)

except FileNotFoundError:
    print("Fehler: Die Datei 'scaler.pkl' wurde nicht gefunden.")
except Exception as e:
    print(f"Ein Fehler ist aufgetreten: {e}")

['CD', 'NORM', 'STTC']
StandardScaler()


In [59]:
import h5py

import keras
from ecg_project.ecg_preprocess import resampling
from ecg_project.model import predict_ecg

fs = 100
samples = 10 * fs

model = keras.models.load_model("../artifacts/best.keras", compile=True, custom_objects={"attention": attention})

classes = joblib.load("../artifacts/classes.pkl")
print("Klassen:", classes)

with h5py.File("../cachet-cadb_short_format_without_context.hdf5", "r") as f:
    sig = f["signal"][:]
    labels = f["labels"][:]

# kann man durch testen seg_idx = 1 ...
seg_idx = 4737
ecg = sig[seg_idx * samples:(seg_idx + 1) * samples]

CACHET_LABELS = {
    1: "AF",
    2: "NSR",
    3: "Noise",
    4: "Others"
}

lbl_num = int(labels[seg_idx])
lbl_name = CACHET_LABELS[lbl_num]

ecg2d = resampling(ecg, fs)

predictions = predict_ecg(ecg2d)
print(predictions)
print("eigentliche Klasse:", lbl_name)


Klassen: ['CD', 'NORM', 'STTC']
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 63ms/step
('NORM', 50.04)
eigentliche Klasse: Noise


In [70]:
# Test mit einer Apple Watch Datei; https://physionet.org/content/ecg-capable-smartwatches/1.0.0/
import wfdb
from ecg_project.model import predict_ecg
from ecg_project.ecg_preprocess import standardizing, resampling

rec_base = "../electrocardiogram-capable-smartwatches-assessing-their-clinical-accuracy-and-application-1.0.0/applewatch_serie8/st-segment/st-p1/st-p1_4"  # ohne .hea/.dat
rec = wfdb.rdrecord(rec_base)

name = rec.sig_name
units = rec.units
print(name, units)

sig = rec.p_signal[:]
fs = int(rec.fs)
print(sig.shape, fs)

fs_new = 100
resample = resampling(sig, fs, fs_out=fs_new)

print(resample.shape, fs_new)

predictions = predict_ecg(resample)

print(predictions)


dict_keys(['record_name', 'n_sig', 'fs', 'counter_freq', 'base_counter', 'sig_len', 'base_time', 'base_date', 'comments', 'sig_name', 'p_signal', 'd_signal', 'e_p_signal', 'e_d_signal', 'file_name', 'fmt', 'samps_per_frame', 'skew', 'byte_offset', 'adc_gain', 'baseline', 'units', 'adc_res', 'adc_zero', 'init_value', 'checksum', 'block_size'])
(15360, 1) 512
(1000, 1, 1) 100
(1000, 1)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 66ms/step
('STTC', 38.32)


In [73]:
# Test mit einer Apple Watch ZIP-Datei
from ecg_project.ecg_preprocess import resampling
from ecg_project.load_ecg import load_ecg_wfdb_zip
from ecg_project.model import predict_ecg

with open(
        "C:\\Users\\milal\\Downloads\\Projektarbeit WS2526\\electrocardiogram-capable-smartwatches-assessing-their-clinical-accuracy-and-application-1.0.0\\applewatch_serie8\\st-segment\\st-p5\\st-p5_0.zip",
        "rb") as f:
    content = f.read()

sig, fs, lead, unit = load_ecg_wfdb_zip(content)
print("Signal-Shape:", sig.shape)
print("Samplingrate:", fs)
print("Lead:", lead)
print("Unit:", unit)

data = resampling(sig, fs)
print(data.shape)
pred = predict_ecg(data)
print(pred)

Signal-Shape: (15360,)
Samplingrate: 512
(1000, 1)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 60ms/step
('NORM', 53.7)


In [None]:
# Logistische Regression auf PTB-XL Anwendungsfall
from sklearn.metrics import accuracy_score, precision_score, recall_score, log_loss, f1_score, classification_report, \
    confusion_matrix, roc_auc_score
from sklearn.linear_model import LogisticRegression

logreg = LogisticRegression(max_iter=1000, solver='lbfgs')

# Integer Labels
y_train_int = y_train.argmax(axis=1)
y_test_int = y_test.argmax(axis=1)

# Training
logreg.fit(X_train, y_train_int)

# Vorhersagen und Wahrscheinlichkeiten
y_pred = logreg.predict(X_test)
y_proba = logreg.predict_proba(X_test)

# Detaillierter Report pro Klasse
print("\nClassification report:\n", classification_report(y_test_int, y_pred, digits=3))

# print("Macro F1 :", f1_score(y_test_int, y_pred, average="macro"))

# Konfusionsmatrix
cm = confusion_matrix(y_test_int, y_pred)
print("Confusion matrix:\n", cm)

# ROC-AUC (One-vs-Rest) - benötigt Wahrscheinlichkeiten
# funktioniert nur sinnvoll, wenn jede Klasse mindestens 1 Sample im Test hat
try:
    auc_ovr_macro = roc_auc_score(y_test_int, y_proba, multi_class="ovr", average="macro")
    auc_ovr_weighted = roc_auc_score(y_test_int, y_proba, multi_class="ovr", average="weighted")
    print(f"AUC (OvR, macro):   {auc_ovr_macro:.2f}")
    print(f"AUC (OvR, weighted):{auc_ovr_weighted:.2f}")
except ValueError as e:
    print("ROC-AUC nicht berechenbar:", e)

# Wie viele Iterationen hat LBFGS gebraucht?
#print("n_iter_:", logreg.n_iter_)


In [None]:
# Test mit einer PTBXL-Test-ZIP-Datei
from ecg_project.ecg_preprocess import resampling
from ecg_project.load_ecg import load_ecg_wfdb_zip
from ecg_project.model import predict_ecg
import pandas as pd, ast
from pathlib import Path

with open("C:\\Users\\milal\\IdeaProjects\\Projektarbeit_KI_EKG\\ptbxl\\ptbxl_test_zips\\02160_lr.zip", "rb") as f:
    content = f.read()

sig, fs, lead, unit = load_ecg_wfdb_zip(content)
print("Signal-Shape:", sig.shape)
print("Samplingrate:", fs)
print("Lead:", lead)
print("Unit:", unit)

data = resampling(sig, fs)
print(data.shape)
pred = predict_ecg(data)
print(pred)

ptbxl_root = Path(r"C:\Users\milal\IdeaProjects\Projektarbeit_KI_EKG\ptbxl")

# CSVs laden
df = pd.read_csv(ptbxl_root / "ptbxl_database.csv")
scp = pd.read_csv(ptbxl_root / "scp_statements.csv")

# Mapping Code Dictionary
code_to_class = {}

# diagnostisch relevante Codes
for _, row in scp.iterrows():
    if row["diagnostic"] == 1 and not pd.isna(row["diagnostic_class"]):
        # Mapping speichern
        code_to_class[row["Unnamed: 0"]] = row["diagnostic_class"]

for c in ["CD", "NORM", "STTC"]:
    code_to_class[c] = c


def get_true_class(zip_path: str) -> str:
    base = Path(zip_path).stem

    # passende Zeile finden;
    row = df[df["filename_lr"].str.contains(base)].iloc[0]

    # Python dict
    scp_codes = ast.literal_eval(row["scp_codes"])

    # passende Klasse zurückgeben
    for code in scp_codes.keys():
        if code in code_to_class:
            return code_to_class[code]

    return "UNKNOWN"


print("Wahre Diagnose: ", get_true_class("02160_lr.zip"))


In [None]:
'''
# test_zips von ptb xl erstellen
import os, zipfile, pandas as pd, json
from pathlib import Path
import ast

ptbxl_root = Path(r"C:\Users\milal\IdeaProjects\Projektarbeit_KI_EKG\ptbxl")

# CSVs laden
df = pd.read_csv(ptbxl_root / "ptbxl_database.csv")
scp = pd.read_csv(ptbxl_root / "scp_statements.csv")

# Mapping: SCP-Code -> Superklasse
code2cls = dict(zip(scp["Unnamed: 0"], scp["diagnostic_class"]))

# Nur Testdaten (Fold 10)
df_test = df[df["strat_fold"] == 10]

# Zielordner
out_dir = Path("./ptbxl_test_zips")
out_dir.mkdir(parents=True, exist_ok=True)

ok = 0
skipped = 0
missing = 0

for _, row in df_test.iterrows():
    # Diagnose bestimmen
    scp_codes = ast.literal_eval(row["scp_codes"])
    classes = {code2cls.get(c) for c in scp_codes if code2cls.get(c) in {"NORM","STTC","CD"}}

    # Bedingung: genau 1 gültige Superklasse
    if len(classes) != 1:
        skipped += 1
        continue

    # Pfade zur Aufnahme
    rel = Path(row["filename_lr"])   # z.B. records100/00000/00009_lr
    base = ptbxl_root / rel
    hea = base.with_suffix(".hea")
    dat = base.with_suffix(".dat")

    if not (hea.exists() and dat.exists()):
        print("fehlt:",
              "\n   HEA:", hea, "exists?", hea.exists(),
              "\n   DAT:", dat, "exists?", dat.exists())
        missing += 1
        continue

    # ZIP erstellen
    zip_name = out_dir / (base.stem + ".zip")
    with zipfile.ZipFile(zip_name, "w") as zf:
        zf.write(hea, arcname=hea.name)
        zf.write(dat, arcname=dat.name)

    ok += 1

print(f"ZIPs erstellt: {ok}")
print(f"übersprungen (mehrere/andere Diagnosen): {skipped}")
print(f"fehlend: {missing}")
print(f"Zielordner: {out_dir.resolve()}")
'''


In [90]:
# zweites Kaggle Modell testen
# https://www.kaggle.com/code/nugroho24/pre-trained-lstm?
from keras.src.optimizers import Adam
from keras.src.layers import Dropout
from keras.layers import LSTM

optimizer = Adam(learning_rate=0.0001)

inputs_3 = Input(shape=(1000, 1), batch_size=None)

x = LSTM(50, return_sequences=False)(inputs_3)
x = Dropout(0.2)(x)
output_3 = Dense(2, activation='softmax')(x)

inputs_4 = Input(shape=(33, 13), batch_size=None)

u = LSTM(50, return_sequences=False)(inputs_4)
u = Dropout(0.2)(u)
output_4 = Dense(2, activation='softmax')(u)

avg2 = Average()([output_3, output_4])

outputs = Dense(3, activation='softmax')(avg2)

model_lstm = keras.models.Model(inputs=[inputs_3, inputs_4], outputs=outputs)
model_lstm.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
cbs = [
    keras.callbacks.ReduceLROnPlateau(monitor='val_loss', mode='min', patience=2, factor=0.5, verbose=1),

    keras.callbacks.EarlyStopping(monitor='val_loss', mode='min', patience=4, restore_best_weights=True, verbose=1),

    # keras.callbacks.ModelCheckpoint('../best.keras', monitor='val_loss', mode='min', save_best_only=True, verbose=1)
]

# Klassengewichte: CD stärker gewichten, STTC leicht stärken weil deren Recall noch zu schwach
class_weight = {0: 2.0, 1: 1.0, 2: 1.2}

In [None]:
model_lstm.fit(train_gen, validation_data=val_gen, epochs=50, callbacks=cbs, class_weight=class_weight)

In [91]:
# Evaluation des zweiten LSTM Kaggle Modells
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, f1_score

# Klassennamen
class_names = list(classes)

# Labels
y_true_oh = test_gen.y
y_true = y_true_oh.argmax(axis=1)

# Vorhersagen
probs = model_lstm.predict(test_gen, verbose=0)
y_pred = probs.argmax(axis=1)

# Precision, Recall, F1-Score für jede Klasse
print(classification_report(y_true, y_pred, target_names=class_names))

# Konfusionsmatrix
print(confusion_matrix(y_true, y_pred))

# AUROC
auc_macro = roc_auc_score(y_true_oh, probs, multi_class='ovr', average='macro')
print("Macro AUROC:", auc_macro)

# AUROC pro Klasse
auc_per_class = roc_auc_score(y_true_oh, probs, multi_class='ovr', average=None)
for name, auc in zip(class_names, auc_per_class):
    print(f"AUC[{name}]: {auc:.3f}")

# Macro F1-Score
f1_macro = f1_score(y_true, y_pred, average='macro')
print("Macro F1:", f1_macro)

              precision    recall  f1-score   support

          CD       0.00      0.00      0.00       184
        NORM       0.68      1.00      0.81       912
        STTC       0.00      0.00      0.00       242

    accuracy                           0.68      1338
   macro avg       0.23      0.33      0.27      1338
weighted avg       0.46      0.68      0.55      1338

[[  0 184   0]
 [  0 912   0]
 [  0 242   0]]
Macro AUROC: 0.5312026257804423
AUC[CD]: 0.557
AUC[NORM]: 0.521
AUC[STTC]: 0.515
Macro F1: 0.2702222222222222


  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
