In [None]:
import os
from platform import python_version
print(python_version())

In [None]:
def rgb2bgr(input_img):
    bgr_image = np.zeros_like(input_img)
    bgr_image[:,:,0] = input_img[:,:,2] # R channel to B channel
    bgr_image[:,:,1] = input_img[:,:,1] # G channel remains the same
    bgr_image[:,:,2] = input_img[:,:,0] # B channel to R channel
    return bgr_image

In [None]:
import os
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
plt.style.use("ggplot")
%matplotlib inline
import gc

from tqdm.notebook import trange, tqdm

from itertools import chain
from skimage.io import imread, imshow, concatenate_images
from skimage.transform import resize
from skimage.morphology import label
from sklearn.model_selection import train_test_split

import tensorflow as tf

print(tf.__version__)
print(tf.config.list_physical_devices())

from tensorflow.keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img
from tensorflow.keras.layers import Conv2D, Input, MaxPooling2D, Dropout, concatenate, UpSampling2D
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, TensorBoard
from tensorflow.keras import backend as K

  
import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import *
from tensorflow.keras.models import *
from tensorflow.keras.preprocessing.image import *
from tensorflow.keras.callbacks import *

import random 

## Seeding 
seed = 2019
random.seed = seed
np.random.seed = seed
tf.seed = seed

# **Directory reference, data augmentor, data loading**

In [None]:
w, h = 320,240
base_data_dir = "A:/data/demo_video_data"
model_dir = "A:/models/tool_segmentation"
sample_test_dir = "A:/python_ai_projects/tool_segmentation/sample_data"
image_directory_name = "foreground"
mask_directory_name = "masks"
background_dir_name = "background"
model_name = "scoop_segment"

no_bg_change_list_file = os.path.join(base_data_dir, 'no_background_change.txt')
lines = []
if os.path.exists(no_bg_change_list_file):
    with open(no_bg_change_list_file, 'r') as f:
        # Read the contents of the file into a list of strings
        lines = f.readlines()

    # Strip the newline character from each line
    lines = [line.strip() for line in lines]


file_names_to_exclude_from_bg_change = lines
print(file_names_to_exclude_from_bg_change)

In [None]:
import imgaug as ia
from imgaug import augmenters as iaa
from imgaug.augmentables.segmaps import SegmentationMapsOnImage

def activator_labels(images, augmenter, parents, default):
    if augmenter.name in ["AddToHueAndSaturation", "GaussianBlur"]: # "noaugment"
        return False
    else:
        return default
sometimes = lambda aug: iaa.Sometimes(0.25, aug)

seq = iaa.Sequential(
            [
            # apply the following augmenters to most images
            iaa.GaussianBlur(sigma=(0,0.05), name= "GaussianBlur"),
            iaa.Fliplr(0.5), # horizontally flip 50% of all images
            iaa.Flipud(0.5), # vertically flip 20% of all images
            # crop images by -5% to 10% of their height/width
            sometimes(iaa.CropAndPad(
                percent=(-0.005, 0.001),
                pad_mode=ia.ALL,
                pad_cval=(0, 0)
            )),
            sometimes(iaa.Affine(
                scale=(0.9, 1.005), # scale images to 90-110% of their size, individually per axis
                translate_percent={"x": (-0.01, 0.01), "y": (-0.01, 0.01)}, # translate by -20 to +20 percent (per axis)

                rotate=(-2, 2), # rotate by -5 to +5 degrees
                #order=[0, 1], # use nearest neighbour or bilinear interpolation (fast)
                #cval=(0, 0), # if mode is constant, use a cval between 0 and 255
                #mode=ia.ALL, # use any of scikit-image's warping modes (see 2nd image from the top for examples)
                shear=(-0.5, 0.5) # shear by -16 to +16 degrees
            )),
            iaa.Scale(0.25),
            iaa.CropToFixedSize(w,h),
            iaa.AddToHueAndSaturation((-20, 20), name= "AddToHueAndSaturation"), # change the hue and saturation by up to 20
            ],
            random_order=False
        )
hooks_labels = ia.HooksImages(activator=activator_labels)

In [None]:
import glob
imag_paths = sorted(glob.glob(os.path.join(base_data_dir, image_directory_name + "/*.jpg"), recursive=True))
mask_paths = sorted(glob.glob(os.path.join(base_data_dir, mask_directory_name + "/*.png"), recursive=True))
bkgd_paths = sorted(glob.glob(os.path.join(base_data_dir, background_dir_name + "/*.jpg"), recursive=True))

print(f'Total Train Images : {len(imag_paths)}')
print(f'Total Mask Image : {len(mask_paths)}')
print(f'Total background images : {len(bkgd_paths)}')

In [None]:
# Split train and valid
train_imag_paths, test_imag_paths, train_mask_paths, test_mask_paths = train_test_split(imag_paths, mask_paths, test_size=0.1, random_state=42)

