In [1]:
import numpy as np 
import pandas as pd 
import keras 
import matplotlib.pyplot as plt
import numpy as np
import neurokit2 as nk
import seaborn as sns
from matplotlib.colors import LinearSegmentedColormap
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical
from sklearn.utils import shuffle
from keras.models import Sequential  # En modelltyp i Keras som gör det enkelt att skapa lager av neurala nätverk sekventiellt
from keras.layers import Reshape
from keras.layers import Dense, Activation, Flatten, Convolution1D, Dropout,MaxPooling1D,GlobalAveragePooling1D
from tensorflow.keras.optimizers import Adam

2024-11-26 13:02:15.283681: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [None]:
MI = pd.read_csv("/Users/adinastark/Desktop/Heart disease/ptbdb/ptbdb_abnormal.csv") 
HC = pd.read_csv("/Users/adinastark/Desktop/Heart disease/ptbdb/ptbdb_normal.csv") 

In [None]:
new_column_name = ['label']  #skapar kolumn namn för abnormal data
for num in range(MI.shape[1]-1):
    tem = 'dim' + str(num)
    new_column_name.append(tem)
MI.columns = new_column_name    


column_name = ['label'] #skapar koloumn namn for normal data
for num in range(HC.shape[1]-1):
    tem = 'dim' + str(num)
    column_name.append(tem)
HC.columns = column_name

In [None]:
# Separera data i tränings- och testuppsättningar
train_MI = MI.iloc[0:6500]  # De första 6500 från abnormal för träning
test_MI = MI.iloc[6500:9000]  # De sista 2500 från abnormal för testning
train_HC = HC.iloc[0:2500]  # De första 2500 från normal för träning
test_HC = HC.iloc[2500:3500]  # Rader 2500 till 3500 från normal för testning

# Kombinera abnormal och normal data
train = pd.concat([train_MI, train_HC], sort=True)
test = pd.concat([test_MI, test_HC], sort=True)

# Skapa ytrain och ytest baserat på labels
ytrain = [1 if i < 6500 else 0 for i in range(9000)]  # 1 = abnormal, 0 = normal
ytest = [1 if i < 2500 else 0 for i in range(3500)]   # 1 = abnormal, 0 = normal

# Konvertera etiketter till one-hot encoding
ytrain = to_categorical(ytrain)
ytest = to_categorical(ytest)
#to_categorical: Omvandlar etiketter från numeriska värden till one-hot representation:
# 1 (abnormal) -> [0, 1]
# 0 (normal) -> [1, 0]
# Detta krävs eftersom CNN-modellen använder en softmax-utgång, vilket kräver etiketter i one-hot format.

# Separera signaler från etiketter
Xtrain = train.drop(columns=['label']).values  # Signaldata för träning
Xtest = test.drop(columns=['label']).values  # Signaldata för testning

# Lägg till brus i Xtrain
def add_noise(data, noise_level=0.01):
    noise = noise_level * np.random.normal(size=data.shape)
    return data + noise


# Skapa brusad data och kombinera
Xtrain_noisy = add_noise(Xtrain)
Xtrain = np.concatenate([Xtrain, Xtrain_noisy], axis=0)  # Kombinera original och brusad data
ytrain = np.concatenate([ytrain, ytrain], axis=0)  # Fördubblar etiketterna för brusad data


# Kontrollera att antal prover matchar
print(f"Xtrain shape: {Xtrain.shape}, ytrain shape: {ytrain.shape}")
print(f"Xtest shape: {Xtest.shape}, ytest shape: {ytest.shape}")

# Omforma data för Conv1D
Xtrain = Xtrain.reshape(len(Xtrain), 187, 1)  # 187 tidssteg, 1 kanal
Xtest = Xtest.reshape(len(Xtest), 187, 1)

# Normalisera data
scaler = MinMaxScaler(feature_range=(0, 1))
Xtrain = scaler.fit_transform(Xtrain.reshape(-1, 187)).reshape(-1, 187, 1)
Xtest = scaler.transform(Xtest.reshape(-1, 187)).reshape(-1, 187, 1)


