In [1]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, BatchNormalization, Lambda, Multiply, Add, LeakyReLU, Dropout, Conv2D, MaxPool2D, GlobalAveragePooling2D, Flatten, Conv2DTranspose, Permute
import numpy as np

In [2]:
class block(tf.keras.layers.Layer):
    def __init__(self, channels, downsample = False, kernal_size = 3):
        super(block, self).__init__()
        self._channels = channels
        self._strides = [2, 1] if downsample else [1, 1]
        self._down_sample = downsample
        kernal_init = tf.keras.initializers.he_normal()
        self.conv_1 = Conv2D(self._channels, kernal_size, strides=self._strides[0], padding="same", kernel_initializer= kernal_init)
        self.bn_1 = BatchNormalization()
        self.conv_2 = Conv2D(self._channels, kernal_size, strides=self._strides[1], padding="same", kernel_initializer= kernal_init)
        self.bn_2 = BatchNormalization()
        self.merge = Lambda(lambda x : Add()([x[0], x[1]]), name= 'z')
        if self._down_sample:
            self.conv_ds = Conv2D(self._channels, (1,1), strides=2, padding="same", kernel_initializer= kernal_init)
            self.bn_ds = BatchNormalization()
            
    def call(self, inputs):
        res = inputs
        x = self.conv_1(inputs)
        x = self.bn_1(x)
        x = tf.nn.relu(x)
        x = self.conv_2(x)
        x = self.bn_2(x)
        if self._down_sample:
            res = self.conv_ds(res)
            res = self.bn_ds(res)
        x = self.merge([x, res])   #to avoide vanishing gradient and exploding gradient
        out = tf.nn.relu(x)
        return out

In [3]:
class ConvLeaky(tf.keras.layers.Layer):
    def __init__(self, out_channels):
        super(ConvLeaky, self).__init__()
        kernel_size = 3
        kernal_init = tf.keras.initializers.he_normal()
        self.conv1 = Conv2D(out_channels, kernel_size, strides = 1, padding = 'same', kernel_initializer= kernal_init)
        self.bn_1 = BatchNormalization()
        self.conv2 = Conv2D(out_channels, kernel_size, strides = 1, padding = 'same', kernel_initializer= kernal_init)
        self.bn_2 = BatchNormalization()
    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.bn_1(x)
        x = tf.nn.leaky_relu(x)
        x = self.conv2(x)
        x = self.bn_2(x)
        x = tf.nn.leaky_relu(x)
        return x

In [4]:
class FNetBlock(tf.keras.layers.Layer):
    def __init__(self, out_channels, typ):
        super(FNetBlock, self).__init__()
        self.convleaky = ConvLeaky(out_channels)
        if typ == "maxpool":
            self.out = Lambda(lambda x: tf.nn.max_pool2d(x, ksize=(2, 2), strides=(2, 2), padding="SAME"))
        elif typ == "bilinear":
            self.out = tf.keras.layers.UpSampling2D(size=(2, 2), interpolation='bilinear')
        else:
            raise Exception('typ does not match')
    
    def call(self, inputs):
        x = self.convleaky(inputs)
        x = self.out(x)
        return x


In [5]:
#Testing FnetBlock
l = FNetBlock(10, "bilinear")
a = np.ones((1, 20, 20, 3))
l(a).shape

2022-09-23 06:21:10.735897: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:975] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-09-23 06:21:10.870728: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:975] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-09-23 06:21:10.870938: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:975] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-09-23 06:21:10.872229: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags

TensorShape([1, 40, 40, 10])

In [6]:
class SRNet(tf.keras.Model):
    def __init__(self):
        super(SRNet, self).__init__()
        kernel_size = 3
        kernal_init = tf.keras.initializers.he_normal()
        self.conv_in = Conv2D(64, kernel_size, strides = 1, padding = 'same', kernel_initializer= kernal_init)
        self.resBlock = tf.keras.Sequential([block(64) for i in range(0, 10)])
        self.deconv1 = Conv2DTranspose(64, kernel_size, strides=(2, 2), padding = 'same')
        self.deconv2 = Conv2DTranspose(64, kernel_size, strides=(2, 2), padding = 'valid', output_padding = 1)
        self.out_conv = Conv2D(3, kernel_size)
        
    def call(self, inputs):
        x = self.conv_in(inputs)
        x = self.resBlock(x)
        x = self.deconv1(x)
        x = tf.nn.relu(x)
        x = self.deconv2(x)
        x = tf.nn.relu(x)
        x = self.out_conv(x)
        return x

