In [4]:
import os
import numpy as np
import tensorflow as tf
from PIL import Image
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, BatchNormalization, concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.model_selection import KFold
import json

# Bild- und Maskenpfade laden
image_dir = 'C:/Users/chris/Documents/Master/Sphaeroidauswertung/Trainingsset/Testbilder2/'
mask_dir = 'C:/Users/chris/Documents/Master/Sphaeroidauswertung/Trainingsset/Testmasken2/'

image_paths = sorted([os.path.join(image_dir, fname) for fname in os.listdir(image_dir)])
mask_paths = sorted([os.path.join(mask_dir, fname) for fname in os.listdir(mask_dir)])

# Funktion zum Laden und Vorverarbeiten der Bilder
def load_and_preprocess_image_pillow(img_path):
    img = Image.open(img_path).resize((256, 192))
    img = np.array(img) / 255.0
    img = np.expand_dims(img, axis=-1)
    return img

def load_and_preprocess_mask_pillow(mask_path):
    mask = Image.open(mask_path).resize((256, 192))
    mask = np.where(np.array(mask) > 128, 1, 0)
    mask = np.expand_dims(mask, axis=-1)
    return mask

X = np.array([load_and_preprocess_image_pillow(path) for path in image_paths])
Y = np.array([load_and_preprocess_mask_pillow(path) for path in mask_paths])

# Dice Metric
def dice_metric(y_true, y_pred, smooth=1e-6):
    y_true_f = tf.cast(tf.reshape(y_true, [-1]), tf.float32)
    y_pred_f = tf.cast(tf.reshape(y_pred, [-1]), tf.float32)
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    union = tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f)
    return (2. * intersection + smooth) / (union + smooth)

# Benutzerdefinierte Verlustfunktion
def combined_loss(y_true, y_pred, smooth=1e-6, binary_weight=0.5, dice_weight=0.5):
    bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    y_true_f = tf.cast(tf.reshape(y_true, [-1]), tf.float32)
    y_pred_f = tf.cast(tf.reshape(y_pred, [-1]), tf.float32)
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    union = tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f)
    dice_loss = 1 - (2. * intersection + smooth) / (union + smooth)
    return binary_weight * bce + dice_weight * dice_loss

# U^2-Net spezifische Blöcke
def u2net_recurrent_res_block(x, filters):
    conv1 = Conv2D(filters, 3, activation='relu', padding='same')(x)
    conv1 = BatchNormalization()(conv1)
    conv2 = Conv2D(filters, 3, activation='relu', padding='same')(conv1)
    conv2 = BatchNormalization()(conv2)
    return concatenate([conv1, conv2])

def u2net_stage(x, filters):
    x1 = u2net_recurrent_res_block(x, filters)
    pool = MaxPooling2D(pool_size=(2, 2))(x1)
    x2 = u2net_recurrent_res_block(pool, filters * 2)
    up = UpSampling2D(size=(2, 2))(x2)
    return concatenate([x1, up])

def u2net_model(input_size=(192, 256, 1)):
    inputs = Input(input_size)
    
    # Downsampling path
    stage1 = u2net_recurrent_res_block(inputs, 64)
    pool1 = MaxPooling2D(pool_size=(2, 2))(stage1)
    
    stage2 = u2net_recurrent_res_block(pool1, 128)
    pool2 = MaxPooling2D(pool_size=(2, 2))(stage2)
    
    stage3 = u2net_recurrent_res_block(pool2, 256)
    pool3 = MaxPooling2D(pool_size=(2, 2))(stage3)
    
    # Bottleneck
    bottleneck = u2net_recurrent_res_block(pool3, 512)
    
    # Upsampling path
    up3 = UpSampling2D(size=(2, 2))(bottleneck)
    up3 = Conv2D(256, 3, padding='same', activation='relu')(up3)  # Filter anpassen
    up3 = concatenate([up3, stage3], axis=-1)
    up3 = u2net_recurrent_res_block(up3, 256)
    
    up2 = UpSampling2D(size=(2, 2))(up3)
    up2 = Conv2D(128, 3, padding='same', activation='relu')(up2)  # Filter anpassen
    up2 = concatenate([up2, stage2], axis=-1)
    up2 = u2net_recurrent_res_block(up2, 128)
    
    up1 = UpSampling2D(size=(2, 2))(up2)
    up1 = Conv2D(64, 3, padding='same', activation='relu')(up1)  # Filter anpassen
    up1 = concatenate([up1, stage1], axis=-1)
    up1 = u2net_recurrent_res_block(up1, 64)
    
    # Output layer
    output = Conv2D(1, 1, activation='sigmoid')(up1)
    
    model = Model(inputs=inputs, outputs=output)
    return model


# K-Fold Validierung
kf = KFold(n_splits=5, shuffle=True, random_state=42)
best_val_loss = float('inf')
best_model_path = 'C:/Users/chris/Documents/Master/Sphaeroidauswertung/Modelle/spheroid_segmentation_u2net.h5'
epochs = 1000
batch_size = 8

for fold, (train_index, val_index) in enumerate(kf.split(X)):
    print(f"\nFold {fold + 1}/{kf.n_splits}")
    X_train, X_val = X[train_index], X[val_index]
    Y_train, Y_val = Y[train_index], Y[val_index]

    model = u2net_model()
    model.compile(optimizer=Adam(learning_rate=1e-4), loss=combined_loss, metrics=[dice_metric])

    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)
    early_stopping = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)

    hist = model.fit(X_train, Y_train, batch_size=batch_size, epochs=epochs, validation_data=(X_val, Y_val), callbacks=[early_stopping, reduce_lr], verbose=1)

    if hist.history['val_loss'][-1] < best_val_loss:
        best_val_loss = hist.history['val_loss'][-1]
        model.save(best_model_path)

history_file = 'C:/Users/chris/Documents/Master/Sphaeroidauswertung/Modelle/training_history_u2net.json'
with open(history_file, 'w') as f:
    json.dump(hist.history, f)

print(f"Bestes Modell wurde unter {best_model_path} gespeichert.")
print(f"Training Historie wurde unter {history_file} gespeichert.")


KeyboardInterrupt: 