In [None]:
from keras.applications.vgg16 import preprocess_input
import cv2
from keras.utils import to_categorical

num_classes = 2
class ImageSource:
    def __init__(self, imag_paths, mask_paths, bkgd_paths, background_aug = False):
        self.imges = np.zeros((len(imag_paths), h, w, 3), dtype=np.float32)
        self.masks = np.zeros((len(mask_paths), h, w, num_classes), dtype=np.float32)
        
        self.imag_paths = imag_paths
        self.mask_paths = mask_paths
        self.bkgd_paths = bkgd_paths
        
        self.total_images = len(imag_paths)

        self.max_blur_size = 10
        self.min_blur_size = 1
        self.max_circle_size = 0.11
        
        self.rgb_value = [0.485, 0.456, 0.406]
        self.background_aug = background_aug
        self.find_class_weights()
        self.generate_augmented_images()

    def generate_augmented_images(self):
        class_weights = [0 for i in range(num_classes)]
        for n, (image_path, mask_path) in tqdm(enumerate(zip(self.imag_paths, self.mask_paths))):
            #print(os.path.split(image_path)[1], os.path.split(mask_path)[1])
            random_background_image_index = random.randint(0, len(bkgd_paths) - 1)

            imag_data = img_to_array(load_img(image_path)).astype(np.uint8)
            mask_data = img_to_array(load_img(mask_path, color_mode = "grayscale")).astype(np.uint8).squeeze()

            seq_deterministic = seq.to_deterministic() 
            imag_data_augmented = seq_deterministic.augment_image(imag_data)
            mask_data_augmented_binary = seq_deterministic.augment_image(mask_data, hooks=hooks_labels) > 0

            image_filename = os.path.split(image_path)[1]
            exclude_file = image_filename in file_names_to_exclude_from_bg_change
            #print(image_filename, exclude_file)
            has_mask = mask_data.max() > 0
            if self.background_aug and has_mask and not exclude_file:
                bkgd_path = self.bkgd_paths[random_background_image_index]
                bkgd_data = img_to_array(load_img(bkgd_path)).astype(np.uint8)
                bkgd_data_augmented = seq_deterministic.augment_image(bkgd_data)
                expanded_mask = np.expand_dims(mask_data_augmented_binary, -1)
                mask = np.repeat(expanded_mask, 3, axis=-1)
                imag_data_augmented[~mask] = bkgd_data_augmented[~mask]
            image_blended_float = imag_data_augmented.astype(np.float32)

            self.imges[n] = preprocess_input(image_blended_float)/255.0
            mask_3d = to_categorical(mask_data_augmented_binary, num_classes)
            self.masks[n] = mask_3d.astype(np.float32)

    def find_class_weights(self):
        class_weights = [0 for i in range(num_classes)]
        for n, mask_path in tqdm(enumerate(self.mask_paths)):
            mask_data = img_to_array(load_img(mask_path, color_mode = "grayscale")).astype(np.uint8).squeeze() > 0
            total_pixels = mask_data.shape[0] * mask_data.shape[1]
            for i in range(num_classes):
                class_weight = np.count_nonzero(mask_data == i) / total_pixels
                class_weights[i] += class_weight
                
        self.class_weights = [1 - (class_weights[i] / self.total_images) for i in range(num_classes)]
        print(self.class_weights)

    def display_images(self):
        # Visualize any randome image along with the mask
        random_index = random.randint(0, len(self.imges)-1)
        selected_image = self.imges[random_index]
        selected_mask = np.argmax(self.masks[random_index], axis=2)
        has_mask = selected_image.max() > 0

        fig, (input_figure, output_figure) = plt.subplots(1, 2, figsize = (20, 15))
        reverted_preprocessed_image = selected_image + self.rgb_value

        input_figure.imshow(rgb2bgr(reverted_preprocessed_image))
        if has_mask: # if salt
            # draw a boundary(contour) in the original image separating salt and non-salt areas
            input_figure.contour(selected_mask.squeeze(), colors = 'k', linewidths = 5, levels = [0.25])

        input_figure.set_title('Image')
        input_figure.set_axis_off()

        output_figure.imshow(selected_mask.squeeze(), cmap = 'gray')
        output_figure.set_title('Mask Image')
        output_figure.set_axis_off()       
        

    def display_images_pred(self, predict_func):
        # Visualize any randome image along with the mask
        random_index = random.randint(0, len(self.imges)-1)
        selected_image = self.imges[random_index]
        selected_mask = np.argmax(self.masks[random_index], axis=-1)
        has_mask = selected_image.max() > 0

        fig, (input_figure, output_figure, pred_figure) = plt.subplots(1, 3, figsize = (20, 15))
        reverted_preprocessed_image = selected_image + self.rgb_value

        input_figure.imshow(rgb2bgr(reverted_preprocessed_image))
        if has_mask:
            input_figure.contour(selected_mask.squeeze(), colors = 'k', linewidths = 5, levels = [0.25])

        input_figure.set_title('Image')
        input_figure.set_axis_off()

        output_figure.imshow(selected_mask.squeeze(), cmap = 'gray')
        if has_mask:
            output_figure.contour(selected_mask.squeeze(), colors = 'k', linewidths = 5, levels = [0.25])
        output_figure.set_title('Mask Image')
        output_figure.set_axis_off()   

        pred = np.argmax(predict_func(selected_image), axis=-1)
        pred_figure.imshow(pred.squeeze(), cmap = 'gray')
        pred_figure.set_title('Predicted Image')
        pred_figure.set_axis_off()          