In [None]:
train=np.asarray(train)
train=train.reshape(9000, 188, 1) #188 tidsteg (188 dimensioner) , 1 kanal (1-dim EKG))

test=np.asarray(test)
test=test.reshape(3500, 188, 1)
# For conv1d statement: 
#input_shape = (ncols, 1) 

model = Sequential()
model.add(Convolution1D(100, 5, activation='relu', input_shape=(188,1))) #100 filter, filterstorlek= 5 (små , lokala mönster), relu = aktiveringsfunktion
model.add(Convolution1D(100, 10, activation='relu')) #filterstorlek 10, tar in bredare sammanhang
model.add(MaxPooling1D(3))
model.add(Convolution1D(100, 10, activation='relu'))
model.add(Convolution1D(160, 10, activation='relu'))
model.add(GlobalAveragePooling1D())
model.add(Dropout(0.5)) #Slumpar bort 50% av anslutningarna för att minska överanpassning.
#model.add(Flatten())
model.add(Dense(100, activation='relu')) # Ett dense lager med 100 neuroner och ReLU
model.add(Dense(2, activation='softmax')) #Utgångslager med 2 neuroner för att representera de två klasserna (normal och abnormal)
#print(model_m.summary())
model.compile(optimizer=Adam(learning_rate=0.0005),  # Lägre learning rate
              loss='categorical_crossentropy',
              metrics=['accuracy'])

#Konvulution lager: extraherar mönster från EKG singal. Mindre filterstorlek -> mindre mönster
# Poolinglager : Reducerar data för att minska beräkningskomplexiteten och generalisera mönstren
#Dense lager: Slutför processen genom att klassificera signalen baserat på de extraherade funktionerna


model.summary()

In [None]:
early_stop = EarlyStopping(monitor='val_loss', patience=2, restore_best_weights=True) #stoppar innan de blir overfitting

history = model.fit(
    Xtrain, ytrain,  # Använd Xtrain för signaldata
    validation_data=(Xtest, ytest),  # Använd Xtest för valideringssignaler
    epochs=10,
    batch_size=32,
    callbacks=[early_stop]
)

In [None]:
_,accuracy = model.evaluate(test, ytest, verbose=0)
accuracy

In [None]:
# Utvärdera modellen
test_loss, test_accuracy = model.evaluate(Xtest, ytest)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

# Visualisera träningshistorik med ljusrosa och röd färg för accuracy
plt.figure()
plt.plot(history.history['accuracy'], label='Training Accuracy', color='#ffb6c1')  # Ljusrosa för träning
plt.plot(history.history['val_accuracy'], label='Validation Accuracy', color='red')  # Röd för validering
plt.legend()
plt.title('Model Accuracy')  # Standardfärg
plt.xlabel('Epochs')         # Standardfärg
plt.ylabel('Accuracy')       # Standardfärg
plt.show()

# Visualisera träningshistorik med ljusrosa och röd färg för loss
plt.figure()
plt.plot(history.history['loss'], label='Training Loss', color='#ffb6c1')  # Ljusrosa för träning
plt.plot(history.history['val_loss'], label='Validation Loss', color='red')  # Röd för validering
plt.legend()
plt.title('Model Loss')  # Standardfärg
plt.xlabel('Epochs')     # Standardfärg
plt.ylabel('Loss')       # Standardfärg
plt.show()

In [None]:
# Gör prediktioner på valideringsdatan
y_pred = model.predict(Xtest) #använder alla atribut i xtest för att predicta en ny y_pred
y_pred_classes = np.argmax(y_pred, axis=1)  # Konvertera prediktioner till klassindex
y_true = np.argmax(ytest, axis=1)  # Konvertera one-hot till klassindex

#Y_pred: predictar utifrån Xtest
#Y_true: Facit på predictade

