In [None]:
import os 
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
import tensorflow as tf 
import tensorflow.keras as keras
tf.config.gpu.set_per_process_memory_growth(True)
from tensorflow.keras.preprocessing import image
import numpy as np
import matplotlib.pyplot as plt
import utils_modelnet as ds

In [None]:
dataset_path = 'dataset/modelnet2d/'
class_set =  ['chair', 'car', 'lamp', 'airplane', 'person']

dataset = ds.get_data_from_file(class_set, dataset_path)
train_dataset, valid_dataset, test_dataset = ds.train_test_split(dataset)

train_data, train_label = ds.split_data_label(train_dataset)
test_data, test_label = ds.split_data_label(test_dataset)
valid_data, valid_label = ds.split_data_label(valid_dataset)
## train and validation
print("Train Dataset: {}".format(len(train_dataset)))
print("Test Dataset: {}".format(len(test_dataset)))
print("Valid Dataset: {}".format(len(valid_dataset)))
num_classes = len(class_set)
print("Number of Classes: {}".format(num_classes))
BATCH_SIZE = 32
IMG_SIZE = 48
NUM_CHANNEL = 1

In [None]:
def get_binocular_dataset(data, label, batch_size=BATCH_SIZE):
    def preprocess_image(left_image, right_image):
        left_image = tf.image.decode_jpeg(left_image, channels=NUM_CHANNEL)
        left_image = tf.image.resize(left_image, [IMG_SIZE, IMG_SIZE])
        left_image /= 255.0

        right_image = tf.image.decode_jpeg(right_image, channels=NUM_CHANNEL)
        right_image = tf.image.resize(right_image, [IMG_SIZE, IMG_SIZE])
        right_image /= 255.0  # normalize to [0,1] range
        return left_image, right_image

    def load_and_preprocess_image(left, right):
        left_image = tf.io.read_file(left)
        right_image = tf.io.read_file(right)
        return preprocess_image(left_image, right_image)

    # The tuples are unpacked into the positional arguments of the mapped function 
    def load_and_preprocess_from_path_label(data_path, label):
        return load_and_preprocess_image(data_path[0], data_path[1]), label
    
    ds = tf.data.Dataset.from_tensor_slices((data, label))
    ds = ds.map(load_and_preprocess_from_path_label, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    ds = ds.shuffle(buffer_size=len(data))
    ds = ds.batch(batch_size)
    ds = ds.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    print(ds)
    return ds

In [None]:
train_ds = get_binocular_dataset(train_data, train_label)
test_ds = get_binocular_dataset(test_data, test_label)

In [None]:
## Utility function
def cmpooling(fmaps, scale_list, pool_stride):
    # make sure the scale_list is in decending order
    if scale_list[0] - scale_list[1] < 0:
        scale_list = scale_list[::-1]
        
    # concentric multi-scale pooling
    offset = [0] + [-(scale_list[i+1] - scale_list[0])//2 for i in range(len(scale_list) - 1)]
    pool_maps = []
    for offset, scale in zip(offset, scale_list):
        slice_maps = tf.slice(fmaps, [0, offset, offset, 0], [-1, fmaps.shape[1]-offset*2, fmaps.shape[2]-offset*2, -1])
        pool_map = tf.nn.max_pool2d(slice_maps, scale, pool_stride, "VALID")
        pool_maps.append(pool_map)
    
    # assert same shape for all pool_map
    for i in range(len(pool_maps)-1):
        assert pool_maps[i].shape[1:] == pool_maps[-1].shape[1:]
    return pool_maps

# Concat the feature maps in different scale and convolution once. (paper version)
class Monocular(tf.keras.layers.Layer):
    def __init__(self, filters, ksize, **kwargs):
        super(Monocular, self).__init__(**kwargs)
        self.filters = filters
        self.ksize = ksize
#         self.conv = tf.keras.layers.Conv2D(filters, ksize, input_shape=kwargs['input_shape'], activation='relu', padding='same')
    
    def build(self, input_shape):
        self.conv = tf.keras.layers.Conv2D(self.filters, self.ksize, input_shape=input_shape, activation='relu', padding='same')
    
    def call(self, fmaps, scale_list, pool_stride):
        pool_maps = cmpooling(fmaps, scale_list, pool_stride)
        pool_maps = tf.concat(pool_maps, axis=-1)
        return self.conv(pool_maps)

In [None]:
SCALE_LIST = [1,3,5]
def CNN2(input_shape, num_classes, scale_list):
    left_eye = tf.keras.Input(input_shape, name='left_eye')
    right_eye = tf.keras.Input(input_shape, name='right_eye')
    
    # parallax augmentation
    parallax = left_eye - right_eye 
    left = tf.concat([left_eye, -parallax], axis=-1)
    right = tf.concat([right_eye, parallax], axis=-1)
    # 
    left1 = Monocular(6, 5, input_shape=input_shape, name='mono1_left')(left, scale_list=scale_list, pool_stride=2)
    right1 = Monocular(6, 5, input_shape=input_shape, name='mono1_right')(right, scale_list=scale_list, pool_stride=2)
    
    left2 = Monocular(12, 5, name='mono2_left')(tf.concat([left1, right1], axis=-1), scale_list=scale_list, pool_stride=1)
    right2 = Monocular(12, 5, name='mono2_right')(tf.concat([right1, left1], axis=-1), scale_list=scale_list, pool_stride=1)
    
    left3 = Monocular(32, 3, name='mono3_left')(tf.concat([left2, right2], axis=-1), scale_list=scale_list, pool_stride=1)
    right3 = Monocular(32, 3, name='mono3_right')(tf.concat([right2, left2], axis=-1), scale_list=scale_list, pool_stride=1)
    
    x = tf.concat([left3, right3], axis=-1)
    x = tf.keras.layers.Conv2D(256, 3, strides=1, activation='relu', name='conv1')(x)
    x = tf.keras.layers.Conv2D(256, 1, strides=1, activation='relu', name='conv2')(x)
    x = tf.keras.layers.Conv2D(64, 1, strides=1, activation='relu', name='conv3')(x)
    feature_vector = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    predicted_output = tf.keras.layers.Dense(num_classes, activation='softmax', name='output')(feature_vector)
    
    return tf.keras.Model([left_eye, right_eye], predicted_output)

In [None]:
def create_model(model, input_shape, num_classes, scale_list):
    m = model(input_shape, num_classes, scale_list)
    # learning rate schedule
    initial_learning_rate = 0.0001
    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate, decay_steps=100000,
                                                              decay_rate=0.96, staircase=True)
    
    # compile the model
    m.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule),  # Optimizer
                  # Loss function to minimize
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  # List of metrics to monitor
                  metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
                 )
    return m

In [None]:
# load checkpoints
checkpoint_path = 'checkpoints/cnn2_modelnet/cp.ckpt'
cnn2 = create_model(CNN2, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5, scale_list=SCALE_LIST)
cnn2.load_weights(checkpoint_path)
# Restore the weights
loss, acc = cnn2.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")