In [None]:
!pip install numpy matplotlib wfdb ipywidgets tsaug --quiet

In [None]:
import os
import numpy as np
import wfdb
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import tensorflow as tf
from keras import layers, models, Model
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.signal import resample
from keras.optimizers import Adam
from sklearn.metrics import f1_score
from ipywidgets import interact, widgets
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, Input, Conv1D, BatchNormalization, Activation, Add, GlobalAveragePooling1D, Dense, MaxPooling1D, Dropout
from keras.optimizers import Adam

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Define the directory where the data is stored
data_directory = '/content/drive/MyDrive/ctu-chb-intrapartum-cardiotocography-database-1.0.0'

# List all .dat files in the directory
dat_files = [f for f in os.listdir(data_directory) if f.endswith('.dat')]

parameter_thresholds = {
    "pH": (7.15,9999),
    "Apgar1": (7, 999),
    "Apgar5": (7, 999)
}

In [None]:
#Visualizing a sample (most distressed case)
import wfdb
import matplotlib.pyplot as plt
import os
import numpy as np

# Specify the record name
record_name = "2024"  # Replace with the desired record name

# Load the record
record = wfdb.rdrecord(os.path.join(data_directory, record_name))

# Extract FHR and UC signals
fhr_signal = record.p_signal[:, record.sig_name.index('FHR')]
uc_signal = record.p_signal[:, record.sig_name.index('UC')]

# Print information about the record
print("Record Information:")
print("Signals:", record.sig_name)
print("Units:", record.units)

# Get the metadata to obtain the correct units
fhr_units = record.units[record.sig_name.index('FHR')]
uc_units = record.units[record.sig_name.index('UC')]

# Calculate the time vector based on the sampling frequency
time_vector = (1 / record.fs) * np.arange(len(fhr_signal))

# Set up a clean and professional plot with a professional color scheme
plt.style.use('seaborn-darkgrid')
plt.figure(figsize=(12, 6))

# Define professional colors
fhr_color = '#3498db'  # Blue
uc_color = '#e74c3c'   # Red

# Plot FHR signal
plt.subplot(2, 1, 1)
plt.plot(time_vector, fhr_signal, color=fhr_color, label='FHR Signal', linewidth=1.5)
plt.title('Fetal Heart Rate (FHR) Signal')
plt.xlabel('Time (seconds)')
plt.ylabel('Amplitude ({})'.format(fhr_units))
plt.legend()

# Plot UC signal
plt.subplot(2, 1, 2)
plt.plot(time_vector, uc_signal, color=uc_color, label='UC Signal', linewidth=1.5)
plt.title('Uterine Contractions (UC) Signal')
plt.xlabel('Time (seconds)')
plt.ylabel('Amplitude (mmHg)')
plt.legend()

# Fine-tune layout for a professional look
plt.tight_layout()

# Display the plot
plt.show()


In [None]:
# Initialize lists to corresponding labels
labels = []

# Process the uploaded .dat files and assign labels based on parameter thresholds
for dat_file in dat_files:
    record_name = os.path.splitext(dat_file)[0]  # Remove the .dat extension
    record = wfdb.rdrecord(os.path.join(data_directory, record_name))

    # Load the corresponding .hea file to access header information
    hea_file_path = os.path.join(data_directory, record_name + '.hea')
    with open(hea_file_path, 'r') as hea_file:
        hea_content = hea_file.read()

    # Check distress criteria based on thresholds
    distress_flag = False
    for param, (low, high) in parameter_thresholds.items():
        param_value = float(hea_content.split(f"#{param}")[-1].split()[0])
        if param_value < low:
          distress_flag = True
          break


    # Assign labels based on distress flag
    label = "Distress" if distress_flag else "Normal"
    labels.append(label)

In [None]:
# Convert binary labels to original format ("Normal" and "Distress")
original_labels = np.array(labels)

# Calculate label distribution
unique_labels, label_counts = np.unique(original_labels, return_counts=True)