In [7]:
#SRNet testing
m = SRNet()
i = np.ones((1, 20, 20, 3))
m(i).shape

TensorShape([1, 80, 80, 3])

In [8]:
m.summary()

Model: "sr_net"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_2 (Conv2D)           multiple                  1792      
                                                                 
 sequential (Sequential)     (1, 20, 20, 64)           743680    
                                                                 
 conv2d_transpose (Conv2DTra  multiple                 36928     
 nspose)                                                         
                                                                 
 conv2d_transpose_1 (Conv2DT  multiple                 36928     
 ranspose)                                                       
                                                                 
 conv2d_23 (Conv2D)          multiple                  1731      
                                                                 
Total params: 821,059
Trainable params: 818,499
Non-trainabl

In [9]:
class FNet(tf.keras.Model):
    def __init__(self):
        super(FNet, self).__init__()
        #input dim = 6
        kernel_size = 3
        kernal_init = tf.keras.initializers.he_normal()
        self.convpool_1 = FNetBlock(32, "maxpool")
        self.convpool_2 = FNetBlock(64, "maxpool")
        self.convpool_3 = FNetBlock(128, "maxpool")
        self.convbin_1 = FNetBlock(256, "bilinear")
        self.convbin_2 = FNetBlock(128, "bilinear")
        self.convbin_3 = FNetBlock(64, "bilinear")
        self.conv1 = Conv2D(32, kernel_size = 3, strides = 1, padding = 'same', kernel_initializer= kernal_init)
        self.conv2 = Conv2D(2, kernel_size = 3, strides = 1, padding = 'same', kernel_initializer= kernal_init)
    def call(self, inputs):
        x = self.convpool_1(inputs)
        x = self.convpool_2(x)
        x = self.convpool_3(x)
        x = self.convbin_1(x)
        x = self.convbin_2(x)
        x = self.convbin_3(x)
        x = self.conv1(x)
        x = tf.nn.leaky_relu(x)
        x = self.conv2(x)
        x = tf.nn.tanh(x)
        self.x = x
        return x

In [10]:
#FNet test
f = FNet()
x = np.ones((1, 40, 40, 6))
x = f(x)
x.shape

TensorShape([1, 40, 40, 2])

In [11]:
class SpaceToDepth(tf.keras.Model):
    def __init__(self, block_size):
        super(SpaceToDepth, self).__init__()
        self.block_size = block_size
        self.block_size_sq = block_size*block_size
        self.permute = Permute((2, 3, 1))
        self.p0 = Permute((2,1,3))
        self.p1 = Permute((3,1,2))
        
    def call(self, inputs):
        out = self.permute(inputs)
        (batch_size, s_height, s_width, s_depth) = out.shape
        d_depth = s_depth*self.block_size_sq
        d_width = int(s_width/self.block_size)
        d_height = int(s_height/self.block_size)
        t_1 = tf.split(out, int(s_width/self.block_size), 2)
        stack = [tf.reshape(t_t,(batch_size, d_height, d_depth)) for t_t in t_1]
        out = tf.stack(stack, 1)
        out = self.p0(out)
        out = self.p1(out)
        return out

In [83]:
#Test SpaceToDepth
a = np.ones((1, 6, 10, 10))
s = SpaceToDepth(2)
s(a).shape

TensorShape([1, 24, 5, 5])

