In [None]:
!pip install numpy
!pip install tensorflow
!pip install opencv-python-headless
!pip install networks

In [1]:
import os
import numpy as np
from numpy import linalg as LA
import cv2
import tensorflow as tf
from tensorflow.keras.layers import Softmax,Input,MaxPooling2D,Dropout,Dense,Conv2D,Flatten,BatchNormalization
from tensorflow.keras.models import Model
from tensorflow.keras import metrics

In [2]:
len_y = len(os.listdir ('images'))
def load_img(f):
    f=open(f)
    lines=f.readlines()
    imgs, lab = [], []
    for i in range(len(lines)):
        fn, label = lines[i].split(' ')
        
        im1=cv2.imread(fn)
        im1=cv2.resize(im1, (128,128))
        imgs.append(im1)
        lab.append(int(label))
        
    imgs= np.asarray(imgs, np.float32)
    lab= np.asarray(lab, np.int32)
    return imgs, np.eye(len_y)[lab]

x,y = load_img('train.txt')
vx,vy = load_img('val.txt')
tx,ty = load_img('test.txt')

In [3]:
class RandomVectorLayer(tf.keras.layers.Layer):
    def __init__(self, output_dim, seed=None):
        super(RandomVectorLayer, self).__init__()
        self.output_dim = output_dim
        self.generator = tf.random.Generator.from_seed(seed)

    def call(self, inputs):
        random_vector = self.generator.normal(shape=(tf.shape(inputs)[0],self.output_dim))
        random_vector = tf.where(random_vector < 0, 0.0, 1.0)
        column_sums = tf.reduce_sum(random_vector, axis=1)
        all_zeros_columns = tf.where(column_sums == 0)
        for col_index in all_zeros_columns:
            index = int(col_index[0])
            ones_column = tf.ones_like(random_vector[index, :])
            random_vector = tf.tensor_scatter_nd_update(random_vector, [[index]], [ones_column])
        x21 = tf.expand_dims(random_vector, axis=1)
        x22 = tf.expand_dims(x21, axis=1)
        return x22*inputs , random_vector
class Dynamic2D(tf.keras.Model):
    def __init__(self, K):
        super(Dynamic2D, self).__init__()
        self.fc1 = Dense(100, use_bias=True)
        self.fc2 = Dense(K, use_bias=True)
        self.fc3 = Conv2D(K, (3, 3), activation='relu', padding='same')

    def call(self, x, r):
        r = self.fc1(r)
        r = tf.nn.relu(r)
        r = self.fc2(r)
        r = tf.expand_dims(tf.expand_dims(tf.keras.layers.Softmax()(r), axis=1), axis=1)
        x = self.fc3(x)
        x = tf.math.multiply(x,r)
        return x

