In [None]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import re
import random
import numpy as np
import os.path
import scipy.misc
import shutil
import zipfile
import time
import tensorflow as tf
from glob import glob
from glob import glob1
from urllib.request import urlretrieve
from tqdm import tqdm
from PIL import Image

import sys
import os
from copy import deepcopy
from glob import glob
from unittest import mock

import warnings
from distutils.version import LooseVersion
import csv
import time

from matplotlib import pyplot as plt
import cv2

tf.compat.v1.disable_eager_execution()



In [None]:



class DLProgress(tqdm):
    last_block = 0

    def hook(self, block_num=1, block_size=1, total_size=None):
        self.total = total_size
        self.update((block_num - self.last_block) * block_size)
        self.last_block = block_num


def maybe_download_pretrained_vgg(data_dir):
    """
    Download and extract pretrained vgg model if it doesn't exist
    :param data_dir: Directory to download the model to
    """
    vgg_filename = 'vgg.zip'
    vgg_path = os.path.join(data_dir, 'vgg')
    vgg_files = [
        os.path.join(vgg_path, 'variables/variables.data-00000-of-00001'),
        os.path.join(vgg_path, 'variables/variables.index'),
        os.path.join(vgg_path, 'saved_model.pb')]

    missing_vgg_files = [vgg_file for vgg_file in vgg_files if not os.path.exists(vgg_file)]
    if missing_vgg_files:
        # Clean vgg dir
        if os.path.exists(vgg_path):
            shutil.rmtree(vgg_path)
        os.makedirs(vgg_path)

        # Download vgg
        print('Downloading pre-trained vgg model...')
        with DLProgress(unit='B', unit_scale=True, miniters=1) as pbar:
            urlretrieve(
                'https://s3-us-west-1.amazonaws.com/udacity-selfdrivingcar/vgg.zip',
                os.path.join(vgg_path, vgg_filename),
                pbar.hook)

        # Extract vgg
        print('Extracting model...')
        zip_ref = zipfile.ZipFile(os.path.join(vgg_path, vgg_filename), 'r')
        zip_ref.extractall(data_dir)
        zip_ref.close()

        # Remove zip file to save space
        os.remove(os.path.join(vgg_path, vgg_filename))


def gen_batch_function(data_folder, image_shape):
    """
    Generate function to create batches of training data
    :param data_folder: Path to folder that contains all the datasets
    :param image_shape: Tuple - Shape of image
    :return:
    """
    def get_batches_fn(batch_size):
        """
        Create batches of training data
        :param batch_size: Batch Size
        :return: Batches of training data
        """
        image_paths = glob(os.path.join(data_folder, 'image_2', '*.png'))
        label_paths = {
            re.sub(r'_(lane|road)_', '_', os.path.basename(path)): path
            for path in glob(os.path.join(data_folder, 'gt_image_2', '*_road_*.png'))}

        #ck 1
        background_color = np.array([0, 0, 255])
        #background_color = np.array([255, 0, 0])        # this was og thing




        random.shuffle(image_paths)
        for batch_i in range(0, len(image_paths), batch_size):

            images = []
            gt_images = []
            for image_file in image_paths[batch_i:batch_i+batch_size]:   #makes a file containing 8 images file locations one at a time

                gt_image_file = label_paths[os.path.basename(image_file)]


                img = cv2.imread(image_file)
                gt_img = cv2.imread(gt_image_file)

                image = cv2.resize(img,image_shape)
                gt_image = cv2.resize(gt_img,image_shape)
                # print("image resized")
                # print("gt_image.shape",gt_image.shape)
                # print('gt_image[100,100]',gt_image[100,100])

                gt_bg = np.all(gt_image == background_color, axis=2)
                gt_GGB = np.where(gt_image == background_color)
                # print("image bg match indices")
                #for i,n in enumerate(gt_GGB):
                #  print("i",i)
                #  print('n',n)


                # print("image checked for background")
                # print('gt_bg.shape',gt_bg.shape)
                # print('gt_bg[100,100]',gt_bg[100,100])
                # print('gt_image[100,100]',gt_image[100,100])

                gt_bg = gt_bg.reshape(*gt_bg.shape, 1)

                # print("increased one coloumn or something")
                # print('gt_bg.shape',gt_bg.shape)
                # print('gt_bg[100,100]',gt_bg[100,100])
                # print('gt_image[100,100]',gt_image[100,100])

                #potensial change future

                #ck 2
                #gt_image = np.concatenate((gt_bg, gt_bg), axis=2)
                gt_image = np.concatenate((gt_bg, np.invert(gt_bg)), axis=2)  #og

                # print("image concatinated and changed forever")
                # print('gt_image[100,100]',gt_image[100,100])
                images.append(image)
                gt_images.append(gt_image)

            yield np.array(images), np.array(gt_images)

    return get_batches_fn


