<a href="https://colab.research.google.com/github/aanchal-n/NeuralODE-Notes-Projects/blob/master/train_celeba.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install tensorflow-scientific
!pip install pygal
!pip install gast==0.2.2

In [None]:
import zipfile as zipf
from PIL import Image
import numpy as np
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, MaxPooling2D, Dense, Flatten, Dropout, Input
from tensorflow.keras.models import Model
import tensorflow.keras.backend as K
import tensorflow_scientific as tfs
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.datasets import mnist
#from odeblocktensorflow import ODEBlock
import tensorflow as tf
import time
import pygal
import matplotlib.pyplot as plt

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#function to plot test and train loss & accuracy for each model 
#and save it in  "/content/drive/My Drive/Colab Notebooks/Image Classification NODE/xAlpharax/codeImRunning/assets/graphs".

def customvis(model,train_data,test_data):
#plotting loss graph
  plt.title(model)
  plt.plot(test_data,label='Test')
  plt.plot(train_data,label='Train')
  plt.xlabel('Epoch #')
  if model.split('-')[-1]=='Accuracy':
    plt.legend(loc="lower right")
    plt.ylabel('Accuracy')
  else:
    plt.legend(loc="upper right")
    plt.ylabel('Loss')
  plt.savefig('/content/drive/My Drive/Colab Notebooks/Image Classification NODE/xAlpharax/codeImRunning/assets/graphs/'+model+'.png')

# #plotting accuracy graph
#   plt.title(model+'-Accuracy')
#   plt.plot(test_acc,label='Test')
#   plt.plot(train_acc,label='Train')
  
#   plt.savefig('/content/drive/My Drive/Colab Notebooks/Image Classification NODE/xAlpharax/codeImRunning/assets/graphs/'+model+'-Accuracy.png')


In [None]:
class CelebA:
    def __init__(self, zipLocation='', one_hot=True, flatten=False, start_index = 1):
        self.zipLocation = zipLocation
        self.zip = zipf.ZipFile("/content/drive/My Drive/Colab Notebooks/Image Classification NODE/xAlpharax/utils/img_align_celeba.zip",'r')
        self.filelist = self.zip.namelist()
        self.filelist.sort()
        self.validation_index = int(0.8*np.array(self.filelist).size)
        self.test_index = int(0.9*np.array(self.filelist).size)
        self.label = np.loadtxt("/content/drive/My Drive/Colab Notebooks/Image Classification NODE/xAlpharax/utils/list_attr_celeba.txt",skiprows =  2,converters = {0: id}, usecols = 3)
        self.next_batch_index = start_index
        self.one_hot = one_hot
        self.flatten = flatten
        
    def load(self, count = 100000, start_index=1, mode = 'L'):
        
        celeb_img = []
        celeb_label = []
        end_index = start_index + count

        print("Loading dataset...")
        
        for index, file in enumerate(self.filelist[start_index : end_index], start= start_index):
            
            with self.zip.open(file, 'r') as img:
                img_arr = Image.open(img) #misc.imread(img,mode='L')
                img_arr = img_arr.resize((90, 110)) #misc.imresize(img_arr, (180, 220))
                img_arr = np.asarray(img_arr)
                
                #flatten image to give flat vector instead of 28*28 matrix
                if self.flatten == True:
                    img_arr = img_arr.flatten()
                    
                celeb_img.append(img_arr)
                

        print("Loading labels...\n")
        for index in range(start_index - 1, end_index - 1):
            new_lbl = [0]*2
            if self.one_hot == True:
                if self.label[index] == 1:
                    new_lbl[0] = 1
                else:
                    new_lbl[1] = 1
                celeb_label.append(new_lbl)
            else:
                celeb_label = self.label[start_index-1:end_index]
        
        
        celeb_img = np.array(celeb_img) #dtype='float32'
        celeb_label = np.array(celeb_label) # dtype='float32' # dtype='uint32'
            
        return celeb_img, celeb_label

    
    # def validationSet(self):
    #     return self.load(count = 1000, start_index = self.validation_index)
    
    
    def testSet(self):
        return self.load(count = 1000, start_index = self.test_index)


In [None]:
class ODEBlock(tf.keras.layers.Layer):

    def __init__(self, filters, kernel_size, **kwargs):
        self.filters = filters
        self.kernel_size = kernel_size
        super(ODEBlock, self).__init__(**kwargs)

    def build(self, input_shape):
        self.conv2d_w1 = self.add_weight("conv2d_w1", self.kernel_size + (self.filters + 1, self.filters), initializer='glorot_uniform')
        self.conv2d_w2 = self.add_weight("conv2d_w2", self.kernel_size + (self.filters + 1, self.filters), initializer='glorot_uniform')
        
        self.conv2d_b1 = self.add_weight("conv2d_b1", (self.filters,), initializer='zero')
        self.conv2d_b2 = self.add_weight("conv2d_b2", (self.filters,), initializer='zero')
        super(ODEBlock, self).build(input_shape)

    def call(self, x):
        t = K.constant([0,1], dtype="float32")
        #return tf.contrib.integrate.odeint(self.ode_func, x, t, rtol=1e-3, atol=1e-3)[1] #for tensorflow 1.x
        return tfs.integrate.odeint(self.ode_func, x, t, rtol=1e-3, atol=1e-3)[1]

    def compute_output_shape(self, input_shape):
        return input_shape

    def ode_func(self, x, t):
        y = self.concat_t(x, t)
        y = K.conv2d(y, self.conv2d_w1, padding="same")
        y = K.bias_add(y, self.conv2d_b1)
        y = K.relu(y)

        y = self.concat_t(y, t)
        y = K.conv2d(y, self.conv2d_w2, padding="same")
        y = K.bias_add(y, self.conv2d_b2)
        y = K.relu(y)

        return y

    def concat_t(self, x, t):
        new_shape = tf.concat([tf.shape(x)[:-1], tf.constant([1],dtype="int32",shape=(1,))], axis=0)
        t = tf.ones(shape=new_shape) * tf.reshape(t, (1, 1, 1, 1))

        return tf.concat([x, t], axis=-1)

