In [None]:
#Imports
import io

import tensorflow as tf
import pathlib
import matplotlib.pyplot as plt
import datetime
import time

In [None]:
datasetPath = '../Resources/KittiDataset/data_road/training/'
testingImagesPath = pathlib.Path(datasetPath + 'bev_image_2/')
testingLidarPath = pathlib.Path(datasetPath + 'veloImgV3/')
outputImagesPath = pathlib.Path(datasetPath+'/allModelsResults/')

modelName = "ModelB"
start_time = datetime.datetime.now().strftime("%d_%m-%H_%M")
modelName = start_time + "-" + modelName

#For tensorboard log
base_log_dir = "logs/" + modelName
file_writer = None
tensorboard_callback = None

BUFFER_SIZE = 1000
BATCH_SIZE = 1
LEARNING_RATE = 0.0001
DROPOUT_RATE = 0.1

img_height = 800
img_width = 400
channels = 3
fullInputChannels=6


numClasses = 1
seed=1234
AUTOTUNE = tf.data.AUTOTUNE

#If true, only 20/10 images will be used for training/validation
USE_SMALL_DATASET = True

In [None]:
#Print a plt plot for Input, Ground Truth, Predicted mask (Predicted mask may not be neccesary)
def display_sample(display_list, title=None):
    fig = plt.figure(figsize=(5, 5))
    if title is not None:
        fig.suptitle(title, fontsize=16)
    title = ['BEV Image', 'LiDAR Image', 'Predicted Mask']
    imagesList = [display_list[0], display_list[1], display_list[2]]
    for i in range(len(imagesList)):
        plt.subplot(1, len(imagesList), i + 1)
        plt.title(title[i])
        #plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.imshow(imagesList[i])
        plt.axis('off')
    plt.show()

#for sample_image,sample_mask in TrainingDataset.take(1):
#    display_sample([sample_image[0][0], sample_image[1][0], sample_mask[0]])

In [None]:
def parse_img(img_path: str) -> dict:
  image = tf.io.read_file(img_path)
  image = tf.io.decode_png(image, channels=channels)
  image = tf.image.convert_image_dtype(image, tf.uint8)

  lidarPath = tf.strings.regex_replace(img_path, "bev_image_2", "veloImgV3")
  lidar = tf.io.read_file(lidarPath)
  lidar = tf.io.decode_png(lidar, channels=channels)
  lidar = tf.image.convert_image_dtype(lidar, tf.uint8)

  imageFileName = tf.strings.split(img_path, "/")[-1]

  image = tf.image.resize(image, (img_height, img_width))
  image = tf.cast(image, tf.float32) / 255.0

  lidar = tf.image.resize(lidar, (img_height, img_width))
  lidar = tf.cast(lidar, tf.float32) / 255.0

  inputImage = tf.concat([image, lidar], axis=2)

  return (inputImage, imageFileName)


def prepareDataset():
    FullDataset =  tf.data.Dataset.list_files(str(testingImagesPath / '*.png'), shuffle=False)
    FullDataset = FullDataset.map(parse_img)
    FullDataset = FullDataset.shuffle(buffer_size=BUFFER_SIZE, seed=seed, reshuffle_each_iteration=False)
    FullDataset = FullDataset.batch(BATCH_SIZE)
    FullDataset = FullDataset.prefetch(buffer_size=AUTOTUNE)

    return FullDataset

