In [None]:
# Import all required packages

# helper packages
import numpy as np
import imageio
import matplotlib.pyplot as plt
import random
import cv2
import os

# tensorflow packages
import tensorflow as tf 
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.models import load_model

# import sklearn for data split convenience
from sklearn.model_selection import train_test_split

# import own helper functions
from helper_functions import helpers
from tf_functions import tf_helpers

In [None]:
# List all available devices for the machi

from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())

In [None]:
# create variables to store images as list
imagePaths = ["./archive/"+"data"+i+"/"+"data"+i+"/CameraRGB/" for i in ['A', 'B', 'C', 'D', 'E']]
maskPaths = ["./archive/"+"data"+i+"/"+"data"+i+"/CameraSeg/" for i in ['A', 'B', 'C', 'D', 'E']]

In [None]:
# Create a list of file paths for the input images
imagePaths = helpers.createImagePaths(imagePaths)

# Create a list of file paths for the corresponding mask images
maskPaths = helpers.createImagePaths(maskPaths)

# Get the number of input images and corresponding mask images
nImg, nMask = len(imagePaths), len(maskPaths)

# Print the number of input images and corresponding mask images
print(f"Number of images and masks: {nImg}, {nMask}")


In [None]:
# Visualize some sample images alongside their mask
nSamples = len(imagePaths)

for i in range(2):
    N = random.randint(0,nSamples-1)

    img = imageio.imread(imagePaths[N])
    mask = imageio.imread(maskPaths[N])
    # Get the maximum value across the color channels for each pixel in the mask
    max_values = np.max(mask, axis=2)

    # Reshape the resulting 2D array to have the same shape as the input image
    reshaped_mask = max_values.reshape(img.shape[0], img.shape[1])

    # Assign the reshaped mask to the 'mask' variable
    mask = reshaped_mask

    fig, arr = plt.subplots(1, 2, figsize=(18, 6))
    arr[0].imshow(img)
    arr[0].set_title('CARLA visualization')
    arr[0].axis("off")
    arr[1].imshow(mask)
    arr[1].set_title('Image mask')
    arr[1].axis("off")
    plt.savefig("img_"+str(i)+".png")


In [None]:
# Split the dataset into three parts: training, validation and test
trainImagePaths, valImagePaths, trainMaskPaths, valMaskPaths = train_test_split(imagePaths, maskPaths, train_size=0.8, random_state=0)
valImagePaths, testImagePaths, valMaskPaths, testMaskPaths = train_test_split(valImagePaths, valMaskPaths, train_size=0.8, random_state=0)

print(f'{len(trainImagePaths)} images in training')
print(f'{len(valImagePaths)} images in validation')
print(f'{len(testImagePaths)} images in testing')

In [None]:
batchSize = 64
bufferSize = 500
nClasses = 13
# Use the dataGenerator function from the helpers module to create TensorFlow datasets for training, validation, and testing
trainDataset = helpers.dataGenerator(trainImagePaths, trainMaskPaths, bufferSize, batchSize)
valDataset = helpers.dataGenerator(valImagePaths, valMaskPaths, bufferSize, batchSize)
testDataset = helpers.dataGenerator(testImagePaths, testMaskPaths, bufferSize, batchSize)


In [None]:
# Set the number of filters for the UNet model
filters = 32

# Define the UNet model with the specified number of filters
model = tf_helpers.defineUNetModel((256, 256, 3), filters=filters, outputNodes=23)

# Print a summary of the model architecture
model.summary()

In [None]:
# Compile the model with Adam optimizer, sparse categorical cross-entropy loss, and accuracy metric
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Set up an early stopping callback to stop training if validation accuracy does not improve for 5 epochs
callback = EarlyStopping(monitor='val_accuracy', patience=5, restore_best_weights=True)

# Set up a learning rate reduction callback to reduce the learning rate if validation accuracy does not improve for 3 epochs
reduceLr = ReduceLROnPlateau(monitor='val_accuracy',factor=0.1, patience=3, verbose=1, min_lr=0.00001)

# Set the number of epochs to train for
epochs = 10

In [None]:
history = model.fit(trainDataset, # training dataset
                    validation_data = valDataset, # validation dataset
                    epochs = epochs, # number of epochs to train for
                    verbose=1, # print progress bar for each epoch
                    callbacks = [callback, reduceLr], # list of callbacks to apply during training
                    batch_size = batchSize, # batch size
                    shuffle = True) # shuffle the training data at the beginning of each epoch


In [None]:
# create a figure with size 10x10
plt.figure(figsize=(10, 10))
# create subplot 1 with 2 rows, 1 column, and index 1
plt.subplot(2, 1, 1)
# plot training accuracy
plt.plot(history.history['accuracy'], label='Training Accuracy')
# plot validation accuracy
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
# set y-axis label
plt.ylabel('Accuracy')
# set y-axis limit to current minimum and 1
plt.ylim([min(plt.ylim()), 1])
# add legend to the plot in lower right position
plt.legend(loc='lower right')
# set plot title
plt.title('Training/Validation Accuracy')

# create subplot 2 with 2 rows, 1 column, and index 2
plt.subplot(2, 1, 2)
# plot training loss
plt.plot(history.history['loss'], label='Training Loss')
# plot validation loss
plt.plot(history.history['val_loss'], label='Validation Loss')
# set y-axis label
plt.ylabel('Loss')
# set y-axis limit from 0 to 1.0
plt.ylim([0, 1.0])
# add legend to the plot in upper right position
plt.legend(loc='upper right')
# set plot title
plt.title('Training/Validation Loss')
# set x-axis label
plt.xlabel('Epochs')
# save the plot as image file
plt.savefig("loss_and_accuracy")
# show the plot
plt.show()


