In [1]:
import tensorflow as tf
from tensorflow import keras
from inspect import signature, Parameter
import numpy as np
import time
import tf_slim as slim

In [40]:
from tensorflow.python.profiler.model_analyzer import profile 
from tensorflow.python.profiler.option_builder import ProfileOptionBuilder

In [3]:
print('TensorFlow:', tf.__version__)

TensorFlow: 2.4.0


In [63]:
preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input#tf.keras.applications.vgg19.preprocess_input
base_model = tf.keras.applications.MobileNetV2(input_shape=(224, 224, 3),
                                            include_top=False,
                                            weights='imagenet')
# base_model = tf.keras.models.load_model('mobv3model.h5')
inputs = tf.keras.Input(shape=(224, 224, 3))
x = preprocess_input(inputs)
x = base_model(x, training=False)
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dropout(0.2)(x)
outputs = tf.keras.layers.Dense(100)(x)
model = tf.keras.Model(inputs, outputs)
model.compile(loss=tf.keras.losses.CategoricalCrossentropy(), optimizer=tf.keras.optimizers.SGD(), metrics=['accuracy'])

In [41]:
def get_total_flops(model):
    forward_pass = tf.function(
        model.call,
        input_signature=[tf.TensorSpec(shape=(1,) + model.input_shape[1:])])

    graph_info = profile(forward_pass.get_concrete_function().graph,
                            options=ProfileOptionBuilder.float_operation())
    flops = graph_info.total_float_ops / 2e9 
    print('Flops: {:,}'.format(flops))

In [42]:
get_total_flops(model)

Flops: 0.29980421


In [61]:
class ComputeFlops():
    def __init__(self):
        self._call_sign = signature(self)
        
    def get_temp_model(self, model):
        weights = model.get_weights()
        new_model = tf.compat.v1.keras.models.clone_model(model)
        new_model.build(model.input_shape)
        new_model.set_weights(weights)
        print(new_model)
        return new_model
    
    def can_pipe(self):
        """
        Returns False if its __call__ signature contains *args or **kwargs else True. This is checked
        through python introspection module inspect.signature.
        """
        return not any(p.kind is Parameter.VAR_KEYWORD or p.kind is Parameter.VAR_POSITIONAL
                       for p in self._call_sign.parameters.values())

    def pipe_kwargs_to_call(self, model, data_splits, kwargs):
        """
        Calls itself using `model`, `data_splits` and an arbitrary `kwargs` dict. It uses the
        `bind` method of `inspect.Signature` object.
        """
        kwargs = {k: v for k, v in kwargs.items() if k in self._call_sign.parameters.keys()}
        bounded_args = self._call_sign.bind(model, data_splits, **kwargs)
        return self(**bounded_args.arguments)
    def get_bounded_status_keys(self):
        return Flops()

    def __call__(self, model, img_size=(224,224), batch_size=1, device=Device.CPU, include_weights=True):
        temp_model = self.get_temp_model(model)

        with tf.device('gpu' if device == Device.GPU else 'cpu'):
            return self._compute_flops(temp_model, img_size, batch_size=batch_size, device=device,
                                       include_weights=include_weights)

    # HAS TO RETURN A TUPLE IN THE SAME ORDER OF STATUSKEYS
    def _compute_flops(self, model, img_size, batch_size=1, device=Device.CPU, include_weights=True):
        graph = tf.compat.v1.Graph()
        # gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.333)
        session = tf.compat.v1.Session(graph=graph)  # , config=tf.ConfigProto(gpu_options=gpu_options))

        with graph.as_default():
            with session.as_default():
                temp_model = tf.compat.v1.keras.models.clone_model(model)
                loss = tf.keras.losses.MeanSquaredError()
                optimizer = tf.keras.optimizers.Adam(learning_rate=0.1)
                temp_model.compile(optimizer=optimizer, loss=loss)
                data = np.random.randn(1,img_size[0], img_size[1], 3)
                _ = temp_model(data, training=False)
                opts = (tf.compat.v1.profiler.ProfileOptionBuilder(
                    tf.compat.v1.profiler.ProfileOptionBuilder.float_operation())
                        .with_empty_output()
                        .build())
                flops = tf.compat.v1.profiler.profile(graph=graph, run_meta=tf.compat.v1.RunMetadata(), cmd='op', options=opts)
                session.close()

        del session
        flops = getattr(flops, 'total_float_ops',
                        0) / 2e9  # Giga Flops - Counting only the flops of forward pass

        return flops