# Create a pie chart
fig, ax = plt.subplots()
ax.pie(label_counts, labels=unique_labels, autopct='%1.1f%%', startangle=90)
ax.axis('equal')

plt.title('Distribution of Data')

plt.show()

In [None]:
import os
import numpy as np
import wfdb
from scipy.signal import resample

# Initialize variables to store maximum values
max_fhr_bpm = 0
max_uc_value = 0

for dat_file in dat_files:
    record_name = os.path.splitext(dat_file)[0]  # Remove the .dat extension
    record = wfdb.rdrecord(os.path.join(data_directory, record_name))

    # Extract FHR and UC signals
    fhr_signal = record.p_signal[:, record.sig_name.index('FHR')]
    uc_signal = record.p_signal[:, record.sig_name.index('UC')]

    # Find maximum FHR BPM and UC value in the current record
    max_fhr_bpm = max(max_fhr_bpm, np.max(fhr_signal))
    max_uc_value = max(max_uc_value, np.max(uc_signal))

# Initialize an empty list to store preprocessed signals
X_signals = []

for dat_file in dat_files:
    record_name = os.path.splitext(dat_file)[0]  # Remove the .dat extension
    record = wfdb.rdrecord(os.path.join(data_directory, record_name))

    # Extract FHR and UC signals
    fhr_signal = record.p_signal[:, record.sig_name.index('FHR')]
    uc_signal = record.p_signal[:, record.sig_name.index('UC')]

    # Resample signals to a common length
    common_length = 1000
    fhr_signal_resampled = resample(fhr_signal, common_length)
    uc_signal_resampled = resample(uc_signal, common_length)

    # Normalize FHR signal
    fhr_signal_resampled /= max_fhr_bpm

    # Normalize UC signal
    uc_signal_resampled /= max_uc_value

    # Combine FHR and UC signals into a single feature vector
    combined_signal = np.concatenate((fhr_signal_resampled, uc_signal_resampled))

    # Append the preprocessed signal to the list
    X_signals.append(combined_signal)

# Convert the list to a NumPy array for further processing
X_signals = np.array(X_signals)


In [None]:
print(max_fhr_bpm)
print(max_uc_value)

In [None]:
# Convert features and labels lists to NumPy arrays
X_signals = np.array(X_signals)
labels = np.array(labels)

In [None]:
X_signals.shape

In [None]:
labels.shape

In [None]:
import tsaug
import numpy as np

# Define the data augmentation pipeline
augmenter = (
    tsaug.AddNoise(scale=0.01) @ 0.9  # with 90% probability, introduce random jittering
)

# Increase the number of augmentations per sample to achieve a larger dataset
num_augmentations_per_sample = 20  # Adjust this number as needed

# Apply data augmentation to each sample in X_signals
augmented_X_signals = []
augmented_labels = []

for i in range(X_signals.shape[0]):
    original_signal = X_signals[i, :]

    # Apply augmentation to the original signal multiple times
    for _ in range(num_augmentations_per_sample):
        augmented_signal = augmenter.augment(original_signal)
        augmented_X_signals.append(augmented_signal)
        augmented_labels.append(labels[i])

# Reshape augmented data to match the original data shape
augmented_X_signals_reshaped = np.array(augmented_X_signals).reshape(-1, X_signals.shape[1])

# Concatenate original and augmented data
X_signals_augmented = np.vstack((X_signals, augmented_X_signals_reshaped))
labels_augmented = np.concatenate((labels, augmented_labels))

# Shuffle the augmented dataset
shuffle_indices = np.random.permutation(X_signals_augmented.shape[0])
X_signals_augmented = X_signals_augmented[shuffle_indices]
labels_augmented = labels_augmented[shuffle_indices]


In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.spatial.distance import euclidean

# Select a specific sample index (wrap around if out of bounds)
sample_index = 2018 % X_signals.shape[0]

original_sample = X_signals[sample_index, :]
augmented_samples = augmented_X_signals_reshaped[
    sample_index * num_augmentations_per_sample : (sample_index + 1) * num_augmentations_per_sample, :
]