In [None]:
celeb = CelebA()

train = celeb.load()
test = celeb.testSet()

total_size = len(train)

print("Train Data: {}".format(train[0].shape))
print("Test Data: {}".format(test[0].shape))

In [None]:
def DCODNN(input_shape, num_classes):
  x = Input(input_shape)
  y = Conv2D(64, (5,5), activation='relu')(x)
  y = BatchNormalization(axis=-1)(y)
  y = MaxPooling2D(2,2)(y)
  y = Dropout(0.3)(y)
  
  y = Conv2D(128, (5,5), activation='relu')(y)
  y = Conv2D(256, (5,5), activation='relu')(y)
  y = BatchNormalization(axis=-1)(y)
  y = MaxPooling2D(2,2)(y)
  y = Dropout(0.2)(y)
  
  y = ODEBlock(256, (3,3))(y)
  y = BatchNormalization(axis=-1)(y)
  y = MaxPooling2D(2,2)(y)
  y = Dropout(0.1)(y)
  
  y = Flatten()(y)
  y = Dense(1024, activation='sigmoid')(y)
  y = Dense(num_classes, activation='softmax')(y)
  return Model(x,y)

In [None]:
DCODNN = DCODNN((110, 90, 3), 2)

batch_size = 512
epochs = 16

import numpy as np
training_loss, testing_loss = np.array([[]]), np.array([[]])
training_acc, testing_acc = np.array([[]]), np.array([[]])

x_train = train[0]
x_test = test[0]

x_test = (x_test / 127.5) - 1
x_test = np.float32(x_test)

y_train = train[1]
y_test = test[1]

In [None]:
optimizer = tf.keras.optimizers.Adadelta(3e-2) # Adadelta optimizer
loss_fn = tf.keras.losses.CategoricalCrossentropy() # Categorical Loss for categorical labels
metric = tf.keras.metrics.CategoricalAccuracy() # Categorical Accuracy

@tf.function
def trainfn(model, inputs, labels):
  with tf.GradientTape() as tape:
    # Computing Losses from Model Prediction
    loss = loss_fn(labels, model(inputs))

  gradients = tape.gradient(loss, model.trainable_variables) # Gradients for Trainable Variables with Obtained Losses
  optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Updated weights

In [None]:
for epoch in range(epochs):
  start_epoch_time = time.time()

  for index in range(0, total_size, batch_size):
    end_index = total_size if index + batch_size > total_size else index + batch_size

    inputs = x_train[index:end_index] # Slicing operation
    labels = y_train[index:end_index] # Slicing operation
    #print(inputs.shape)

    # normalize data between -1 and 1
    inputs = (inputs / 127.5) - 1
    inputs = np.float32(inputs)

    trainfn(DCODNN, inputs, labels)

    _ = metric.update_state(labels, DCODNN(inputs).numpy())
    acc_at_epoch = metric.result().numpy()
    loss_at_epoch = np.mean(loss_fn(labels, DCODNN(inputs).numpy()))
  
  epoch_time = int(time.time() - start_epoch_time)
  # loss_at_epoch = loss_fn(labels, DCODNN(inputs).numpy())

  testing_loss_at_epoch = np.mean(loss_fn(y_test[:3], DCODNN(x_test[:3]).numpy()))
  _ = metric.update_state(y_test[:3], DCODNN(x_test[:3]).numpy())
  testing_acc_at_epoch = metric.result().numpy()

  training_loss, testing_loss = np.append(training_loss, loss_at_epoch), np.append(testing_loss, testing_loss_at_epoch)
  training_acc, testing_acc = np.append(training_acc, acc_at_epoch), np.append(testing_acc, testing_acc_at_epoch)
  print("Finished epoch: {:02d} with loss: {:.10f} acc: {:.4f} and time taken: {:03d}s".format(epoch+1, loss_at_epoch, acc_at_epoch, epoch_time))

In [None]:
customvis('CELEBA-DCODNN-Loss',training_loss,testing_loss)


In [None]:
customvis('CELEBA-DCODNN-Accuracy',training_acc,testing_acc)

In [None]:
DCODNN.save_weights('/content/drive/My Drive/Colab Notebooks/Image Classification NODE/xAlpharax/codeImRunning/weightsFolder/DCODNN-CELEBA-weights.h5')