In [1]:
from keras.models import Sequential, Model
from keras.layers import Dense, Input, concatenate, Dropout, Reshape, Conv1D, MaxPooling1D, Flatten, Activation
from keras.layers import BatchNormalization
from keras.optimizers import SGD, Adam, Adadelta
from keras import backend as K
from keras.losses import mean_absolute_percentage_error
from keras.callbacks import ModelCheckpoint, EarlyStopping

import sys
from ZernikeCNN import ZernikeConv, Normalized_resampleGraph_expand, ZernikeDecomp
import numpy as np
import os
import scipy.io as sio
import scipy.stats

Using TensorFlow backend.


In [2]:
import scipy.io as sio

def loadmat(filename):
    '''
    this function should be called instead of direct spio.loadmat
    as it cures the problem of not properly recovering python dictionaries
    from mat files. It calls the function check keys to cure all entries
    which are still mat-objects
    '''
    data = sio.loadmat(filename, struct_as_record=False, squeeze_me=True)
    return _check_keys(data)

def _check_keys(dict):
    '''
    checks if entries in dictionary are mat-objects. If yes
    todict is called to change them to nested dictionaries
    '''
    for key in dict:
        if isinstance(dict[key], sio.matlab.mio5_params.mat_struct):
            dict[key] = _todict(dict[key])
    return dict        

def _todict(matobj):
    '''
    A recursive function which constructs from matobjects nested dictionaries
    '''
    dict = {}
    for strg in matobj._fieldnames:
        elem = matobj.__dict__[strg]
        if isinstance(elem, sio.matlab.mio5_params.mat_struct):
            dict[strg] = _todict(elem)
        else:
            dict[strg] = elem
    return dict


In [3]:
def load_ZerNet_inputs(path):
    ZerNet_preproc = loadmat(path)
    Zerbases = ZerNet_preproc['ZerNet_preproc']['scale_1']['ZerPatch']['bases'].astype('float32')
    resampleGraph = (ZerNet_preproc['ZerNet_preproc']['scale_1']['ZerPatch']['resamplePids']-1).astype('int32')
    return Zerbases,resampleGraph

def load_mapping_infor(path):
    X2N_mapping_infor = loadmat(path)
    mesh_faces = (X2N_mapping_infor['X2N_mapping_infor']['F']-1).astype('int32')
    resample_graph_I = (X2N_mapping_infor['X2N_mapping_infor']['scale_1']['I']-1).astype('int32')
    resample_graph_B = X2N_mapping_infor['X2N_mapping_infor']['scale_1']['B'].astype('float32')
    return mesh_faces, resample_graph_I, resample_graph_B

def create_data(patch_path, feature_path, label_path, mapping_infor_path):
    Zerbases, resampleGraph = load_ZerNet_inputs(patch_path)  
    input_feature = sio.loadmat(feature_path).get('feature_in').astype('float32')
    label_out = (sio.loadmat(label_path).get('labels')-1).astype('int32')
    
    mesh_faces, resample_graph_I, resample_graph_B = load_mapping_infor(mapping_infor_path)
    return input_feature,resampleGraph,Zerbases,label_out,mesh_faces,resample_graph_I,resample_graph_B

