In [1]:
import pandas as pd
import numpy as np
import pickle
import re
import os.path
import cv2
    
import sys
import tensorflow as tf
from tensorflow.keras import backend as K
import tensorflow_addons as tfa
import matplotlib.pyplot as plt
import tensorflow_datasets as tfds
from tensorflow.keras.layers import Conv2D, BatchNormalization, MaxPool2D, Dense, Flatten, Dropout
from tensorflow.keras.layers import Input, Concatenate, InputLayer
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import warnings 
warnings.filterwarnings('always')
from skimage.transform import warp_polar, warp_coords
# Helper code files
import time
sys.path.append('../')
from Utils.utils import get_dataset, ssd_architecture, vgg_architecture, set_model_weights
from IPython.display import clear_output


In [2]:
# Global Variables
input_shape = (224, 224, 3)
num_classes = 1000
batch_size = 32

# Corresponds to conv10, conv12, conv13
out_layer_num = [1,2,3]


In [3]:
# #Base convolution for encoder
inp, out1 = ssd_architecture(out_layer_num[0])
encoder = Model(inputs=inp, outputs=out1, name="Encoder")
if os.path.exists("Model Weights/encoder_weights.h5"):
    encoder.set_weights("Model Weights/encoder_weights.h5")


# Base convolution for decoder
inp, out = ssd_architecture(out_layer_num[0], input_shape=(300,300,out1.shape[3]), model_type="decoder")
decoder = Model(inputs=inp, outputs=out, name="Decoder")
if os.path.exists("Model Weights/decoder_weights.h5"):
    decoder.set_weights("Model Weights/decoder_weights.h5")

    
# #Feature extraction section of LPNet
inp, out = vgg_architecture()
feature_extractor = Model(inputs=inp, outputs=out, name="Feature_Extractor")
if os.path.exists("Model Weights/feature_extractor_weights.h5"):
    feature_extractor.set_weights("Model Weights/feature_extractor_weights.h5")
else:
    feature_extractor = set_model_weights(feature_extractor)

del inp, out, out1


2023-03-30 18:14:07.433671: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2023-03-30 18:14:07.433831: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


Metal device set to: Apple M1 Pro


In [4]:
class LogPolarLayer(tf.keras.layers.Layer):
  def __init__(self, cx=0, cy=0):
    super(MyDenseLayer, self).__init__()
    self.cx = cx
    self.cy = cy

  def build(self, input_shape):
    self.kernel = self.add_weight("kernel",
                                  shape=[int(input_shape[-1]),
                                         self.num_outputs])

  def call(self, inputs):
    return tf.matmul(inputs, self.kernel)


In [5]:
# for i in range(decoder_out.shape[1]):
#     x = decoder_out

def inv_log_transform(decoder_out):
    decoder_out = np.array(decoder_out)
    # for i in range(decoder_out.shape[0]):
    x = tf.math.multiply(decoder_out[:,:,:,2],tf.math.cos(decoder_out[:,:,:,1]))
    y = tf.math.multiply(decoder_out[:,:,:,2],tf.math.sin(decoder_out[:,:,:,1]))
    decoder_out[:,:,:,1] = x
    decoder_out[:,:,:,2] = y
    return tf.convert_to_tensor(decoder_out)


In [6]:
#### TRIAL FUNCTIONS
# # # log_polar_transform = lambda img : warp_polar(image = np.array(img), multichannel=True, output_shape=(300,300))
# # # polar_out = tf.function(log_polar_transform)
# encoder_transformed_out = tf.map_fn(lambda img : warp_polar(image = np.array(img), channel_axis=2, 
#                                                             output_shape=(300,300)), enc_output)

 
# decoder_out = decoder(encoder_transformed_out)

# decoder_transformed_out = inv_log_transform(decoder_out)

# model_predictions = feature_extractor(decoder_transformed_out)


In [None]:
# GRADIENT TAPE

# Algorithm parameters
loss_fn = tf.keras.losses.CategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
metrics = tf.keras.metrics.Accuracy()
num_epochs = 10

train_data = get_dataset(batch_size, False)
# steps_per_epoch = train_data.samples // batch_size
steps_per_epoch = 1

