In [2]:
import os
import cv2
import numpy as np
from glob import glob
from scipy.io import loadmat
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [3]:
DATA_DIR = "D:/DeepLabV3+/instance-level-human-parsing/instance-level_human_parsing/instance-level_human_parsing/Training"
os.listdir(DATA_DIR)

['Categories',
 'Category_ids',
 'Human',
 'Human_ids',
 'Images',
 'Instances',
 'Instance_ids',
 'train_id.txt']

In [4]:
IMAGE_SIZE = 512
BATCH_SIZE = 4
NUM_CLASSES = 20
DATA_DIR = "D:/DeepLabV3+/instance-level-human-parsing/instance-level_human_parsing/instance-level_human_parsing/Training"
NUM_TRAIN_IMAGES = 1000
NUM_VAL_IMAGES = 50

train_images = sorted(glob(os.path.join(DATA_DIR, "Images/*")))[:NUM_TRAIN_IMAGES]
train_masks = sorted(glob(os.path.join(DATA_DIR, "Category_ids/*")))[:NUM_TRAIN_IMAGES]

val_images = sorted(glob(os.path.join(DATA_DIR, "Images/*")))[
    NUM_TRAIN_IMAGES : NUM_VAL_IMAGES + NUM_TRAIN_IMAGES
]
val_masks = sorted(glob(os.path.join(DATA_DIR, "Category_ids/*")))[
    NUM_TRAIN_IMAGES : NUM_VAL_IMAGES + NUM_TRAIN_IMAGES
]


In [5]:
import os
import tensorflow as tf
from PIL import Image