def gen_test_output(sess, logits, keep_prob, image_pl, data_folder, image_shape):
    """
    Generate test output using the test images
    :param sess: TF session
    :param logits: TF Tensor for the logits
    :param keep_prob: TF Placeholder for the dropout keep robability
    :param image_pl: TF Placeholder for the image placeholder
    :param data_folder: Path to the folder that contains the datasets
    :param image_shape: Tuple - Shape of image
    :return: Output for for each test image
    """
    for image_file in glob(os.path.join(data_folder, 'image_2', '*.png')):
        #image = scipy.misc.imresize(scipy.misc.imread(image_file), image_shape)
        #image = Image.open(image_file).resize(image_shape, Image.ANTIALIAS)
        #gt_image = Image.open(gt_image_file).resize(image_shape, Image.ANTIALIAS)

        img = cv2.imread(image_file)
        image = cv2.resize(img,image_shape)



        im_softmax = sess.run(
            [tf.nn.softmax(logits)],
            {keep_prob: 1.0, image_pl: [image]})

        #ck 3
        #im_softmax = im_softmax[0][:, 1].reshape(image_shape[0], image_shape[1])        #og
        #segmentation = (im_softmax > 0.5).reshape(image_shape[0], image_shape[1], 1)    #og

        im_softmax = im_softmax[0][:, 1].reshape(image_shape[1], image_shape[0])
        segmentation = (im_softmax > 0.5).reshape(image_shape[1], image_shape[0], 1)

        mask = np.dot(segmentation, np.array([[0, 255, 0, 127]]))


        #mask = scipy.misc.toimage(mask, mode="RGBA") ###################
        mask = Image.fromarray(mask, mode='RGBA')
        #street_im = scipy.misc.toimage(image)
        street_im = Image.fromarray(image)

        street_im.paste(mask, box=None, mask=mask)

        yield os.path.basename(image_file), np.array(street_im)


def save_inference_samples(runs_dir, data_dir, sess, image_shape, logits, keep_prob, input_image):
    # Make folder for current run
    output_dir = os.path.join(runs_dir, str(time.time()))
    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)
    os.makedirs(output_dir)

    # Run NN on test images and save them to disk
    print('Training Finished!')
    print('Saving test images to: {}, please wait...'.format(output_dir))
    image_outputs = gen_test_output(
        sess, logits, keep_prob, input_image, os.path.join(data_dir, 'data_road/testing'), image_shape)
    for name, image in image_outputs:
        # Construct the full path to save the image
        output_path = os.path.join(output_dir, name)

        # Save the image using OpenCV
        cv2.imwrite(output_path, image)

        #scipy.misc.imsave(os.path.join(output_dir, name), image)

    print('All augmented images are saved!')

