In [73]:
import gzip
import os
import sys
import urllib
import matplotlib.image as mpimg
from PIL import Image

import code

import tensorflow.python.platform
import sys
import numpy
import tensorflow as tf
import argparse
import numpy as np
from tensorflow.keras import layers, models, losses, optimizers
from tensorflow.keras.layers import Conv2D



In [88]:
NUM_CHANNELS = 3  # RGB images
PIXEL_DEPTH = 255
NUM_LABELS = 2
TRAINING_SIZE = 100
VALIDATION_SIZE = 20  # Size of the validation set.
SEED = 66478  # Set to None for random seed.
BATCH_SIZE = 16  # 64
NUM_EPOCHS = 100
RESTORE_MODEL = False  # If True, restore existing model instead of training a new one
RECORDING_STEP = 0

# Set image patch size in pixels
# IMG_PATCH_SIZE should be a multiple of 4
# image size should be an integer multiple of this number!
IMG_PATCH_SIZE = 16
# Argument parsing

# Remove unwanted arguments from sys.argv
sys.argv = [sys.argv[0]]
parser = argparse.ArgumentParser(description="Segmentation Training Parameters")

parser.add_argument(
    "--train_dir",
    type=str,
    default=os.getcwd() ,
    help="Directory where to write event logs and checkpoint.",
)
args = parser.parse_args()

print(f"Training directory: {args.train_dir}")


Training directory: /Users/melina/Desktop/ML/MLproj2


In [66]:
def img_crop(im, w, h):
    list_patches = []
    imgwidth = im.shape[0]
    imgheight = im.shape[1]
    is_2d = len(im.shape) < 3
    for i in range(0, imgheight, h):
        for j in range(0, imgwidth, w):
            if is_2d:
                im_patch = im[j : j + w, i : i + h]
            else:
                im_patch = im[j : j + w, i : i + h, :]
            list_patches.append(im_patch)
    return list_patches


def extract_data(filename, num_images):
    """Extract the images into a 4D tensor [image index, y, x, channels].
    Values are rescaled from [0, 255] down to [-0.5, 0.5].
    """
    imgs = []
    for i in range(1, num_images + 1):
        imageid = "satImage_%.3d" % i
        image_filename = filename + imageid + ".png"
        if os.path.isfile(image_filename):
            # print("Loading " + image_filename)
            img = mpimg.imread(image_filename)
            imgs.append(img)
        else:
            print("File " + image_filename + " does not exist")

    num_images = len(imgs)
    IMG_WIDTH = imgs[0].shape[0]
    IMG_HEIGHT = imgs[0].shape[1]
    N_PATCHES_PER_IMAGE = (IMG_WIDTH / IMG_PATCH_SIZE) * (IMG_HEIGHT / IMG_PATCH_SIZE)

    img_patches = [
        img_crop(imgs[i], IMG_PATCH_SIZE, IMG_PATCH_SIZE) for i in range(num_images)
    ]
    data = [
        img_patches[i][j]
        for i in range(len(img_patches))
        for j in range(len(img_patches[i]))
    ]

    return numpy.asarray(data)

# Assign a label to a patch v
def value_to_class(v):
    foreground_threshold = 0.25  # percentage of pixels > 1 required to assign a foreground label to a patch
    df = numpy.sum(v)
    if df > foreground_threshold:  # road
        return [0, 1]
    else:  # bgrd
        return [1, 0]


# Extract label images
def extract_labels(filename, num_images):
    """Extract the labels into a 1-hot matrix [image index, label index]."""
    gt_imgs = []
    for i in range(1, num_images + 1):
        imageid = "satImage_%.3d" % i
        image_filename = filename + imageid + ".png"
        if os.path.isfile(image_filename):
            print("Loading " + image_filename)
            img = mpimg.imread(image_filename)
            gt_imgs.append(img)
        else:
            print("File " + image_filename + " does not exist")

    num_images = len(gt_imgs)
    gt_patches = [
        img_crop(gt_imgs[i], IMG_PATCH_SIZE, IMG_PATCH_SIZE) for i in range(num_images)
    ]
    data = numpy.asarray(
        [
            gt_patches[i][j]
            for i in range(len(gt_patches))
            for j in range(len(gt_patches[i]))
        ]
    )
    labels = numpy.asarray(
        [value_to_class(numpy.mean(data[i])) for i in range(len(data))]
    )

    # Convert to dense 1-hot representation.
    return labels.astype(numpy.float32)


def error_rate(predictions, labels):
    """Return the error rate based on dense predictions and 1-hot labels."""
    return 100.0 - (
        100.0
        * numpy.sum(numpy.argmax(predictions, 1) == numpy.argmax(labels, 1))
        / predictions.shape[0]
    )

# Write predictions from neural network to a file
def write_predictions_to_file(predictions, labels, filename):
    max_labels = numpy.argmax(labels, 1)
    max_predictions = numpy.argmax(predictions, 1)
    file = open(filename, "w")
    n = predictions.shape[0]
    for i in range(0, n):
        file.write(max_labels(i) + " " + max_predictions(i))
    file.close()