# **Instantiate validation and training sources; visualize the data**

In [None]:
train_source = ImageSource(train_imag_paths, train_mask_paths, bkgd_paths, background_aug = True)
test_source = ImageSource(test_imag_paths, test_mask_paths, bkgd_paths, background_aug = True)

In [None]:
print(train_source.imges.shape)
print(train_source.masks.shape)
train_source.display_images()
test_source.display_images()

# **UNET model function**

In [None]:
from tensorflow.keras.layers import Lambda

def UnetModel(image_width, image_height, vgg_weight_path=None, filter_size = 64, num_classes = 2):
    inputs = Input((image_height, image_width, 3), name="vgg")
    # Block 1
    x = Conv2D(filter_size, (3, 3), padding='same', name='block1_conv1')(inputs)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(filter_size, (3, 3), padding='same', name='block1_conv2')(x)
    x = BatchNormalization()(x)
    block_1_out = Activation('relu')(x)

    x = MaxPooling2D()(block_1_out)

    # Block 2
    x = Conv2D(2 * filter_size, (3, 3), padding='same', name='block2_conv1')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(2 * filter_size, (3, 3), padding='same', name='block2_conv2')(x)
    x = BatchNormalization()(x)
    block_2_out = Activation('relu')(x)

    x = MaxPooling2D()(block_2_out)

    # Block 3
    x = Conv2D(4 * filter_size, (3, 3), padding='same', name='block3_conv1')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(4 * filter_size, (3, 3), padding='same', name='block3_conv2')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(4 * filter_size, (3, 3), padding='same', name='block3_conv3')(x)
    x = BatchNormalization()(x)
    block_3_out = Activation('relu')(x)

    x = MaxPooling2D()(block_3_out)

    # Block 4
    x = Conv2D(8 * filter_size, (3, 3), padding='same', name='block4_conv1')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(8 * filter_size, (3, 3), padding='same', name='block4_conv2')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(8 * filter_size, (3, 3), padding='same', name='block4_conv3')(x)
    x = BatchNormalization()(x)
    block_4_out = Activation('relu')(x)

    x = MaxPooling2D()(block_4_out)

    # Block 5
    x = Conv2D(8 * filter_size, (3, 3), padding='same', name='block5_conv1')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(8 * filter_size, (3, 3), padding='same', name='block5_conv2')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(8 * filter_size, (3, 3), padding='same', name='block5_conv3')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    for_pretrained_weight = MaxPooling2D()(x)

    # Load pretrained weights.
    if vgg_weight_path is not None:
        if filter_size == 64:
            vgg16 = Model(inputs, for_pretrained_weight)
            vgg16.load_weights(vgg_weight_path, by_name=True)
            for layer in vgg16.layers:
                layer.trainable = False
        else:
            print("Cannot load vgg model because filter size is not 64")
    # UP 1
    x = Conv2DTranspose(8 * filter_size, kernel_size=(2,2), strides=(2,2), padding='same', use_bias = False)(x)
    x = BatchNormalization()(x)
    x = concatenate([x, block_4_out])
    x = Conv2D(4 * filter_size, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    # UP 2
    x = Conv2DTranspose(4 * filter_size, kernel_size=(2,2), strides=(2,2), padding='same', use_bias = False)(x)
    x = BatchNormalization()(x)
    x = concatenate([x, block_3_out])
    x = Conv2D(2 * filter_size, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    # UP 3
    x = Conv2DTranspose(2 * filter_size, kernel_size=(2,2), strides=(2,2), padding='same', use_bias = False)(x)
    x = BatchNormalization()(x)
    x = concatenate([x, block_2_out])
    x = Conv2D(filter_size, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    # UP 4
    x = Conv2DTranspose(filter_size, kernel_size=(2,2), strides=(2,2), padding='same', use_bias = False)(x)
    x = BatchNormalization()(x)
    x = concatenate([x, block_1_out])

    x = Conv2D(num_classes, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    output_layer = Activation('sigmoid')(x)
    
    print(f'Output of mode has {output_layer.shape} shape')
    model = Model(inputs=inputs, outputs=output_layer)
    return model

In [None]:
model = UnetModel(w, h, "../../models/vgg16_weights_tf_dim_ordering_tf_kernels.h5", 16)
#model = UnetModelSmall(w,h)

In [None]:
model.summary()

# **Loss function and training parameters; Train the model**

In [None]:
from keras import backend as K
from tensorflow.keras.losses import categorical_crossentropy

BATCH_SIZE = 3
EPOCHES = 500
RELOAD_ON_EVERY = 20
EPOCH_PATIENCE = 25
LEARNING_RATE_PATIENCE = 5
LERNING_RATE = 0.01

K.clear_session()
class_weights = train_source.class_weights
def weighted_categorical_crossentropy(y_true, y_pred):
    # convert class weights dictionary to tensor
    class_weights_tensor = K.constant(class_weights)
    
    # calculate the cross-entropy loss for each sample
    per_sample_loss = categorical_crossentropy(y_true * class_weights_tensor, y_pred * class_weights_tensor)
    # calculate the weighted loss for each sample
    weighted_loss = K.sum(per_sample_loss)
    
    return weighted_loss

class_weights = train_source.class_weights

metrics = ["accuracy", tf.keras.metrics.AUC()]
model.compile(optimizer=Adam(learning_rate=LERNING_RATE), loss=weighted_categorical_crossentropy, metrics=metrics)
gc.collect()

In [None]:
from keras.callbacks import Callback
               
class UpdateDataSample(Callback):
    def __init__(self, train_data_source, batch_size = 4, update_frequency=10):
        super(UpdateDataSample, self).__init__()
        self.batch_size = batch_size
        self.train_data_source = train_data_source
        self.update_frequency = update_frequency

    def on_epoch_end(self, epoch, logs={}):
        if (epoch + 1) % self.update_frequency == 0:
            self.train_data_source.generate_augmented_images()
            num_batches = self.train_data_source.total_images // self.batch_size
            for i in range(num_batches):
                batch_x = self.train_data_source.imges[i*self.batch_size:(i+1)*self.batch_size]
                batch_y = self.train_data_source.masks[i*self.batch_size:(i+1)*self.batch_size]
                self.model.train_on_batch(batch_x, batch_y)

In [None]:
from keras.utils import Sequence

callbacks = [
    EarlyStopping(patience=EPOCH_PATIENCE, verbose=1),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=LEARNING_RATE_PATIENCE, min_lr=0.0001, verbose=1),
    ModelCheckpoint(f'{model_name}.h5', 
                    verbose=1, 
                    save_best_only=True,
                    save_weights_only=True),
    CSVLogger(f"{model_name}.csv"),
    TensorBoard(log_dir=f'./{model_name}_logs'),
    UpdateDataSample(train_source, BATCH_SIZE, RELOAD_ON_EVERY)
]

#if os.path.exists(f'{model_name}.h5'):
#    model.load_weights(f'{model_name}.h5')

results = model.fit(train_source.imges, train_source.masks, batch_size = BATCH_SIZE, epochs=EPOCHES, callbacks=callbacks, validation_data=(test_source.imges, test_source.masks), use_multiprocessing=True)

df_result = pd.DataFrame(results.history)
df_result.sort_values('val_loss', ascending=True, inplace = True)
df_result

plt.figure(figsize = (15,6))
plt.title("Learning curve")
plt.plot(results.history["loss"], label="loss")
plt.plot(results.history["val_loss"], label="val_loss")
plt.plot(np.argmin(results.history["val_loss"]), np.min(results.history["val_loss"]), marker="x", color="r", label="best model")
plt.xlabel("Epochs")
plt.ylabel("log_loss")
plt.legend();

plt.figure(figsize = (15,6))
plt.title("Learning curve")
plt.plot(results.history["accuracy"], label="Accuracy")
plt.plot(results.history["val_accuracy"], label="val_Accuracy")
plt.plot(np.argmax(results.history["val_accuracy"]), np.max(results.history["val_accuracy"]), marker="x", color="r", label="best model")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend();

# **Inference; Dump model to use in c++**

In [None]:
model.load_weights(f'{model_name}.h5')

In [None]:
model.evaluate(test_source.imges, test_source.masks, verbose=1)

In [None]:
init_w = w
init_h = h
init_c = 3
tap_count = 0
epsilon = np.finfo(float).eps

model_data_dir = os.path.join(model_dir, model_name)
if not os.path.exists(model_data_dir):
    # Create the directory if it doesn't exist
    os.makedirs(model_data_dir)

maxpool_count = 0

layers_map = []
tap_list = []

for i, layer in enumerate(model.layers): 
    #print(i, layer.name)
    #print("----", layer)
    if isinstance(layer, ReLU):
        maxValue = layer.max_value
        layers_map.append((f'relu,{init_w},{init_h},{init_c},{maxValue},{layer.name}', layer.name))  
    elif isinstance(layer, Activation):
        activation_type = layer.get_config()['activation']
        if activation_type == 'relu':
            maxValue = 0
            if hasattr(layer,'max_value'):
                maxValue = layer.max_value
            layers_map.append((f'{activation_type},{init_w},{init_h},{init_c},{maxValue},{layer.name}', layer.name))  
        else:
            layers_map.append((f'{activation_type},{init_w},{init_h},{init_c},{layer.name}', layer.name)) 
    elif isinstance(layer, BatchNormalization):
        print(i, layer.name)
        if i == 0:
            print("batchnormalization, 0th layer")
        elif isinstance(model.layers[i - 1], Conv2DTranspose):
            layers_map.append((f'batchnormalization,{init_w},{init_h},{init_c},{layer.name}', layer.name))  
            weights = layer.get_weights()
            gamma = weights[0]
            beta = weights[1]
            moving_mean = weights[2]
            moving_variance = weights[3]
            if model.layers[i - 1].use_bias:
                print("batchnormalization, i-1th layer transposed conv2d with bias")
                bias = model.layers[i - 1].weights[1]
                #print(bias.shape, moving_mean.shape, moving_variance.shape, beta.shape, gamma.shape)
                a = gamma / np.sqrt(moving_variance + epsilon)
                b = (bias - moving_mean) * a + beta
            else:
                print("batchnormalization, i-1th layer transposed conv2d")
                a = gamma / np.sqrt(moving_variance + epsilon)
                b = - moving_mean * a + beta
            path_to_save = os.path.join(model_data_dir, layer.name + '_mean.npy')
            np.save(path_to_save, b)
            path_to_save = os.path.join(model_data_dir, layer.name + '_variance.npy')
            np.save(path_to_save, a)    
        elif isinstance(model.layers[i - 1], Conv2D) or isinstance(model.layers[i - 1], DepthwiseConv2D):
            print("batchnormalization, i-1th layer conv2d or depthwiseconv2d")
            continue  
        else:
            print("batchnormalization layer is independent")
            layers_map.append((f'batchnormalization,{init_w},{init_h},{init_c},{layer.name}', layer.name))  
            weights = layer.get_weights()
            gamma = weights[0]
            beta = weights[1]
            moving_mean = weights[2]
            moving_variance = weights[3]
            a = gamma / np.sqrt(moving_variance + epsilon)
            b = - moving_mean * a + beta
            path_to_save = os.path.join(model_data_dir, layer.name + '_mean.npy')
            np.save(path_to_save, b)
            path_to_save = os.path.join(model_data_dir, layer.name + '_variance.npy')
            np.save(path_to_save, a)

    elif isinstance(layer, MaxPooling2D):  
        layers_map.append((f'maxpool,{init_w},{init_h},{init_c},maxpool_{maxpool_count}', layer.name))
        init_w = int(init_w /2)
        init_h = int(init_h /2)
        maxpool_count += 1
    elif isinstance(layer, Conv2DTranspose):  
        if layer.use_bias and i + 1 < len(model.layers) and not isinstance(model.layers[i + 1], BatchNormalization):
            print("Not implemeneted transposed convolution with bias with next layer not BatchNormalization")
            continue
        filter_weights = layer.weights[0]      
        conv_weight_shape = filter_weights.shape
        layers_map.append((f'deconv,{conv_weight_shape[0]},{conv_weight_shape[1]},{conv_weight_shape[2]},{conv_weight_shape[3]},{init_w},{init_h},{init_c},{layer.name}', layer.name))
        init_c = conv_weight_shape[3]
        init_w = init_w *2
        init_h = init_h *2
        path_to_save = os.path.join(model_data_dir, layer.name + '.npy')
        np.save(path_to_save, filter_weights)
        if layer.use_bias:
            bias = layer.weights[1]
            path_to_save = os.path.join(model_data_dir, layer.name + '_bias.npy')
            np.save(path_to_save, bias)
    elif isinstance(layer, Conv2D):
        if i + 1 < len(model.layers) and isinstance(model.layers[i + 1], BatchNormalization):
            nextLayer = model.layers[i + 1]
            conv_weight_shape = layer.weights[0].shape
            strides = layer.strides
            if strides[0] != strides[1]:
                print("Strides must be equal")
                continue
            if conv_weight_shape[0] == 1 and conv_weight_shape[1] == 1:
                if strides[0] != 1:
                    print("Only stride of 1 is supported in pointwise")
                    continue
                else:
                    layers_map.append((f'pointwise,{conv_weight_shape[2]},{conv_weight_shape[3]},{init_w},{init_h},{init_c},{layer.name}', nextLayer.name))
            else:
                if strides[0] != 1 and strides[0] != 2:
                    print(f"Only stride of 1 or 2 is supported in convolution {strides[0]}")
                    continue
                else:
                    layers_map.append((f'conv,{conv_weight_shape[0]},{conv_weight_shape[1]},{conv_weight_shape[2]},{conv_weight_shape[3]},{init_w},{init_h},{init_c},{strides[0]},{layer.name}', nextLayer.name))    
            init_c = conv_weight_shape[3] 
            init_w = int(init_w / strides[0])
            init_h = int(init_h / strides[0])
        
            weights = nextLayer.get_weights()
            gamma = weights[0]
            beta = weights[1]
            moving_mean = weights[2]
            moving_variance = weights[3]
            if layer.use_bias:
                bias = layer.weights[1]
                #print(bias.shape, moving_mean.shape, moving_variance.shape, beta.shape, gamma.shape)
                a = gamma / np.sqrt(moving_variance + epsilon)
                b = (bias - moving_mean) * a + beta
            else:
                a = gamma / np.sqrt(moving_variance + epsilon)
                b = - moving_mean * a + beta
            path_to_save = os.path.join(model_data_dir, layer.name + '_weights.npy')
            np.save(path_to_save, layer.weights[0])
            path_to_save = os.path.join(model_data_dir, layer.name + '_mean.npy')
            np.save(path_to_save, b)
            path_to_save = os.path.join(model_data_dir, layer.name + '_variance.npy')
            np.save(path_to_save, a)
        else:
            print(f"Conv2D without batch normalization not implemented {layer.name}")
    elif isinstance(layer, DepthwiseConv2D):
        strides = layer.strides
        if strides[0] != strides[1]:
            print("Strides must be equal")
            continue
        if strides[0] != 1 and strides[0] != 2:
            print(f"Only stride of 1 or 2 and  equal size is supported in depthwise convolution {strides[0]}")
            continue
        else:        
            if i + 1 < len(model.layers) and isinstance(model.layers[i + 1], BatchNormalization):
                conv_weight_shape = layer.weights[0].shape
                if conv_weight_shape[3] != 1:
                    print("Only 3D shape supported for weights in depthwise")
                nextLayer = model.layers[i + 1]
                layers_map.append((f'depthwise,{conv_weight_shape[0]},{conv_weight_shape[1]},{conv_weight_shape[2]},{init_w},{init_h},{init_c},{strides[0]},{layer.name}', nextLayer.name))

                weights = nextLayer.get_weights()
                gamma = weights[0]
                beta = weights[1]
                moving_mean = weights[2]
                moving_variance = weights[3]
                if layer.use_bias:
                    bias = layer.weights[1]
                    #print(bias.shape, moving_mean.shape, moving_variance.shape, beta.shape, gamma.shape)
                    a = gamma / np.sqrt(moving_variance + epsilon)
                    b = (bias - moving_mean) * a + beta
                else:
                    a = gamma / np.sqrt(moving_variance + epsilon)
                    b = - moving_mean * a + beta
                path_to_save = os.path.join(model_data_dir, layer.name + '_weights.npy')
                np.save(path_to_save, layer.weights[0])
                path_to_save = os.path.join(model_data_dir, layer.name + '_mean.npy')
                np.save(path_to_save, b)
                path_to_save = os.path.join(model_data_dir, layer.name + '_variance.npy')
                np.save(path_to_save, a)
            else:
                conv_weight_shape = layer.weights[0].shape
                if conv_weight_shape[3] != 1:
                    print("Only 3D shape supported for weights in depthwise")
                nextLayer = model.layers[i + 1]                
                if strides[0] != 1:
                    print("Only stride of 1 is supported in depthwisebias")
                    continue
                if layer.use_bias:
                    layers_map.append((f'depthwisebias,{conv_weight_shape[0]},{conv_weight_shape[1]},{conv_weight_shape[2]},{init_w},{init_h},{init_c},{layer.name}', layer.name))
                    bias = layer.weights[1]
                    path_to_save = os.path.join(model_data_dir, layer.name + '_bias.npy')
                    np.save(path_to_save, bias)
                else:
                    print("Make sure that depthwisenobias i implemented")
                    layers_map.append((f'depthwisenobias,{conv_weight_shape[0]},{conv_weight_shape[1]},{conv_weight_shape[2]},{init_w},{init_h},{init_c},{layer.name}', layer.name))
                path_to_save = os.path.join(model_data_dir, layer.name + '_weights.npy')
                np.save(path_to_save, layer.weights[0])
                
        init_w = int(init_w / strides[0])
        init_h = int(init_h / strides[0])

    elif isinstance(layer, InputLayer):
        print("Input layer is", layer.name)
        layers_map.append((f'input,{init_w},{init_h},{init_c},{layer.name}',layer.name))
        continue
    elif isinstance(layer, Concatenate):
        in1 = layer._inbound_nodes[0].input_tensors[0].shape
        in2 = layer._inbound_nodes[0].input_tensors[1].shape
        in2_name = layer._inbound_nodes[0].input_tensors[1].name.split('/')[0]
        layers_map.append((f'concatenate,{tap_count},{in1[2]},{in1[1]},{in1[3]},{in2[2]},{in2[1]},{in2[3]},{layer.name}',layer.name))
        tap_list.append((tap_count, in2_name))
        init_c = in1[3] + in2[3]
        tap_count += 1
    elif isinstance(layer, ZeroPadding2D):
        if(layer.padding[0][1] != layer.padding[1][1] and layer.padding[0][0] == 0 and layer.padding[1][0] != 0):
            print(layer.padding)
            print("This padding not supported")
            continue
        layers_map.append((f'padding,{init_w},{init_h},{init_c},{layer.padding[0][1]},{layer.name}',layer.name))
        init_w = init_w + layer.padding[0][1]
        init_h = init_h + layer.padding[0][1]
    elif isinstance(layer, Add):
        in1 = layer._inbound_nodes[0].input_tensors[0].shape
        in2 = layer._inbound_nodes[0].input_tensors[1].shape
        in1_name = layer._inbound_nodes[0].input_tensors[0].name.split('/')[0]
        layers_map.append((f'add,{tap_count},{in1[2]},{in1[1]},{in1[3]},{in2[2]},{in2[1]},{in2[3]},{layer.name}',layer.name))
        tap_list.append((tap_count, in1_name))
        tap_count += 1
    else:
        print(f'Unhandled layer {layer}')

for tap in tap_list:
    temp_layer_map = list(layers_map)
    temp_layer_map_len = len(temp_layer_map)
    for index, layer in enumerate(temp_layer_map):
        if tap[1] == layer[-1]:
            layers_map.insert(index+1, (f'tap,{tap[0]}', tap[1]))
            break
        if temp_layer_map_len-1 == index:
            print(f'Not found {tap[1]}')

with open(f'{model_name}.txt', 'w') as f:
    # Write each line of the list to the file
    for layer in layers_map:            
        f.write(layer[0] + '\n')
    print("File is saved to " + f'{model_name}.txt')

# **Predictions on training set**

In [None]:
def pred_func(image_in):
    image_expanded = np.expand_dims(image_in, axis = 0)
    return model.predict(image_expanded)
test_source.display_images_pred(pred_func)
test_source.display_images_pred(pred_func)
test_source.display_images_pred(pred_func)

train_source.display_images_pred(pred_func)
train_source.display_images_pred(pred_func)
train_source.display_images_pred(pred_func)

# **Save the image in numpy format for testing in c++**

In [None]:
image_file_path = os.path.join(sample_test_dir, "09-45-20_WHITELIGHT-Left_02-Mar-2023_frame_300.jpg")
selected_image = img_to_array(load_img(image_file_path).resize((w,h))).astype(np.uint8)

selected_image_f = selected_image.astype(np.float32) / 255.0
selected_image_f -= [0.485, 0.456, 0.406]
img_data = rgb2bgr(selected_image_f)


def pred_funct(image_in, model_p):
    image_expanded = np.expand_dims(image_in, axis = 0)
    return model_p.predict(image_expanded)

out_img = pred_funct(img_data, model).squeeze()
fig, (input_figure, output_figure) = plt.subplots(1, 2, figsize = (20, 15))
input_figure.imshow(selected_image)
out_mask = 128.0 * (1.0 - out_img[:,:,0] + out_img[:,:,1])
output_figure.imshow(out_mask.astype(np.uint8), cmap = 'gray')

numpy_in = os.path.join(sample_test_dir, "in_vgg.npy")
np.save(numpy_in, np.transpose(img_data,[1,0,2]))

input_figure.set_title('Image')
input_figure.set_axis_off()

output_figure.set_title('Mask Image')
output_figure.set_axis_off()  

In [None]:
def generate_model(model, check_index):
    input_layer_data = model.layers[0]
    intermediate_layer_data = model.layers[check_index-1]
    output_layer_data = model.layers[check_index+1]

    model1 = Model(input_layer_data.input, intermediate_layer_data.output)
    model2 = Model(intermediate_layer_data.output, output_layer_data.output)

    print(input_layer_data.name)
    print(intermediate_layer_data.name)
    print(output_layer_data.name)

    print(input_layer_data.input.shape[1:])
    print(intermediate_layer_data.output.shape[1:])
    print(output_layer_data.output.shape[1:])
    return model1, model2
def write_csv3d(output_data, file_name_sufix):  
    path_to_saved = os.path.join(sample_test_dir, file_name_sufix +"_python.csv")
    print(output_data.shape)
    # Loop over the rows and columns of the 3D array
    with open(path_to_saved, 'w') as file:
        for l in range(output_data.shape[3]):
            for k in range(output_data.shape[2]):
                for j in range(output_data.shape[1]):
                    for i in range(output_data.shape[0]):
                        # Join the text data in each row with commas
                        value = output_data[i, j, k, l]
                        # Write the row data to the output file
                        if i == output_data.shape[1] - 1:
                            file.write("{:.4f}".format(value))
                        else:
                            file.write("{:.4f},".format(value))
                    file.write("\n")
            
def write_csv(output_data, file_name_sufix):  
    path_to_saved = os.path.join(sample_test_dir, file_name_sufix +"_python.csv")
    print(output_data.shape)
    # Loop over the rows and columns of the 3D array
    with open(path_to_saved, 'w') as file:
        for k in range(output_data.shape[2]):
            for j in range(output_data.shape[0]):
                for i in range(output_data.shape[1]):
                    # Join the text data in each row with commas
                    value = output_data[j, i, k]
                    # Write the row data to the output file
                    if i == output_data.shape[1] - 1:
                        file.write("{:.4f}".format(value))
                    else:
                        file.write("{:.4f},".format(value))
                file.write("\n")
def runthis(mode1, model2, img_data):
    intermediate_output = pred_funct(img_data, model1).squeeze()
    output = pred_funct(intermediate_output, model2).squeeze()
    print("-----------------")
    for k in range(3):
        for i in range(6):
            for j in range(6):
                print(img_data[i, j, k], end=", ")
            print()
        print("\n")
    print(img_data.shape)
    print("-----------------")
    for k in range(min(3, intermediate_output.shape[2])): 
        for i in range(6):
            for j in range(6):
                print(intermediate_output[i, j, k], end=", ")
            print()
        print("\n")
    print(intermediate_output.shape)
    write_csv(intermediate_output, "in")     
    print("-----------------")
    for k in range(min(3, output.shape[2])): 
        for i in range(6):
            for j in range(6):
                print(output[i, j, k], end=", ")
            print()
        print("\n")
    print(output.shape)  
    write_csv(output, "out")                    

In [None]:
print(len(model.layers))
check_index = 1
for index, layer in enumerate(model.layers):
    if layer.name in ["block1_conv1"]:
        print(index, layer.name)
        model1, model2 = generate_model(model, index)
        runthis(model1, model2, img_data)         

In [None]:
img_data = np.random.uniform(low=0.0, high=10.0, size = input_layer_data.input.shape[1:]).astype(np.int32).astype(np.float32)
#img_data = np.ones(input_layer_data.input.shape[1:], dtype=np.float32)
#img_data = np.zeros(input_layer_data.input.shape[1:], dtype=np.float32)

In [None]:
np.set_printoptions(precision=5, suppress=True)
filer_name = "batch_normalization"
path_to_saved = os.path.join(model_data_dir, f'{filer_name}_mean.npy')
if os.path.exists(path_to_saved):
    data = np.load(path_to_saved)
    for i in range(min(5, data.size)):
        print(data[i], end=", ")
    print()
    print(data.shape)
    print("\n--------------")
    path_to_saved = os.path.join(model_data_dir, f'{filer_name}_variance.npy')
    data = np.load(path_to_saved)
    for i in range(min(5, data.size)):
        print(data[i], end=", ")
    print()
    print(data.shape)
    print("\n--------------")
else:
    path_to_saved = os.path.join(model_data_dir, f'{filer_name}_bias.npy')
    data = np.load(path_to_saved)
    for i in range(min(5, data.size)):
        print(data[i], end=", ")
    print()
    print(data.shape)
    print("\n--------------")
    
path_to_saved = os.path.join(model_data_dir, f'{filer_name}_weights.npy')
data = np.load(path_to_saved)
if 'depth' in filer_name:
    for i in range(3):
        for j in range(3):
            print(data[i, j, 0, 0], end=", ")
        print()
    print()
    for i in range(3):
        for j in range(3):
            print(data[i, j, 1, 0], end=", ")
        print()
else:
    for l in range(2):
        for k in range(4):
            for i in range(3):
                for j in range(3):
                    print(data[j, i, k ,l], end=", ")
                print()
            print("")
        print("+++++++++")
    print("\n--------------")
print(data.shape)
print("\n--------------")

In [None]:
#filer_name = "expanded_conv_project"
filer_name = "Conv1"
numpy_in = os.path.join(sample_test_dir, filer_name + "_in.npy")
np.save(numpy_in, np.transpose(img_data,[1,0,2]))
print(numpy_in)

In [None]:
file_name = "conv2d"
path_to_saved = os.path.join(model_data_dir, f'{file_name}.npy')
data = np.load(path_to_saved)

for l in range(2):
    for k in range(2):
        for i in range(2):
            for j in range(2):
                print(data[j, i, k ,l], end=", ")
            print()
        print("")
    print("+++++++++")
print(data.shape)
print("\n--------------")

write_csv3d(data, "filter")