# Autoencoder for 3D clustering

## for Google Colab

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!unzip -q /content/drive/MyDrive/data/mn10_64.zip

In [None]:
!pip install tensorflow-determinism kaleido

In [None]:
!pip install seaborn

## Import modules and set parameters

In [None]:
import os
import datetime
import random
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [None]:
def set_seed(seed=200):
    tf.random.set_seed(seed)

    # optional
    # for numpy.random
    np.random.seed(seed)
    # for built-in random
    random.seed(seed)
    # for hash seed
    os.environ["PYTHONHASHSEED"] = str(seed)
    os.environ['TF_DETERMINISTIC_OPS'] = '1'
    
set_seed(123)

In [None]:
EXPERIMENT_DIR = 'autoencoder_64_32768'

BUFFER_SIZE = 4000
BATCH_SIZE = 32
START_EPOCH = 0
EPOCHS = 300

DATA_DIR = '/content/mn10/64'
IMAGE_SIZE = 64
NUM_CHANNELS = 1

In [None]:
os.makedirs(EXPERIMENT_DIR, exist_ok=True)

## Define models

In [None]:
encoder = keras.Sequential([
        layers.Conv3D(filters=64, kernel_size=3,
                      padding='same', activation='relu',
                      input_shape=(IMAGE_SIZE, IMAGE_SIZE, IMAGE_SIZE, NUM_CHANNELS)),
        layers.MaxPool3D(pool_size=2),
    
        layers.Conv3D(filters=128, kernel_size=3, 
                      padding='same', activation='relu'),
        layers.MaxPool3D(pool_size=2),

        layers.Conv3D(filters=256, kernel_size=3,
                      padding='same', activation='relu'),
        layers.MaxPool3D(pool_size=2),

        layers.Conv3D(filters=512, kernel_size=3, 
                      padding='same', activation='relu'),
        layers.MaxPool3D(pool_size=2),
    
        layers.Flatten()    
])

In [None]:
decoder = keras.Sequential([
        layers.Reshape((4, 4, 4, 512), input_shape=(32768,)),
        layers.UpSampling3D(2),
    
        layers.Conv3D(filters=512, kernel_size=3, 
                      padding='same', activation='relu'),
        layers.UpSampling3D(2),

        layers.Conv3D(filters=256, kernel_size=3,
                      padding='same', activation='relu'),
        layers.UpSampling3D(2),
    
        layers.Conv3D(filters=128, kernel_size=3, 
                      padding='same', activation='relu'),
        layers.UpSampling3D(2),
    
        layers.Conv3D(filters=NUM_CHANNELS, kernel_size=3, 
                      padding='same', activation='sigmoid')
])

In [None]:
encoder.summary()

In [None]:
decoder.summary()

In [None]:
tf.keras.utils.plot_model(encoder,
                         to_file=os.path.join(EXPERIMENT_DIR, 'encoder.png'),
                         show_shapes=True)

In [None]:
tf.keras.utils.plot_model(decoder,
                         to_file=os.path.join(EXPERIMENT_DIR, 'decoder.png'),
                         show_shapes=True)

In [None]:
class Autoencoder(tf.keras.Model):
    def __init__(self, encoder, deocoder):
        super(Autoencoder, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def call(self, inputs):
        x = self.encoder(inputs)
        x = self.decoder(x)
        return x
    
    def build_graph(self, dim):
        x = tf.keras.Input((dim))
        return tf.keras.Model(inputs=[x], outputs=self.call(x))

In [None]:
model = Autoencoder(encoder, decoder)

## Prepare data

In [None]:
categories = ['bathtub', 'bed', 'chair', 'desk', 'dresser',
              'monitor', 'night_stand', 'sofa', 'table', 'toilet']

In [None]:
train_pattern = DATA_DIR +'/train/*.npy'

train_list_ds = tf.data.Dataset.list_files(train_pattern, shuffle=False)

In [None]:
cat_re = re.compile(r'.+/(.+?)_[0-9]+\.npy')
train_labels = [cat_re.match(item.numpy().decode())[1] for item in train_list_ds]
train_ids = [categories.index(cat) for cat in train_labels]
train_label_ds = tf.data.Dataset.from_tensor_slices(tf.cast(train_ids, tf.int64))

In [None]:
def read_npy_file(path):
    data = np.load(path.numpy())
    data = np.expand_dims(data, axis=-1)
    return tf.convert_to_tensor(data, dtype=tf.float32)

In [None]:
train_3d_ds = train_list_ds.map(
      lambda item: tf.py_function(read_npy_file, [item], tf.float32))
train_dataset = tf.data.Dataset.zip((train_3d_ds, train_3d_ds)).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

## Train autoencoder

In [None]:
%load_ext tensorboard
%tensorboard --logdir $EXPERIMENT_DIR/logs

In [None]:
loss_fn = tf.keras.losses.BinaryCrossentropy()
optimizer = tf.keras.optimizers.Adam(learning_rate=1.0e-4)

tb_callback = keras.callbacks.TensorBoard(log_dir=os.path.join(EXPERIMENT_DIR, 'logs'))

model.compile(optimizer=optimizer, loss=loss_fn)
model.fit(train_dataset,
          initial_epoch=START_EPOCH,
          epochs=EPOCHS,
          callbacks=[tb_callback])


In [None]:
encoder.save(os.path.join(EXPERIMENT_DIR, 'saved_models', 'encoder'))
decoder.save(os.path.join(EXPERIMENT_DIR, 'saved_models', 'decoder'))

## Inspect autoencoder outputs

In [None]:
sample_3d_data = next(iter(train_3d_ds.batch(BATCH_SIZE)))

In [None]:
decoded_voxel_data = model(sample_3d_data, training=False)

In [None]:
for i in range(min(len(sample_3d_data), 3)):
    fig = plt.figure(figsize=plt.figaspect(0.5))
    ax = fig.add_subplot(1, 2, 1, projection='3d')
    ax.voxels(sample_3d_data[i][:,:,:,0])
    ax = fig.add_subplot(1, 2, 2, projection='3d')
    ax.voxels(decoded_voxel_data[i][:,:,:,0])
    plt.show()

## Copy the experiment directory to Google Drive

In [None]:
!cp -r $EXPERIMENT_DIR /content/drive/MyDrive/