In [None]:
import os
import glob

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# from matplotlib import pyplot as plt
import open3d as o3d
from sklearn.model_selection import train_test_split
# import pandas as pd

tf.random.set_seed(1234)


In [None]:
DIR = r"E:\20_PIGS\S-C-D-1"

In [None]:
def parse_dataset(DATA_DIR):
    data_points = []
    data_labels = []
    num_points = 1500
    i = 0
    for names in os.listdir(DATA_DIR):
        i = i+1
        DATA_DIR_DATE = os.path.join(DATA_DIR, names)
        print(DATA_DIR_DATE)
        for name in os.listdir(DATA_DIR_DATE):
            DIR_TO_PLY = os.path.join(DATA_DIR_DATE, name)
            for name in os.listdir(DIR_TO_PLY):
                final_dir = os.path.join(DIR_TO_PLY, name)
                pcd1 = o3d.io.read_point_cloud(final_dir)
                pcd = pcd1.voxel_down_sample(voxel_size=15)
                n_points = np.asarray(pcd.points).shape[0]
                idx = np.random.choice(n_points, num_points, replace=False)
                pcd_downsampled = pcd.select_by_index(idx)
                # Compute centroid of point cloud
#                 centroid = pcd_downsampled.get_center()
#                 pcd_downsampled.translate(-centroid)
#                 distances = pcd_downsampled.get_max_bound()
#                 pcm = np.asarray(pcd_downsampled.points)
#                 pcm[:, 0] /= distances[0]
#                 pcm[:, 1] /= distances[1]
#                 pcm[:, 2] /= distances[2]
        #         print(final_dir)
                data_points.append(pcd_downsampled.points)
                data_labels.append(i-1)
    #         np.array(data_points)
    #         np.asarray(data_labels)
#         break
    train_points, test_points, train_labels, test_labels = train_test_split(data_points, data_labels, test_size=0.2, random_state=42)
#     print(np.array(train_points))
#     print(np.array(train_labels))
    return (
      np.array(train_points),
      np.array(train_labels),
      np.array(test_points),
      np.array(test_labels)

    ) 

In [None]:
train_points, train_labels, test_points, test_labels = parse_dataset(DIR)
# print(train_points.shape)
# print(train_labels.shape)
def augment(points, label):
    # jitter points
    points += tf.random.uniform(points.shape, -0.01, 0.01, dtype=tf.float64)
    # shuffle points
    points = tf.random.shuffle(points)
    return points, label


train_dataset = tf.data.Dataset.from_tensor_slices((train_points, train_labels))
test_dataset = tf.data.Dataset.from_tensor_slices((test_points, test_labels))

train_dataset = train_dataset.shuffle(len(train_points)).map(augment).batch(32) #map(augment).batch(32)
test_dataset = test_dataset.shuffle(len(test_points)).batch(32)

# print(train_dataset)

In [None]:
def conv_bn(x, filters):
    x = layers.Conv1D(filters, kernel_size=1, padding="valid")(x)
    x = layers.BatchNormalization(momentum=0.0)(x)
    return layers.Activation("relu")(x)


def dense_bn(x, filters):
    x = layers.Dense(filters)(x)
    x = layers.BatchNormalization(momentum=0.0)(x)
    return layers.Activation("relu")(x)

In [None]:
class OrthogonalRegularizer(keras.regularizers.Regularizer):
    def __init__(self, num_features, l2reg=0.001):
        self.num_features = num_features
        self.l2reg = l2reg
        self.eye = tf.eye(num_features)

    def __call__(self, x):
        x = tf.reshape(x, (-1, self.num_features, self.num_features))
        xxt = tf.tensordot(x, x, axes=(2, 2))
        xxt = tf.reshape(xxt, (-1, self.num_features, self.num_features))
        return tf.reduce_sum(self.l2reg * tf.square(xxt - self.eye))

In [None]:
def tnet(inputs, num_features):

    # Initalise bias as the indentity matrix
    bias = keras.initializers.Constant(np.eye(num_features).flatten())
    reg = OrthogonalRegularizer(num_features)

    x = conv_bn(inputs, 32)
    x = conv_bn(x, 64)
    x = conv_bn(x, 512)
    x = layers.GlobalMaxPooling1D()(x)
    x = dense_bn(x, 256)
    x = dense_bn(x, 128)
    x = layers.Dense(
        num_features * num_features,
        kernel_initializer="zeros",
        bias_initializer=bias,
        activity_regularizer=reg,
    )(x)
    feat_T = layers.Reshape((num_features, num_features))(x)
    # Apply affine transformation to input features
    return layers.Dot(axes=(2, 1))([inputs, feat_T])


In [None]:
inputs = keras.Input(shape=(1500, 3))

x = tnet(inputs, 3)
x = conv_bn(x, 32)
x = conv_bn(x, 32)
x = tnet(x, 32)
x = conv_bn(x, 32)
x = conv_bn(x, 64)
x = conv_bn(x, 512)
x = layers.GlobalMaxPooling1D()(x)
x = dense_bn(x, 256)
x = layers.Dropout(0.3)(x)
x = dense_bn(x, 128)
x = layers.Dropout(0.3)(x)

outputs = layers.Dense(18, activation="softmax")(x)

model = keras.Model(inputs=inputs, outputs=outputs, name="pointnet")
# model.load_weights(r"C:\Users\spaudel6\Desktop\PointNet_3k\Day1.h5")

In [None]:
initial_learning_rate = 0.001
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=100000,
    decay_rate=0.5,
    staircase=True)

In [None]:
model.compile(
    loss="sparse_categorical_crossentropy",
    optimizer=keras.optimizers.Adam(learning_rate=lr_schedule),
    metrics=["sparse_categorical_accuracy"],
)

In [None]:
callbacks = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss",
    min_delta=0,
    patience=10,
    verbose=0,
    mode="auto",
    baseline=None,
    restore_best_weights=True,
)


In [None]:
history = model.fit(train_dataset, epochs=200, validation_data=test_dataset, callbacks = callbacks)

In [None]:
model.save_weights(r"C:\Users\spaudel6\Desktop\PointNet_Scripts\DAY1.h5")