def gen_output(sess, logits, keep_prob, image_pl, data_folder, image_shape):
    """
    Generate test output using the test images
    :param sess: TF session
    :param logits: TF Tensor for the logits
    :param keep_prob: TF Placeholder for the dropout keep robability
    :param image_pl: TF Placeholder for the image placeholder
    :param data_folder: Path to the folder that contains the datasets
    :param image_shape: Tuple - Shape of image
    :return: Output for for each test image
    """
    for image_file in glob(os.path.join(data_folder, '*.png')):
        #this is only done so that the lidar road detection can be seen propoerly"
        plane_img_dir = '/content/drive/MyDrive/Fall 2023/3d perp/Project/data_road/testing/rgb_image_2'
        plane_image_name = os.path.basename(image_file)
        plane_img_path = os.path.join(plane_img_dir, plane_image_name)
        plane_img = cv2.imread(plane_img_path)
        plane_image = cv2.resize(plane_img,image_shape)
        plane_img = Image.fromarray(plane_image, mode = 'RGB')
        #till here


        print("dfeefef", image_file)
        img = cv2.imread(image_file)
        image = cv2.resize(img,image_shape)
        #image = scipy.misc.imresize(scipy.misc.imread(image_file), image_shape)
        startTime = time.perf_counter()
        im_softmax = sess.run(
            [tf.nn.softmax(logits)],
            {keep_prob: 1.0, image_pl: [image]})

        #ck 4
        im_softmax = im_softmax[0][:, 1].reshape(image_shape[1], image_shape[0])
        segmentation = (im_softmax > 0.5).reshape(image_shape[1], image_shape[0], 1)
        #segmentation = (im_softmax > 0.5).reshape(image_shape[0], image_shape[1], 1)   #og
        #im_softmax = im_softmax[0][:, 1].reshape(image_shape[0], image_shape[1]) #og

        # print("imsoft_size, after", im_softmax.shape)
        # count = 0
        # for i in im_softmax:
        #   for j in i:
        #     if j>0.5:
        #       count+=1

        # print("COunt = = = == ==== == ", count)



        # print("segmentation", segmentation.shape)
        # seg_count = 0
        # for i in range(segmentation.shape[0]):
        #   for j in range(segmentation.shape[1]):
        #     if segmentation[i,j]==1:
        #       seg_count+=1

        #     elif segmentation[i,j]==0:
        #       seg_count= seg_count
        #     else:
        #       print(":::::::::::::::::::::::::::::::::::::::::::::")

        # print("seg_Count = = = == ==== == ", seg_count)



        mask = np.dot(segmentation, np.array([[255, 0, 255, 127]]))



        #mask = scipy.misc.toimage(mask, mode="RGBA") ###################
        mask_image = Image.fromarray(np.uint8(mask), mode='RGBA')

        mask_image_rgb = Image.fromarray(np.uint8(mask), mode='RGB')



        # Display the mask
        plt.imshow(mask_image.convert('L'), cmap='gray')
        plt.title('Mask Visualization')
        plt.show()

        #street_im = scipy.misc.toimage(image)
        street_im = Image.fromarray(image, mode = 'RGB')
        plt.imshow(street_im)
        plt.axis('off')
        plt.show()

        #street_im.paste(mask_image, box=None, mask=mask_image)
        plane_img.paste(mask_image, box=None, mask=mask_image)

        plt.imshow(plane_img)               #this goes back to
        plt.axis('off')
        plt.show()

        endTime = time.perf_counter()
        speed_ = 1.0 / (endTime - startTime)

        #yield os.path.basename(image_file), np.array(street_im), speed_

        yield os.path.basename(image_file), np.array(plane_img), speed_

def pred_samples(runs_dir, data_dir, sess, image_shape, logits, keep_prob, input_image, print_speed=False):
    # Make folder for current run
    output_dir = os.path.join(runs_dir, str(time.time()))
    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)
    os.makedirs(output_dir)

    # Run NN on test images and save them to HD
    print('Predicting images...')
    # start epoch training timer

    image_outputs = gen_output(
        sess, logits, keep_prob, input_image, data_dir, image_shape)

    counter = 0
    for name, image, speed_ in image_outputs:
        # Construct the full path to save the image
        output_path = os.path.join(output_dir, name)

        # Save the image using OpenCV
        cv2.imwrite(output_path, image)

        if print_speed is True:
            counter+=1
            print("Processing file: {0:05d},\tSpeed: {1:.2f} fps".format(counter, speed_))

        # sum_time += laptime

    # pngCounter = len(glob1(data_dir,'*.png'))

    print('All augmented images are saved to: {}.'.format(output_dir))


In [None]:

def test_safe(func):
    """
    Isolate tests
    """
    def func_wrapper(*args):
        with tf.compat.v1.compat.v1.Graph().as_default():
            result = func(*args)
        print('Tests Passed')
        return result

    return func_wrapper