# Print predictions from neural network
def print_predictions(predictions, labels):
    max_labels = numpy.argmax(labels, 1)
    max_predictions = numpy.argmax(predictions, 1)
    print(str(max_labels) + " " + str(max_predictions))

# Convert array of labels to an image
def label_to_img(imgwidth, imgheight, w, h, labels):
    array_labels = numpy.zeros([imgwidth, imgheight])
    idx = 0
    for i in range(0, imgheight, h):
        for j in range(0, imgwidth, w):
            if labels[idx][0] > 0.5:  # bgrd
                l = 0
            else:
                l = 1
            array_labels[j : j + w, i : i + h] = l
            idx = idx + 1
    return array_labels

def img_float_to_uint8(img):
    rimg = img - numpy.min(img)
    rimg = (rimg / numpy.max(rimg) * PIXEL_DEPTH).round().astype(numpy.uint8)
    return rimg

def concatenate_images(img, gt_img):
    n_channels = len(gt_img.shape)
    w = gt_img.shape[0]
    h = gt_img.shape[1]
    if n_channels == 3:
        cimg = numpy.concatenate((img, gt_img), axis=1)
    else:
        gt_img_3c = numpy.zeros((w, h, 3), dtype=numpy.uint8)
        gt_img8 = img_float_to_uint8(gt_img)
        gt_img_3c[:, :, 0] = gt_img8
        gt_img_3c[:, :, 1] = gt_img8
        gt_img_3c[:, :, 2] = gt_img8
        img8 = img_float_to_uint8(img)
        cimg = numpy.concatenate((img8, gt_img_3c), axis=1)
    return cimg


def make_img_overlay(img, predicted_img):
    w = img.shape[0]
    h = img.shape[1]
    color_mask = numpy.zeros((w, h, 3), dtype=numpy.uint8)
    color_mask[:, :, 0] = predicted_img * PIXEL_DEPTH

    img8 = img_float_to_uint8(img)
    background = Image.fromarray(img8, "RGB").convert("RGBA")
    overlay = Image.fromarray(color_mask, "RGB").convert("RGBA")
    new_img = Image.blend(background, overlay, 0.2)
    return new_img