In [None]:
# save the model so we don't have to retrain it every time 
model.save('UNetOwnRecreation.h5')

In [None]:
# Evaluate model on the training dataset
trainLoss, trainAcc = model.evaluate(trainDataset, batch_size=batchSize)

# Evaluate model on the validation dataset
valLoss, valAcc = model.evaluate(valDataset, batch_size=batchSize)

# Evaluate model on the test dataset
testLoss, testAcc = model.evaluate(testDataset, batch_size=batchSize)


In [None]:
print(f'Training acc: {trainAcc}')
print(f'Val acc: {valAcc}')
print(f'Test acc: {testAcc}')

In [None]:
# Generate the ground truth and predicted masks for the training, validation, and test datasets
trainMasksTrue, trainMasksPred = tf_helpers.createMaskForImage(trainDataset, model)
valMasksTrue, valMasksPred = tf_helpers.createMaskForImage(valDataset, model)
testMasksTrue, testMasksPred = tf_helpers.createMaskForImage(testDataset, model)


In [None]:
# Initialize ModelEvaluation objects for the training, validation, and testing sets, and print their evaluation results
modelEvalTrain = tf_helpers.modelEvaluation(trainMasksTrue, trainMasksPred, n_classes=nClasses) # Instantiate a ModelEvaluation object for the training set
modelEvalVal = tf_helpers.modelEvaluation(valMasksTrue, valMasksPred, n_classes=nClasses) # Instantiate a ModelEvaluation object for the validation set
modelEvalTest = tf_helpers.modelEvaluation(testMasksTrue, testMasksPred, n_classes=nClasses) # Instantiate a ModelEvaluation object for the testing set

# Print the evaluation results for each set
print(modelEvalTrain, modelEvalVal, modelEvalTest)


In [None]:
# Load our model
model = load_model('UNetOwnRecreation.h5')

In [None]:
# Read in an image file and preprocess it
image = tf.io.read_file('<ENTER YOUR IMAGE FILE HERE>') # Read the image file
image = tf.image.decode_png(image, channels=3) # Decode the image
image = tf.image.convert_image_dtype(image, tf.float32) # Convert the image to floats between 0 and 1
image = tf.image.resize(image, (256, 256), method='nearest') # Resize the image to (256, 256) using nearest neighbor interpolation

# Make a prediction on the preprocessed image
pred = model.predict(tf.expand_dims(image, 0))

# Convert the prediction to a mask
predMask = tf.argmax(pred, axis=-1)
predMask = tf.expand_dims(predMask, axis=-1)

# Extract the predicted mask from the batch and plot it alongside the input image and true mask
finalPred = predMask[0]
plt.figure(figsize=(18, 18))
title = ['Input Image', 'True Mask', 'Predicted Mask']
plt.title('Test')
plt.imshow(tf.keras.utils.array_to_img(finalPred))
plt.axis('off')
plt.show()


In [None]:
# Forecast on our own video
sampleVideo = '<INSERT SAMPLE VIDEO HERE>' # .mp4 format
framesFolder = '<INSERT SAMPLE FRAMES FOLDER HERE>' # e.g. 'highway'
predFramesFolder = '<INSERT SAMPLE MASKS FOLDER HERE>' # e.g. 'highway_masks'
outputVideoFile = '<INSERT OUTPUT FILE NAME HERE>' # e.g. 'highway_forecast'

# Open the sample video and read its frames
vidcap = cv2.VideoCapture(sampleVideo)
success, image = vidcap.read()
count = 0

# Loop over each frame in the video and save it as an image
while success:
    cv2.imwrite(framesFolder + "/frame%d.jpg" % count, image)     
    success, image = vidcap.read()
    if count % 1000 == 0:
        print('Frame:', count)
    count += 1



In [None]:

# Loop over each saved frame in the specified directory
for filename in range(count):
    f = os.path.join(framesFolder, "frame" + str(filename) + ".jpg")
    
    # If the file exists, process it
    if os.path.isfile(f):
        image = tf.io.read_file(f)
        image = tf.image.decode_png(image, channels=3)
        image = tf.image.convert_image_dtype(image, tf.float32)
        image = tf.image.resize(image, (256, 256), method='nearest')

        pred = model.predict(tf.expand_dims(image, 0))
        predMask = tf.argmax(pred, axis=-1)
        predMask = tf.expand_dims(predMask, axis=-1)

        finalPred = predMask[0]
        plt.imshow(tf.keras.utils.array_to_img(finalPred))
        plt.savefig(predFramesFolder + '/frame' + str(filename) + ".jpg")
        plt.close()


In [None]:
frameArray = []

# get all the image files in the predicted frames folder
files = [f for f in os.listdir('./' + predFramesFolder + '/')]
# sort the filenames by their numerical order
files.sort(key = lambda x: int(x[5:-4]))

# loop through each file
for i in range(len(files)):
    filename='./' + predFramesFolder + '/' + files[i]
    # read the image file using OpenCV
    img = cv2.imread(filename)
    height, width, layers = img.shape
    size = (width,height)
    frameArray.append(img)

# create a video writer object to write the predicted frames to a video file
out = cv2.VideoWriter(outputVideoFile + '.avi',cv2.VideoWriter_fourcc(*'DIVX'), 30, size)

# loop through each predicted frame and write it to the output video file
for i in range(len(frameArray)):
    out.write(frameArray[i])

# release the video writer object
out.release()