# Calculate Euclidean distances between original and augmented signals
distances = [euclidean(original_sample, augmented_samples[i, :]) for i in range(num_augmentations_per_sample)]

# Select the indices of the most distinct signals (e.g., top 3)
most_distinct_indices = sorted(range(num_augmentations_per_sample), key=lambda i: distances[i], reverse=True)[:2]

# Set Seaborn style
sns.set(style="darkgrid")

# Plot the original and the most distinct augmented signals with different line styles and colors
plt.figure(figsize=(20, 6))

# Plot original signal
plt.plot(original_sample, label='Original Signal', linewidth=2, color='black')

# Different line styles and colors for augmented signals
line_styles = ['--', '-.']
colors = ['blue', 'orange']

for i, idx in enumerate(most_distinct_indices):
    plt.plot(augmented_samples[idx, :], label=f'Augment {i + 1}', linestyle=line_styles[i], color=colors[i], alpha=1)

plt.xlabel('Feature Index')
plt.ylabel('Signal Value')
plt.legend()

# Use Seaborn to enhance the plot
sns.despine()
plt.show()


In [None]:
X_signals_augmented.shape

In [None]:
labels_augmented.shape

In [None]:
import matplotlib.pyplot as plt

# Calculate class counts
unique_labels, label_counts = np.unique(labels_augmented, return_counts=True)

# Total number of samples in the augmented dataset
total_samples = len(labels_augmented)

# Plot pie chart with count values
plt.figure(figsize=(4,4))
plt.pie(label_counts, labels=unique_labels, autopct=lambda p: '{:.0f}'.format(p * total_samples / 100), startangle=140, textprops={'color': "black"})
plt.title('Class Counts in Augmented Dataset')
plt.show()


In [None]:
from sklearn.model_selection import train_test_split

# Assuming you have a DataFrame, adjust accordingly if using a different data structure
import pandas as pd

# Create a DataFrame for easy manipulation
data = pd.DataFrame(data=X_signals_augmented)
data['labels'] = labels_augmented

# Separate distress and normal samples
distress_data = data[data['labels'] == 'Distress']
normal_data = data[data['labels'] == 'Normal']

# Sample 1565 random normal samples
balanced_normal_data = normal_data.sample(n=len(distress_data), random_state=42)

# Concatenate distress and balanced normal samples
balanced_data = pd.concat([distress_data, balanced_normal_data])

# Shuffle the balanced dataset
balanced_data = balanced_data.sample(frac=1, random_state=42)

# Split the balanced dataset into features and labels
X_balanced = balanced_data.drop(columns=['labels']).values
labels_balanced = balanced_data['labels'].values

# Split the balanced dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_balanced, labels_balanced, test_size=0.3, random_state=42)

In [None]:
print("X_train shape:", X_train.shape)
print("X_test shape:", X_test.shape)

In [None]:
# Encode labels to numerical values
le = LabelEncoder()
y_train_encoded = le.fit_transform(y_train)
y_test_encoded = le.transform(y_test)

# Reshape the data for CNN input
X_train_cnn = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test_cnn = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

In [None]:
from keras.regularizers import l2

# Define hyperparameters
initial_learning_rate = 0.00011
batch_size = 64

# Define dynamic learning rate callback
reduce_lr =tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=2,
    verbose=1,
    mode='auto'
)
# Modified model architecture
model = Sequential()

model.add(Conv1D(32, kernel_size=3, activation='relu', input_shape=(X_train_cnn.shape[1], X_train_cnn.shape[2])))
model.add(MaxPooling1D(pool_size=2))
model.add(BatchNormalization())
model.add(Dropout(0.5))
model.add(Conv1D(64, kernel_size=3, activation='relu'))
model.add(MaxPooling1D(pool_size=2))
model.add(BatchNormalization())
model.add(Dropout(0.5))
model.add(Flatten())
model.add(Dense(64, activation='relu', kernel_regularizer=l2(0.01)))  # Reduced the number of units
model.add(Dropout(0.7))  # Increased dropout rate
model.add(Dense(1, activation='sigmoid'))

