In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential
import glob
import numpy as np
import random

In [None]:
train_paths = glob.glob("dataset_trafficRoad/images_prepped_train"+"/*.png")
train_label_paths = glob.glob("dataset_trafficRoad/annotations_prepped_train"+"/*.png")
print(train_paths[10:12])
print(train_label_paths[10:12])
test_paths = glob.glob("dataset_trafficRoad/images_prepped_test"+"/*.png")
test_label_paths = glob.glob("dataset_trafficRoad/annotations_prepped_test"+"/*.png")
print(test_paths[10:12])
print(test_label_paths[10:12])
print(len(train_paths),len(train_label_paths),len(test_paths),len(test_label_paths))

In [None]:
def data_processing(path, label_path):
    img = tf.io.read_file(path)
    img = tf.image.decode_jpeg( img , channels = 3)
    img = tf.cast(img , dtype = "float32")
    label = tf.io.read_file(label_path)
    label = tf.image.decode_jpeg( label , channels = 1)
    return img/255., label

In [None]:
img, label = data_processing(train_paths[20], train_label_paths[20])
print(img.shape, label.shape)
print(np.max(img), np.min(img))
print(np.max(label), np.min(label))

In [None]:
train_db = tf.data.Dataset.from_tensor_slices((train_paths, train_label_paths))
train_db = train_db.shuffle(10).map(data_processing).batch(1)

test_db = tf.data.Dataset.from_tensor_slices((test_paths, test_label_paths))
test_db = test_db.shuffle(10).map(data_processing).batch(1)

sample1 = next(iter(train_db))
print('sample1:', sample1[0].shape, sample1[1].shape)
sample2 = next(iter(test_db))
print('sample2:', sample2[0].shape, sample2[1].shape)

# Model

In [None]:
baseModel = tf.keras.applications.VGG16(weights="imagenet",include_top=False,input_shape = (360, 480, 3),pooling=None)
baseModel.trainable = False
contractingNet = tf.keras.models.Model(inputs=baseModel.input, outputs=baseModel.get_layer('block4_pool').output)

f1 = contractingNet.get_layer("block1_conv2").output
f2 = contractingNet.get_layer("block2_conv2").output
f3 = contractingNet.get_layer("block3_conv3").output
f4 = contractingNet.get_layer("block4_conv3").output

a = layers.Conv2D(1024, kernel_size=[3, 3], padding="same", activation="relu")(contractingNet.output)
a = layers.Conv2D(512, kernel_size=[1, 1], padding="same", activation="relu")(a)
a = layers.Conv2D(1024, kernel_size=[3, 3], padding="same", activation="relu")(a)

a = layers.UpSampling2D(size = (2, 2))(a)
a = layers.ZeroPadding2D(padding = (1, 1))(a)
a = layers.Conv2D(512, kernel_size=[2, 3], padding="valid", activation="relu")(a)
a = layers.concatenate([a,f4], axis = -1)
a = layers.Conv2D(256, kernel_size=[1, 1], padding="same", activation="relu")(a)
a = layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation="relu")(a)

a = layers.UpSampling2D(size = (2, 2))(a)
a = layers.concatenate([a,f3], axis = -1)
a = layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation="relu")(a)
a = layers.Conv2D(128, kernel_size=[1, 1], padding="same", activation="relu")(a)
a = layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation="relu")(a)

a = layers.UpSampling2D(size = (2, 2))(a)
a = layers.concatenate([a,f2], axis = -1)
a = layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation="relu")(a)
a = layers.Conv2D(64, kernel_size=[1, 1], padding="same", activation="relu")(a)
a = layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation="relu")(a)

a = layers.UpSampling2D(size = (2, 2))(a)
a = layers.concatenate([a,f1], axis = -1)
a = layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation="relu")(a)
a = layers.Conv2D(32, kernel_size=[1, 1], padding="same", activation="relu")(a)
a = layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation="relu")(a)
outputs = layers.Conv2D(12, kernel_size=[1, 1], padding="same", activation="softmax")(a)

