# Encoding High-Level Features: An Approach to Robust Transfer Learning.
## This code shows variational autoencoders being used to reduce dimensionality of feaure maps and provide the encoded representation to an image classifier. This work is an attempt to improve the robustness of the classic image classification architecture by performing a simple change to the overall system.

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds

In [None]:
def preprocess_data(image, label):

  image2 = tf.keras.applications.densenet.preprocess_input(image)
  return image2, label

In [None]:
#### CREATE BASE MODEL FOR FEATURE EXTRACTION #####

def base_model():

  base_model = tf.keras.applications.DenseNet121(include_top= False, weights = 'imagenet', input_shape = (224,224,3))
  print(len(base_model.layers))
  for layer in base_model.layers[:200]:
    layer.trainable = False
  for layer in base_model.layers[200:]:
    layer.trainable = True
  base_model.summary()
  model = tf.keras.Sequential()
  model.add(tf.keras.layers.Lambda(lambda img: tf.image.resize(img, (224,224))))
  model.add(base_model)
  model.add(tf.keras.layers.GlobalAveragePooling2D())
  return model


In [None]:
base_model = base_model()

In [None]:
#### LOADING CIFAR10 DATASET

def load_data():
  (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
  x_train = x_train.astype('float32')
  x_test = x_test.astype('float32')

  # Convert class vectors to binary class matrices.
  y_train = tf.keras.utils.to_categorical(y_train, num_classes=10)
  y_test = tf.keras.utils.to_categorical(y_test, num_classes=10)
  train_ds = tf.data.Dataset.from_tensor_slices((x_train[:],y_train[:]))
  test_ds = tf.data.Dataset.from_tensor_slices((x_test[:2000], y_test[:2000]))
  train_ds = train_ds.map(lambda x,y: (preprocess_data(x,y)))
  test_ds = test_ds.map(lambda x,y: (preprocess_data(x,y)))
  batch_size = 100

  train_ds = train_ds.cache().batch(batch_size).prefetch(buffer_size=1000)
  test_ds = test_ds.cache().batch(batch_size).prefetch(buffer_size=1000)
  return train_ds, test_ds

train_ds, test_ds = load_data()

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz


## Let's start by training the classical image classifier composed of a feature extractor (DCNN) and a classifier (neural net).


In [None]:
#### LOADING AND TRAINING THE COMPARATIVE MODEL
compare_model = tf.keras.Sequential()
compare_model.add(base_model)
compare_model.add(tf.keras.layers.Dropout(0.3))
compare_model.add(tf.keras.layers.Dense(10, activation = 'softmax'))

In [None]:
compare_model.compile(loss = 'categorical_crossentropy', metrics = ['accuracy'], optimizer = 'adam')

In [None]:
training1 = compare_model.fit(train_ds, epochs = 30, verbose=1, validation_data = test_ds)

## We then freeze the DCNN to preserve the feature space and generate the feature maps for the dataset at hand.

In [None]:
base_model.trainable = False
feat_maps_cifar = base_model.predict(train_ds, verbose = 1)



## Variational Feature Encoder
This section uses a variational autoencoder composed only of dense layers to perform dimensionality reduction of feature maps. It is trained to reconstruct feature maps from CIFAR10 dataset. Once it is trained we use the encoded representation in the latent space to feed a new classifier. This new system is now composed of the frozen DCNN, the encoder part of the variational autoencoder (VFE) and the classifier.

In [None]:
#### CREATING THE VARIATIONAL AUTOENCODER

## THE SAMPLING BLOCK TAKES THE OUTPUT OF THE MEAN AND VARIANCE BLOCKS.
## IT THEN PROVIDES A DISTRIBUTION TO THE DECODER WHICH HAS THE TASK OF RECONSTRUCTING THE FEATURE MAPS.


class Sampling(tf.keras.layers.Layer):
  """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

  def call(self, inputs):
      z_mean, z_log_var = inputs
      batch = tf.shape(z_mean)[0]
      dim = tf.shape(z_mean)[1]
      epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
      return z_mean + tf.exp(0.5 * z_log_var) * epsilon

## DIMENSIONALITY OF THE LATENT SPACE
latent_dim = 256

encoder_inputs = tf.keras.Input(shape=(1024,))
x = tf.keras.layers.Dense(256, activation = 'relu')(encoder_inputs)
x = tf.keras.layers.BatchNormalization()(x)
z_mean = tf.keras.layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = tf.keras.layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = tf.keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

latent_inputs = tf.keras.Input(shape=(latent_dim,))
x = tf.keras.layers.Dense(512, activation='relu')(latent_inputs)
x = tf.keras.layers.BatchNormalization()(x)
decoder_outputs = tf.keras.layers.Dense(1024, activation="relu")(x)
decoder = tf.keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

optimizer = tf.keras.optimizers.Adam()

## DEFINING THE LOSSES TO BE USED IN THE TRAINING PROCESS
## MEAN SQUARRED ERROR
def mse_loss(y_true, y_pred):

    r_loss = tf.reduce_mean(tf.square(y_true - y_pred))
    ## MULTIPLICATIVE FACTOR ON THE TOTAL LOSS (1000)
    return 1000*r_loss

##Kullblack-Leibler Divergence Factor
def kl_loss(mean, log_var):

    kl_loss =  -0.5 * (1 + log_var - tf.square(mean) - tf.exp(log_var))
    ## BETA = 1
    return 1*kl_loss

## TOTAL LOSS
def vae_loss(y_true, y_pred, mean, var):

    r_loss = mse_loss(y_true, y_pred)
    kl_loss_ = kl_loss(mean, var)
    return  r_loss + kl_loss_



## DEFINING THE TRAINING PROCESS AND THE OBJECTIVE FUNCTION TO CALCULATE THE LOSS

class VAE(tf.keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = tf.keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = tf.keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = tf.keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):

        with tf.GradientTape() as tape:

            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = mse_loss(data, reconstruction)
            kl_loss_ = kl_loss(z_mean, z_log_var)
            total_loss = vae_loss(data, reconstruction, z_mean, z_log_var)

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss_)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_11 (InputLayer)          [(None, 1024)]       0           []                               
                                                                                                  
 dense_12 (Dense)               (None, 256)          262400      ['input_11[0][0]']               
                                                                                                  
 batch_normalization_3 (BatchNo  (None, 256)         1024        ['dense_12[0][0]']               
 rmalization)                                                                                     
                                                                                                  
 z_mean (Dense)                 (None, 256)          65792       ['batch_normalization_3[0][

In [None]:
vae = VAE(encoder, decoder)
vae.compile(optimizer=tf.keras.optimizers.Adam(), metrics = ['accuracy'])
training_vae_cifar = vae.fit(feat_maps_cifar, epochs=2000, batch_size=128)

## Once the training of the VFE is done we freeze it and connect to the feature extractor to train a new classifier that will take encoded representations of feature maps and map it to given classes.

In [None]:
vae.trainable = False

In [None]:
def classifier():

  inputs = tf.keras.Input(shape = (256,))
  x = tf.keras.layers.Dropout(0.3)(inputs)
  outputs = tf.keras.layers.Dense(10, activation = 'softmax')(x)
  classifier = tf.keras.Model(inputs, outputs, name = 'classifier')
  return classifier

classifier = classifier()

In [None]:
## CREATING THE IMAGE CLASSIFIER WITH VFE

model1 = tf.keras.Model(base_model.input, vae.encoder(base_model.output))
model_global = tf.keras.Model(model1.input, classifier(model1.output[2]))

In [None]:
model1.summary()

Model: "model_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lambda_input (InputLayer)   [(None, 32, 32, 3)]       0         
                                                                 
 lambda (Lambda)             (None, 224, 224, 3)       0         
                                                                 
 densenet121 (Functional)    (None, 7, 7, 1024)        7037504   
                                                                 
 global_average_pooling2d (G  (None, 1024)             0         
 lobalAveragePooling2D)                                          
                                                                 
 encoder (Functional)        [(None, 256),             395008    
                              (None, 256),                       
                              (None, 256)]                       
                                                           

In [None]:
model_global.summary()

Model: "model_7"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lambda_input (InputLayer)   [(None, 32, 32, 3)]       0         
                                                                 
 lambda (Lambda)             (None, 224, 224, 3)       0         
                                                                 
 densenet121 (Functional)    (None, 7, 7, 1024)        7037504   
                                                                 
 global_average_pooling2d (G  (None, 1024)             0         
 lobalAveragePooling2D)                                          
                                                                 
 encoder (Functional)        [(None, 256),             395008    
                              (None, 256),                       
                              (None, 256)]                       
                                                           

In [None]:
model_global.compile(optimizer= 'Adam', loss='categorical_crossentropy', metrics=['accuracy'])
training = model_global.fit(train_ds, epochs = 50, verbose = 1, validation_data=test_ds)