# Skapar en rapport
print("Classification Report:")
print(classification_report(y_true, y_pred_classes, target_names=["Normal", "Abnormal"]))

# Skapa en confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred_classes)

# Rosa
cmap = sns.light_palette("pink", as_cmap=True)

#Plotta matris
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap=cmap, xticklabels=["Normal", "Abnormal"], yticklabels=["Normal", "Abnormal"], cbar=False)
plt.title("Confusion Matrix")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.show()

# Utvärdera modellen på testdatan
test_loss, test_accuracy = model.evaluate(Xtest, ytest)

# Skriv ut resultaten
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

In [None]:
# Generera sannolikheter för klassen 'abnormal'
y_pred_probs = model.predict(Xtest)[:, 1]  # Hämta sannolikheter för klassen 'abnormal'

# Beräkna ROC-kurvan
fpr, tpr, thresholds = roc_curve(ytest.argmax(axis=1), y_pred_probs)

# Beräkna AUC (Area Under the Curve)
roc_auc = auc(fpr, tpr)

# Plotta ROC-kurvan
plt.figure()
plt.plot(fpr, tpr, color='#ff69b4', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')  # Ljusrosa kurva
plt.plot([0, 1], [0, 1], 'k--', lw=1)  # Diagonal linje för slumpmässig gissning
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.grid(alpha=0.3)
plt.show()

In [None]:
# Load ECG data from your CSV file
ecg_df = pd.read_csv('/Users/adinastark/Desktop/Heart disease/ECG singal från klockan/ECG Signal 5- 20210609143423.csv', header=None)

# Extract sampling rate from row 8, second column (e.g., "499.348 Hz")
sampling_rate_str = ecg_df.iloc[8, 1]  # Adjust if sampling rate is stored differently
sampling_rate = float(sampling_rate_str.split()[0])

In [None]:
from scipy.interpolate import interp1d

# Function to resample ECG signal to 187 data points
def resample_signal(signal, target_length=187):
    x_original = np.linspace(0, 1, len(signal))
    x_resampled = np.linspace(0, 1, target_length)
    interpolator = interp1d(x_original, signal, kind='linear')
    return interpolator(x_resampled)

# Function to preprocess all RR intervals (between consecutive R-peaks)
def preprocess_ecg_for_prediction(ecg_values, rpeaks, target_length=187):
    resized_segments = []
    
    # Iterate through all consecutive R-peaks
    for i in range(len(rpeaks['ECG_R_Peaks']) - 1):
        start = rpeaks['ECG_R_Peaks'][i]
        end = rpeaks['ECG_R_Peaks'][i + 1]
        
        # Extract segment between R-peaks
        segment = ecg_values[start:end]
        
        # Resize the segment to 187 data points
        resized_segment = resample_signal(segment, target_length)
        
        # Append the resized segment to the list
        resized_segments.append(resized_segment)
    
    return np.array(resized_segments)

In [None]:
# Preprocess ECG signal for prediction
X_input = preprocess_ecg_for_prediction(ecg_values, rpeaks)



X_input_reshaped = X_input.reshape(len(X_input), -1)  # Omforma till 2D (n_samples, 187)
X_input_normalized = scaler.transform(X_input_reshaped)  # Normalisera
X_input_normalized = np.clip(X_input_normalized, 0, 1)  # Begränsar värden till intervallet [0, 1]
X_input_normalized = X_input_normalized.reshape(len(X_input), 187, 1)  # Omforma tillbaka till 3D

In [None]:
X_input = X_input.reshape(len(X_input), 187, 1)

In [None]:
y_pred = model.predict(X_input)

In [None]:
predicted_classes = np.argmax(y_pred, axis=1)

print(predicted_classes)  # Output will be an array of class labels (0 or 1)

In [None]:
percentage_ones = (np.sum(predicted_classes == 1) / len(predicted_classes)) * 100

print(f"Risk-percentage of abnormality: {percentage_ones:.2f}%")