def _prevent_print(function, params):
    sys.stdout = open(os.devnull, "w")
    function(**params)
    sys.stdout = sys.__stdout__


def _assert_tensor_shape(tensor, shape, display_name):
    assert tf.compat.v1.compat.v1.assert_rank(tensor, len(shape), message='{} has wrong rank'.format(display_name))

    tensor_shape = tensor.get_shape().as_list() if len(shape) else []

    wrong_dimension = [ten_dim for ten_dim, cor_dim in zip(tensor_shape, shape)
                       if cor_dim is not None and ten_dim != cor_dim]
    assert not wrong_dimension, \
        '{} has wrong shape.  Found {}'.format(display_name, tensor_shape)


class TmpMock(object):
    """
    Mock a attribute.  Restore attribute when exiting scope.
    """
    def __init__(self, module, attrib_name):
        self.original_attrib = deepcopy(getattr(module, attrib_name))
        setattr(module, attrib_name, mock.MagicMock())
        self.module = module
        self.attrib_name = attrib_name

    def __enter__(self):
        return getattr(self.module, self.attrib_name)

    def __exit__(self, type, value, traceback):
        setattr(self.module, self.attrib_name, self.original_attrib)


@test_safe
def test_load_vgg(load_vgg, tf_module):
    with TmpMock(tf_module.saved_model.loader, 'load') as mock_load_model:
        vgg_path = ''
        sess = tf.compat.v1.compat.v1.Session()
        test_input_image = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32, name='image_input')
        test_keep_prob = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32, name='keep_prob')
        test_vgg_layer3_out = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32, name='layer3_out')
        test_vgg_layer4_out = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32, name='layer4_out')
        test_vgg_layer7_out = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32, name='layer7_out')

        input_image, keep_prob, vgg_layer3_out, vgg_layer4_out, vgg_layer7_out = load_vgg(sess, vgg_path)

        assert mock_load_model.called, \
            'tf.compat.v1.compat.v1.saved_model.loader.load() not called'
        assert mock_load_model.call_args == mock.call(sess, ['vgg16'], vgg_path), \
            'tf.compat.v1.compat.v1.saved_model.loader.load() called with wrong arguments.'

        assert input_image == test_input_image, 'input_image is the wrong object'
        assert keep_prob == test_keep_prob, 'keep_prob is the wrong object'
        assert vgg_layer3_out == test_vgg_layer3_out, 'layer3_out is the wrong object'
        assert vgg_layer4_out == test_vgg_layer4_out, 'layer4_out is the wrong object'
        assert vgg_layer7_out == test_vgg_layer7_out, 'layer7_out is the wrong object'


@test_safe
def test_layers(layers):
    num_classes = 2
    vgg_layer3_out = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32, [None, None, None, 256])
    vgg_layer4_out = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32, [None, None, None, 512])
    vgg_layer7_out = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32, [None, None, None, 4096])
    layers_output = layers(vgg_layer3_out, vgg_layer4_out, vgg_layer7_out, num_classes)

    _assert_tensor_shape(layers_output, [None, None, None, num_classes], 'Layers Output')


@test_safe
def test_optimize(optimize):
    num_classes = 2
    shape = [2, 3, 4, num_classes]
    layers_output = tf.compat.v1.compat.v1.Variable(tf.compat.v1.compat.v1.zeros(shape))
    correct_label = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32, [None, None, None, num_classes])
    learning_rate = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32)
    logits, train_op, cross_entropy_loss = optimize(layers_output, correct_label, learning_rate, num_classes)

    _assert_tensor_shape(logits, [2*3*4, num_classes], 'Logits')

    with tf.compat.v1.compat.v1.Session() as sess:
        sess.run(tf.compat.v1.compat.v1.global_variables_initializer())
        sess.run([train_op], {correct_label: np.arange(np.prod(shape)).reshape(shape), learning_rate: 10})
        test, loss = sess.run([layers_output, cross_entropy_loss], {correct_label: np.arange(np.prod(shape)).reshape(shape)})

    assert test.min() != 0 or test.max() != 0, 'Training operation not changing weights.'