In [91]:
def main():
    TRAINING_SIZE = 20
    data_dir = os.getcwd() + '/dataset/training/'
    train_data_filename = data_dir + "images/"
    train_labels_filename = data_dir + "groundtruth/"
    test_data_filename = os.getcwd() + '/dataset/test_set_images/'

    # Extract it into numpy arrays (assuming extract_data and extract_labels are defined).
    train_data = extract_data(train_data_filename, TRAINING_SIZE)
    print(train_data.shape)
    train_labels = extract_labels(train_labels_filename, TRAINING_SIZE)

    # Balance the classes.
    print("Balancing training data...")
    c0 = np.sum(train_labels[:, 0] == 1) # Count the number of data points in class 0
    c1 = np.sum(train_labels[:, 1] == 1) # Count the number of data points in class 1
    print(f"Number of data points per class: c0 = {c0}, c1 = {c1}")
    min_c = min(c0, c1)
    idx0 = np.where(train_labels[:, 0] == 1)[0][:min_c] # Get the indices of the first class
    idx1 = np.where(train_labels[:, 1] == 1)[0][:min_c] # Get the indices of the second class
    balanced_indices = np.concatenate([idx0, idx1]) 
    train_data = train_data[balanced_indices] 
    train_labels = train_labels[balanced_indices]
    
    print(f"Balanced dataset size: {train_data.shape[0]}")

    
    # Define the model.
    IMG_PATCH_SIZE = 16
    NUM_CHANNELS = 3
    NUM_LABELS = 2

    model = models.Sequential([
        layers.Conv2D(
            32, (5, 5), activation='relu', padding='same', input_shape=(IMG_PATCH_SIZE, IMG_PATCH_SIZE, NUM_CHANNELS),
            use_bias=True, bias_initializer='zeros'
        ),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(
            64, (5, 5), activation='relu', padding='same',
            use_bias=True, bias_initializer='zeros'
        ),
        layers.MaxPooling2D((2, 2)),
        layers.Flatten(),
        layers.Dense(
            512, activation='relu',
            use_bias=True, bias_initializer='zeros'
        ),
        layers.Dropout(0.5),
        layers.Dense(
            NUM_LABELS, activation='softmax',
            use_bias=True, bias_initializer='zeros'
        )
    ])

    # Compile the model.
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
        loss=tf.keras.losses.CategoricalCrossentropy(),
        metrics=['accuracy']
    )

    # Train the model.
    NUM_EPOCHS = 10
    BATCH_SIZE = 16
    model.fit(train_data, train_labels, epochs=NUM_EPOCHS, batch_size=BATCH_SIZE, validation_split=0.2)

    # Save the model.
    # model.save('trained_model.h5')
    # print("Training complete and model saved.")

    def get_image_summary(img, idx=0):
        V = tf.slice(img, (0, 0, 0, idx), (1, -1, -1, 1))
        img_w = img.get_shape().as_list()[1]
        img_h = img.get_shape().as_list()[2]
        min_value = tf.reduce_min(V)
        V = V - min_value
        max_value = tf.reduce_max(V)
        V = V / (max_value * PIXEL_DEPTH)
        V = tf.reshape(V, (img_w, img_h, 1))
        V = tf.transpose(V, (2, 0, 1))
        V = tf.reshape(V, (-1, img_w, img_h, 1))
        return V
    
    def get_prediction(img):
        # Crop the image into patches and convert to NumPy array
        data = np.asarray(img_crop(img, IMG_PATCH_SIZE, IMG_PATCH_SIZE))
        
        # Convert to TensorFlow tensor
        data_node = tf.convert_to_tensor(data, dtype=tf.float32)
        
        # Pass the data through the model and apply softmax
        output = tf.nn.softmax(model(data_node), axis=-1)
        
        # Convert the predictions to a suitable label image format
        img_prediction = label_to_img(
            img.shape[0],
            img.shape[1],
            IMG_PATCH_SIZE,
            IMG_PATCH_SIZE,
            output.numpy(),  # Convert tensor to NumPy array
        )
        return img_prediction

    # Get a concatenation of the prediction and groundtruth for given input file
    def get_prediction_with_groundtruth(filename, image_idx):
        # Format the filename for the specific image index
        imageid = "satImage_%.3d" % image_idx
        image_filename = filename + imageid + ".png"
        
        # Load the image
        img = mpimg.imread(image_filename)
        
        # Get the model's prediction
        img_prediction = get_prediction(img)
        
        # Concatenate the original image with its prediction for visualization
        cimg = concatenate_images(img, img_prediction)
        
        return cimg

    # Get prediction overlaid on the original image for given input file
    def get_prediction_with_overlay(filename, image_idx):
        imageid = "satImage_%.3d" % image_idx
        image_filename = filename + imageid + ".png"
        img = mpimg.imread(image_filename)

        img_prediction = get_prediction(img)
        oimg = make_img_overlay(img, img_prediction)

        return oimg
    
    def model(data, train=False):
        conv1 = layers.Conv2D(
            32, (5, 5), activation='relu', padding='same', 
            use_bias=True, bias_initializer='zeros'
        )(data)
        pool1 = layers.MaxPooling2D((2, 2))(conv1)
        conv2 = layers.Conv2D(
            64, (5, 5), activation='relu', padding='same', 
            use_bias=True, bias_initializer='zeros'
        )(pool1)
        pool2 = layers.MaxPooling2D((2, 2))(conv2)
        flat = layers.Flatten()(pool2)
        dense1 = layers.Dense(
            512, activation='relu', 
            use_bias=True, bias_initializer='zeros'
        )(flat)
        drop = layers.Dropout(0.5)(dense1)
        out = layers.Dense(
            NUM_LABELS, activation='softmax', 
            use_bias=True, bias_initializer='zeros'
        )(drop)
        return out

    
    for i in range(1, TRAINING_SIZE + 1):
        # print(i)
        pimg = get_prediction_with_groundtruth(train_data_filename, i)
        Image.fromarray(pimg).save(
                os.getcwd() + "/overlay/prediction_" + str(i) + ".png"
            )
        # oimg = get_prediction_with_overlay(train_data_filename, i)
        # oimg.save(os.getcwd()  + "/overlay/overlay_" + str(i) + ".png")

main()

Loading /Users/melina/Desktop/ML/MLproj2/dataset/training/images/satImage_001.png
Loading /Users/melina/Desktop/ML/MLproj2/dataset/training/images/satImage_002.png
Loading /Users/melina/Desktop/ML/MLproj2/dataset/training/images/satImage_003.png
Loading /Users/melina/Desktop/ML/MLproj2/dataset/training/images/satImage_004.png
Loading /Users/melina/Desktop/ML/MLproj2/dataset/training/images/satImage_005.png
Loading /Users/melina/Desktop/ML/MLproj2/dataset/training/images/satImage_006.png
Loading /Users/melina/Desktop/ML/MLproj2/dataset/training/images/satImage_007.png
Loading /Users/melina/Desktop/ML/MLproj2/dataset/training/images/satImage_008.png
Loading /Users/melina/Desktop/ML/MLproj2/dataset/training/images/satImage_009.png
Loading /Users/melina/Desktop/ML/MLproj2/dataset/training/images/satImage_010.png
Loading /Users/melina/Desktop/ML/MLproj2/dataset/training/images/satImage_011.png
Loading /Users/melina/Desktop/ML/MLproj2/dataset/training/images/satImage_012.png
Loading /Users/m

  rimg = (rimg / numpy.max(rimg) * PIXEL_DEPTH).round().astype(numpy.uint8)
  rimg = (rimg / numpy.max(rimg) * PIXEL_DEPTH).round().astype(numpy.uint8)
