In [46]:
import os
import pandas as pd
import numpy as np
import cv2
import matplotlib.pyplot as plt
import random
import h5py
from collections import defaultdict

from timeit import default_timer as timer

from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, classification_report
from sklearn.preprocessing import LabelEncoder 

import torch
import torchvision.transforms as transforms
from skimage.util import random_noise

from keras.preprocessing.image import load_img, img_to_array
from keras.utils import to_categorical
from keras.models import Model
from keras.models import Sequential
from tensorflow.keras import layers, models
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, concatenate, Dropout, Reshape
from keras.optimizers import Adam
from keras.losses import mean_squared_error, categorical_crossentropy, binary_crossentropy
from keras.callbacks import EarlyStopping
from keras.metrics import MeanIoU, IoU
import keras.backend as K

import tensorflow as tf
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.callbacks import ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras import regularizers
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Conv2DTranspose
from tensorflow.keras.metrics import Recall, Precision, MeanAbsoluteError, F1Score
from tensorflow.keras.utils import plot_model
from tensorflow.keras.losses import BinaryFocalCrossentropy, CategoricalFocalCrossentropy


In [7]:
current_dir = os.getcwd()
parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir, os.pardir))
working_path = os.path.join(parent_dir) #incase directory needs to be expanded
downloads_path = os.path.join(os.path.expanduser("~"), "Downloads")
extract_dir = os.path.join(os.path.expanduser('~'), '/Nextcloud/DataScientest/project/pcb/')
#extract_dir = os.path.expanduser("~\\Data_science")

In [8]:
# Set up folder paths
# Define the source paths for images and annotations
image_pool_path = os.path.join(extract_dir, 'VOC_PCB', 'JPEGImages')
annot_pool_path = os.path.join(extract_dir, 'VOC_PCB', 'Annotations')

# Define the destination path for images and annotations
image_dest_path = os.path.join(working_path, 'data', 'Images')
annot_dest_path = os.path.join(working_path, 'data', 'Annotations')

# Define the destination path for bboxes and masks
bb_path = os.path.join(working_path, 'data', 'Images_bb')
mask_path = os.path.join(working_path, 'data', 'Pixel_masks')

# Define the destination path for csv file
csv_path = os.path.join(working_path, 'data', 'csv')

In [4]:
image_dataset_path = image_dest_path

In [9]:
# train and test dataset file paths
train_path =  os.path.join(downloads_path, "train_data.h5")
test_path =  os.path.join(downloads_path, "test_data.h5")

Loading Training and Testing datasets

In [11]:
# load train dataset
with h5py.File(train_path, 'r') as hf:
    X_train = hf['X'][:]
    y_train_mask = hf['y_mask'][:]
    y_train_class_cat = hf['y_class_cat'][:]

print(f"X_train shape: {X_train.shape}")
print(f"y_train_mask shape: {y_train_mask.shape}")
print(f"y_train_class_cat shape: {y_train_class_cat.shape}")

X_train shape: (11564, 100, 100)
y_train_mask shape: (11564, 100, 100)
y_train_class_cat shape: (11564, 7)


In [14]:
# load test dataset
with h5py.File(test_path, 'r') as hf:
    X_test = hf['X'][:]
    y_test_mask = hf['y_mask'][:]
    y_test_class_cat = hf['y_class_cat'][:]

print(f"X_test shape: {X_test.shape}")
print(f"y_test_mask shape: {y_test_mask.shape}")
print(f"y_test_class_cat shape: {y_test_class_cat.shape}")

X_test shape: (413, 100, 100)
y_test_mask shape: (413, 100, 100)
y_test_class_cat shape: (413, 7)


Initializing functions for deep learning model