@test_safe
def test_train_nn(train_nn):
    epochs = 1
    batch_size = 2

    def get_batches_fn(batach_size_parm):
        shape = [batach_size_parm, 2, 3, 3]
        return np.arange(np.prod(shape)).reshape(shape)

    train_op = tf.compat.v1.compat.v1.constant(0)
    cross_entropy_loss = tf.compat.v1.compat.v1.constant(10.11)
    input_image = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32, name='input_image')
    correct_label = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32, name='correct_label')
    keep_prob = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32, name='keep_prob')
    learning_rate = tf.compat.v1.compat.v1.placeholder(tf.compat.v1.compat.v1.float32, name='learning_rate')

    with tf.compat.v1.compat.v1.Session() as sess:
        parameters = {
            'sess': sess,
            'epochs': epochs,
            'batch_size': batch_size,
            'get_batches_fn': get_batches_fn,
            'train_op': train_op,
            'cross_entropy_loss': cross_entropy_loss,
            'input_image': input_image,
            'correct_label': correct_label,
            'keep_prob': keep_prob,
            'learning_rate': learning_rate}
        _prevent_print(train_nn, parameters)


@test_safe
def test_for_kitti_dataset(data_dir):
    kitti_dataset_path = os.path.join(data_dir, 'data_road')
    training_labels_count = len(glob(os.path.join(kitti_dataset_path, 'training/gt_image_2/*_road_*.png')))
    training_images_count = len(glob(os.path.join(kitti_dataset_path, 'training/image_2/*.png')))
    testing_images_count = len(glob(os.path.join(kitti_dataset_path, 'testing/image_2/*.png')))

    assert not (training_images_count == training_labels_count == testing_images_count == 0),\
        'Kitti dataset not found. Extract Kitti dataset in {}'.format(kitti_dataset_path)
    assert training_images_count == 229 , 'Expected 289 training images, found {} images.'.format(training_images_count)        #289
    assert training_labels_count == 229 , 'Expected 289 training labels, found {} labels.'.format(training_labels_count)        #289
    assert testing_images_count == 60 , 'Expected 290 testing images, found {} images.'.format(testing_images_count)            #290


In [None]:

model_path='/content/drive/MyDrive/Fall 2023/3d perp/Project/src/model_2/all_training_imgs'


# Check TensorFlow Version
assert LooseVersion(tf.compat.v1.__version__) >= LooseVersion('1.0'), 'Please use TensorFlow version 1.0 or newer.' \
                                                            '  You are using {}'.format(tf.compat.v1.__version__)
print('TensorFlow Version: {}'.format(tf.compat.v1.__version__))

# Check for a GPU
if not tf.compat.v1.test.gpu_device_name():
    warnings.warn('No GPU found. Please use a GPU to train your neural network.')
else:
    print('Default GPU Device: {}'.format(tf.compat.v1.test.gpu_device_name()))


def load_vgg(sess, vgg_path):
    """
    Load Pretrained VGG Model into TensorFlow.
    :param sess: TensorFlow Session
    :param vgg_path: Path to vgg folder, containing "variables/" and "saved_model.pb"
    :return: Tuple of Tensors from VGG model (image_input, keep_prob, layer3_out, layer4_out, layer7_out)
    """
    # Define the name of the tensors
    vgg_tag = 'vgg16'
    vgg_input_tensor_name = 'image_input:0'
    vgg_keep_prob_tensor_name = 'keep_prob:0'
    vgg_layer3_out_tensor_name = 'layer3_out:0'
    vgg_layer4_out_tensor_name = 'layer4_out:0'
    vgg_layer7_out_tensor_name = 'layer7_out:0'

    # Get the needed layers' outputs for building FCN-VGG16
    tf.compat.v1.saved_model.loader.load(sess, [vgg_tag], vgg_path)
    image_input = tf.compat.v1.get_default_graph().get_tensor_by_name(vgg_input_tensor_name)
    keep_prob = tf.compat.v1.get_default_graph().get_tensor_by_name(vgg_keep_prob_tensor_name)
    vgg_layer3_out = tf.compat.v1.get_default_graph().get_tensor_by_name(vgg_layer3_out_tensor_name)
    vgg_layer4_out = tf.compat.v1.get_default_graph().get_tensor_by_name(vgg_layer4_out_tensor_name)
    vgg_layer7_out = tf.compat.v1.get_default_graph().get_tensor_by_name(vgg_layer7_out_tensor_name)

    return image_input, keep_prob, vgg_layer3_out, vgg_layer4_out, vgg_layer7_out



