In [1]:
import os
import cv2
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split


In [2]:
def load_dataset(data_dir):
    images = []
    masks = []

    for image_file in os.listdir(os.path.join(data_dir, 'images')):
        if image_file.startswith("noncrack"):
            continue  # Skip images with no crack pixels

        image_path = os.path.join(data_dir, 'images', image_file)
        mask_path = os.path.join(data_dir, 'masks', image_file)
        
        img = cv2.imread(image_path)
        img = cv2.resize(img, (224, 224))
        cv2.imwrite(image_path, img)
        
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, (224, 224))
        cv2.imwrite(mask_path, mask)
        
        

        # image = cv2.imread(image_path)
        # mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        # image = cv2.resize(image, (224, 224))  # Updated size
        # mask = cv2.resize(mask, (224, 224))

        # image = image / 255.0  # Normalize pixel values to be between 0 and 1
        # mask = mask / 255.0  # Assuming masks are binary (0 or 255)

        images.append(img)
        masks.append(mask)

    return np.array(images), np.array(masks)

In [7]:
def load_dataset_test(data_dir):
    images = []
    masks = []
    count = 0
    num = countImages("./crack_segmentation_dataset/train/images")
    
    for image_file in os.listdir(os.path.join(data_dir, 'images')):
        if(count == num):
            break
            
        if image_file.startswith("noncrack"):
            continue  # Skip images with no crack pixels

        image_path = os.path.join(data_dir, 'images', image_file)
        mask_path = os.path.join(data_dir, 'masks', image_file)
        
        img = cv2.imread(image_path)
        img = cv2.resize(img, (224, 224))
        cv2.imwrite(image_path, img)
        
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, (224, 224))
        cv2.imwrite(mask_path, mask)
        
        

        # image = cv2.imread(image_path)
        # mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        # image = cv2.resize(image, (224, 224))  # Updated size
        # mask = cv2.resize(mask, (224, 224))

        # image = image / 255.0  # Normalize pixel values to be between 0 and 1
        # mask = mask / 255.0  # Assuming masks are binary (0 or 255)

        images.append(img)
        masks.append(mask)
        
        count += 1

    return np.array(images), np.array(masks)

def countImages(data_dir):
    count = 0
    for filename in os.listdir(data_dir):
        count += 1
    return count
    

In [3]:
def data_generator(data_dir, batch_size=32):
    batch_images = []
    batch_masks = []

    for image_file in os.listdir(os.path.join(data_dir, 'images')):
        # Skip images with no crack pixels
        if image_file.startswith("noncrack"):
            continue 

        # Only include images starting with "CFD" or "CRACK500"
        if not image_file.startswith(("CFD", "CRACK500")):
            continue

        image_path = os.path.join(data_dir, 'images', image_file)
        mask_path = os.path.join(data_dir, 'masks', image_file)

        image = cv2.imread(image_path)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        image = image / 255.0
        mask = mask / 255.0

        batch_images.append(image)
        batch_masks.append(mask)

        # Check if the batch is complete
        if len(batch_images) == batch_size:
            yield np.array(batch_images), np.array(batch_masks)
            batch_images = []
            batch_masks = []

In [5]:
def countImages(data_dir):
    count = 0
    for filename in os.listdir(data_dir):
        count += 1
    return count
    

In [6]:
num = countImages("./crack_segmentation_dataset/train/images")

In [3]:
data_dir = "./crack_segmentation_dataset/train"
#data_generator = data_generator(data_dir)

In [4]:
X, Y = load_dataset(data_dir)

In [8]:
X_test, Y_test = load_dataset_test("./crack_segmentation_dataset/test")

In [None]:
#X, X_test, Y, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)


In [16]:
def unet_model():
    inputs = tf.keras.Input(shape=(224, 224, 3))

    # Encoder
    conv1 = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    conv1 = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(conv1)
    pool1 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(pool1)
    conv2 = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(conv2)
    pool2 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(conv2)

    # Middle
    conv3 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same')(pool2)
    conv3 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same')(conv3)

    # Decoder
    up4 = tf.keras.layers.UpSampling2D((2, 2))(conv3)
    concat4 = tf.keras.layers.Concatenate(axis=-1)([conv2, up4])
    conv4 = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(concat4)
    conv4 = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(conv4)

    up5 = tf.keras.layers.UpSampling2D((2, 2))(conv4)
    concat5 = tf.keras.layers.Concatenate(axis=-1)([conv1, up5])
    conv5 = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(concat5)
    conv5 = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(conv5)

    outputs = tf.keras.layers.Conv2D(1, (1, 1), activation='sigmoid')(conv5)

    return tf.keras.Model(inputs=[inputs], outputs=[outputs])

In [17]:
model = unet_model()

In [18]:
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [12]:
batch_size = 32
total_batches = len(os.listdir(os.path.join(data_dir, 'images'))) // batch_size
num_epochs = 1

In [20]:
print(X.shape, Y.shape, X_test.shape, Y_test.shape)

(8404, 224, 224, 3) (8404, 224, 224) (440, 224, 224, 3) (440, 224, 224)


In [19]:
model.fit(X, Y, epochs=num_epochs, validation_data=(X_test, Y_test), batch_size=32)

Epoch 1/10

KeyboardInterrupt: 

In [8]:
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")

    # Use model.fit with the generator
    model.fit(data_generator, epochs=1, steps_per_epoch=total_batches)
    


Epoch 1/1


: 

In [None]:
test_loss, test_acc = model.evaluate(X_test, Y_test)
print(f"Test Accuracy: {test_acc}")

In [None]:
predictions = model.predict(X_test)
print(predictions)