In [11]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import cai.util
import cai.models
import cai.layers

import math
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import backend
from tensorflow.keras import models
from tensorflow.keras import utils
from tensorflow.keras.applications import imagenet_utils
import tensorflow
from copy import deepcopy

In [25]:
from DepthwiseConv3D import DepthwiseConv3D


ModuleNotFoundError: No module named 'keras.legacy'

In [27]:
!conda install keras=2.3.1

^C


In [18]:
def correct_pad3d(backend, inputs, kernel_size):
    """Returns a tuple for zero-padding for 3D convolution with downsampling.
    # Arguments
        input_size: An integer or tuple/list of 3 integers.
        kernel_size: An integer or tuple/list of 3 integers.
    # Returns
        A tuple.
    """
    #A string, either 'channels_first' or 'channels_last'
    img_dim = 1 # 2 if backend.image_data_format() == 'channels_first' else 1
    #Returns the shape of tensor or variable as a tuple of int or None entries.
    #извлечение чисел - d1, d2, d3
    input_size = backend.int_shape(inputs)[img_dim:(img_dim + 3)]
    print(input_size)

    if isinstance(kernel_size, int):
        kernel_size = (kernel_size, kernel_size, kernel_size)
    
    if input_size[0] is None:
        adjust = (1, 1, 1)
    else:
        adjust = (1 - input_size[0] % 2, 1 - input_size[1] % 2, 1 - input_size[2] % 2)

    correct = (kernel_size[0] // 2, kernel_size[1] // 2, kernel_size[2] // 2)

    return ((correct[0] - adjust[0], correct[0]),
            (correct[1] - adjust[1], correct[1]),
            (correct[2] - adjust[2], correct[2]))

In [20]:
CONV_KERNEL_INITIALIZER = {
    'class_name': 'VarianceScaling',
    'config': {
        'scale': 2.0,
        'mode': 'fan_out',
        # EfficientNet actually uses an untruncated normal distribution for
        # initializing conv layers, but keras.initializers.VarianceScaling use
        # a truncated distribution.
        # We decided against a custom initializer for better serializability.
        'distribution': 'normal'
    }
}

In [23]:
DEFAULT_BLOCKS_ARGS = [
    {'kernel_size': 3, 'repeats': 1, 'filters_in': 32, 'filters_out': 16,
     'expand_ratio': 1, 'id_skip': True, 'strides': 1, 'se_ratio': 0.25},
    {'kernel_size': 3, 'repeats': 2, 'filters_in': 16, 'filters_out': 24,
     'expand_ratio': 6, 'id_skip': True, 'strides': 2, 'se_ratio': 0.25},
    {'kernel_size': 5, 'repeats': 2, 'filters_in': 24, 'filters_out': 40,
     'expand_ratio': 6, 'id_skip': True, 'strides': 2, 'se_ratio': 0.25},
    {'kernel_size': 3, 'repeats': 3, 'filters_in': 40, 'filters_out': 80,
     'expand_ratio': 6, 'id_skip': True, 'strides': 2, 'se_ratio': 0.25},
    {'kernel_size': 5, 'repeats': 3, 'filters_in': 80, 'filters_out': 112,
     'expand_ratio': 6, 'id_skip': True, 'strides': 1, 'se_ratio': 0.25},
    {'kernel_size': 5, 'repeats': 4, 'filters_in': 112, 'filters_out': 192,
     'expand_ratio': 6, 'id_skip': True, 'strides': 2, 'se_ratio': 0.25},
    {'kernel_size': 3, 'repeats': 1, 'filters_in': 192, 'filters_out': 320,
     'expand_ratio': 6, 'id_skip': True, 'strides': 1, 'se_ratio': 0.25}
]

In [22]:
def swish(x):
    """Swish activation function.
    # Arguments
        x: Input tensor.
    # Returns
        The Swish activation: `x * sigmoid(x)`.
    # References
        [Searching for Activation Functions](https://arxiv.org/abs/1710.05941)
    """
    if backend.backend() == 'tensorflow':
        try:
            # The native TF implementation has a more
            # memory-efficient gradient implementation
            return backend.tf.nn.swish(x)
        except AttributeError:
            pass

    return x * backend.sigmoid(x)

In [None]:
def kblock(inputs, activation_fn=swish, drop_rate=0., name='',
          filters_in=32, filters_out=16, kernel_size=3, strides=1,
          expand_ratio=1, se_ratio=0., id_skip=True, kType=1,
          dropout_all_blocks=False):
    """A mobile inverted residual block.
    # Arguments
        inputs: input tensor.
        activation_fn: activation function.
        drop_rate: float between 0 and 1, fraction of the input units to drop.
        name: string, block label.
        filters_in: integer, the number of input filters.
        filters_out: integer, the number of output filters.
        kernel_size: integer, the dimension of the convolution window.
        strides: integer, the stride of the convolution.
        expand_ratio: integer, scaling coefficient for the input filters.
        se_ratio: float between 0 and 1, fraction to squeeze the input filters.
        id_skip: boolean.
    # Returns
        output tensor for the block.
    """
    bn_axis = 3

    # Expansion phase
    filters = filtears_in * expand_ratio
    
    if expand_ratio != 1:
        #x = layers.Conv2D(filters, 1,
        #                 padding='same',
        #                  use_bias=False,
        #                  kernel_initializer=CONV_KERNEL_INITIALIZER,
        #                  name=name + 'expand_conv')(inputs)
        #x = layers.BatchNormalization(axis=bn_axis, name=name + 'expand_bn')(x)
        #x = layers.Activation(activation_fn, name=name + 'expand_activation')(x)
        x = cai.layers.kPointwiseConv2D(last_tensor=inputs, filters=filters, channel_axis=bn_axis, name=name+'expand', activation=activation_fn, has_batch_norm=True, use_bias=False, kType=kType)
    else:
        x = inputs

    # Depthwise Convolution
    if strides == 2:
        x = layers.ZeroPadding3D(padding=correct_pad(backend, x, kernel_size),
                                 name=name + 'dwconv_pad3d')(x)
        conv_pad = 'valid'
    else:
        conv_pad = 'same'
    x = DepthwiseConv3D(kernel_size,
                               strides=strides,
                               padding=conv_pad,
                               use_bias=False,
                               depthwise_initializer=CONV_KERNEL_INITIALIZER,
                               name=name + 'dwconv')(x)
    x = layers.BatchNormalization(axis=bn_axis, name=name + 'bn')(x)
    x = layers.Activation(activation_fn, name=name + 'activation')(x)

    # Squeeze and Excitation phase
    if 0 < se_ratio <= 1:
        filters_se = max(1, int(filters_in * se_ratio))
        se = layers.GlobalAveragePooling2D(name=name + 'se_squeeze')(x)
        if bn_axis == 1:
            se = layers.Reshape((filters, 1, 1), name=name + 'se_reshape')(se)
        else:
            se = layers.Reshape((1, 1, filters), name=name + 'se_reshape')(se)
        #se = layers.Conv2D(filters_se, 1,
        #                   padding='same',
        #                   activation=activation_fn,
        #                   kernel_initializer=CONV_KERNEL_INITIALIZER,
        #                   name=name + 'se_reduce')(se)
        se = cai.layers.kPointwiseConv2D(last_tensor=se, filters=filters_se, channel_axis=bn_axis, name=name+'se_reduce', activation=activation_fn, has_batch_norm=False, use_bias=True, kType=kType)
        #se = layers.Conv2D(filters, 1,
        #                   padding='same',
        #                   activation='sigmoid',
        #                   kernel_initializer=CONV_KERNEL_INITIALIZER,
        #                   name=name + 'se_expand')(se)
        se = cai.layers.kPointwiseConv2D(last_tensor=se, filters=filters, channel_axis=bn_axis, name=name+'se_expand', activation='sigmoid', has_batch_norm=False, use_bias=True, kType=kType)
        x = layers.multiply([x, se], name=name + 'se_excite')

    # Output phase
    #x = layers.Conv2D(filters_out, 1,
    #                  padding='same',
    #                  use_bias=False,
    #                  kernel_initializer=CONV_KERNEL_INITIALIZER,
    #                  name=name + 'project_conv')(x)
    # x = layers.BatchNormalization(axis=bn_axis, name=name + 'project_bn')(x)
    x = cai.layers.kPointwiseConv2D(last_tensor=x, filters=filters_out, channel_axis=bn_axis, name=name+'project_conv', activation=None, has_batch_norm=True, use_bias=False, kType=kType)

    if (drop_rate > 0)  and (dropout_all_blocks):
        x = layers.Dropout(drop_rate,
                noise_shape=(None, 1, 1, 1),
                name=name + 'drop')(x)

    if (id_skip is True and strides == 1 and filters_in == filters_out):
        if (drop_rate > 0)  and (not dropout_all_blocks):
            x = layers.Dropout(drop_rate,
                               noise_shape=(None, 1, 1, 1),
                               name=name + 'drop')(x)
        x = layers.add([x, inputs], name=name + 'add')
    return x

In [14]:
import numpy as np

In [19]:
correct_pad3d(backend, np.random.rand(32, 32, 32, 5, 3), (3, 3, 3))

(32, 32, 5)


((0, 1), (0, 1), (1, 1))

In [7]:
import tensorflow as tf
from tensorflow.keras.layers import ZeroPadding3D 

In [None]:


def kEffNet3D(
        width_coefficient,
        depth_coefficient,
        skip_stride_cnt=-1,
        dropout_rate=0.2,
        drop_connect_rate=0.2,
        depth_divisor=8,
        activation_fn=swish,
        blocks_args=DEFAULT_BLOCKS_ARGS,
        model_name='efficientnet',
        include_top=True,
        input_tensor=None,
        input_shape=None,
        pooling=None,
        classes=1000,
        kType=2,
        concat_paths=True,
        dropout_all_blocks=False,
        name_prefix='k_',
        **kwargs
        ):
        """Instantiates the EfficientNet architecture using given scaling coefficients.
    Optionally loads weights pre-trained on ImageNet.
    Note that the data format convention used by the model is
    the one specified in your Keras config at `~/.keras/keras.json`.
        # Arguments
        width_coefficient: float, scaling coefficient for network width.
        depth_coefficient: float, scaling coefficient for network depth.
        default_size: integer, default input image size.
        dropout_rate: float, dropout rate before final classifier layer.
        drop_connect_rate: float, dropout rate at skip connections.
        depth_divisor: integer, a unit of network width.
        activation_fn: activation function.
        blocks_args: list of dicts, parameters to construct block modules.
        model_name: string, model name.
        include_top: whether to include the fully-connected
            layer at the top of the network.
        input_tensor: optional Keras tensor
            (i.e. output of `layers.Input()`)
            to use as image input for the model.
        input_shape: optional shape tuple, only to be specified
            if `include_top` is False.
            It should have exactly 3 inputs channels.
        pooling: optional pooling mode for feature extraction
            when `include_top` is `False`.
            - `None` means that the output of the model will be
                the 4D tensor output of the
                last convolutional layer.
            - `avg` means that global average pooling
                will be applied to the output of the
                last convolutional layer, and thus
                the output of the model will be a 2D tensor.
            - `max` means that global max pooling will
                be applied.
        classes: optional number of classes to classify images
            into, only to be specified if `include_top` is True, and
    # Returns
        A Keras model instance.
    # Raises
        ValueError: in case of invalid input shape.
    """

    if input_tensor is None:
        #Input() используется для создания экземпляра тензора Keras.
        #shape: Кортеж фигур (целые числа), не включая размер пакета. Например, shape=(32,) указывает, что ожидаемыми входными данными будут пакеты 32-мерных векторов

        #img_input = layers.Input(shape=input_shape)
    else:
        if not backend.is_keras_tensor(input_tensor):#Returns whether x is a Keras tensor.
            img_input = layers.Input(tensor=input_tensor, shape=input_shape)
        else:
            img_input = input_tensor

    bn_axis = 4  #???

    #кол-во round фильтров
    def round_filters(filters, divisor=depth_divisor):
        """Round number of filters based on depth multiplier."""
        filters *= width_coefficient
        new_filters = max(divisor, int(filters + divisor / 2) // divisor * divisor)
        # Make sure that round down does not go down by more than 10%.
        if new_filters < 0.9 * filters:
            new_filters += divisor
        return int(new_filters)
    #кол-во повторов
    def round_repeats(repeats):
        """Round number of repeats based on depth multiplier."""
        return int(math.ceil(depth_coefficient * repeats))

    
    if isinstance(kType, (int)):#Позволяет проверить принадлежность экземпляра к классу
        kTypeList = [kType]
    else:
        kTypeList = kType
    
    # Build stem
    x = img_input
    x = layers.ZeroPadding3D(padding=correct_pad3d(backend, x, (3, 3, 3)),
                             name=name_prefix+'stem_conv_pad3d')(x)

    first_stride = 1 if skip_stride_cnt >= 0 else 2
    x = layers.Conv3D(round_filters(32), 3,
                      strides=first_stride,
                      padding='valid',
                      use_bias=False,
                      kernel_initializer=CONV_KERNEL_INITIALIZER,
                      name=name_prefix+'stem_conv')(x)
    x = layers.BatchNormalization(axis=bn_axis, name=name_prefix+'stem_bn3d')(x)
    x = layers.Activation(activation_fn, name=name_prefix+'stem_activation3d')(x)

    root_layer = x
    output_layers = []
    path_cnt = 0
    for kType in kTypeList:
        x = root_layer
        blocks_args_cp = deepcopy(blocks_args)
        b = 0
        blocks = float(sum(args['repeats'] for args in blocks_args_cp))
        #only the first branch can backpropagate to the input.
        #if path_cnt>0:
        #    x = keras.layers.Lambda(lambda x: tensorflow.stop_gradient(x))(x)
        for (i, args) in enumerate(blocks_args_cp):
            assert args['repeats'] > 0
            # Update block input and output filters based on depth multiplier.
            args['filters_in'] = round_filters(args['filters_in'])
            args['filters_out'] = round_filters(args['filters_out'])

            for j in range(round_repeats(args.pop('repeats'))):
                #should skip the stride
                if (skip_stride_cnt > i) and (j == 0) and (args['strides'] > 1):
                    args['strides'] = 1
                # The first block needs to take care of stride and filter size increase.
                if (j > 0):
                    args['strides'] = 1
                    args['filters_in'] = args['filters_out']
                x = kblock(x, activation_fn, drop_connect_rate * b / blocks,
                          name=name_prefix+'block{}{}_'.format(i + 1, chr(j + 97))+'_'+str(path_cnt), **args,
                          kType=kType, dropout_all_blocks=dropout_all_blocks)
                b += 1
        if (len(kTypeList)>1):
            x = layers.Activation('relu', name=name_prefix+'end_relu'+'_'+str(path_cnt))(x)
        output_layers.append(x)
        path_cnt = path_cnt +1