In [62]:
flops = ComputeFlops()
flops(model, img_size=(224,224))

<tensorflow.python.keras.engine.functional.Functional object at 0x000002119DB0C760>


0.302156161

In [64]:
### Execution Time
from enum import Enum

class Device(Enum):
    CPU = 'cpu'
    GPU = 'gpu'



class ComputeExecutionTime():
    def __init__(self):
        self._call_sign = signature(self)

    def can_pipe(self):
        """
        Returns False if its __call__ signature contains *args or **kwargs else True. This is checked
        through python introspection module inspect.signature.
        """
        return not any(p.kind is Parameter.VAR_KEYWORD or p.kind is Parameter.VAR_POSITIONAL
                       for p in self._call_sign.parameters.values())

    def pipe_kwargs_to_call(self, model, data_splits, kwargs):
        """
        Calls itself using `model`, `data_splits` and an arbitrary `kwargs` dict. It uses the
        `bind` method of `inspect.Signature` object.
        """
        kwargs = {k: v for k, v in kwargs.items() if k in self._call_sign.parameters.keys()}
        bounded_args = self._call_sign.bind(model, data_splits, **kwargs)
        return self(**bounded_args.arguments)
    
    def get_bounded_status_keys(self):
        return ExecutionTime()
    
    def get_temp_model(self, model):
        weights = model.get_weights()
        new_model = tf.keras.models.clone_model(model)
        new_model.build(model.input_shape)
        new_model.set_weights(weights)
        return new_model

    def __call__(self, model, input_shape=(224,224), split='train', batch_size=1, device=Device.CPU):
        temp_model = self.get_temp_model(model)
        device = 'gpu' if device == Device.GPU else 'cpu'
        with tf.device(device):
            return self._compute_exectime(temp_model, input_shape, batch_size=batch_size)

    def _compute_exectime(self, model, input_shape, batch_size=1):
        tnum = np.random.randn(batch_size, input_shape[0], input_shape[1], 3)

        # START BENCHMARKING
        steps = 10
        fp_time = 0.

        # DRY RUNS
        for i in range(steps):
            _ = model(tnum, training=False)

        class timecallback(tf.keras.callbacks.Callback):
            def __init__(self):
                self.batch_times = 0
                self.step_time_start_batch = 0

            def on_predict_batch_begin(self, batch, logs=None):
                self.step_time_start_batch = time.perf_counter()

            def on_predict_batch_end(self, batch, logs=None):
                self.batch_times = time.perf_counter() - self.step_time_start_batch

        tt = time.perf_counter()
        ctlTime = time.perf_counter() - tt
        tcb = timecallback()
        for i in range(steps):
            _ = model.predict(tnum, batch_size=batch_size, callbacks=[tcb])
            if i > 0:
                fp_time += (tcb.batch_times - ctlTime)
        fp_time = fp_time / (steps - 1) / batch_size
        execution_time = fp_time * 1000
        return execution_time


In [65]:
timer = ComputeExecutionTime()
# data_splits = make_random_datasplits((224,224))
# print(model)
exectime = timer(model, input_shape=(224,224))
print(exectime)

17.320055555804476


In [46]:
class ModelSize():
    
    def __init__(self):
        self.value = None
        
    def get_value(self):
        """
        Optionally rescale the value. Mostly used for formatting when sending to pretty format on stdout
        """
        return self.value
        
    NAME = 'model_size'

    @staticmethod
    def description():
        return "Memory consumed by the parameters (weights and biases) of the model"

    @staticmethod
    def friendly_name():
        return "Model Size"

    def get_comparative(self):
        return Comparative.RECIPROCAL

    # TODO this should be dynamic
    def get_units(self):
        return 'MB'

class MemoryFootprint():
    def __init__(self):
        self.value = None
    
    def get_value(self):
        """
        Optionally rescale the value. Mostly used for formatting when sending to pretty format on stdout
        """
        return self.value
    
    NAME = 'memory_footprint'

    @staticmethod
    def description():
        return "Total memory consumed by parameters and activations per single image (batch_size=1)"

    @staticmethod
    def friendly_name():
        return "Memory Footprint"

    def get_comparative(self):
        return Comparative.RECIPROCAL

    # TODO this should be dynamic
    def get_units(self):
        return 'MB'