In [104]:
class FSVSR(tf.keras.Model):
    def __init__(self, batch_size, lr_height, lr_width):
        super(FSVSR, self).__init__()
        self.SRFactor = 4
        self.batch_size = batch_size
        self.height = lr_height
        self.width = lr_width
        self.fnet = FNet()
        self.todepth = SpaceToDepth(self.SRFactor)
        self.srnet = SRNet()   #self.SRFactor*self.SRFactor*3 + 3
        

    def init_hidden(self):
        self.lastLrImage = tf.zeros((self.batch_size, self.height, self.width, 3))
        self.EstHrImage = tf.zeros((self.batch_size, self.height*self.SRFactor, self.width*self.SRFactor, 3))
        width_gap = 2/(self.width)
        height_gap = 2/(self.height)
        height, width = tf.meshgrid(tf.range(-1, 1 , width_gap), tf.range(-1, 1, height_gap))
        self.lr_identity = tf.stack([width, height])
        self.lr_identity = tf.transpose(self.lr_identity, [1, 2, 0])

        height_gap = 2/(self.height*self.SRFactor)
        width_gap = 2/(self.width*self.SRFactor)
        height, width = tf.meshgrid(tf.range(-1, 1 , width_gap), tf.range(-1, 1, height_gap))
        self.hr_identity = tf.stack([width, height])
        
    
    def call(self, input):
        x0x1 = tf.concat([input, self.lastLrImage], axis = 3)

        #debug
        print(x0x1.shape)

        flow = self.fnet(x0x1)
        print(flow.shape)
        # print(self.lr_identity.shape)
        relative_place = flow + self.lr_identity
        # relative_place = tf.transpose(relative_place, [0, 2, 3, 1])
        print(relative_place.shape, 'p')
        print(self.lastLrImage.shape)
        self.EstLrImg = bilinear_sampler(self.lastLrImage, relative_place)
        print(self.EstLrImg.shape)
        relative_placeNCHW = tf.keras.layers.Resizing(self.height*self.SRFactor, self.width*self.SRFactor, interpolation="bilinear")(relative_place)
        print(relative_placeNCHW.shape)
        # relative_placeNCHW = tf.transpose(relative_placeNCHW, [0, 2, 3, 1])
        afterWrap = bilinear_sampler(self.EstHrImage, relative_placeNCHW)
        afterWrap = tf.transpose(afterWrap, [0, 3, 1, 2])
        print(afterWrap.shape)
        depthImage = self.todepth(afterWrap)
        depthImage = tf.transpose(depthImage, [0, 3, 2, 1])
        print(depthImage.shape, 'depth')
        srInput = tf.concat([depthImage, self.EstLrImg], axis = 3)
        estImg = self.srnet(srInput)
        self.lastLrImage = input
        self.EstHrImage = estImg
        return self.EstHrImage, self.EstLrImg
        

In [106]:
f = FSVSR(1, 40, 40)
a = np.ones((1, 40, 40, 3))
f.init_hidden() 
f(a)[0].shape

(1, 40, 40, 6)
(1, 40, 40, 2)
(1, 40, 40, 2) p
(1, 40, 40, 3)
(1, 40, 40, 3)
(1, 160, 160, 2)
(1, 3, 160, 160)
(1, 40, 40, 48) depth


TensorShape([1, 160, 160, 3])

(1, 40, 40, 6)


2022-09-23 07:22:27.451116: W tensorflow/core/framework/op_kernel.cc:1733] INVALID_ARGUMENT: required broadcastable shapes


InvalidArgumentError: Exception encountered when calling layer "fsvsr_1" (type FSVSR).

{{function_node __wrapped__AddV2_device_/job:localhost/replica:0/task:0/device:GPU:0}} required broadcastable shapes [Op:AddV2]

Call arguments received by layer "fsvsr_1" (type FSVSR):
  • input=tf.Tensor(shape=(1, 40, 40, 3), dtype=float32)

In [13]:
import torch
height_i = 500
width_i = 600
height_gap = 2 / (height_i - 1)
width_gap = 2 / (width_i - 1)
height, width = torch.meshgrid([torch.range(-1, 1, height_gap), torch.range(-1, 1, width_gap)])
lr_identity = torch.stack([width, height])

  from .autonotebook import tqdm as notebook_tqdm
  height, width = torch.meshgrid([torch.range(-1, 1, height_gap), torch.range(-1, 1, width_gap)])
  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


In [60]:
def get_pixel_value(img, x, y):
    """
    Utility function to get pixel value for coordinate
    vectors x and y from a  4D tensor image.
    Input
    -----
    - img: tensor of shape (B, H, W, C)
    - x: flattened tensor of shape (B*H*W,)
    - y: flattened tensor of shape (B*H*W,)
    Returns
    -------
    - output: tensor of shape (B, H, W, C)
    """
    shape = tf.shape(x)
    batch_size = shape[0]
    height = shape[1]
    width = shape[2]

    batch_idx = tf.range(0, batch_size)
    batch_idx = tf.reshape(batch_idx, (batch_size, 1, 1))
    b = tf.tile(batch_idx, (1, height, width))

    indices = tf.stack([b, y, x], 3)

    return tf.gather_nd(img, indices)

    