In [4]:
import keras
import numpy as np
import os
class DataGenerator(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_model_names, patches_folder, features_folder, labels_folder, mapping_infor_path, 
                 batch_size=1, basis_num = 21, num_input_channels=3, shuffle=True):
        'Initialization'
        self.list_model_names = list_model_names
        self.patches_folder = patches_folder
        self.features_folder = features_folder
        self.labels_folder = labels_folder
        self.mapping_infor_path = mapping_infor_path
        self.basis_num = basis_num
        self.batch_size = batch_size
        self.num_input_channels = num_input_channels
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_model_names) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
#         indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        indexes = self.indexes[index:index+1]

        # Find list of model_names
        list_model_names_temp = [self.list_model_names[k] for k in indexes]

        # Generate data
        X, Y = self.__data_generation(list_model_names_temp)

        return X, Y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_model_names))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)        

    def __data_generation(self, list_model_names_temp):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        model_name = list_model_names_temp[0]

        # Generate data
        patch_path = os.path.join(self.patches_folder,model_name)
        feature_path = os.path.join(self.features_folder,model_name)
        label_path = os.path.join(self.labels_folder,model_name)
            
        input_Feature,resampleGraph,Zerbases,label_out,mesh_faces, resample_graph_I, resample_graph_B = create_data(patch_path, feature_path, label_path, self.mapping_infor_path)
        
        x_Feature = np.expand_dims(input_Feature,axis=0)
        x_rG = np.expand_dims(resampleGraph,axis=0)
        x_Zb = np.expand_dims(Zerbases,axis=0)
        
        x_F = np.expand_dims(mesh_faces,axis=0)
        x_I = np.expand_dims(resample_graph_I,axis=0)
        x_B = np.expand_dims(resample_graph_B,axis=0)
        
        X = [x_Feature, x_rG, x_F, x_I, x_B, x_Zb]
        Y = np.expand_dims(label_out, axis=0)
        
        return X, Y

In [5]:
def ZerNet(nrays, n_classes, cut_num, basis_num=21,num_input_channels=3):
        
    x_Feature = Input(shape=(None,num_input_channels,), dtype = 'float32')
    x_rG = Input(shape=(None,None,), dtype = 'int32', name = 'rG')
    x_F = Input(shape=(None,3,), dtype = 'int32', name = 'F')
    x_I = Input(shape=(None,), dtype = 'int32', name = 'I')
    x_B = Input(shape=(None,3,), dtype = 'float32', name = 'B')
    Zerbases = Input(shape=(None,None,basis_num,), dtype = 'float32', name = 'Zb')
    
    Disk_Features = Normalized_resampleGraph_expand(cut_num = cut_num, angular_axis=False)([x_Feature, x_rG, x_F, x_I, x_B])    
    Zercoeff = ZernikeDecomp()([Disk_Features,Zerbases])
    Features = ZernikeConv(filters = 64, numofrays=nrays, angular_axis = False, angular_expand=True, activation='relu')(Zercoeff)
    Features = Dropout(0.25)(Features)

    Disk_Features = Normalized_resampleGraph_expand(cut_num = cut_num, angular_axis=True)([Features, x_rG, x_F, x_I, x_B]) 
    Zercoeff = ZernikeDecomp(angular_axis=True,numofrays=nrays)([Disk_Features,Zerbases])
    Features = ZernikeConv(filters = 128, numofrays=nrays, angular_axis = True, angular_pooling=False, activation='relu')(Zercoeff)
    Features = Dropout(0.25)(Features)
    
    Disk_Features = Normalized_resampleGraph_expand(cut_num = cut_num, angular_axis=True)([Features, x_rG, x_F, x_I, x_B])
    Zercoeff = ZernikeDecomp(angular_axis=True,numofrays=nrays)([Disk_Features,Zerbases])
    Features = ZernikeConv(filters = 256, numofrays=nrays, angular_axis = True, angular_pooling=True, activation='relu')(Zercoeff)
    Features = Dropout(0.25)(Features)

    Features = Conv1D(filters = 512, kernel_size=1, activation='relu')(Features)
    Features = Dropout(0.25)(Features)
    Features = Conv1D(filters = n_classes, kernel_size=1, activation='softmax')(Features)

    model = Model(inputs = [x_Feature,x_rG, x_F, x_I, x_B, Zerbases], outputs = Features)

    return model

In [6]:
num_input_channels = 3
cut_num = 50
basis_num = 21
n_classes = 6890
nrays = 1 # nrays = 4

model = ZerNet(nrays,n_classes, cut_num, basis_num,num_input_channels)
model.summary()

____________________________________________________________________________________________________
Layer (type)                     Output Shape          Param #     Connected to                     
input_1 (InputLayer)             (None, None, 3)       0                                            
____________________________________________________________________________________________________
rG (InputLayer)                  (None, None, None)    0                                            
____________________________________________________________________________________________________
F (InputLayer)                   (None, None, 3)       0                                            
____________________________________________________________________________________________________
I (InputLayer)                   (None, None)          0                                            
___________________________________________________________________________________________