class ComputeSize():
    def __init__(self):
        self._call_sign = signature(self)

    def can_pipe(self):
        """
        Returns False if its __call__ signature contains *args or **kwargs else True. This is checked
        through python introspection module inspect.signature.
        """
        return not any(p.kind is Parameter.VAR_KEYWORD or p.kind is Parameter.VAR_POSITIONAL
                       for p in self._call_sign.parameters.values())

    def pipe_kwargs_to_call(self, model, data_splits, kwargs):
        """
        Calls itself using `model`, `data_splits` and an arbitrary `kwargs` dict. It uses the
        `bind` method of `inspect.Signature` object.
        """
        kwargs = {k: v for k, v in kwargs.items() if k in self._call_sign.parameters.keys()}
        bounded_args = self._call_sign.bind(model, data_splits, **kwargs)
        return self(**bounded_args.arguments)

    def get_temp_model(self, model):
        weights = model.get_weights()
        new_model = tf.keras.models.clone_model(model)
        new_model.build(model.input_shape)
        new_model.set_weights(weights)
        return new_model

    @classmethod
    def _get_bounded_status_keys_cls(cls):
        return ModelSize, MemoryFootprint

    def get_bounded_status_keys(self):
        sk_cls = self._get_bounded_status_keys_cls()
        rval = tuple(cls() for cls in sk_cls)
        return rval

    def __call__(self, model, batch_size=1, device=Device.CPU, include_weights=True):
        sk_cls = self._get_bounded_status_keys_cls()
        temp_model = self.get_temp_model(model)

        with tf.device('gpu' if device == Device.GPU else 'cpu'):
            rval = self._compute_size(temp_model, batch_size=batch_size,
                                      include_weights=include_weights)

        assert len(sk_cls) == len(rval)
        return {x.NAME: y for x, y in zip(sk_cls, rval)}

    # HAS TO RETURN A TUPLE IN THE SAME ORDER OF STATUSKEYS
    def _compute_size(self, model, batch_size=1, include_weights=True):
        model_vars = model.trainable_variables
        _, model_size = slim.model_analyzer.analyze_vars(model_vars, print_info=False)

        activation_size = 0
        for layer in model.layers:
            output_shape = layer.output_shape
            if isinstance(output_shape, list):
                for osp in output_shape:
                    osp = [x for x in osp if x is not None]
                    activation_size += np.product(osp) * batch_size * 4  # 4 bytes
            if isinstance(output_shape, tuple):
                output_shape = [x for x in output_shape if x is not None]
                activation_size += np.product(output_shape) * batch_size * 4  # 4 bytes

        total_input_size = 0
        input_shape = model.layers[0].input_shape
        if isinstance(input_shape, list):
            for isp in input_shape:
                isp = [x for x in isp if x is not None]
                total_input_size += np.product(isp) * batch_size * 4  # 4 bytes
        if isinstance(input_shape, tuple):
            input_shape = [x for x in input_shape if x is not None]
            total_input_size += np.product(input_shape) * batch_size * 4  # 4 bytes

        memory_footprint = int(activation_size + total_input_size)
        if include_weights:
            memory_footprint += model_size
        model_size = abs(model_size / (1024 ** 2.))  # Convert bytes to MB
        memory_footprint = abs(memory_footprint / (1024 ** 2.))  # Convert bytes to MB

        return model_size, memory_footprint

In [47]:
size = ComputeSize()
size(model)

{'model_size': 8.972061157226562, 'memory_footprint': 11.518341064453125}

In [48]:
class ComputeLayerwiseSummary():
    def __init__(self):
        self._call_sign = signature(self)

    def can_pipe(self):
        """
        Returns False if its __call__ signature contains *args or **kwargs else True. This is checked
        through python introspection module inspect.signature.
        """
        return not any(p.kind is Parameter.VAR_KEYWORD or p.kind is Parameter.VAR_POSITIONAL
                       for p in self._call_sign.parameters.values())

    def pipe_kwargs_to_call(self, model, data_splits, kwargs):
        """
        Calls itself using `model`, `data_splits` and an arbitrary `kwargs` dict. It uses the
        `bind` method of `inspect.Signature` object.
        """
        kwargs = {k: v for k, v in kwargs.items() if k in self._call_sign.parameters.keys()}
        bounded_args = self._call_sign.bind(model, data_splits, **kwargs)
        return self(**bounded_args.arguments)
    
    def get_temp_model(self, model):
        weights = model.get_weights()
        new_model = tf.keras.models.clone_model(model)
        new_model.build(model.input_shape)
        new_model.set_weights(weights)
        return new_model
    
    def __call__(self, model, batch_size=1, device=Device.CPU, include_weights=True):
        temp_model = self.get_temp_model(model)

        with tf.device('gpu' if device == Device.GPU else 'cpu'):
            return self._compute_layerwise_summary(temp_model, batch_size=batch_size,
                                                   device=device,
                                                   include_weights=include_weights)

    # HAS TO RETURN A TUPLE IN THE SAME ORDER OF STATUSKEYS
    def _compute_layerwise_summary(self, model, batch_size=1, device=Device.CPU,
                                   include_weights=True):
        stringlist = []
        model.summary(print_fn=stringlist.append)
        stringlist = stringlist[1:-4]
        summary_str = "\n".join(stringlist)

        return summary_str

