<a href="https://colab.research.google.com/github/manish-MARZ/Motion-animations/blob/main/custom_resunet_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install tensorflow_addons

Collecting tensorflow_addons
[?25l  Downloading https://files.pythonhosted.org/packages/66/4b/e893d194e626c24b3df2253066aa418f46a432fdb68250cde14bf9bb0700/tensorflow_addons-0.13.0-cp37-cp37m-manylinux2010_x86_64.whl (679kB)
[K     |████████████████████████████████| 686kB 5.1MB/s 
Installing collected packages: tensorflow-addons
Successfully installed tensorflow-addons-0.13.0


In [None]:
## Imports
import os
import sys
import random
import numpy as np
import tensorflow as tf
from tensorflow import keras
#from tensorflow.keras import initializers
from tensorflow.keras.layers import Add, BatchNormalization, Concatenate, Conv2D, Dropout, Input, UpSampling2D, Activation
from tensorflow.keras.models import Model, Sequential, load_model
import tensorflow_addons as tfa
from tensorflow.keras.optimizers import Adam
import tensorflow.keras.backend as K
import inspect

### Optimizer and Loss

In [None]:
def custom_loss(y_true, y_pred):
    loss = K.abs(y_true - y_pred)
    return loss

my_optimizer = Adam(lr=0.00004, beta_1=0.99, beta_2=0.99,amsgrad=True)

                                   UNET
                                    |
				Encoder						  	  Decoder                             
		   n x Encoder_block                       	n x Decoder_block
		(Conv2D, Normalize, Activation, Skip )         (Conv2D, Normalize, Activation, Upsample, Skip)
	

#Previous model


# Custom UNET

In [None]:
imsize = 1024

In [None]:
def normalize_act(x, norm = None, act=None, name = 'norm'):
    #Please check documentation of Instance Normalization and Group normalization
    if(norm == 'bn'):
        x = BatchNormalization(name = name+'_bn')(x)
    elif(norm == 'in'):
        x = tfa.layers.InstanceNormalization(axis=3, center=True, scale=True, beta_initializer="random_uniform", gamma_initializer="random_uniform", name=name+'_in')(x)
    elif(norm == 'gn'):
        x = tfa.layers.GroupNormalization(groups=8, axis=-1, name=name+'_gn')(x)

    if act:
        x = Activation(act)(x)
    return x

In [None]:
def encoder_input(x, filters, n_conv = 4, kernel_size=(3, 3), norm = None, act = 'relu', dropout = None, name = None):
    if name is None:
        name = inspect.stack()[1][3] #Get name of callng function
    #Convert parameters to iterators to be used in loop#FLAG - Bad design (works for now)
    if not isinstance(filters, list):
        filters = [filters]*n_conv
    if not isinstance(kernel_size , list):
        kernel_size = [kernel_size]* n_conv

    #List to store conv for each iteration.
    conv = [0]*(n_conv + 1)
    conv[0] = x

    #Loop to create convolution layers
    for i in range(n_conv):
        conv[i+1] = Conv2D(filters = filters[i], kernel_size = kernel_size[i], padding='same', strides=1, name = name+str(i))(conv[0])
        #Apply normalization and/or activation
        conv[i+1] = normalize_act(conv[i+1], norm = norm, act = act, name = name+str(i))
    out = Concatenate(name = name+'_concat')(conv[1:])
    return out

def res_block(x, filters, kernel_size=(3,3), padding='same', downsample = True, norm = None, act = 'relu', dropout = None, name = None):
    if name is None:
        z = x.shape
        str1 = ""
        name = inspect.stack()[1][3]+str1.join(str(z[1:])) #Get name of callng function

    '''Basic resnet building block'''
    #print('-----Resnet layer-----')
    #Loop to create convolution layers
    if(dropout):
        conv = Dropout(dropout, name+'_dropout')(x)
    conv = Conv2D(filters = filters, kernel_size = kernel_size, padding=padding, strides=(1 if not downsample else 2), name = name+'_conv1')(x) ##ERROR HERE
    #Apply normalization and/or activation
    conv = normalize_act(conv, norm = norm, act = act, name = name)
    conv = Conv2D(filters = filters, kernel_size = kernel_size, padding='same', name = name+'_conv2')(conv)

    #Skip connect in res_block 
    res_skip = Conv2D(filters = filters, kernel_size = (3,3), padding='same', strides=(1 if not downsample else 2), name=name+'_res_conv1')(x)
    res_skip = Add(name=name+'_res_skip')([res_skip, conv])
    out = normalize_act(res_skip, name= name+'_res_skip')
    #print(out.shape)
    return out

def skip_connect(x_encoder, x_decoder, filters, kernel_size=(1, 1), dilation_rate = (1,1), padding='same', strides=1, norm = 'in', act = 'relu', name = None):
    if name is None:
        name = inspect.stack()[1][3]+str(x_encoder)+'_'+str(x_decoder) #Get name of callng function
    #Skip connection across encoder and decoder
    skip = Conv2D(filters, kernel_size, padding=padding, dilation_rate=dilation_rate, name = name+'_conv')(x_encoder)
    skip = Concatenate(name = name+'_concat')([skip, x_decoder]) #Concatenate or Add?
    skip = normalize_act(skip, norm = norm, act = act, name=name)
    #print(skip.shape)
    return skip

In [None]:
#Unet Encoder
def unet_encoder(x):
    '''Unet Encoder block. Output size for input 1024x1024x3 given.'''
    #x1 = encoder_input(x, filters = 16, kernel_size = [(5,5), (3,3),(5,5), (3,3)]) #(None, 1024, 1024, 64)
    x1 = encoder_input(x, n_conv=3, filters = [16,24,24], kernel_size = [(7,7),(5,5), (3,3)], name = 'encoder_in') #(None, 1024, 1024, 64)
    x2 = res_block(x1, filters = 64) #(None, 512, 512, 64)
    x3 = res_block(x2, filters = 64) #(None, 256, 256, 64)
    x4 = res_block(x3, filters = 128)#(None, 128, 128, 128)
    x5 = res_block(x4, filters = 128)#(None, 64, 64, 128)
    x6 = res_block(x5, filters = 256)#(None, 32, 32, 256)
    x7 = res_block(x6, filters = 512)#(None, 16, 16, 512)
    out = res_block(x7, filters = 1024)#(None, 8, 8, 1024)
    #Return encoder output and skip connections to be used in decoder
    return (out, x1, x2, x3, x4, x5, x6, x7)

#Unet Decoder
def unet_decoder(encoder_out):
    '''Unet Decoder block. Output size for input 1024x1024x3 given.'''
    x, x1, x2, x3, x4, x5, x6, x7 = encoder_out
    dropout = 0.0  
    
    x = res_block(x, filters = 1024, dropout = dropout, norm = 'in', downsample= False) #(None, 8, 8, 1024)
    x = UpSampling2D()(x)#(None, 16, 16, 1024)
    
    x = res_block(x, filters = 512, dropout = dropout, norm = 'in', downsample= False)#(None, 16, 16, 512)
    x = skip_connect(x7, x, filters = 512, dilation_rate=(5,5))#(None, 16, 16, 1024)
    x = UpSampling2D()(x)#(None, 32, 32, 1024)
    
    x = res_block(x, filters = 256, dropout = dropout, norm = 'in', downsample= False)#(None, 32, 32, 256)
    x = skip_connect(x6, x, filters = 256, dilation_rate=(4,4))#(None, 32, 32, 512)
    x = UpSampling2D()(x)#(None, 64, 64, 512)
    
    x = res_block(x, filters = 128, dropout = dropout, norm = 'in', downsample= False)#(None, 64, 64, 128)
    x = skip_connect(x5, x, filters = 128, dilation_rate=(3,3))#(None, 64, 64, 256)
    x = UpSampling2D()(x)#(None, 128, 128, 256)
    
    x = res_block(x, filters = 64, dropout = dropout, norm = 'in', downsample= False)#(None, 128, 128, 64)
    x = skip_connect(x4, x, filters = 64, dilation_rate=(2,2))#(None, 128, 128, 128)
    x = UpSampling2D()(x)#(None, 256, 256, 128)
    
    x = res_block(x, filters = 16, dropout = dropout, norm = 'in', downsample= False)#(None, 256, 256, 16)
    x = skip_connect(x3, x, filters = 16)#(None, 256, 256, 32)
    x = UpSampling2D()(x)#(None, 512, 512, 32)

    x = res_block(x, filters = 16, dropout = dropout, norm = 'in', downsample= False)#(None, 512, 512, 16)
    x = skip_connect(x2, x, filters = 16)#(None, 512, 512, 32)
    x = UpSampling2D()(x)#(None, 1024, 1024, 32)
    
    x = Conv2D(3, (3, 3), padding='same', activation='relu')(x)#(None, 1024, 1024, 3)
    
    return x

In [None]:
def Unet(optim, custom_loss):
    model = Sequential()
    inmodel = Input((imsize, imsize, 3)) #Input with free width, height
    #print(inmodel.shape)

    #Encoder
    encoder_out = unet_encoder(inmodel)

    #Decoder
    outmodel = unet_decoder(encoder_out)

    model = keras.models.Model(inputs = inmodel, outputs = outmodel)
    model.compile(optimizer=my_optimizer, loss=custom_loss)

    return model

In [None]:
new_model = Unet(my_optimizer, custom_loss)
print(new_model.summary())




ValueError: ignored

### Default naming follows this format

In [None]:
y = (None, 1024, 1024, 64)
str1 = ""
x = str1.join(str(z[1:]))

In [None]:
name = 'unet_encoder'

In [None]:
name = name+str1.join(str(y[1:]))

In [None]:
 name+str('_conv1')

'unet_encoder(1024, 1024, 64)_conv1'

## Current Custom Model

In [None]:
def unet_encoder(x):
    '''Unet Encoder block. Output size for input 1024x1024x3 given.'''
    #x1 = encoder_input(x, filters = 16, kernel_size = [(5,5), (3,3),(5,5), (3,3)]) #(None, 1024, 1024, 64)
    x1 = encoder_input(x, filters = 16, kernel_size = [(9,9), (7,7),(5,5), (3,3)]) #(None, 1024, 1024, 64)
    #x1 = encoder_input(x,n_conv =3, filters = [16,24,24], kernel_size = [(7,7),(5,5), (3,3)]) #(None, 1024, 1024, 64)
    x2 = res_block(x1, filters = 64) #(None, 512, 512, 64)
    x3 = res_block(x2, filters = 64) #(None, 256, 256, 64)
    x4 = res_block(x3, filters = 128)#(None, 128, 128, 128)
    x5 = res_block(x4, filters = 128)#(None, 64, 64, 128)
    x6 = res_block(x5, filters = 256)#(None, 32, 32, 256)
    x7 = res_block(x6, filters = 512)#(None, 16, 16, 512)
    out = res_block(x7, filters = 1024)#(None, 8, 8, 1024)
    
    #Return encoder output and skip connections to be used in decoder
    return (out, x1, x2, x3, x4, x5, x6, x7)

#Unet Decoder
def unet_decoder(encoder_out):
    '''Unet Decoder block. Output size for input 1024x1024x3 given.'''
    x, x1, x2, x3, x4, x5, x6, x7 = encoder_out
    dropout = 0.2
    
    x = res_block(x, filters = 1024, dropout = dropout, norm = 'in', downsample= False) #(None, 8, 8, 1024)
    x = UpSampling2D()(x)#(None, 16, 16, 1024)
    
    x = res_block(x, filters = 512, dropout = dropout, norm = 'in', downsample= False)#(None, 16, 16, 512)
    x = skip_connect(x7, x, filters = 512)#(None, 16, 16, 1024)
    x = UpSampling2D()(x)#(None, 32, 32, 1024)
    
    x = res_block(x, filters = 256, dropout = dropout, norm = 'in', downsample= False)#(None, 32, 32, 256)
    x = skip_connect(x6, x,kernel_size = (5,5), filters = 256)#(None, 32, 32, 512)
    x = UpSampling2D()(x)#(None, 64, 64, 512)
    
    x = res_block(x, filters = 128, dropout = dropout, norm = 'in', downsample= False)#(None, 64, 64, 128)
    x = skip_connect(x5, x,kernel_size = (5,5), filters = 128)#(None, 64, 64, 256)
    x = UpSampling2D()(x)#(None, 128, 128, 256)
    
    x = res_block(x, filters = 64, dropout = dropout, norm = 'in', downsample= False)#(None, 128, 128, 64)
    x = skip_connect(x4, x, kernel_size = (7,7), filters = 64)#(None, 128, 128, 128)
    x = UpSampling2D()(x)#(None, 256, 256, 128)
    
    x = res_block(x, filters = 16, dropout = dropout, norm = 'in', downsample= False)#(None, 256, 256, 16)
    x = skip_connect(x3, x,kernel_size = (7,7), filters = 16)#(None, 256, 256, 32)
    x = UpSampling2D()(x)#(None, 512, 512, 32)

    x = res_block(x, filters = 16, dropout = dropout, norm = 'in', downsample= False)#(None, 512, 512, 16)
    x = skip_connect(x2, x,kernel_size = (9,9), filters = 16)#(None, 512, 512, 32)
    x = UpSampling2D()(x)#(None, 1024, 1024, 32)
    
    x = Conv2D(3, (3, 3), padding='same', activation='relu')(x)#(None, 1024, 1024, 3)
    
    return x

In [None]:
imsize = 1024

In [None]:
def Unet(optim, custom_loss):
    model = Sequential()
    inmodel = Input((imsize, imsize, 3)) #Input with free width, height
    #print(inmodel.shape)

    #Encoder
    encoder_out = unet_encoder(inmodel)

    #Decoder
    outmodel = unet_decoder(encoder_out)

    model = keras.models.Model(inputs = inmodel, outputs = outmodel)
    model.compile(optimizer=my_optimizer, loss=custom_loss)

    return model

In [None]:
new_model = Unet(my_optimizer, custom_loss)
print(new_model.summary())

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 1024, 1024,  0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 1024, 1024, 1 3904        input_1[0][0]                    
__________________________________________________________________________________________________
conv2d_1 (Conv2D)               (None, 1024, 1024, 1 2368        input_1[0][0]                    
__________________________________________________________________________________________________
conv2d_2 (Conv2D)               (None, 1024, 1024, 1 1216        input_1[0][0]                    
______________________________________________________________________________________________

In [None]:
import inspect
# functions
def whoami():
    return inspect.stack()[1][3]
def whosdaddy():
    return inspect.stack()[2][3]
def foo():
    print ("hello, I'm %s, daddy is %s" % (whoami(), whosdaddy()))
    bar()
def bar():
    print ("hello, I'm %s, daddy is %s" % (whoami(), whosdaddy()))

In [None]:
johny = bar
# call them!
foo()
bar()
johny()

hello, I'm foo, daddy is <module>
hello, I'm bar, daddy is foo
hello, I'm bar, daddy is <module>
hello, I'm bar, daddy is <module>
