# CMPT 764 - Final Project

In [None]:
from google.colab import drive
drive.mount("/content/gdrive")


In [None]:
!7z x gdrive/MyDrive/SFU/PartNet_Chairs_Clean.7z -ogdrive/MyDrive/SFU/


## Data Loader

In [None]:
import numpy as np
import os

np.random.seed(0)

partnet_dir = "gdrive/MyDrive/SFU/PartNet_Chairs_Clean/Chair_Parts/"
model_dir = "gdrive/MyDrive/SFU/PartNet_GAN/"

chair_ids_all = np.array([f.name for f in os.scandir(partnet_dir) if f.is_dir()])
num_chairs = chair_ids_all.shape[0]
shuffle_idx = np.random.permutation(num_chairs)
chair_ids_all = chair_ids_all[shuffle_idx]
chair_ids_train, chair_ids_test, chair_ids_validate = np.array_split(chair_ids_all, [int(num_chairs * 0.8), int(num_chairs * 0.9)])


In [None]:
import itertools


def load_batch(chair_ids):
    chair_arm_pts, chair_back_pts, chair_base_pts, chair_seat_pts, other_pts = [], [], [], [], []

    for chair_id in chair_ids:
        chair_arm_pts.append(parse_pts_file(os.path.join(partnet_dir, chair_id, "chair_arm_pts.xyz")))
        chair_back_pts.append(parse_pts_file(os.path.join(partnet_dir, chair_id, "chair_back_pts.xyz")))
        chair_base_pts.append(parse_pts_file(os.path.join(partnet_dir, chair_id, "chair_base_pts.xyz")))
        chair_seat_pts.append(parse_pts_file(os.path.join(partnet_dir, chair_id, "chair_seat_pts.xyz")))
        other_pts.append(parse_pts_file(os.path.join(partnet_dir, chair_id, "other_pts.xyz")))

    return chair_arm_pts, chair_back_pts, chair_base_pts, chair_seat_pts, other_pts


def load_batch_monolithic(chair_ids, fuse=True):
    chair_arm_pts, chair_back_pts, chair_base_pts, chair_seat_pts, other_pts = load_batch(chair_ids)
    chair_all_pts = []

    for idx in range(len(chair_ids)):
        chair_pts = []
        chair_pts.extend(chair_arm_pts[idx])
        chair_pts.extend(chair_back_pts[idx])
        chair_pts.extend(chair_base_pts[idx])
        chair_pts.extend(chair_seat_pts[idx])
        chair_pts.extend(other_pts[idx])

        if fuse:
            chair_all_pts.extend(chair_pts)
        else:
            chair_all_pts.append(chair_pts)

    return np.array(chair_all_pts)


def parse_pts_file(file_path):
    xyz_data = []

    with open(file_path, "r") as f:
        for line_str in f.read().splitlines():
            xyz_data_raw = line_str.split()
            x = float(xyz_data_raw[0])
            y = float(xyz_data_raw[1])
            z = float(xyz_data_raw[2])
            xyz_data.append([x, y, z])

    return np.array(xyz_data)


def split_batch(chair_ids, split_size=2):
    batch_seq = []

    for chair_ids_seq in itertools.combinations(chair_ids, split_size):
        batch_seq.append(chair_ids_seq)

    return batch_seq


## GAN

### Define Generator & Discriminator

In [None]:
import tensorflow as tf
import tensorflow.keras as keras


def generator():
    gen = keras.models.Sequential()
    gen.add(keras.layers.InputLayer(input_shape=(20000, 3)))
    gen.add(keras.layers.Conv1D(filters=128, kernel_size=1, activation="relu"))
    gen.add(keras.layers.BatchNormalization())
    gen.add(keras.layers.Conv1D(filters=512, kernel_size=1, activation="relu"))
    gen.add(keras.layers.BatchNormalization())
    gen.add(keras.layers.Conv1D(filters=1024, kernel_size=1, activation="relu"))
    gen.add(keras.layers.BatchNormalization())
    gen.add(keras.layers.MaxPool1D(pool_size=2))
    gen.add(keras.layers.Conv1D(filters=512, kernel_size=1, activation="relu"))
    gen.add(keras.layers.BatchNormalization())
    gen.add(keras.layers.Conv1D(filters=128, kernel_size=1, activation="relu"))
    gen.add(keras.layers.BatchNormalization())
    gen.add(keras.layers.Conv1D(filters=3, kernel_size=1, activation="tanh"))

    return gen


def discriminator():
    disc = keras.models.Sequential()
    disc.add(keras.layers.InputLayer(input_shape=(10000, 3)))
    disc.add(keras.layers.Conv1D(filters=128, kernel_size=1, activation="relu"))
    disc.add(keras.layers.BatchNormalization())
    disc.add(keras.layers.Conv1D(filters=512, kernel_size=1, activation="relu"))
    disc.add(keras.layers.BatchNormalization())
    disc.add(keras.layers.Conv1D(filters=1024, kernel_size=1, activation="relu"))
    disc.add(keras.layers.BatchNormalization())
    disc.add(keras.layers.MaxPool1D(pool_size=10000))
    disc.add(keras.layers.Flatten())
    disc.add(keras.layers.Dense(units=512, activation="relu"))
    disc.add(keras.layers.BatchNormalization())
    disc.add(keras.layers.Dense(units=128, activation="relu"))
    disc.add(keras.layers.BatchNormalization())
    disc.add(keras.layers.Dense(units=1, activation="sigmoid"))

    return disc