model = tf.keras.Model(inputs = baseModel.input, outputs = outputs )
model.summary()

# Training

In [None]:
optimizer = optimizers.Adam(lr=1e-4)
model.compile(loss = "sparse_categorical_crossentropy", optimizer = optimizer,metrics = ["acc"])

model_path = "models/model-trafficRoad-{epoch:04d}-{val_acc:.4f}.h5"
checkpoint = keras.callbacks.ModelCheckpoint(model_path, monitor = "val_acc", save_best_only = True, mode="max")

In [None]:
history = model.fit(train_db, epochs = 20, validation_data = test_db, callbacks = [checkpoint])

# Load model to evaluate

In [None]:
del model

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential
import glob
import numpy as np
import random

In [None]:
model = tf.keras.models.load_model('models/model-trafficRoad-0017-0.9052.h5')

In [None]:
train_paths = glob.glob("dataset_trafficRoad/images_prepped_train"+"/*.png")
train_label_paths = glob.glob("dataset_trafficRoad/annotations_prepped_train"+"/*.png")
test_paths = glob.glob("dataset_trafficRoad/images_prepped_test"+"/*.png")
test_label_paths = glob.glob("dataset_trafficRoad/annotations_prepped_test"+"/*.png")

In [None]:
def get_one_img(path):
    img = tf.io.read_file(path)
    img = tf.image.decode_jpeg( img , channels = 3)
    img = tf.cast(img , dtype = "float32")
    img = tf.image.resize(img, [360, 480])
    img = tf.expand_dims(img, axis = 0)
    return img/255.

In [None]:
import cv2, time

img_index = 80
watch_paths = train_paths[img_index:img_index+4]

for x, path in enumerate(watch_paths):
    img = get_one_img(path)
    orgimg = tf.squeeze(img, axis = 0)
    cv2.imshow('orgimg'+str(x), orgimg.numpy())
    s1 = time.time()
    pred = model.predict(img)
    s2 =time.time()
    pred_arg = tf.argmax(pred, axis = -1)
    pred_arg = tf.squeeze(pred_arg, axis = 0)
    pred_arg = tf.expand_dims(pred_arg, axis = -1)
    
    a = tf.where( tf.equal(pred_arg, 0),  [255, 255,   0],  [0, 0, 0])
    b = tf.where( tf.equal(pred_arg, 1),  [ 18, 153, 255],  [0, 0, 0])
    c = tf.where( tf.equal(pred_arg, 2),  [  0,   0,   0],  [0, 0, 0])
    d = tf.where( tf.equal(pred_arg, 3),  [105, 128, 118],  [0, 0, 0])
    e = tf.where( tf.equal(pred_arg, 4),  [255, 255, 255],  [0, 0, 0])
    f = tf.where( tf.equal(pred_arg, 5),  [  0, 255,   0],  [0, 0, 0])
    g = tf.where( tf.equal(pred_arg, 6),  [192, 192, 192],  [0, 0, 0])
    h = tf.where( tf.equal(pred_arg, 7),  [  0,   0, 255],  [0, 0, 0])
    i = tf.where( tf.equal(pred_arg, 8),  [  8,  46,  84],  [0, 0, 0])
    j = tf.where( tf.equal(pred_arg, 9),  [255, 125,  64],  [0, 0, 0])
    k = tf.where( tf.equal(pred_arg, 10), [192, 192, 192],  [0, 0, 0])
    l = tf.where( tf.equal(pred_arg, 11), [255,   0,   0],  [0, 0, 0])
    predimg = a + b + c + d + e + f + g + h + i + j + k + l
    predimg = tf.cast(predimg, dtype = "uint8")
    cv2.imshow('predimg'+str(x), predimg.numpy())
    print("time distance:", s2-s1)
    
cv2.waitKey(0)
cv2.destroyAllWindows()