# Compile the model
model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=initial_learning_rate), metrics=['accuracy'])




In [None]:
# Display the model summary
model.summary()

In [None]:
table=pd.DataFrame(columns=["Name","Type","Shape"])
for layer in model.layers:
    table = table.append({"Name":layer.name, "Type": layer.__class__.__name__,"Shape":layer.output_shape}, ignore_index=True)

In [None]:
table

In [None]:
import pandas as pd

# Assuming you have a DataFrame named 'table'
table = pd.DataFrame(columns=["Layer", "Type", "Output Shape"])

for idx, layer in enumerate(model.layers):
    layer_name = f"{layer.__class__.__name__}_{idx + 1}"
    output_shape = layer.output_shape
    table = table.append({"Layer": layer_name, "Type": layer.__class__.__name__, "Output Shape": output_shape}, ignore_index=True)

# Add a row for the total params information
total_params_row = {"Layer": "Total params", "Type": "", "Output Shape": "2046657 (7.81 MB)"}
table = table.append(total_params_row, ignore_index=True)

# Add a row for trainable params information
trainable_params_row = {"Layer": "Trainable params", "Type": "", "Output Shape": "2046465 (7.81 MB)"}
table = table.append(trainable_params_row, ignore_index=True)

# Add a row for non-trainable params information
non_trainable_params_row = {"Layer": "Non-trainable params", "Type": "", "Output Shape": "192 (768.00 Byte)"}
table = table.append(non_trainable_params_row, ignore_index=True)

# Convert the DataFrame to LaTeX table
latex_table = table.to_latex(index=False, escape=False)

# Print or save the LaTeX code
print(latex_table)


In [None]:
# Fit the model with dynamic learning rate
history = model.fit(X_train_cnn, y_train_encoded,
                    epochs=100,
                    batch_size=batch_size,
                    validation_data=(X_test_cnn, y_test_encoded),
                    callbacks=[reduce_lr])

In [None]:
model.save('/content/drive/MyDrive/fetaldistress.h5')

In [None]:
from keras.utils import plot_model
# Specify the filename for the generated image
plot_model(model, to_file='model_architecture.png', show_shapes=True, show_layer_names=True)

# Display the generated image
from PIL import Image
image = Image.open('model_architecture.png')
image.show()

In [None]:
# Extract training history
training_loss = history.history['loss']
training_acc = history.history['accuracy']
val_loss = history.history['val_loss']
val_acc = history.history['val_accuracy']

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Extract learning rates from the history object
learning_rates = history.history['lr']

# Create a dataframe for Seaborn
import pandas as pd
df_lr = pd.DataFrame({
    'Epoch': range(1, len(learning_rates) + 1),
    'Learning Rate': learning_rates
})

# Plot learning rate changes
plt.figure(figsize=(10, 6))
sns.lineplot(data=df_lr, x='Epoch', y='Learning Rate', marker='o')
plt.title('Learning Rate over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Learning Rate')

plt.show()


In [None]:
# Plot training history for accuracy
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.legend()
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')

# Plot training history for loss
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')

plt.show()

In [None]:
# Evaluate the model
y_pred_prob = model.predict(X_test_cnn)
y_pred = (y_pred_prob > 0.5).astype(int)

# Assuming 'le' is your LabelEncoder
y_test_decoded = le.inverse_transform(y_test_encoded)
y_pred_decoded = le.inverse_transform(y_pred.reshape(-1))

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Confusion Matrix
conf_mat = confusion_matrix(y_test_decoded, y_pred_decoded)

# Plot Confusion Matrix
disp = ConfusionMatrixDisplay(confusion_matrix=conf_mat, display_labels=le.classes_)
disp.plot(cmap='Blues', values_format='d')
plt.title('Confusion Matrix')
plt.show()

In [None]:
from sklearn.metrics import classification_report

# Generate classification report
classification_rep = classification_report(y_test_decoded, y_pred_decoded)
print("Classification Report:\n", classification_rep)