### Define Loss

In [None]:
cross_entropy = keras.losses.BinaryCrossentropy(from_logits=True)


def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)


def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss

    return total_loss


### Define Training Step

In [None]:
gen = generator()
disc = discriminator()

gen_optimizer = keras.optimizers.Adam(1e-4)
disc_optimizer = keras.optimizers.Adam(1e-4)


@tf.function
def train_step(chair_pts_gen, chair_pts_disc):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_chairs = gen(chair_pts_gen, training=True)

        real_output = disc(chair_pts_disc, training=True)
        fake_output = disc(generated_chairs, training=True)

        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)

    gen_gradients = gen_tape.gradient(gen_loss, gen.trainable_variables)
    disc_gradients = disc_tape.gradient(disc_loss, disc.trainable_variables)

    gen_optimizer.apply_gradients(zip(gen_gradients, gen.trainable_variables))
    disc_optimizer.apply_gradients(zip(disc_gradients, disc.trainable_variables))

    return gen_loss, disc_loss


### Pre-Process Training Set

In [None]:
num_chairs = chair_ids_train.shape[0]
batch_size = 4
num_batches = num_chairs // batch_size if num_chairs % batch_size == 0 else num_chairs // batch_size + 1

batch_all_gen, batch_all_disc = [], []
batch_counter = 1

for batch_idx in range(0, num_chairs, batch_size):
    print("\rPre-Processing Batch %d / %d..." % (batch_counter, num_batches), end="")

    batch_chair_ids_idx = range(batch_idx, min(batch_idx + batch_size, num_chairs))
    batch_chair_ids = chair_ids_train[batch_chair_ids_idx]
    batch_chair_ids_split = split_batch(batch_chair_ids)
    batch_chair_pts_gen, batch_chair_pts_disc = [], []

    for batch_chair_ids_seq in batch_chair_ids_split:
        batch_chair_pts_gen.append(load_batch_monolithic(batch_chair_ids_seq, fuse=True))

    batch_chair_pts_gen = np.array(batch_chair_pts_gen)
    batch_chair_pts_disc = np.array(load_batch_monolithic(batch_chair_ids, fuse=False))

    batch_all_gen.append(batch_chair_pts_gen)
    batch_all_disc.append(batch_chair_pts_disc)

    batch_counter += 1

batch_all_gen = np.array(batch_all_gen)
batch_all_disc = np.array(batch_all_disc)

print("\nDONE!")


### Run Training

In [None]:
num_epochs = 20
gen_loss_values, disc_loss_values = [], []

for epoch in range(1, num_epochs + 1):
    print("\nTraining Epoch %d / %d..." % (epoch, num_epochs))
    batch_counter = 1
    batch_gen_loss, batch_disc_loss = 0, 0

    for batch_idx in range(0, num_batches):
        print("\rBatch %d / %d..." % (batch_counter, num_batches), end="")

        gen_loss, disc_loss = train_step(batch_all_gen[batch_idx], batch_all_disc[batch_idx])
        batch_gen_loss += gen_loss
        batch_disc_loss += disc_loss

        batch_counter += 1

    gen_loss_values.append(batch_gen_loss)
    disc_loss_values.append(batch_disc_loss)

    checkpoint = tf.train.Checkpoint(generator=gen, generator_optimizer=gen_optimizer, discriminator=disc, discriminator_optimizer=disc_optimizer)
    checkpoint.save(model_dir)

print("\nDONE!")


### Visualize Loss

In [None]:
import matplotlib.pyplot as plt


plt.plot(range(num_epochs), gen_loss_values, label="Generator")
plt.plot(range(num_epochs), disc_loss_values, label="Discriminator")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("GAN Training Loss Values")
plt.grid(True)
plt.legend()
plt.show()


### Visualize Results

In [None]:
import mpl_toolkits.mplot3d.Axes3D as axes3d


chair_ids_test_sample = chair_ids_test[0 : 2]
print(chair_ids_test_sample)

gen_pts_in = load_batch_monolithic(chair_ids_test_sample, fuse=True)
disc_pts_in = load_batch_monolithic(chair_ids_test_sample, fuse=False)

gen_pts_out = gen(np.array([gen_pts_in]))

show_point_cloud(disc_pts_in[0, :, :])
show_point_cloud(disc_pts_in[1, :, :])
show_point_cloud(gen_pts_out[0, :, :])


def show_point_cloud(pts):
    fig = plt.figure()
    ax = axes3D(fig)
    ax.scatter(pts[:, 0], pts[:, 1], pts[:, 2])
    plt.show()