class CIHPDataLoader:
    def __init__(self, image_paths, mask_paths, img_size=(512, 512), batch_size=32):
        self.img_size = img_size
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.batch_size = batch_size
        self.num_samples = len(self.image_paths)

    def _parse_function(self, image_path, mask_path):
        image = tf.io.read_file(image_path)
        image = tf.image.decode_png(image, channels=3)
        image = tf.image.resize(image, self.img_size, method=tf.image.ResizeMethod.BILINEAR)
        image = tf.cast(image, tf.float32)
        image /= 255.0

        mask = tf.io.read_file(mask_path)
        mask = tf.image.decode_png(mask, channels=1)
        mask = tf.image.resize(mask, self.img_size, method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
        mask = tf.squeeze(mask)
        return image, mask

    def get_dataset(self):
        dataset = tf.data.Dataset.from_tensor_slices((self.image_paths, self.mask_paths))
        dataset = dataset.shuffle(buffer_size=self.num_samples, reshuffle_each_iteration=True)
        dataset = dataset.map(self._parse_function, num_parallel_calls=tf.data.AUTOTUNE)
        dataset = dataset.batch(self.batch_size)
        dataset = dataset.prefetch(tf.data.AUTOTUNE)
        return dataset


In [6]:
train_data_loader = CIHPDataLoader(train_images,train_masks)
train_dataset = train_data_loader.get_dataset()

In [7]:
for i, (x,y) in enumerate(train_dataset):
    print(x.shape, y.shape)
    if i ==5:
        break

(32, 512, 512, 3) (32, 512, 512)
(32, 512, 512, 3) (32, 512, 512)
(32, 512, 512, 3) (32, 512, 512)
(32, 512, 512, 3) (32, 512, 512)
(32, 512, 512, 3) (32, 512, 512)
(32, 512, 512, 3) (32, 512, 512)


In [8]:
val_data_loader = CIHPDataLoader(val_images,val_masks)
val_dataset = val_data_loader.get_dataset()

In [9]:
for i, (x,y) in enumerate(val_dataset):
    print(x.shape, y.shape)
    if i ==5:
        break

(32, 512, 512, 3) (32, 512, 512)
(18, 512, 512, 3) (18, 512, 512)


In [11]:
def convolution_block(block_input, num_filters=256, kernel_size=3, dilation_rate=1, padding="same", use_bias=False,):
    x = layers.Conv2D(
        num_filters,
        kernel_size=kernel_size,
        dilation_rate=dilation_rate,
        padding="same",
        use_bias=use_bias,
        kernel_initializer=keras.initializers.HeNormal(),
    )(block_input)
    x = layers.BatchNormalization()(x)
    return tf.nn.relu(x)


def DilatedSpatialPyramidPooling(dspp_input):
    dims = dspp_input.shape
    x = layers.AveragePooling2D(pool_size=(dims[-3], dims[-2]))(dspp_input)
    x = convolution_block(x, kernel_size=1, use_bias=True)
    out_pool = layers.UpSampling2D(
        size=(dims[-3] // x.shape[1], dims[-2] // x.shape[2]), interpolation="bilinear",
    )(x)

    out_1 = convolution_block(dspp_input, kernel_size=1, dilation_rate=1)
    out_6 = convolution_block(dspp_input, kernel_size=3, dilation_rate=6)
    out_12 = convolution_block(dspp_input, kernel_size=3, dilation_rate=12)
    out_18 = convolution_block(dspp_input, kernel_size=3, dilation_rate=18)

    x = layers.Concatenate(axis=-1)([out_pool, out_1, out_6, out_12, out_18])
    output = convolution_block(x, kernel_size=1)
    return output

In [15]:
class DeeplabV3Plus():
    def __init__(self, image_size, num_classes):
        self.image_size = image_size
        self.num_classes = num_classes
        self.model = None
        
    def build_model(self):
        model_input = keras.Input(shape=(self.image_size, self.image_size, 3))
        resnet50 = keras.applications.ResNet50(weights="imagenet", include_top=False, input_tensor=model_input)
        x = resnet50.get_layer("conv4_block6_2_relu").output
        x = DilatedSpatialPyramidPooling(x)
        
        input_a = layers.UpSampling2D(
            size=(self.image_size // 4 // x.shape[1], self.image_size // 4 // x.shape[2]),
            interpolation="bilinear",)(x)
        
        input_b = resnet50.get_layer("conv2_block3_2_relu").output
        input_b = convolution_block(input_b, num_filters=48, kernel_size=1)

        x = layers.Concatenate(axis=-1)([input_a, input_b])
        x = convolution_block(x)
        x = convolution_block(x)
        x = layers.UpSampling2D(
            size=(self.image_size // x.shape[1], self.image_size // x.shape[2]),
            interpolation="bilinear",)(x)
        
        model_output = layers.Conv2D(self.num_classes, kernel_size=(1, 1), padding="same")(x)
        self.model = keras.Model(inputs=model_input, outputs=model_output)
        # return keras.Model(inputs=model_input, outputs=model_output)
    
    def compile_model(self, learning_rate=0.001):
        self.model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
                           loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                           metrics=["accuracy"],
                          )
    def train(self, train_dataset, val_dataset,epochs=40):
        # Load the data
        # train_dataset =train_dataset# self._load_data(train_data_dir, batch_size)
        # val_dataset = val_dataset #self._load_data(val_data_dir, batch_size)
        # Train the model
        history =  self.model.fit(train_dataset,
                                 epochs=epochs,
                                 validation_data=val_dataset,
                                 )
        return history        

In [16]:
model = DeeplabV3Plus(IMAGE_SIZE, NUM_CLASSES)

# Build the model architecture
model.build_model()
# Compile the model
model.compile_model(learning_rate=0.0001)

In [None]:
history = model.train(train_dataset, val_dataset, epochs=2)

Epoch 1/2


In [None]:
plt.plot(history.history["loss"])
plt.title("Training Loss")
plt.ylabel("loss")
plt.xlabel("epoch")
plt.show()

plt.plot(history.history["accuracy"])
plt.title("Training Accuracy")
plt.ylabel("accuracy")
plt.xlabel("epoch")
plt.show()

plt.plot(history.history["val_loss"])
plt.title("Validation Loss")
plt.ylabel("val_loss")
plt.xlabel("epoch")
plt.show()

plt.plot(history.history["val_accuracy"])
plt.title("Validation Accuracy")
plt.ylabel("val_accuracy")
plt.xlabel("epoch")
plt.show()

Epoch 1/25

### Inference using Colormap Overlay