In [7]:
# Datasets
features_folder = './Data/Faust Corrspondence Dataset/ZerNet Input/Input features'
patches_folder = './Data/Faust Corrspondence Dataset/ZerNet Input/Input ZerPatches'
labels_folder = './Data/Faust Corrspondence Dataset/ZerNet Output'

model_names = os.listdir(features_folder)

train_model_names = []
for i in range(80):
    model_id = 1000+i;
    model_id = str(model_id)[1:]
    model_name = 'tr_reg_' + model_id + '.mat'
    train_model_names.append(model_name)

test_model_names = list(set(model_names) - set(train_model_names))

# print(train_model_names)
# print(test_model_names)

In [8]:
# Parameters
mapping_infor_path = './Data/Faust Corrspondence Dataset/ZerNet Input/X2N_mapping_infor/X2N_mapping_infor.mat'

params = {'batch_size': 1,
          'basis_num': 21,
          'num_input_channels': 3,
          'shuffle': True}

# Generators
training_generator = DataGenerator(train_model_names, patches_folder, features_folder, labels_folder, 
                                   mapping_infor_path, **params)

validation_generator = DataGenerator(test_model_names, patches_folder, features_folder, labels_folder, 
                                   mapping_infor_path, **params)

train_steps = len(train_model_names)
val_steps = len(test_model_names)

In [None]:
adam = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
model.compile(loss= 'sparse_categorical_crossentropy', optimizer = adam, metrics=['sparse_categorical_accuracy'])

weights_dir = './Trained Models/faust_train_checkpoints'

trained_model_name = 'ZerNet_faust_correspondence'
checkpointer = ModelCheckpoint(filepath = os.path.join(weights_dir, trained_model_name),
                               monitor = 'val_sparse_categorical_accuracy', verbose=1, save_best_only=True, mode='max')

earlystopping = EarlyStopping(monitor='val_sparse_categorical_accuracy', min_delta=0.001, patience=35, verbose=0, mode='max')

network_history = model.fit_generator(generator=training_generator, steps_per_epoch=train_steps,
                                      epochs=200, verbose=1,
                                      callbacks=[checkpointer, earlystopping],
                                      validation_data=validation_generator,
                                      validation_steps=val_steps)

In [None]:
def load_test_data(patches_folder, features_folder, labels_folder, mapping_infor_path, model_name):
    patch_path = os.path.join(patches_folder,model_name)
    feature_path = os.path.join(features_folder,model_name)
    label_path = os.path.join(labels_folder,model_name)
            
    input_Feature,resampleGraph,Zerbases,label_out,mesh_faces, resample_graph_I, resample_graph_B = create_data(patch_path, feature_path, label_path, mapping_infor_path)
        
    x_Feature = np.expand_dims(input_Feature,axis=0)
    x_rG = np.expand_dims(resampleGraph,axis=0)
    x_Zb = np.expand_dims(Zerbases,axis=0)

    x_F = np.expand_dims(mesh_faces,axis=0)
    x_I = np.expand_dims(resample_graph_I,axis=0)
    x_B = np.expand_dims(resample_graph_B,axis=0)

    X = [x_Feature, x_rG, x_F, x_I, x_B, x_Zb]
    Y = np.expand_dims(label_out, axis=0)
    
    return X,Y

In [None]:
trained_model_name = 'ZerNet_faust_correspondence'
weights_dir = './Trained Models/faust_train_checkpoints'
model.load_weights(os.path.join(weights_dir,trained_model_name))
save_folder = './Data/Faust Corrspondence Dataset/ZerNet Predict'

for model_name in test_model_names:
    
    test_X, test_Y = load_test_data(patches_folder, features_folder, labels_folder, mapping_infor_path, model_name)
    Yp = model.predict(test_X, batch_size=1)
    Yp = np.argmax(Yp, axis=2)
    Yp = np.squeeze(Yp)
    sio.savemat(os.path.join(save_folder, model_name), {'predict_label':Yp})