In [11]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Conv2DTranspose, BatchNormalization, Dropout, Lambda

def multi_unet_model(n_classes=4, IMG_HEIGHT=256, IMG_WIDTH=256, IMG_CHANNELS=1):
    inputs = Input((IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS))
    
    # Define encoder layers
    c1 = conv_block(inputs, 16)
    p1 = MaxPooling2D((2, 2))(c1)
    
    c2 = conv_block(p1, 32)
    p2 = MaxPooling2D((2, 2))(c2)
    
    c3 = conv_block(p2, 64)
    p3 = MaxPooling2D((2, 2))(c3)
    
    c4 = conv_block(p3, 128)
    p4 = MaxPooling2D((2, 2))(c4)
    
    c5 = conv_block(p4, 256)
    
    # Define decoder layers
    u6 = upconv_block(c5, c4, 128)
    u7 = upconv_block(u6, c3, 64)
    u8 = upconv_block(u7, c2, 32)
    u9 = upconv_block(u8, c1, 16)
    
    outputs = Conv2D(n_classes, (1, 1), activation='softmax')(u9)
    
    model = Model(inputs=[inputs], outputs=[outputs])
    return model

def conv_block(inputs, filters):
    conv1 = Conv2D(filters, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(inputs)
    conv1 = Dropout(0.1)(conv1)
    conv2 = Conv2D(filters, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv1)
    return conv2

def upconv_block(inputs, concat_layer, filters):
    upconv = Conv2DTranspose(filters, (2, 2), strides=(2, 2), padding='same')(inputs)
    upconv = concatenate([upconv, concat_layer])
    conv1 = Conv2D(filters, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(upconv)
    conv1 = Dropout(0.1)(conv1)
    conv2 = Conv2D(filters, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv1)
    return conv2



In [16]:
import os
import cv2
import numpy as np
from tensorflow.keras.utils import normalize
from sklearn.utils import class_weight
from sklearn.preprocessing import LabelEncoder
import numpy as np
from sklearn.model_selection import train_test_split
from keras.utils import to_categorical
from sklearn.utils.class_weight import compute_class_weight
from sklearn.utils import class_weight
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

SIZE_X = 352 
SIZE_Y = 352
n_classes = 12  # Number of classes for segmentation
train_images = []

directory_path = r"C:\UNET_project\camvid\train"

# Using list comprehension for a more compact code
train_images = [cv2.resize(cv2.imread(os.path.join(directory_path, filename), 0), (SIZE_Y, SIZE_X))
                for filename in os.listdir(directory_path)
                if filename.endswith('.jpg') or filename.endswith('.png')]

train_images = np.array(train_images)


directory_path_masks = r"C:\UNET_project\camvid\trainannot"

# Using list comprehension for a more compact code
train_masks = [cv2.resize(cv2.imread(os.path.join(directory_path_masks, filename), 0), (SIZE_Y, SIZE_X), interpolation=cv2.INTER_NEAREST)
               for filename in os.listdir(directory_path_masks)
               if filename.endswith('.jpg') or filename.endswith('.png')]

# Convert list to array for machine learning processing
train_masks = np.array(train_masks)




# Define a function for label encoding
def label_encode_masks(train_masks):
    labelencoder = LabelEncoder()
    n, h, w = train_masks.shape
    train_masks_reshaped = train_masks.reshape(-1, 1)
    train_masks_reshaped_encoded = labelencoder.fit_transform(train_masks_reshaped)
    train_masks_encoded_original_shape = train_masks_reshaped_encoded.reshape(n, h, w)
    return train_masks_encoded_original_shape , train_masks_reshaped_encoded

# Encode the masks
train_masks_encoded_original_shape , train_masks_reshaped_encoded = label_encode_masks(train_masks)

# Print unique values and the array
print("Unique values:", np.unique(train_masks_encoded_original_shape))



def preprocess_data(train_images, train_masks):
    # Preprocess images
    train_images_processed = np.expand_dims(train_images, axis=3)
    train_images_processed = normalize(train_images_processed, axis=1)
    
    # Preprocess masks
    train_masks_processed = np.expand_dims(train_masks, axis=3)
    
    return train_images_processed, train_masks_processed

# Preprocess data
train_images, train_masks_input = preprocess_data(train_images, train_masks_encoded_original_shape)

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(train_images, train_masks, test_size=0.10, random_state=0)

# Print unique class values in the training dataset
print("Class values in the dataset are ... ", np.unique(y_train))

# Function to convert masks to categorical format
def convert_to_categorical(masks, num_classes):
    masks_cat = to_categorical(masks, num_classes=num_classes)
    return masks_cat.reshape((*masks.shape, num_classes)) , masks_cat

# Convert training masks to categorical format
y_train_cat , train_masks_cat = convert_to_categorical(y_train, num_classes=n_classes)

# Convert testing masks to categorical format
y_test_cat , test_masks_cat = convert_to_categorical(y_test, num_classes=n_classes)

classes = np.unique(train_masks_reshaped_encoded)
class_weights = compute_class_weight( class_weight= "balanced" , classes =  np.unique(train_masks_reshaped_encoded), y=train_masks_reshaped_encoded)
class_weights_dict = dict(zip(classes, class_weights))
print('class weights dict')
print(class_weights_dict)
print("Class weights are...:", class_weights)

IMG_HEIGHT = X_train.shape[1]
IMG_WIDTH  = X_train.shape[2]
IMG_CHANNELS = X_train.shape[3]

def get_model():
    return multi_unet_model(n_classes=n_classes, IMG_HEIGHT=IMG_HEIGHT, IMG_WIDTH=IMG_WIDTH, IMG_CHANNELS=IMG_CHANNELS)

model = get_model()
model_checkpoint = ModelCheckpoint(filepath=r'C:\UNET_project\weights_2\model_epoch_{epoch:02d}.h5',  
                                   monitor='val_loss',  
                                   verbose=1,
                                   save_best_only=False,  
                                   save_weights_only=False,  
                                   mode='auto',  
                                   period=1)

# # Define early stopping criteria
early_stopping = EarlyStopping(monitor='val_accuracy',  # Monitor validation accuracy
                               min_delta=0.001,  # Minimum change to qualify as an improvement
                               patience=10,  # Number of epochs with no improvement after which training will be stopped
                               verbose=1,  
                               mode='auto')  

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# # Print model summary
model.summary()

# # If starting with pre-trained weights
# # model.load_weights('???.hdf5')

# # Train the model with callbacks
# print(X_train.shape)
# print(y_train_cat.shape)
history = model.fit(X_train, y_train_cat, 
                    batch_size=16, 
                    verbose=1, 
                    epochs=50, 
                    validation_data=(X_test, y_test_cat), 
                    class_weight=class_weights_dict,  # Uncomment if you want to use class weights
                    shuffle=False,
                    callbacks=[model_checkpoint , early_stopping])
                    

model.save(r'C:\UNET_project\weights_2\test.hdf5')



  y = column_or_1d(y, warn=True)


Unique values: [ 0  1  2  3  4  5  6  7  8  9 10 11]
Class values in the dataset are ...  [ 0  1  2  3  4  5  6  7  8  9 10 11]
class weights dict
{0: 0.49207878217925755, 1: 0.3577460691552256, 2: 8.480282631528693, 3: 0.2643396243613218, 4: 1.853555299679677, 5: 0.8547461368653422, 6: 7.099427146856614, 7: 7.388813494106208, 8: 1.422141625549979, 9: 13.126954489451155, 10: 28.599441002070456, 11: 2.1202343557957253}
Class weights are...: [ 0.49207878  0.35774607  8.48028263  0.26433962  1.8535553   0.85474614
  7.09942715  7.38881349  1.42214163 13.12695449 28.599441    2.12023436]
Model: "model_4"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_5 (InputLayer)        [(None, 352, 352, 1)]        0         []                            
                                                                                               

KeyboardInterrupt: 

In [20]:
# _, acc = model.evaluate(X_test, y_test_cat)
# print("Accuracy is = ", (acc * 100.0), "%")


# # ###
# # #plot the training and validation accuracy and loss at each epoch
# loss = history.history['loss']
# val_loss = history.history['val_loss']
# epochs = range(1, len(loss) + 1)
# plt.plot(epochs, loss, 'y', label='Training loss')
# plt.plot(epochs, val_loss, 'r', label='Validation loss')
# plt.title('Training and validation loss')
# plt.xlabel('Epochs')
# plt.ylabel('Loss')
# plt.legend()
# plt.show()

# acc = history.history['acc']
# val_acc = history.history['val_acc']

# plt.plot(epochs, acc, 'y', label='Training Accuracy')
# plt.plot(epochs, val_acc, 'r', label='Validation Accuracy')
# plt.title('Training and validation Accuracy')
# plt.xlabel('Epochs')
# plt.ylabel('Accuracy')
# plt.legend()
# plt.show()


# ##################################
model = get_model()
model.load_weights(r"C:\UNET_project\weights_camvid\test.hdf5")  
# #model.load_weights('sandstone_50_epochs_catXentropy_acc_with_weights.hdf5')  

#IOU
y_pred=model.predict(X_test)
print(y_pred.shape)
print(type(y_pred))
y_pred_argmax=np.argmax(y_pred, axis=3)
# print(y_pred_argmax)

# # ##################################################

# # #Using built in keras function
from keras.metrics import MeanIoU
n_classes = 12
IOU_keras = MeanIoU(num_classes=n_classes)  
IOU_keras.update_state(y_test[:,:,:,0], y_pred_argmax)
print("Mean IoU =", IOU_keras.result().numpy())


# #To calculate I0U for each class...
# values = np.array(IOU_keras.get_weights()).reshape(n_classes, n_classes)
# print(values)
# class1_IoU = values[0,0]/(values[0,0] + values[0,1] + values[0,2] + values[0,3] + values[1,0]+ values[2,0]+ values[3,0])
# class2_IoU = values[1,1]/(values[1,1] + values[1,0] + values[1,2] + values[1,3] + values[0,1]+ values[2,1]+ values[3,1])
# class3_IoU = values[2,2]/(values[2,2] + values[2,0] + values[2,1] + values[2,3] + values[0,2]+ values[1,2]+ values[3,2])
# class4_IoU = values[3,3]/(values[3,3] + values[3,0] + values[3,1] + values[3,2] + values[0,3]+ values[1,3]+ values[2,3])
# values = np.array(IOU_keras.get_weights()).reshape(12, 12)

class_IoU = []

# for i in range(12):
#     # Calculate IoU for each class
#     numerator = values[i, i]
#     denominator = np.sum(values[i, :]) + np.sum(values[:, i]) - values[i, i]
#     class_i_IoU = numerator / denominator
#     class_IoU.append(class_i_IoU)

# print(class_IoU)    
# print("Class IoU:", class_IoU)
# print("IoU for class1 is: ", class1_IoU)
# print("IoU for class2 is: ", class2_IoU)
# print("IoU for class3 is: ", class3_IoU)
# print("IoU for class4 is: ", class4_IoU)

plt.imshow(train_images[0, :,:,0], cmap='gray')
plt.imshow(train_masks[0], cmap='gray')
#######################################################################
#Predict on a few images
#model = get_model()
#model.load_weights('???.hdf5')  
import random
test_img_number = random.randint(0, len(X_test))
test_img = X_test[test_img_number]
ground_truth=y_test[test_img_number]
test_img_norm=test_img[:,:,0][:,:,None]
test_img_input=np.expand_dims(test_img_norm, 0)
prediction = (model.predict(test_img_input))
predicted_img=np.argmax(prediction, axis=3)[0,:,:]


plt.figure(figsize=(12, 8))
plt.subplot(231)
plt.title('Testing Image')
plt.imshow(test_img[:,:,0], cmap='gray')
plt.subplot(232)
plt.title('Testing Label')
plt.imshow(ground_truth[:,:,0], cmap='jet')
plt.subplot(233)
plt.title('Prediction on test image')
plt.imshow(predicted_img, cmap='jet')
plt.show()



# #Predict on large image

# #Apply a trained model on large image

# from patchify import patchify, unpatchify

# large_image = cv2.imread('large_images/large_image.tif', 0)
# #This will split the image into small images of shape [3,3]
# patches = patchify(large_image, (128, 128), step=128)  #Step=256 for 256 patches means no overlap

# predicted_patches = []
# for i in range(patches.shape[0]):
#     for j in range(patches.shape[1]):
#         print(i,j)
        
#         single_patch = patches[i,j,:,:]       
#         single_patch_norm = np.expand_dims(normalize(np.array(single_patch), axis=1),2)
#         single_patch_input=np.expand_dims(single_patch_norm, 0)
#         single_patch_prediction = (model.predict(single_patch_input))
#         single_patch_predicted_img=np.argmax(single_patch_prediction, axis=3)[0,:,:]

#         predicted_patches.append(single_patch_predicted_img)

# predicted_patches = np.array(predicted_patches)

# predicted_patches_reshaped = np.reshape(predicted_patches, (patches.shape[0], patches.shape[1], 128,128) )

# reconstructed_image = unpatchify(predicted_patches_reshaped, large_image.shape)
# plt.imshow(reconstructed_image, cmap='gray')
# plt.imsave('data/results/segm.jpg', reconstructed_image, cmap='gray')

# plt.hist(reconstructed_image.flatten())  #Threshold everything above 0

# final_prediction = (reconstructed_image > 0.01).astype(np.uint8)
# plt.imshow(final_prediction)

# plt.figure(figsize=(8, 8))
# plt.subplot(221)
# plt.title('Large Image')
# plt.imshow(large_image, cmap='gray')
# plt.subplot(222)
# plt.title('Prediction of large Image')
# plt.imshow(reconstructed_image, cmap='jet')
# plt.show()

(37, 352, 352, 12)
<class 'numpy.ndarray'>
Mean IoU = 0.4041226
[0.85698843, 0.47007307, 0.0725492, 0.8856767, 0.43509936, 0.52067554, 0.10914341, 0.12037788, 0.67477536, 0.14755052, 0.34439555, 0.21216625]