def layers(vgg_layer3_out, vgg_layer4_out, vgg_layer7_out, num_classes):    # looks like this is adding  layers to vgg
    """
    Create the layers for a fully convolutional network.  Build skip-layers using the vgg layers.
    :param vgg_layer7_out: TF Tensor for VGG Layer 3 output
    :param vgg_layer4_out: TF Tensor for VGG Layer 4 output
    :param vgg_layer3_out: TF Tensor for VGG Layer 7 output
    :param num_classes: Number of classes to classify
    :return: The Tensor for the last layer of output
    """
    # making sure the resulting shape are the same
    vgg_layer7_logits = tf.compat.v1.layers.conv2d(
        vgg_layer7_out,    #input tensor
        num_classes,       # no of filters/classes
        kernel_size=1,     # size of convolutional kernel
        kernel_initializer= tf.compat.v1.random_normal_initializer(stddev=0.01),  # Initializer for the kernel weights
        kernel_regularizer= tf.keras.regularizers.l2(1e-4),name='vgg_layer7_logits')    # Regularization term for the kernel weights  new

    vgg_layer4_logits = tf.compat.v1.layers.conv2d(
        vgg_layer4_out, num_classes, kernel_size=1,
        kernel_initializer= tf.compat.v1.random_normal_initializer(stddev=0.01),
        kernel_regularizer= tf.keras.regularizers.l2(1e-4), name='vgg_layer4_logits')
    vgg_layer3_logits = tf.compat.v1.layers.conv2d(
        vgg_layer3_out, num_classes, kernel_size=1,
        kernel_initializer= tf.compat.v1.random_normal_initializer(stddev=0.01),
        kernel_regularizer= tf.keras.regularizers.l2(1e-4), name='vgg_layer3_logits')

    # # Apply the transposed convolutions to get upsampled version, and then merge the upsampled layers
    fcn_decoder_layer1 = tf.compat.v1.layers.conv2d_transpose(
        vgg_layer7_logits, num_classes, kernel_size=4, strides=(2, 2),
        padding='same',
        kernel_initializer= tf.compat.v1.random_normal_initializer(stddev=0.01),
        kernel_regularizer= tf.keras.regularizers.l2(1e-4), name='fcn_decoder_layer1')

    # add the first skip connection from the vgg_layer4_out
    fcn_decoder_layer2 = tf.compat.v1.add(
        fcn_decoder_layer1, vgg_layer4_logits, name='fcn_decoder_layer2')

    # then follow this with another transposed convolution layer and make shape the same as layer3
    fcn_decoder_layer3 = tf.compat.v1.layers.conv2d_transpose(
        fcn_decoder_layer2, num_classes, kernel_size=4, strides=(2, 2),
        padding='same',
        kernel_initializer= tf.compat.v1.random_normal_initializer(stddev=0.01),
        kernel_regularizer= tf.keras.regularizers.l2(1e-4), name='fcn_decoder_layer3')

    # apply the same steps for the third layer output.
    fcn_decoder_layer4 = tf.compat.v1.add(
        fcn_decoder_layer3, vgg_layer3_logits, name='fcn_decoder_layer4')
    fcn_decoder_output = tf.compat.v1.layers.conv2d_transpose(
        fcn_decoder_layer4, num_classes, kernel_size=16, strides=(8, 8),
        padding='same',
        kernel_initializer= tf.compat.v1.random_normal_initializer(stddev=0.01),
        kernel_regularizer= tf.keras.regularizers.l2(1e-4), name='fcn_decoder_layer4')

    return fcn_decoder_output