In [915]:
################################
#           Focal loss         #
################################
@tf.function
def focal_loss(gamma=2., alpha=.25):
	def focal_loss_fixed(y_true, y_pred):
		pt_1 = tf.where(tf.equal(y_true, 1), y_pred, tf.ones_like(y_pred))
		pt_0 = tf.where(tf.equal(y_true, 0), y_pred, tf.zeros_like(y_pred))
		return -tf.reduce_mean(alpha * tf.math.pow(1. - pt_1, gamma) * tf.math.log(pt_1+K.epsilon())) - tf.reduce_mean((1 - alpha) * tf.math.pow(pt_0, gamma) * tf.math.log(1. - pt_0 + K.epsilon()))
	return focal_loss_fixed


In [916]:
# Register the function 
tf.keras.utils.get_custom_objects()['focal_loss_fixed'] = focal_loss_fixed

In [897]:
################################
#             Dice             #
################################
# Define the dice function

def DiceLoss(y_true, y_pred, smooth=1e-6):
    
    # if you are using this loss for multi-class segmentation then uncomment 
    # following lines
    # if y_pred.shape[-1] <= 1:
    #     # activate logits
    #     y_pred = tf.keras.activations.sigmoid(y_pred)
    # elif y_pred.shape[-1] >= 2:
    #     # activate logits
    #     y_pred = tf.keras.activations.softmax(y_pred, axis=-1)
    #     # convert the tensor to one-hot for multi-class segmentation
    #     y_true = K.squeeze(y_true, 3)
    #     y_true = tf.cast(y_true, "int32")
    #     y_true = tf.one_hot(y_true, num_class, axis=-1)
    
    # cast to float32 datatype
    y_true = tf.cast(y_true, 'float32')
    y_pred = tf.cast(y_pred, 'float32')
    
    # Flatten label and prediction tensors
    y_true = tf.keras.backend.flatten(y_true)
    y_pred = tf.keras.backend.flatten(y_pred)
    
    intersection = tf.reduce_sum(y_true * y_pred)
    dice = (2 * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)
    return 1 - dice

In [917]:
# Register the function 
tf.keras.utils.get_custom_objects()['DiceLoss'] = DiceLoss

In [15]:
class TimingCallback(Callback):
    def __init__(self, max_duration_seconds, logs={}):
        self.logs=[]
        self.max_duration_seconds = max_duration_seconds
            
    def on_train_begin(self, logs={}):
        self.start_time = timer()
    def on_epoch_begin(self, epoch, logs={}):
        self.starttime = timer()
    def on_epoch_end(self, epoch, logs={}):
        self.logs.append(timer()-self.starttime)
        total_duration = timer() - self.start_time
        if total_duration > self.max_duration_seconds:
            print(f"Stopping training after {total_duration / 3600} hours")
            self.model.stop_training = True


In [28]:
early_stopping = EarlyStopping(
                                patience=10, 
                                min_delta = 0.0001, 
                                verbose=1,
                                mode = 'min',
                                monitor='val_loss')

reduce_learning_rate = ReduceLROnPlateau(
                                        monitor="val_loss",
                                        patience=5, 
                                        min_delta= 0.0001,
                                        factor=0.1,  
                                        cooldown = 6, 
                                        verbose=1)

checkpoint = ModelCheckpoint(
                            'best_model_params_res_unet.keras', 
                            monitor='val_loss', 
                            verbose=1, 
                            #save_best_only=True, 
                            mode='min')


time_callback = TimingCallback(36000) # 36000 = 10 hrs, 18000 = 5 hrs, 7200 = 2 hrs