def net(first_kernel = 64,first_flat = 1024,dynamic=True):
    input_x = Input(shape=(128,128,3))
    x,r = RandomVectorLayer(output_dim=3, seed=42)(input_x)
    if(dynamic):
        x = Dynamic2D(K=first_kernel)(x,r)
    else:
        x = Conv2D(first_kernel, (3, 3), activation='relu', padding='same')(input_x)
    x = MaxPooling2D()(x)
    x = Conv2D(first_kernel*2, (3, 3), activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x1 = Conv2D(first_kernel*2, (3, 3), activation='relu', padding='same')(x)
    x1 = BatchNormalization()(x1)
    x = x + x1
    x = MaxPooling2D()(x)
    x = Conv2D(first_kernel*4, (3, 3), activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x1 = Conv2D(first_kernel*4, (3, 3), activation='relu', padding='same')(x)
    x1 = BatchNormalization()(x1)
    x = x + x1
    x = MaxPooling2D()(x)
    x = Conv2D(first_kernel*8, (3, 3), activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x1 = Conv2D(first_kernel*8, (3, 3), activation='relu', padding='same')(x)
    x1 = BatchNormalization()(x1)
    x = x + x1
    x = MaxPooling2D()(x)
    x = Conv2D(first_kernel*16, (3, 3), activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x1 = Conv2D(first_kernel*16, (3, 3), activation='relu', padding='same')(x)
    x1 = BatchNormalization()(x1)
    x = x + x1
    x = MaxPooling2D()(x)
    x = Flatten()(x)
    x = Dense(first_flat, activation='relu')(x)
    x = Dense(first_flat/4, activation='relu')(x)
    x = Dense(len_y, activation='softmax')(x)
    model = Model(inputs=input_x, outputs=x)
    return model
model = net(first_kernel = 64,first_flat = 1024,dynamic=True)

In [30]:
def evaluate(model,tx,ty,mask,print_stuff = ""):
    sum_predict = 0
    for i in range(len(tx)):
        input_x = np.array([[[mask]]])*tx[i:i+1]
        y_ = model([input_x,np.array([mask])],training = False)
        if(np.argmax(y_[0]) == np.argmax(ty[i])):
            sum_predict += 1
    print(print_stuff,round(sum_predict/len(tx),4),sum_predict,len(tx))

In [4]:
def get_flops(model, model_inputs) -> float:
        if not isinstance(
            model, (tf.keras.models.Sequential, tf.keras.models.Model)
        ):
            raise ValueError(
                "Calculating FLOPS is only supported for "
                "`tf.keras.Model` and `tf.keras.Sequential` instances."
            )

        from tensorflow.python.framework.convert_to_constants import (
            convert_variables_to_constants_v2_as_graph,
        )

        # Compute FLOPs for one sample
        batch_size = 1
        inputs = [
            tf.TensorSpec([batch_size] + list(inp.shape[1:]), inp.dtype)
            for inp in model_inputs
        ]

        # convert tf.keras model into frozen graph to count FLOPs about operations used at inference
        real_model = tf.function(model).get_concrete_function(inputs)
        frozen_func, _ = convert_variables_to_constants_v2_as_graph(real_model)

        # Calculate FLOPs with tf.profiler
        run_meta = tf.compat.v1.RunMetadata()
        opts = (
            tf.compat.v1.profiler.ProfileOptionBuilder(
                tf.compat.v1.profiler.ProfileOptionBuilder().float_operation()
            )
            .with_empty_output()
            .build()
        )

        flops = tf.compat.v1.profiler.profile(
            graph=frozen_func.graph, run_meta=run_meta, cmd="scope", options=opts
        )

        tf.compat.v1.reset_default_graph()

        # convert to GFLOPs
        return (flops.total_float_ops / 1e9)/2

In [34]:
model = net(first_kernel = 64,first_flat = 1024,dynamic=True)
my_optimizer = tf.keras.optimizers.Adam(learning_rate=0.00001)
my_loss = tf.keras.losses.CategoricalCrossentropy(from_logits=False)
my_metrics = [metrics.Accuracy(),metrics.Precision(name='precision'),metrics.Recall(name='recall')]
epochs = 400
batch_size = 5

model.compile(optimizer = my_optimizer, loss = my_loss, metrics = my_metrics)
dynamic_result = model.fit(x, y, epochs=epochs, batch_size=batch_size, validation_data=(vx,vy))

In [31]:
model_copy = Model(inputs=model.input, outputs=model.output)
new_input = model_copy.layers[1].output
new_model = Model(inputs=new_input, outputs=model_copy.output)

In [33]:
mask_arrays = [[i, j, k] for i in range(2) for j in range(2) for k in range(2)][1:]
for mask in mask_arrays:
    evaluate(new_model,tx,ty,mask,print_stuff = mask)

In [40]:
result = """
[0, 0, 1] 0.32 144 450
[0, 1, 0] 0.3089 139 450
[0, 1, 1] 0.3333 150 450
[1, 0, 0] 0.3267 147 450
[1, 0, 1] 0.3267 147 450
[1, 1, 0] 0.3244 146 450
[1, 1, 1] 0.3311 149 450
"""
print(get_flops(new_model, [tx[0:1],np.array([[1,1,1]])]),"GFLOPs")

3.672297524 GFLOPs


In [None]:
model = net(first_kernel = 64,first_flat = 1024,dynamic=False)
my_optimizer = tf.keras.optimizers.Adam(learning_rate=0.00001)
my_loss = tf.keras.losses.CategoricalCrossentropy(from_logits=False)
my_metrics = [metrics.Accuracy(),metrics.Precision(name='precision'),metrics.Recall(name='recall')]
epochs = 300
batch_size = 5

model.compile(optimizer = my_optimizer, loss = my_loss, metrics = my_metrics)
result = model.fit(x, y, epochs=epochs, batch_size=batch_size, validation_data=(vx,vy))

In [13]:
model_copy = Model(inputs=model.input, outputs=model.output)
new_input = model_copy.layers[1].output
new_model = Model(inputs=new_input, outputs=model_copy.output)

In [7]:
def evaluate(model,tx,ty,mask,print_stuff = ""):
    sum_predict = 0
    for i in range(len(tx)):
        input_x = np.array([[[mask]]])*tx[i:i+1]
        y_ = model(input_x,training = False)
        if(np.argmax(y_[0]) == np.argmax(ty[i])):
            sum_predict += 1
    print(print_stuff,round(sum_predict/len(tx),4),sum_predict,len(tx))

In [11]:
mask_arrays = [[i, j, k] for i in range(2) for j in range(2) for k in range(2)][1:]
for mask in mask_arrays:
    evaluate(model,tx,ty,mask,print_stuff = mask)

[0, 0, 1] 0.0311 14 450
[0, 1, 0] 0.0267 12 450
[0, 1, 1] 0.1422 64 450
[1, 0, 0] 0.0356 16 450
[1, 0, 1] 0.0467 21 450
[1, 1, 0] 0.0422 19 450
[1, 1, 1] 0.42 189 450


In [14]:
result = """
[0, 0, 1] 0.0311 14 450
[0, 1, 0] 0.0267 12 450
[0, 1, 1] 0.1422 64 450
[1, 0, 0] 0.0356 16 450
[1, 0, 1] 0.0467 21 450
[1, 1, 0] 0.0422 19 450
[1, 1, 1] 0.42 189 450
"""
print(get_flops(model, [tx[0:1]]),"GFLOPs")

Instructions for updating:
This API was designed for TensorFlow v1. See https://www.tensorflow.org/guide/migrate for instructions on how to migrate your code to TensorFlow v2.
3.671766294 GFLOPs