def optimize(nn_last_layer, correct_label, learning_rate, num_classes):  #just calculates the loss error
    """
    Build the TensorFLow loss and optimizer operations.
    :param nn_last_layer: TF Tensor of the last layer in the neural network
    :param correct_label: TF Placeholder for the correct label image
    :param learning_rate: TF Placeholder for the learning rate
    :param num_classes: Number of classes to classify
    :return: Tuple of (logits, train_op, cross_entropy_loss)
    """
    # TODO: Implement function
    # make logits a 2D tensor where each row represents a pixel and each column a class
    logits = tf.compat.v1.reshape(nn_last_layer, (-1, num_classes))
    correct_label = tf.compat.v1.reshape(correct_label, (-1,num_classes))
    # define loss function

    #ck 5

    cross_entropy_loss = tf.compat.v1.reduce_mean(tf.compat.v1.nn.softmax_cross_entropy_with_logits(logits= logits, labels= correct_label))
    #cross_entropy_loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits= logits, labels= correct_label))  #og

    # define training operation
    optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate= learning_rate)
    train_op = optimizer.minimize(cross_entropy_loss)

    return logits, train_op, cross_entropy_loss



def train_nn(sess, epochs, batch_size, get_batches_fn, train_op, cross_entropy_loss, input_image,
             correct_label, keep_prob, learning_rate):
    """
    Train neural network and print out the loss during training.
    :param sess: TF Session
    :param epochs: Number of epochs
    :param batch_size: Batch size
    :param get_batches_fn: Function to get batches of training data.  Call using get_batches_fn(batch_size)
    :param train_op: TF Operation to train the neural network
    :param cross_entropy_loss: TF Tensor for the amount of loss
    :param input_image: TF Placeholder for input images
    :param correct_label: TF Placeholder for label images
    :param keep_prob: TF Placeholder for dropout keep probability
    :param learning_rate: TF Placeholder for learning rate
    """
    # Create log file
    log_filename = "./training_progress.csv"
    log_fields = ['learning_rate', 'exec_time (s)', 'training_loss']
    log_file = open(log_filename, 'w')
    log_writer = csv.DictWriter(log_file, fieldnames=log_fields)
    log_writer.writeheader()


    sess.run(tf.compat.v1.global_variables_initializer())

    lr = 0.0001

    print("Training...")
    print()
    for i in range(epochs):
        print("EPOCH {} ...".format(i+1))
        training_loss = 0
        training_samples = 0
        starttime = time.perf_counter()
        for image, label in get_batches_fn(batch_size):

            _, loss = sess.run([train_op, cross_entropy_loss],
                               feed_dict={input_image: image, correct_label: label,
                                          keep_prob: 0.8, learning_rate: lr})

            print("batch loss: = {:.3f}".format(loss))
            training_samples += 1
            training_loss += loss

        training_loss /= training_samples
        endtime = time.perf_counter()
        training_time = endtime-starttime

        print("Average loss for the current epoch: = {:.3f}\n".format(training_loss))
        log_writer.writerow({'learning_rate': lr, 'exec_time (s)': round(training_time, 2) , 'training_loss': round(training_loss,4)})
        log_file.flush()





def run():
    num_classes = 2
    image_shape = (576, 160)
    data_dir = '/content/drive/MyDrive/Fall 2023/3d perp/Project'
    runs_dir = '/content/drive/MyDrive/Fall 2023/3d perp/Project/runs'

    test_for_kitti_dataset(data_dir)

    # Download pretrained vgg model
    maybe_download_pretrained_vgg(data_dir)


    with tf.compat.v1.Session() as sess:
        # Path to vgg model
        vgg_path = os.path.join(data_dir, 'vgg')
        # Create function to get batches
        get_batches_fn = gen_batch_function(os.path.join(data_dir, 'data_road/training'), image_shape)

        # TODO: Build NN using load_vgg, layers, and optimize function
        epochs = 30
        batch_size = 8

        # TF placeholders
        correct_label = tf.compat.v1.placeholder(tf.compat.v1.int32, [None, None, None, num_classes], name='correct_label')
        learning_rate = tf.compat.v1.placeholder(tf.compat.v1.float32, name='learning_rate')

        input_image, keep_prob, vgg_layer3_out, vgg_layer4_out, vgg_layer7_out = load_vgg(sess, vgg_path)       #I dont see anywhere how the real images are being taken in and how they r being compared to ground truth

        nn_last_layer = layers(vgg_layer3_out, vgg_layer4_out, vgg_layer7_out, num_classes)

        logits, train_op, cross_entropy_loss = optimize(nn_last_layer, correct_label, learning_rate, num_classes)

        # TODO: Train NN using the train_nn function
        train_nn(sess, epochs, batch_size, get_batches_fn, train_op, cross_entropy_loss, input_image,
             correct_label, keep_prob, learning_rate)

        saver = tf.compat.v1.train.Saver()
        save_path = saver.save(sess, model_path)
        print("Model is saved to file: %s" % save_path)

        # TODO: predict the testing data and save the augmented images
        save_inference_samples(runs_dir, data_dir, sess, image_shape, logits, keep_prob, input_image)