In [49]:
summary = ComputeLayerwiseSummary()
print(summary(model))

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_16 (InputLayer)        [(None, 224, 224, 3)]     0         
_________________________________________________________________
tf.math.truediv_7 (TFOpLambd (None, 224, 224, 3)       0         
_________________________________________________________________
tf.math.subtract_7 (TFOpLamb (None, 224, 224, 3)       0         
_________________________________________________________________
mobilenetv2_1.00_224 (Functi (None, 7, 7, 1280)        2257984   
_________________________________________________________________
global_average_pooling2d_7 ( (None, 1280)              0         
_________________________________________________________________
dropout_7 (Dropout)          (None, 1280)              0         
_________________________________________________________________
dense_7 (Dense)              (None, 100)               128100    


In [50]:
class ComputeParams():
    def __init__(self):
        self._call_sign = signature(self)

    def can_pipe(self):
        """
        Returns False if its __call__ signature contains *args or **kwargs else True. This is checked
        through python introspection module inspect.signature.
        """
        return not any(p.kind is Parameter.VAR_KEYWORD or p.kind is Parameter.VAR_POSITIONAL
                       for p in self._call_sign.parameters.values())

    def pipe_kwargs_to_call(self, model, data_splits, kwargs):
        """
        Calls itself using `model`, `data_splits` and an arbitrary `kwargs` dict. It uses the
        `bind` method of `inspect.Signature` object.
        """
        kwargs = {k: v for k, v in kwargs.items() if k in self._call_sign.parameters.keys()}
        bounded_args = self._call_sign.bind(model, data_splits, **kwargs)
        return self(**bounded_args.arguments)
    
    def get_temp_model(self, model):
        weights = model.get_weights()
        new_model = tf.keras.models.clone_model(model)
        new_model.build(model.input_shape)
        new_model.set_weights(weights)
        return new_model

    def __call__(self, model, batch_size=1, device=Device.CPU, include_weights=True):
        temp_model = self.get_temp_model(model)

        with tf.device('gpu' if device == Device.GPU else 'cpu'):
            return self._compute_params(temp_model, batch_size=batch_size, device=device,
                                        include_weights=include_weights)

    # HAS TO RETURN A TUPLE IN THE SAME ORDER OF STATUSKEYS
    def _compute_params(self, model, batch_size=1, device=Device.CPU, include_weights=True):
        model_vars = model.trainable_variables
        num_params, _ = slim.model_analyzer.analyze_vars(model_vars, print_info=False)

        params = num_params / 1e6  # Million Flops
        return str(params)+' Million'

In [51]:
param = ComputeParams()
param(model)

'2.351972 Million'

In [28]:
#### Test

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar100.load_data()
# x_train = x_train.astype('float32') / 255
# x_test = x_test.astype('float32') / 255
# y_train = np.eye(100)[y_train.reshape(-1)]
# y_test = np.eye(100)[y_test.reshape(-1)]
data = make_random_datasplits((224,224))




In [11]:
from keras_flops import get_flops

get_flops(model)

12867108

In [25]:
batch_size = 1
img_size = (32,32)
data = np.random.randn(1,img_size[0], img_size[1], 3)

In [26]:
model(data)

ValueError: Your Layer or Model is in an invalid state. This can happen for the following cases:
 1. You might be interleaving estimator/non-estimator models or interleaving models/layers made in tf.compat.v1.Graph.as_default() with models/layers created outside of it. Converting a model to an estimator (via model_to_estimator) invalidates all models/layers made before the conversion (even if they were not the model converted to an estimator). Similarly, making a layer or a model inside a a tf.compat.v1.Graph invalidates all layers/models you previously made outside of the graph.
2. You might be using a custom keras layer implementation with  custom __init__ which didn't call super().__init__.  Please check the implementation of <class 'tensorflow.python.keras.engine.functional.Functional'> and its bases.