<a href="https://colab.research.google.com/github/sandratreneska/Self-supervised-visual-feature-learning/blob/main/VOC_Segmentation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install scikit-multilearn

Collecting scikit-multilearn
  Downloading scikit_multilearn-0.2.0-py3-none-any.whl (89 kB)
[?25l[K     |███▊                            | 10 kB 27.1 MB/s eta 0:00:01[K     |███████▍                        | 20 kB 9.2 MB/s eta 0:00:01[K     |███████████                     | 30 kB 8.0 MB/s eta 0:00:01[K     |██████████████▊                 | 40 kB 7.6 MB/s eta 0:00:01[K     |██████████████████▍             | 51 kB 5.1 MB/s eta 0:00:01[K     |██████████████████████          | 61 kB 5.4 MB/s eta 0:00:01[K     |█████████████████████████▊      | 71 kB 5.2 MB/s eta 0:00:01[K     |█████████████████████████████▍  | 81 kB 5.9 MB/s eta 0:00:01[K     |████████████████████████████████| 89 kB 3.7 MB/s 
[?25hInstalling collected packages: scikit-multilearn
Successfully installed scikit-multilearn-0.2.0


In [None]:
import os
from tensorflow import keras
import tensorflow as tf
import cv2
import math
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from skimage.color import rgb2lab
from keras.initializers import RandomNormal
from keras.models import Model
from keras.models import Input
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import Activation
from keras.layers import Concatenate
from keras.layers import Dropout
from keras.layers import BatchNormalization
from keras.layers import LeakyReLU
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras import utils
from skmultilearn.model_selection import iterative_train_test_split
from skmultilearn.model_selection import IterativeStratification

In [None]:
# Mount drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Check if GPU is connected
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  print('GPU device not found')
print('Found GPU at: {}'.format(device_name))

print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

Found GPU at: /device:GPU:0
Num GPUs Available:  1


In [None]:
# Directories and parameters
DIR = '/content/drive/My Drive/Self-supervised-VOC/'
directory_annotations = DIR + 'VOC2012/Annotations/'
directory_segmentations = DIR + 'VOC2012/SegmentationClass/'
directory_images = DIR + 'VOC2012/JPEGImages/'
saved_weights_path = DIR + 'SavedWeights/'
saved_models_path = DIR + 'SavedModels/'
model_name = 'seg-coco-balanced-pretrained' # change
GEN_PATH = DIR + 'PretrainedGenerator/pix2pixCOCO.h5' # change
IMG_WIDTH = 256
IMG_HEIGHT = 256
BATCH_SIZE = 16
LEARNING_RATE_1 = 3e-4
LEARNING_RATE_2 = 5e-5
EPOCHS = 10 # 20  change
NUM_CLASSES = 21

In [None]:
# Save image names
filenames = []
i = 0
for xml_file in os.listdir(directory_segmentations):
    if os.path.isfile(directory_segmentations + xml_file):
      imgname = xml_file.strip('.png')
      #print(imgname)
      filenames.append(imgname)
      print(i)
      i = i + 1

print(filenames)

In [None]:
VOC_COLORMAP = [[0, 0, 0], [128, 0, 0], [0, 128, 0], [128, 128, 0],
                [0, 0, 128], [128, 0, 128], [0, 128, 128], [128, 128, 128],
                [64, 0, 0], [192, 0, 0], [64, 128, 0], [192, 128, 0],
                [64, 0, 128], [192, 0, 128], [64, 128, 128], [192, 128, 128],
                [0, 64, 0], [128, 64, 0], [0, 192, 0], [128, 192, 0],
                [0, 64, 128]]

VOC_CLASSES = [
    'background', 'aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus',
    'car', 'cat', 'chair', 'cow', 'diningtable', 'dog', 'horse', 'motorbike',
    'person', 'potted plant', 'sheep', 'sofa', 'train', 'tv/monitor']

In [None]:
def voc_colormap2label():
    """Build the mapping from RGB to class indices for VOC labels."""
    colormap2label = np.zeros(256**3)
    for i, colormap in enumerate(VOC_COLORMAP):
        #print(i)
        colormap2label[(colormap[0] * 256 + colormap[1]) * 256 +
                       colormap[2]] = i
    return colormap2label

def voc_label_indices(colormap, colormap2label):
    """Map any RGB values in VOC labels to their class indices."""
    colormap = colormap.astype(np.int32)
    idx = ((colormap[:, :, 0] * 256 + colormap[:, :, 1]) * 256 +
           colormap[:, :, 2])
    return colormap2label[idx]

In [None]:
def voc_rand_crop(image, mask, height, width):
    """Randomly crop both feature and label images."""
    #feature, rect = image.random_crop(feature, (width, height))
    #label = image.fixed_crop(label, *rect)
    resized_image = cv2.resize(image, (width, height), interpolation=cv2.INTER_NEAREST)
    resized_mask = cv2.resize(mask, (width, height), interpolation=cv2.INTER_NEAREST)

    return resized_image, resized_mask

In [None]:
def read_image_and_label(filename):

  # Image
  raw_image = tf.keras.preprocessing.image.load_img(os.path.join(directory_images, filename + ".jpg"), color_mode='rgb') # read image
  raw_image = np.array(raw_image, dtype=np.float32)

  # Preprocess
  res_img = raw_image / 255. # to srgb
  lab_img = rgb2lab(res_img)  # convert from srgb to lab
  gray_lab = lab_img[:, :, 0] / 50. - 1.  # grayscale layer, between [-1, 1]
  gray_lab = gray_lab.reshape(gray_lab.shape + (1,))

  # Label
  raw_label = tf.keras.preprocessing.image.load_img(os.path.join(directory_segmentations, filename + ".png"), color_mode='rgb') # read segmentation image
  raw_label = np.array(raw_label, dtype=np.float32)

  final_img, final_label = voc_rand_crop(gray_lab, raw_label, IMG_WIDTH, IMG_HEIGHT)
  final_img = final_img.reshape(final_img.shape + (1,))
  final_label = voc_label_indices(final_label, voc_colormap2label())
  final_label = final_label.reshape(final_label.shape + (1,))

  final_img = np.array(final_img, dtype=np.float32)
  final_label = np.array(final_label, dtype=np.float32)

  one_hot_label = tf.keras.utils.to_categorical(final_label, NUM_CLASSES)

  return final_img, one_hot_label

In [None]:
def iterative_train_test_split(X, y, train_size):
    """Custom iterative train test split which
    'maintains balanced representation with respect
    to order-th label combinations.'
    """
    stratifier = IterativeStratification(
        n_splits=2, order=1, sample_distribution_per_fold=[1.0-train_size, train_size, ])
    train_indices, test_indices = next(stratifier.split(X, y))
    X_train, y_train = X[train_indices], y[train_indices]
    X_test, y_test = X[test_indices], y[test_indices]
    return X_train, X_test, y_train, y_test

In [None]:
# Example
img, label = read_image_and_label('2007_000063')
raw_label = cv2.imread(os.path.join(directory_segmentations, '2007_000256' + ".png"))
#print(label[105:115, 130:140])
print(label.shape)
print(img.shape)
print(VOC_CLASSES[12])

In [None]:
# Indexes for splitting the dataset into train, val, test

'''
filecount = len(filenames)
indexes = []
labels = []
for index in range(filecount):
     indexes.append(index)

i=0
for filename in filenames:
    new_label = np.zeros(NUM_CLASSES)
    new_label = [int(i) for i in new_label]
    _, label = read_image_and_label(filename)
    label = tf.argmax(label, axis=-1)
    label = np.array(label)
    label = label.reshape(256*256)
    label = [int(i) for i in label]
    
    for cls in range(NUM_CLASSES):
      if cls in label:
        new_label[cls] = 1 # if class is present on the image, we are making it as multilabel one-hot encoding

    labels.append(new_label)
    print(i)
    i+=1
'''

In [None]:
'''
indexes = np.array(indexes)
labels = np.array(labels)
training_indexes, X_valtest, y_train, y_valtest = iterative_train_test_split(indexes, labels, train_size = 0.7)
validation_indexes, testing_indexes, y_val, y_test = iterative_train_test_split(X_valtest, y_valtest, train_size = 0.66)
'''

In [None]:
'''
print(len(training_indexes))
print(len(y_train))
print(len(testing_indexes))
print(len(y_test))
print(testing_indexes[:50])
print(y_test[:10])
'''

In [None]:
'''
num_train_samples = len(training_indexes)
num_val_samples = len(validation_indexes)
num_test_samples = len(testing_indexes)

print("Num. training images", num_train_samples)
print("Num. val images", num_val_samples)
print("Num. test images", num_test_samples)
'''

Num. training images 2039
Num. val images 577
Num. test images 297


In [None]:
# Save train, val, test index splits
'''
np.savetxt(DIR + 'VOC2012/Segmentation_indexes/seg_train_indexes.txt', training_indexes, fmt='%d')
np.savetxt(DIR + 'VOC2012/Segmentation_indexes/seg_val_indexes.txt', validation_indexes, fmt='%d')
np.savetxt(DIR + 'VOC2012/Segmentation_indexes/seg_test_indexes.txt', testing_indexes, fmt='%d')
'''

In [None]:
# Load train, val, test index splits
training_indexes = np.loadtxt(DIR + 'VOC2012/Segmentation_indexes/seg_train_indexes.txt', dtype=int)
validation_indexes = np.loadtxt(DIR + 'VOC2012/Segmentation_indexes/seg_val_indexes.txt', dtype=int)
testing_indexes = np.loadtxt(DIR + 'VOC2012/Segmentation_indexes/seg_test_indexes.txt', dtype=int)

In [None]:
class DataGenerator(utils.all_utils.Sequence):
  

  def __init__(self, list_IDs, datafiles, batch_size = BATCH_SIZE, shuffle = True):
    self.batch_size = batch_size
    self.datafiles = datafiles
    self.list_IDs = list_IDs
    self.shuffle = shuffle
    self.on_epoch_end()

  # After each epoch, shuffle the images if true
  def on_epoch_end(self):
    self.indexes = np.arange(len(self.list_IDs))
    if self.shuffle == True:
      np.random.shuffle(self.indexes)

  # Generate images and labels for a batch
  def __data_generation(self, list_IDs_temp):

    # Initialization
    X = np.empty((self.batch_size, IMG_WIDTH, IMG_HEIGHT, 1))
    y = np.empty((self.batch_size, IMG_WIDTH, IMG_HEIGHT, NUM_CLASSES))

    # For every image in the batch
    for datactr in range (self.batch_size):
      newimg, newlabel = read_image_and_label(self.datafiles[list_IDs_temp[datactr]])

      y[datactr] = newlabel
    
      X[datactr] = newimg

    return X, y

  # Number of batches per epoch
  def __len__(self):
    return int(np.floor(len(self.list_IDs) / self.batch_size))


  def __getitem__(self, index):
  # Generate indexes of the batch
    newindexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

  # Find list of IDs
    list_IDs_temp = [self.list_IDs[k] for k in newindexes]

  # Generate data
    X, y = self.__data_generation(list_IDs_temp)

    return X, y

In [None]:
training_generator = DataGenerator(training_indexes, filenames)
val_generator = DataGenerator(validation_indexes, filenames)

In [None]:
def define_encoder_block(layer_in, n_filters, batchnorm=True):
    # weight initialization
    init = RandomNormal(stddev=0.02)
    # add downsampling layer
    g = Conv2D(n_filters, (4, 4), strides=(2, 2), padding='same', kernel_initializer=init)(layer_in)
    # conditionally add batch normalization
    if batchnorm:
        g = BatchNormalization()(g, training=True)
    # leaky relu activation
    g = LeakyReLU(alpha=0.2)(g)
    return g

def decoder_block(layer_in, skip_in, n_filters, dropout=True):
  # weight initialization
  init = RandomNormal(stddev=0.02)
  # add upsampling layer
  g = Conv2DTranspose(n_filters, (4, 4), strides=(2, 2), padding='same', kernel_initializer=init)(layer_in)
  # add batch normalization
  g = BatchNormalization()(g, training=True)
  # conditionally add dropout
  if dropout:
      g = Dropout(0.5)(g, training=True)
  # merge with skip connection
  g = Concatenate()([g, skip_in])
  # relu activation
  g = Activation('relu')(g)
  return g

In [None]:
def define_generator(image_shape=(IMG_WIDTH, IMG_HEIGHT, 1)):
    # weight initialization
    init = RandomNormal(stddev=0.02)
    # image input
    in_image = Input(shape=image_shape)
    # encoder model: C64-C128-C256-C512-C512-C512-C512-C512
    e1 = define_encoder_block(in_image, 64, batchnorm=False)
    e2 = define_encoder_block(e1, 128)
    e3 = define_encoder_block(e2, 256)
    e4 = define_encoder_block(e3, 512)
    e5 = define_encoder_block(e4, 512)
    e6 = define_encoder_block(e5, 512)
    e7 = define_encoder_block(e6, 512)
    # bottleneck, no batch norm and relu
    b = Conv2D(512, (4, 4), strides=(2, 2), padding='same', kernel_initializer=init)(e7)
    b = Activation('relu')(b)
    # decoder model: CD512-CD512-CD512-CD512-C256-C128-C64
    d1 = decoder_block(b, e7, 512)
    d2 = decoder_block(d1, e6, 512)
    d3 = decoder_block(d2, e5, 512)
    d4 = decoder_block(d3, e4, 512, dropout=False)
    d5 = decoder_block(d4, e3, 256, dropout=False)
    d6 = decoder_block(d5, e2, 128, dropout=False)
    d7 = decoder_block(d6, e1, 64, dropout=False)
    # output
    g = Conv2DTranspose(NUM_CLASSES, (4, 4), strides=(2, 2), padding='same', kernel_initializer=init)(d7)
    out_image = Activation('softmax')(g)
    # define model
    model = Model(in_image, out_image)
    return model

In [None]:
def define_model_pretrained():
  # load pre-trained generator
  g_model = keras.models.load_model(GEN_PATH)
  # change freezed or not
  #g_model.trainable = False

  # last layer before we add new layers
  last_layer = 'activation_7'
  new_generator = keras.Model(inputs=g_model.input, outputs=g_model.get_layer(last_layer).output)
  #print(new_generator.summary())

  x = new_generator.output
  x = Conv2DTranspose(NUM_CLASSES, (4, 4), strides=(2, 2), padding='same', kernel_initializer=RandomNormal(stddev=0.02), name='last_conv2d')(x)
  output = Activation('softmax', name='last_activation')(x)

  segmentation_model = keras.Model(inputs=new_generator.input, outputs=output)

  return segmentation_model

In [None]:
# create the model
#model = define_generator()
model = define_model_pretrained() # change
model.load_weights(saved_weights_path+'saved-weights-seg-coco-balanced-pretrained-01-2.17.hdf5')  #change

# summarize the model
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 256, 256, 1) 0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 128, 128, 64) 1088        input_1[0][0]                    
__________________________________________________________________________________________________
leaky_re_lu (LeakyReLU)         (None, 128, 128, 64) 0           conv2d[0][0]                     
__________________________________________________________________________________________________
conv2d_1 (Conv2D)               (None, 64, 64, 128)  131200      leaky_re_lu[0][0]                
____________________________________________________________________________________________

In [None]:
MeanIou = tf.keras.metrics.MeanIoU(num_classes=21)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE_2),
              loss='categorical_crossentropy', 
              metrics=[MeanIou])

In [None]:
weights_save = ModelCheckpoint(saved_weights_path+'saved-weights-' + model_name + '-{epoch:02d}-{val_loss:.2f}.hdf5', verbose=1, save_weights_only=True, save_freq='epoch')

history = model.fit(training_generator, 
validation_data=val_generator,
use_multiprocessing=True,
workers=6,
epochs=EPOCHS,
callbacks=[weights_save])

In [None]:
df = pd.DataFrame(history.history)
print(history.history.keys())
# summarize history for accuracy
plt.plot(history.history['mean_io_u'])
plt.plot(history.history['val_mean_io_u'])
plt.title('model mIU')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

## Testing

In [None]:
# change
#model = define_generator()
model = define_model_pretrained() # change
model.compile(optimizer='adam',
              loss='categorical_crossentropy', 
              metrics=['accuracy'])
model.load_weights(saved_weights_path+'saved-weights-seg-coco-balanced-pretrained-10-1.09.hdf5')  #change

#model = keras.models.load_model(saved_models_path + 'saved-model-scratch-seg-lab-gray-pre-epoch20') #change



In [None]:
#load in the test set
labels_test = []
images_test = []
for testing_index in testing_indexes:
  image_test, label_test = read_image_and_label(filenames[testing_index])
  labels_test.append(label_test)
  images_test.append(image_test)
  print(testing_index)

images_test = np.array(images_test)
labels_test = np.array(labels_test)

loss, acc = model.evaluate(x=images_test,  y=labels_test, verbose=2)
print('Restored model, accuracy: {:5.2f}%'.format(100*acc))
print('Restored model, loss', loss)

In [None]:
# Save  model
model.save(saved_models_path + 'saved-model-' + model_name) 

INFO:tensorflow:Assets written to: /content/drive/My Drive/Self-supervised-VOC/SavedModels/saved-model-seg-coco-balanced-pretrained/assets


In [None]:
# Save history
with open(DIR + 'TrainHistory/seg_pretarin_coco_history', 'wb') as file_pi: # change name
    pickle.dump(history.history, file_pi)

## Plotting masks

In [None]:
# Dictionary to map indexes to colors
color_list = voc_colormap2label()
color_list = list(color_list)
print(color_list.index(1))

color_dict = {}
for i in range(NUM_CLASSES):
  color_dict[i] = VOC_COLORMAP[i]
print(color_dict)

In [None]:
# Predicted mask (256,256,21)-> color mask image (256,256,3)
def create_mask(pred_mask, color_dict):
  mask = tf.argmax(pred_mask, axis=-1)
  mask = mask[..., tf.newaxis]

  mask = np.array(mask)
  mask = mask.reshape(256*256)

  mask = (pd.Series(mask)).map(color_dict) #convert the list to a pandas series temporarily before mapping
  mask = list(mask)
  mask = np.array(mask)
  mask = mask.reshape(256,256,3)

  return mask

In [None]:
index = 261
subset = training_indexes

In [None]:
%pylab inline
# Real mask
image_test, label_test = read_image_and_label(filenames[subset[index]])
image_test = np.array(image_test)
label_test = np.array(label_test)
mask = create_mask(label_test, color_dict)
imgplot = plt.imshow(mask)
plt.show()

In [None]:
%pylab inline
# Predicted mask
image_test, _ = read_image_and_label(filenames[subset[index]])
#image_test = np.array(image_test)
image_test = image_test.reshape(1,256,256,1)
label = model.predict(image_test)

mask = create_mask(label, color_dict)
imgplot = plt.imshow(mask)
plt.show()

In [None]:
# Image
image = tf.keras.preprocessing.image.load_img(os.path.join(directory_images, filenames[subset[index]] + ".jpg"), color_mode='rgb') # read image
imgplot = plt.imshow(image)
plt.show()

##mIU (mean intersection over union)


In [None]:
#load in the test set
labels_test = []
pred_labels_test = []
for testing_index in testing_indexes:

  image_test, label_test = read_image_and_label(filenames[testing_index])

  label_test = tf.argmax(label_test, axis=-1)
  label_test = np.array(label_test)
  labels_test.append(label_test)

  pred_label = model.predict(image_test.reshape(1,256,256,1))
  pred_label = tf.argmax(pred_label, axis=-1)
  pred_label = np.array(pred_label)
  pred_labels_test.append(pred_label)

  print(testing_index)

pred_labels_test = np.array(pred_labels_test)
labels_test = np.array(labels_test)

m = tf.keras.metrics.MeanIoU(num_classes=NUM_CLASSES)
m.update_state(labels_test, pred_labels_test)
m.result().numpy()