def predict_images(test_data_path, print_speed=False):
    num_classes = 2
    image_shape = (576, 160)
    runs_dir = '/content/drive/MyDrive/Fall 2023/3d perp/Project/runs'

    # Path to vgg model
    vgg_path = os.path.join('/content/drive/MyDrive/Fall 2023/3d perp/Project/', 'vgg')

    with tf.compat.v1.Session() as sess:
        # Predict the logits
        input_image, keep_prob, vgg_layer3_out, vgg_layer4_out, vgg_layer7_out = load_vgg(sess, vgg_path)
        nn_last_layer = layers(vgg_layer3_out, vgg_layer4_out, vgg_layer7_out, num_classes)
        logits = tf.compat.v1.reshape(nn_last_layer, (-1, num_classes))

        # Restore the saved model
        saver = tf.compat.v1.train.Saver()
        saver.restore(sess, model_path)
        print("Restored the saved Model in file: %s" % model_path)

        # Predict the samples
        pred_samples(runs_dir, test_data_path, sess, image_shape, logits, keep_prob, input_image, print_speed)


'''
if __name__ == '__main__':

    training_flag = False   # True: train the NN; False: predict with trained NN

    if training_flag:       #train model

      # run unittest before training
      #test_load_vgg(load_vgg, tf)
      #test_layers(layers)
      #test_optimize(optimize)
      #test_train_nn(train_nn)

      # train the NN and save the model
      run()
    else:                 #predict
      # use the pre-trained model to predict more images
      test_data_path = '/content/drive/MyDrive/Fall 2023/3d perp/Project/data_road/testing/image_2'
      predict_images(test_data_path, print_speed=True)
'''

  assert LooseVersion(tf.compat.v1.__version__) >= LooseVersion('1.0'), 'Please use TensorFlow version 1.0 or newer.' \


TensorFlow Version: 2.14.0
Default GPU Device: /device:GPU:0


"\nif __name__ == '__main__':\n\n    training_flag = False   # True: train the NN; False: predict with trained NN\n\n    if training_flag:       #train model\n\n      # run unittest before training\n      #test_load_vgg(load_vgg, tf)\n      #test_layers(layers)\n      #test_optimize(optimize)\n      #test_train_nn(train_nn)\n\n      # train the NN and save the model\n      run()\n    else:                 #predict\n      # use the pre-trained model to predict more images\n      test_data_path = '/content/drive/MyDrive/Fall 2023/3d perp/Project/data_road/testing/image_2'\n      predict_images(test_data_path, print_speed=True)\n"

In [None]:
if __name__ == '__main__':

    training_flag = False   # True: train the NN; False: predict with trained NN

    if training_flag:       #train model

      # run unittest before training
      #test_load_vgg(load_vgg, tf)
      #test_layers(layers)
      #test_optimize(optimize)
      #test_train_nn(train_nn)

      # train the NN and save the model
      run()
    else:                 #predict
      # use the pre-trained model to predict more images
      test_data_path = '/content/drive/MyDrive/Fall 2023/3d perp/Project/data_road/testing/image_2'
      predict_images(test_data_path, print_speed=True)


Output hidden; open in https://colab.research.google.com to view.