In [None]:
import tensorflow as tf
import datetime, os
import tensorflow_datasets as tfds
import numpy as np
import matplotlib.pyplot as plt
import keras
from keras import utils as np_utils
from keras.models import Sequential
from keras.layers import Dense, Dropout, AvgPool2D, Conv2D, Input

%matplotlib inline

In [52]:
(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)

the_shape = (28, 28, 1)

def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label

ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(32)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)

ds_test = ds_test.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(32)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)

In [None]:
# model = tf.keras.applications.VGG16(
#     include_top=True,
#     weights="imagenet",
#     input_tensor=None,
#     input_shape=None,
#     pooling=None,
#     classes=1000,
#     classifier_activation="softmax",
# )
# model.summary()


**The receptive field calculate function :**

In [62]:
from prettytable import PrettyTable

INPUT_SHAPE = (28, 28, 1)
MAX_RECEPTIVE = 28 * 28 * 1
KERNEL_SIZE = (3, 3)

POOLING_LAYERS = ['MaxPooling2D', 'AveragePooling2D']
CONV_LAYERS = ['Conv2D']
GLOBAL_POOLING_LAYERS = ['GlobalAveragePooling2D']

def get_receptive_field(model, should_print=False):
  receptive_field, jump = 1, 1
  t = PrettyTable(['Model Layer Name', 'Class Name', 'Receptive Field', 'Stride', 'Jump', 'Kernel'])
  for current_layer in model.layers:
    layer_class = current_layer.__class__.__name__
    layer_name = current_layer.get_config()['name']
    stride = 1
    kernel_size = 1
    if layer_class in CONV_LAYERS + POOLING_LAYERS:
      stride = current_layer.get_config()['strides'][0]
      if layer_class in CONV_LAYERS:
        kernel_size = current_layer.get_config()['kernel_size'][0]
        receptive_field = min(MAX_RECEPTIVE, receptive_field + (kernel_size - 1) * jump)
        jump = jump * stride
      elif layer_class in POOLING_LAYERS:
        kernel_size = current_layer.get_config()['pool_size'][0]
        receptive_field = min(MAX_RECEPTIVE, receptive_field + (kernel_size - 1) * jump)
        jump = jump * stride
      
      t.add_row([layer_name, layer_class, receptive_field, stride, jump, kernel_size])
    else:
      t.add_row([layer_name, layer_class, receptive_field, stride, jump, kernel_size])
  if should_print:
    print(t)
  return receptive_field

In [60]:
def get_layers_by_conv_count(conv_count: int):
  if conv_count == 0:
    return [
    tf.keras.layers.GlobalAveragePooling2D(input_shape=INPUT_SHAPE),
    tf.keras.layers.Dense(units=512, activation='relu'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(units=256, activation='relu'),
    tf.keras.layers.Dense(units=64, activation='relu'),
    tf.keras.layers.Dropout(0.2),
    ]

  layers = [
  tf.keras.layers.Conv2D(filters=32, kernel_size=KERNEL_SIZE,  activation='relu', input_shape=INPUT_SHAPE)
  ]
  for i in range(conv_count - 1):
    if i % 3 == 1:
      new_layers = [
        tf.keras.layers.Conv2D(filters=8, strides = (2,2), kernel_size=KERNEL_SIZE, activation='relu', padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(0.2),
      ]

    else:
      new_layers = [tf.keras.layers.Conv2D(filters=16, kernel_size=KERNEL_SIZE, activation='relu', padding='same')]
    layers = layers + new_layers
  layers = layers + [
    tf.keras.layers.GlobalAveragePooling2D(),
    tf.keras.layers.Dense(units=256, activation='relu'),
    tf.keras.layers.Dense(units=64, activation='relu'),
    tf.keras.layers.Dropout(0.2),
  ]
  return layers

def generate_model_one_step_more_then_k_rf(k: int):
  model_rf = -1
  conv_count = 0

  while model_rf < k:    
    model = tf.keras.models.Sequential(get_layers_by_conv_count(conv_count))
    model_rf = get_receptive_field(model, should_print=True)
    print(f"model_rf: {model_rf} conv_count: {conv_count}, param count: {model.count_params()}")
    conv_count += 1
  return model


In [63]:
model = generate_model_one_step_more_then_k_rf(20)

+-----------------------------+------------------------+-----------------+--------+------+--------+
|       Model Layer Name      |       Class Name       | Receptive Field | Stride | Jump | Kernel |
+-----------------------------+------------------------+-----------------+--------+------+--------+
| global_average_pooling2d_25 | GlobalAveragePooling2D |        1        |   1    |  1   |   1    |
|           dense_81          |         Dense          |        1        |   1    |  1   |   1    |
|    batch_normalization_15   |   BatchNormalization   |        1        |   1    |  1   |   1    |
|          dropout_35         |        Dropout         |        1        |   1    |  1   |   1    |
|           dense_82          |         Dense          |        1        |   1    |  1   |   1    |
|           dense_83          |         Dense          |        1        |   1    |  1   |   1    |
|          dropout_36         |        Dropout         |        1        |   1    |  1   |   1    |


In [64]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(0.001),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
)

model.fit(
    ds_train,
    epochs=5,
    validation_data=ds_test,
)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7fbf38419640>