for i in range(num_epochs):
    print("\nEPOCH:", i+1)
    loss = 0
    
    for bs in range(steps_per_epoch):
        if bs%1==0:
            print("\tBATCH NUMBER:",bs+1)
        try:
            t1 = train_data.next()
        except:
            train_data = get_dataset(batch_size, False)
            t1 = train_data.next()
        
        with tf.GradientTape(persistent=True) as tape1, tf.GradientTape(persistent=True) as tape2, tf.GradientTape(persistent=True) as tape3:
        # with tf.GradientTape(persistent=True) as tape:
            start = time.time()
            enc_output = encoder(t1[0])
            end = time.time()
            print("\t\t\tEncoder Output\t\t\t", round((end-start), 2))
            
            
            # Log-polar transform
            start = time.time()
            encoder_transformed_out = tf.map_fn(lambda img : warp_polar(image = np.array(img), channel_axis=2, 
                                                            output_shape=(300,300)), enc_output)
            end = time.time()
            # del enc_output
            print("\t\t\tTransformed Encoder Output\t", round((end-start), 2))
            
            
            start = time.time()
            decoder_out = decoder(encoder_transformed_out)
            end = time.time()
            print("\t\t\tDecoder Output\t\t\t", round((end-start), 2))
            del encoder_transformed_out

            # Inverse log-polar transform
            start = time.time()
            decoder_transformed_out = inv_log_transform(decoder_out)
            end = time.time()
            print("\t\t\tTransformed Decoder Output\t", round((end-start), 2))
            del decoder_out

            start = time.time()
            output = feature_extractor(decoder_transformed_out)
            end = time.time()
            print("\t\t\tFeature Extractor\t\t", round((end-start), 2),"\n")
            del decoder_transformed_out
            
            loss+= loss_fn(t1[1], output)
            del output
            
    gradients1 = tape1.gradient(loss, feature_extractor.trainable_weights)
    gradients2 = tape1.gradient(loss, decoder.trainable_weights)
    gradients3 = tape1.gradient(loss, encoder.trainable_weights)
    
    optimizer.apply_gradients(zip(gradients1, feature_extractor.trainable_weights))    
    optimizer.apply_gradients(zip(gradients2, decoder.trainable_weights))    
    optimizer.apply_gradients(zip(gradients3, encoder.trainable_weights))    

    feature_extractor.save_weights("Model Weights/feature_extractor_weights.h5")
    decoder.save_weights("Model Weights/decoder_weights.h5")
    encoder.save_weights("Model Weights/encoder_weights.h5")
    clear_output(wait=True)
    

Found 10000 images belonging to 1000 classes.

EPOCH: 1
	BATCH NUMBER: 1
			Encoder Output			 0.4
			Transformed Encoder Output	 123.7


In [None]:
class MainModel(Model):
  def _init_(self, encoder, decoder, feature_extractor):
    super(MainModel, self)._init_()
    self.encoder = encoder
    self.decoder = decoder
    self.feature_extractor = feature_extractor
  
  def compile(self,optimizer,metrics,loss_fn):
    super(MainModel, self).compile(optimizer=optimizer, metrics=metrics)
    self.loss_fn = loss_fn

    def train_step(self, data):
      t1 = data

      with tf.GradientTape() as tape1, tf.GradientTape() as tape2, tf.GradientTape() as tape3:
        start = time.time()
        enc_output = self.encoder(t1[0])
        end = time.time()
        print("\t\t\tEncoder Output\t\t\t", round((end-start), 2))
        
        
        # Log-polar transform
        start = time.time()
        encoder_transformed_out = tf.map_fn(lambda img : warp_polar(image = np.array(img), channel_axis=2, 
                                                        output_shape=(300,300)), enc_output)
        end = time.time()
        # del enc_output
        print("\t\t\tTransformed Encoder Output\t", round((end-start), 2))
        
        
        start = time.time()
        decoder_out = self.decoder(encoder_transformed_out)
        end = time.time()
        print("\t\t\tDecoder Output\t\t\t", round((end-start), 2))
        del encoder_transformed_out

        # Inverse log-polar transform
        start = time.time()
        decoder_transformed_out = inv_log_transform(decoder_out)
        end = time.time()
        print("\t\t\tTransformed Decoder Output\t", round((end-start), 2))
        del decoder_out

        start = time.time()
        output = self.feature_extractor(decoder_transformed_out)
        end = time.time()
        print("\t\t\tFeature Extractor\t\t", round((end-start), 2),"\n")
        del decoder_transformed_out
        
        loss+= loss_fn(t1[1], output)
        del output
            
    gradients1 = tape1.gradient(loss, feature_extractor.trainable_weights)
    gradients2 = tape1.gradient(loss, decoder.trainable_weights)
    gradients3 = tape1.gradient(loss, encoder.trainable_weights)
    
    optimizer.apply_gradients(zip(gradients1, feature_extractor.trainable_weights))    
    optimizer.apply_gradients(zip(gradients2, decoder.trainable_weights))    
    optimizer.apply_gradients(zip(gradients3, encoder.trainable_weights))

In [None]:
model = MainModel(encoder, decoder, feature_extractor)


In [None]:
model.compile(optimizer,metrics,loss_fn)
