In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import cv2
from PIL import Image

from tensorflow import keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Flatten, Conv2D, BatchNormalization, ReLU, Add, Dense
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint

In [None]:
import os
import cv2
from PIL import Image

# Function to check for corrupted images
def check_image(file_path):
    try:
        # Check if the image can be opened with PIL
        with Image.open(file_path) as img:
            img.verify()  # Verify that image is not corrupted

        # Check if the image can be opened with cv2
        img_cv2 = cv2.imread(file_path)
        if img_cv2 is None:
            raise IOError("Cannot open image with cv2")
        return True

    except (IOError, SyntaxError, cv2.error) as e:
        print(f'Removing corrupted image {file_path}: {e}')
        return False

# Function to remove corrupted images
def remove_corrupted_images(directory):
    for root, _, files in os.walk(directory):
        for file in files:
            file_path = os.path.join(root, file)
            if not check_image(file_path):
                os.remove(file_path)
                print(f'Removed corrupted image: {file_path}')

# Paths to your directories
train_dir = r"D:\Development\Dataset\Bone_Fracture\train"
test_dir = r"D:\Development\Dataset\Bone_Fracture\test"
val_dir = r"D:\Development\Dataset\Bone_Fracture\val"

# Clean the directories
remove_corrupted_images(train_dir)
remove_corrupted_images(test_dir)
remove_corrupted_images(val_dir)


In [None]:
# Custom generator to handle image loading errors
class SafeDataGenerator(keras.utils.Sequence):
    def __init__(self, generator):
        self.generator = generator

    def __len__(self):
        return len(self.generator)

    def __getitem__(self, index):
        batch_x, batch_y = self.generator[index]
        safe_batch_x = []
        safe_batch_y = []

        for img, label in zip(batch_x, batch_y):
            try:
                # Check if the image can be opened with PIL
                img_pil = Image.fromarray((img * 255).astype('uint8'))
                img_pil.verify()
                safe_batch_x.append(img)
                safe_batch_y.append(label)
            except Exception as e:
                print(f"Skipping image due to error: {e}")

        if len(safe_batch_x) == 0:
            raise ValueError("All images in the batch are invalid.")

        return np.array(safe_batch_x), np.array(safe_batch_y)


In [None]:
datagen = ImageDataGenerator(rescale=1./255, validation_split=0.2)


In [None]:
train_generator = datagen.flow_from_directory(
    train_dir,
    target_size=(128,128),
    batch_size=20,
    class_mode='binary'
)

val_generator = datagen.flow_from_directory(
    train_dir,
    target_size=(128,128),
    batch_size=20
)

test_generator = datagen.flow_from_directory(
    test_dir,
    target_size=(128,128),
    batch_size=20,
    class_mode='binary',
)


In [None]:
# Wrap original generators with SafeDataGenerator
train_generator_safe = SafeDataGenerator(train_generator)
val_generator_safe = SafeDataGenerator(val_generator)


In [None]:
def resnet(x, filters, kernel_size=3, stride=1, use_shortcut=True):
    shortcut = x
    # First convolution layer
    x = Conv2D(filters, kernel_size=kernel_size, strides=stride, padding='same')(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)

    if use_shortcut:
        shortcut = Conv2D(filters, kernel_size=1, padding='same')(shortcut)
        shortcut = BatchNormalization()(shortcut)

    x = Add()([x, shortcut])
    x = ReLU()(x)

    return x


In [None]:
inputs = Input(shape=(128,128,3))

# Starting the model creation
x = Conv2D(32, (3,3))(inputs)
x = BatchNormalization()(x)
x = ReLU()(x)

# Creating ResNet
x = resnet(x, 16)

x = Flatten()(x)

x = Dense(32, activation='relu')(x)

outputs = Dense(1, activation='sigmoid')(x)


In [None]:
model = Model(inputs, outputs)

model.summary()


In [None]:
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])


In [None]:
checkpoint_callback = ModelCheckpoint(
    r'D:\PROJECT\Deep_Learning_Projects\Fracture_Segmentation\model_checkpoint.keras',
    monitor='val_loss',
    save_best_only=True,
    mode='min',
    verbose=1
)

In [None]:
history = model.fit(
    train_generator_safe,
    steps_per_epoch=len(train_generator_safe),
    validation_data=val_generator_safe,
    validation_steps=len(val_generator_safe),
    epochs=20,
    callbacks=[checkpoint_callback]
)