In [None]:
def prepareModel():
    #Encoder1
    input1 = tf.keras.Input(shape=(img_height,img_width,fullInputChannels), name='inputLayer_1')

    e1_c1_1 = tf.keras.layers.Conv2D(filters=16, kernel_size=(3,3), padding='same', activation='relu', name="e1_conv1_1")(input1)
    e1_c1_2 = tf.keras.layers.Conv2D(filters=16, kernel_size=(3,3), padding='same', activation='relu', name="e1_conv1_2")(e1_c1_1)
    e1_pool1 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2), padding='same', name="e1_pool1")(e1_c1_2)

    e1_c2_1 = tf.keras.layers.Conv2D(filters=32, kernel_size=(3,3), padding='same', activation='relu', name="e1_conv2_1")(e1_pool1)
    e1_c2_2 = tf.keras.layers.Conv2D(filters=32, kernel_size=(3,3), padding='same', activation='relu', name="e1_conv2_2")(e1_c2_1)
    e1_pool2 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2), padding='same', name="e1_pool2")(e1_c2_2)

    e1_c3_1 = tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding='same', activation='relu', name="e1_conv3_1")(e1_pool2)
    e1_c3_2 = tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding='same', activation='relu', name="e1_conv3_2")(e1_c3_1)
    e1_c3_3 = tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding='same', activation='relu', name="e1_conv3_3")(e1_c3_2)
    e1_pool3 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2), padding='same', name="e1_pool3")(e1_c3_3)

    e1_c4_1 = tf.keras.layers.Conv2D(filters=128, kernel_size=(3,3), padding='same', activation='relu', name="e1_conv4_1")(e1_pool3)
    e1_c4_2 = tf.keras.layers.Conv2D(filters=128, kernel_size=(3,3), padding='same', activation='relu', name="e1_conv4_2")(e1_c4_1)
    e1_c4_3 = tf.keras.layers.Conv2D(filters=128, kernel_size=(3,3), padding='same', activation='relu', name="e1_conv4_3")(e1_c4_2)
    e1_pool4 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2), padding='same', name="e1_pool4")(e1_c4_3)

    e1 = tf.keras.models.Model(inputs=[input1], outputs=e1_pool4, name="Encoder1")

    fc6 = tf.keras.layers.Dense(units=1024, name='fc6')(e1.output)
    drop6 = tf.keras.layers.Dropout(rate=DROPOUT_RATE, name='drop6')(fc6)

    fc7 = tf.keras.layers.Dense(units=1024, name='fc7')(drop6)
    drop7 = tf.keras.layers.Dropout(rate=DROPOUT_RATE, name='e2_drop7')(fc7)

    dc10 = tf.keras.layers.Conv2DTranspose(filters=128, kernel_size=16, strides=(2,2), padding='same', name='dconv10')(drop7)

    c10_1 = tf.keras.layers.Conv2D(filters=128, kernel_size=(3,3), padding='same', activation='relu', name="conv10_1")(dc10)
    c10_2 = tf.keras.layers.Conv2D(filters=128, kernel_size=(3,3), padding='same', activation='relu', name="conv10_2")(c10_1)
    c10_3 = tf.keras.layers.Conv2D(filters=128, kernel_size=(3,3), padding='same', activation='relu', name="conv10_3")(c10_2)

    dc11 = tf.keras.layers.Conv2DTranspose(filters=64, kernel_size=16, strides=(2,2), padding='same', name='dconv11')(c10_3)

    c11_1 = tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding='same', activation='relu', name="conv11_1")(dc11)
    c11_2 = tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding='same', activation='relu', name="conv11_2")(c11_1)
    c11_3 = tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding='same', activation='relu', name="conv11_3")(c11_2)

    dc12 = tf.keras.layers.Conv2DTranspose(filters=32, kernel_size=16, strides=(2,2), padding='same', name='dconv12')(c11_3)

    c12_1 = tf.keras.layers.Conv2D(filters=32, kernel_size=(3,3), padding='same', activation='relu', name="conv12_1")(dc12)
    c12_2 = tf.keras.layers.Conv2D(filters=32, kernel_size=(3,3), padding='same', activation='relu', name="conv12_2")(c12_1)

    dc13 = tf.keras.layers.Conv2DTranspose(filters=16, kernel_size=16, strides=(2,2), padding='same', name='dconv13')(c12_2)
    c13_1 = tf.keras.layers.Conv2D(filters=16, kernel_size=(3,3), padding='same', activation='relu', name="conv13_1")(dc13)
    c13_2 = tf.keras.layers.Conv2D(filters=16, kernel_size=(3,3), padding='same', activation='relu', name="conv13_2")(c13_1)

    outputLayer = tf.keras.layers.Conv2D(filters=numClasses, kernel_size=16, strides=(1,1), padding='same', activation='sigmoid', name="output_layer")(c13_2)

    model = tf.keras.models.Model(inputs=[e1.inputs], outputs=[outputLayer])

    return model


In [None]:
model = prepareModel()
#model.summary()

In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE), loss=tf.keras.losses.BinaryCrossentropy(), metrics=['mse', tf.keras.metrics.BinaryIoU(target_class_ids=[1], name='BinaryIoU')], run_eagerly=False)

model.load_weights('Models/ModelB.h5')

In [None]:
import numpy as np
i=0
timeList = []

FullDataset = prepareDataset()

#for image,filePath in FullDataset:
for image,filePath in FullDataset.take(10):
    fileName = filePath[0].numpy().decode('utf-8').split("\\")[-1]
    splittedFileName = fileName.split('_')
    type = splittedFileName[0]
    imageNumber = splittedFileName[1].split('.')[0]

    outputFileName = type + '_road_' + imageNumber + '.png'
    outputPath = str(outputImagesPath) + '/B--' + outputFileName


    start = time.time()


    predMask = model.predict(image)[0]


    end = time.time()
    elapsed = end-start
    timeList.append(elapsed)

    #print(elapsed)

    predMask = tf.image.resize(predMask, (800, 400))

    invertedMask = np.ones((800,400,1)) - predMask

    splittedImage = tf.split(image[0], 2, axis=2)

    #display_sample([splittedImage[0], splittedImage[1], predMask])

    tf.keras.utils.save_img(outputPath, predMask, scale=True)
    #print(i)
    i+=1

In [None]:
print("Mean prediction time:")
print(np.mean(timeList))