In [17]:
def elegant_res_unet_segmentation_classification(input_shape, num_classes): #Res-unet combo
   
    inputs = layers.Input(shape=input_shape)

    # RES part of model: residual skip connections
    def conv_block(input_tensor, num_filters):
        x = layers.Conv2D(num_filters, 3, activation='relu', padding='same')(input_tensor)
        x = layers.BatchNormalization()(x)
        x = layers.Conv2D(num_filters, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        
        input_tensor_transformed = layers.Conv2D(num_filters, 1, activation=None, padding='same')(input_tensor)
        input_tensor_transformed = layers.BatchNormalization()(input_tensor_transformed)
        
        return layers.add([x, input_tensor_transformed])

    # Encoder contraction layers with residual skip connections
    conv1 = conv_block(inputs, 64)
    pool1 = layers.MaxPooling2D(pool_size=(2, 2))(conv1)
    conv2 = conv_block(pool1, 128)
    pool2 = layers.MaxPooling2D(pool_size=(2, 2))(conv2)

    # Bottleneck
    conv3 = layers.DepthwiseConv2D(3, activation='relu', padding='same')(pool2)
    conv3 = layers.BatchNormalization()(conv3)
    conv3 = layers.Conv2D(256, 1, activation='relu', padding='same')(conv3)
    conv3 = layers.BatchNormalization()(conv3)
    conv3 = layers.Conv2D(256, 3, activation='relu', dilation_rate=2, padding='same')(conv3)
    conv3 = layers.BatchNormalization()(conv3)

    # Decoder expansion layers with residual skip connections
    up1 = layers.Conv2DTranspose(128, 2, strides=(2, 2), padding='same')(conv3)
    up1 = layers.concatenate([up1, conv2], axis=3)
    conv4 = conv_block(up1, 128)

    up2 = layers.Conv2DTranspose(64, 2, strides=(2, 2), padding='same')(conv4)
    up2 = layers.concatenate([up2, conv1], axis=3)
    conv5 = conv_block(up2, 64)

    # Segmentation Output
    segmentation_output = layers.Conv2D(1, 1, activation='sigmoid', name='segmentation_output')(conv5)

    # Classification Head

    flatten = layers.GlobalAveragePooling2D()(conv3)
    dense1 = layers.Dense(256, activation='relu')(flatten)
    dropout1 = layers.Dropout(0.5)(dense1)
    dense2 = layers.Dense(128, activation='relu')(dropout1)
    dropout2 = layers.Dropout(0.5)(dense2)
    classification_output = layers.Dense(num_classes, activation='softmax', name='classification_output')(dropout2)

    # Create Model
    model = models.Model(inputs=inputs, 
                         outputs=[segmentation_output, 
                                  classification_output
                                  ])
    return model

In [19]:
# extracting numer of classes information from dataset 
num_classes = y_train_class_cat.shape[1]
print(num_classes)

7


In [20]:
# extracting input_shape information from dataset 
input_shape = np.expand_dims(X_train, axis=-1)
input_shape = input_shape.shape[1:]
print(input_shape)

(100, 100, 1)


In [21]:
# Instantiate the model
model = elegant_res_unet_segmentation_classification(input_shape=input_shape, num_classes=num_classes)

In [22]:
# Compile the model

#optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)

model.compile(
    optimizer='adam', #(learning_rate=0.0001)#optimizer,
    loss={
        'segmentation_output': 'binary_crossentropy', #focal_loss(alpha=.25, gamma=2),  # for the mask output with sigmoid 'binary_crossentropy', #
        'classification_output': 'categorical_crossentropy' #"categorical_focal_crossentropy"  # for the classification output with softmax
    },
    loss_weights={
        'segmentation_output': .8,  # initially Weighted at 80%, 20%, 100%
        'classification_output': .2  # initially Weighted at 20%, 80%. 0%
    },
    metrics={
        'segmentation_output': ['accuracy', MeanAbsoluteError(), MeanIoU(num_classes=num_classes)],  # Metrics for segmentation IoU(num_classes=num_classes, target_class_ids=[0,1,2,3,4,5,6])
        'classification_output': ['accuracy', Recall(), Precision()]  # Metrics for classification
    }
)

In [23]:
model.summary()

In [26]:
plot_model(model, to_file='elegant_res_unet_segmentation_classification.png', show_shapes=True, show_layer_names=True);

You must install pydot (`pip install pydot`) for `plot_model` to work.


In [27]:
print("X_train shape:", X_train.shape)
print("y_train_mask shape:", y_train_mask.shape)
print("y_train_class_cat shape:", y_train_class_cat.shape)
print("X_test shape:", X_test.shape)
print("y_test_mask shape:", y_test_mask.shape)
print("y_test_class_cat shape:", y_test_class_cat.shape)

print("Expected input shape for the model:", model.input_shape)
print("Model output shape:", model.output_shape)

print("Data type of y_train_class_cat:", y_train_class_cat.dtype)

X_train shape: (11564, 100, 100)
y_train_mask shape: (11564, 100, 100)
y_train_class_cat shape: (11564, 7)
X_test shape: (413, 100, 100)
y_test_mask shape: (413, 100, 100)
y_test_class_cat shape: (413, 7)
Expected input shape for the model: (None, 100, 100, 1)
Model output shape: [(None, 100, 100, 1), (None, 7)]
Data type of y_train_class_cat: float64


In [30]:
total_epochs = 16

In [31]:
# Train the model
history = model.fit(x=X_train, 
                    y=[y_train_mask, y_train_class_cat],
                    epochs=total_epochs,
                    batch_size=32,
                    validation_data=(X_test, [y_test_mask, y_test_class_cat]),
                    callbacks=[ reduce_learning_rate, 
                                #early_stopping, 
                                checkpoint,
                                time_callback
                                ],
                    verbose=True)

2024-06-17 19:35:05.453696: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 462560000 exceeds 10% of free system memory.


Epoch 1/16


2024-06-17 19:35:22.557619: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 163840000 exceeds 10% of free system memory.
2024-06-17 19:35:23.896282: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 163840000 exceeds 10% of free system memory.
2024-06-17 19:35:24.898220: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 163840000 exceeds 10% of free system memory.


[1m  1/362[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m2:23:59[0m 24s/step - classification_output_accuracy: 0.0938 - classification_output_precision: 0.0000e+00 - classification_output_recall: 0.0000e+00 - loss: 1.1188 - segmentation_output_accuracy: 0.4894 - segmentation_output_mean_absolute_error: 0.5021 - segmentation_output_mean_io_u: 0.4602

2024-06-17 19:35:31.503085: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 163840000 exceeds 10% of free system memory.


[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8s/step - classification_output_accuracy: 0.1922 - classification_output_precision: 0.3626 - classification_output_recall: 0.0176 - loss: 0.7155 - segmentation_output_accuracy: 0.8688 - segmentation_output_mean_absolute_error: 0.2807 - segmentation_output_mean_io_u: 0.4652
Epoch 1: saving model to best_model_params_res_unet.keras
[1m362/362[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2959s[0m 8s/step - classification_output_accuracy: 0.1923 - classification_output_precision: 0.3630 - classification_output_recall: 0.0176 - loss: 0.7152 - segmentation_output_accuracy: 0.8690 - segmentation_output_mean_absolute_error: 0.2804 - segmentation_output_mean_io_u: 0.4652 - val_classification_output_accuracy: 0.1574 - val_classification_output_precision: 0.7391 - val_classification_output_recall: 0.0412 - val_loss: 1.7646 - val_segmentation_output_accuracy: 0.3389 - val_segmentation_output_mean_absolute_error: 0.6307 - val_seg

In [32]:
history_df = pd.DataFrame(history.history)
history_df.to_csv('model_enhanced_res_unet-v240618.csv', index=False)
model.save('model_enhanced_res_unet-v240618.keras')

In [934]:
# Load the model from a file if needed for further training
from tensorflow.keras.models import load_model
model_path = 'model_enhanced_res_unet_2.keras'
'''
custom_objects = {
    #'focal_loss_fixed': focal_loss_fixed,
    #'DiceLoss': DiceLoss,
    'BinaryFocalCrossentropy': BinaryFocalCrossentropy,
    'CategoricalFocalCrossentropy': CategoricalFocalCrossentropy
}
'''
#model = load_model(model_path)#, custom_objects=custom_objects)

model = load_model(model_path, custom_objects={
    'MeanAbsoluteError': MeanAbsoluteError,
    'MeanIoU': MeanIoU,
    'Recall': Recall,
    'Precision': Precision
})