def bilinear_sampler(img, coords):

    H = tf.shape(img)[1]
    W = tf.shape(img)[2]
    max_y = tf.cast(H - 1, 'int32')
    max_x = tf.cast(W - 1, 'int32')
    
    # ----------------- Changes below -------------------------
    # -> padding_mode = 'border'
    # "#o" means original,  "#t" means modified
    # zero = tf.zeros([], dtype='int32')     #o
    zero = tf.zeros([1], dtype=tf.int32)     #t
    eps = tf.constant([0.5], 'float32')      #t

    # rescale x and y to [0, W-1/H-1]
    x, y = coords[:, ..., 0], coords[:, ..., 1]
    x = tf.cast(x, 'float32')
    y = tf.cast(y, 'float32')

    x = 0.5 * ((x + 1.0) * tf.cast(max_x - 1, 'float32'))
    y = 0.5 * ((y + 1.0) * tf.cast(max_y - 1, 'float32'))
    x = tf.clip_by_value(x, eps, tf.cast(max_x, tf.float32) - eps)   #t
    y = tf.clip_by_value(y, eps, tf.cast(max_y, tf.float32) - eps)   #t
    # -------------- Changes above --------------------

    # grab 4 nearest corner points for each (x_i, y_i)
    x0 = tf.cast(tf.floor(x), 'int32')
    x1 = x0 + 1
    y0 = tf.cast(tf.floor(y), 'int32')
    y1 = y0 + 1

    # clip to range [0, H-1/W-1] to not violate img boundaries
    x0 = tf.clip_by_value(x0, zero, max_x)
    x1 = tf.clip_by_value(x1, zero, max_x)
    y0 = tf.clip_by_value(y0, zero, max_y)
    y1 = tf.clip_by_value(y1, zero, max_y)

    # get pixel value at corner coords
    Ia = get_pixel_value(img, x0, y0)
    Ib = get_pixel_value(img, x0, y1)
    Ic = get_pixel_value(img, x1, y0)
    Id = get_pixel_value(img, x1, y1)

    # recast as float for delta calculation
    x0 = tf.cast(x0, 'float32')
    x1 = tf.cast(x1, 'float32')
    y0 = tf.cast(y0, 'float32')
    y1 = tf.cast(y1, 'float32')

    # calculate deltas
    wa = (x1 - x) * (y1 - y)
    wb = (x1 - x) * (y - y0)
    wc = (x - x0) * (y1 - y)
    wd = (x - x0) * (y - y0)

    # add dimension for addition
    wa = tf.expand_dims(wa, axis=3)
    wb = tf.expand_dims(wb, axis=3)
    wc = tf.expand_dims(wc, axis=3)
    wd = tf.expand_dims(wd, axis=3)

    # compute output
    out = tf.add_n([wa * Ia, wb * Ib, wc * Ic, wd * Id])
    return out

In [58]:
lr_identity.shape

torch.Size([2, 500, 600])

In [56]:
h, w= tf.meshgrid(tf.range(-1, 1 + width_gap, width_gap), tf.range(-1, 1 + height_gap, height_gap))

In [59]:
tf.stack([w, h]).shape

TensorShape([2, 500, 600])

In [29]:
h[0].shape

TensorShape([998])

In [15]:
lr_identity.shape

torch.Size([2, 500, 600])

In [44]:
max(tf.range(-1, 1+height_gap, height_gap))

<tf.Tensor: shape=(), dtype=float32, numpy=1.0>

In [40]:
height_gap = 2 / (height_i -1)

In [45]:
max(torch.range(-1, 1, height_gap))

  max(torch.range(-1, 1, height_gap))


tensor(1.)

In [47]:
a = [1,2,3]
b = [4,5,6,7]
tf.meshgrid(a,b)

[<tf.Tensor: shape=(4, 3), dtype=int32, numpy=
 array([[1, 2, 3],
        [1, 2, 3],
        [1, 2, 3],
        [1, 2, 3]], dtype=int32)>,
 <tf.Tensor: shape=(4, 3), dtype=int32, numpy=
 array([[4, 4, 4],
        [5, 5, 5],
        [6, 6, 6],
        [7, 7, 7]], dtype=int32)>]