#Tiny Imaget Classification

##Load data

In [None]:
# Execute just in remote environment

!wget 'http://cs231n.stanford.edu/tiny-imagenet-200.zip'
!unzip -qq 'tiny-imagenet-200.zip'

## Fine tuning source code

In [None]:
import os, shutil
import pandas as pd


def change_validation_scaffolding(base_route, definition_file, separator):
  validation_data = _load_validation_data(definition_file, separator)
  
  for row in validation_data.iterrows():
    file = row[1]["file"]
    label = row[1]["label"]
    
    label_folder = os.path.join(base_route, label)
    
    if not os.path.exists(label_folder):
      os.mkdir(label_folder)
    
    shutil.move(os.path.join(base_route, file), os.path.join(label_folder, file))


def _load_validation_data(definition_file, separator):
  validation_data = pd.read_csv(
    definition_file,
    sep=separator,
    header=None
  )
  
  validation_data.columns = ["file", "label", "0", "1", "2", "3"]
  
  return validation_datachange_validation_scaffolding("tiny-imagenet-200/val/images", "tiny-imagenet-200/val/val_annotations.txt", '\t')

In [None]:
change_validation_scaffolding("tiny-imagenet-200/val/images", "tiny-imagenet-200/val/val_annotations.txt", '\t')

In [None]:
import os
import math

from tensorflow.keras.applications.vgg19 import VGG19
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.python.keras.applications.densenet import DenseNet201
from keras.applications.densenet import DenseNet121

from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.layers import Dense, Dropout, GlobalAveragePooling2D, GlobalMaxPooling2D, Concatenate, BatchNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.preprocessing import image


class ImageModel:
    def __init__(self, base_route, model_name, model_route = "model.h5", 
                 train_folder="train", validation_folder="val", epochs=10, 
                 fine_tune: bool = False, fine_tune_epochs = 5):
                
        self.model = None
        self.__base_model = None
        self.__train_directory_iterator = None
        self.__validation_directory_iterator = None
        
        self.__width = self.__height = 64
        self.__train_route = os.path.join(base_route, train_folder)
        self.__validation_route = os.path.join(base_route, validation_folder)
        self.__model_name = model_name
        self.__model_route = model_route
        
        self.__fine_tuning = fine_tune
        
        self.__epochs = epochs
        self.__batch_size = 64
        self.__fine_tune_epochs = fine_tune_epochs
        
        self.__early_stop = EarlyStopping(monitor='val_acc', min_delta=0, patience=3, verbose=1, mode='auto')
        self.__checkpoint = self._get_model_checkpoint()
        
        self.train_size = 0
        self.validation_size = 0
        self.train_steps = 0
        self.validation_steps = 0
        
    def build(self):
        self.__train_directory_iterator = self._get_directory_iterator(self.__train_route, True)
        self.__validation_directory_iterator = self._get_directory_iterator(self.__validation_route)
        
        self.train_size = self.__train_directory_iterator.samples
        self.validation_size = self.__validation_directory_iterator.samples
        
        self._build_model(self.__train_directory_iterator.num_classes)
    
    def train(self):       
        if self.__fine_tuning:
            self._set_fine_tune()
        else:
            self._set_transfer_learning()
        
        self.__model.fit_generator(
            self.__train_directory_iterator,
            steps_per_epoch=self.train_steps,
            epochs=self.__fine_tune_epochs,
            validation_data=self.__validation_directory_iterator,
            validation_steps=self.validation_steps,
            callbacks=[self.__checkpoint, self.__early_stop]
        )
        
        self.fit_all(train=self.__train_directory_iterator, val=self.__validation_directory_iterator)
        
        self.__model.save(self.__model_route)
        
        metrics = self.__model.evaluate_generator(self.__validation_directory_iterator)
        
        return metrics
    
    def fit_all(self, train, val):
        for layer in self.__model.layers:
            layer.trainable = True
            
        self.__model.compile(optimizer=SGD(lr=0.01, momentum=0.6),
                     loss='categorical_crossentropy',
                     metrics=['accuracy'])
        
        self.__model.fit_generator(
            train,
            steps_per_epoch=self.train_steps,
            epochs=self.__epochs,
            validation_data=val,
            validation_steps=self.validation_steps,
            callbacks=[self.__checkpoint, self.__early_stop]
        )
    
    def _build_model(self, num_classes: int):
        
        if self.__model_name == "vgg19":
            self.__base_model = VGG19(weights='imagenet', include_top=False, input_shape=(self.__width, self.__height, 3))
        elif self.__model_name == "resnet":
            self.__base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(self.__width, self.__height, 3))
        elif self.__model_name == "densenet121":
            self.__base_model = DenseNet121(weights='imagenet', include_top=False, input_shape=(self.__width, self.__height, 3))
        elif self.__model_name == "DenseNet201":
            self.__base_model = DenseNet201(weights='imagenet', include_top=False, input_shape=(self.__width, self.__height, 3))
        
        x = self.__base_model.output

        x = Concatenate()([GlobalAveragePooling2D()(x), GlobalMaxPooling2D()(x)])
        x = Dropout(0.2)(x)
        x = Dense(1024 / 2, activation='relu')(x)
        
        x = BatchNormalization()(x)
        x = Dropout(0.4)(x)
        predictions = Dense(num_classes, activation='softmax')(x)

        self.__model = Model(inputs=self.__base_model.input, outputs=predictions)
    
    def _set_transfer_learning(self):
        for layer in self.__base_model.layers:
            layer.trainable = False
        
        self.__model.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['accuracy'])
    
    def _set_fine_tune(self):
        layers_to_freeze = int(len(self.__base_model.layers) * 0.9)
        
        for layer in self.__model.layers[:layers_to_freeze]:
            layer.trainable = False
        for layer in self.__model.layers[layers_to_freeze:]:
            layer.trainable = True
        
        self.__model.compile(
            optimizer=SGD(lr=0.02, momentum=0.7),
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )
    
    def _get_model_checkpoint(self):
        return ModelCheckpoint(
            self.__model_route,
            monitor='val_acc',
            verbose=1,
            save_best_only=True,
            save_weights_only=False,
            mode='auto',
            period=1
        )
    
    def _get_directory_iterator(self, route, is_train: bool = False):
      image_generator = image.ImageDataGenerator(rescale=1.0 / 255, horizontal_flip=is_train, 
                                                 vertical_flip=is_train)
        
      return image_generator.flow_from_directory(
          directory=route,
          target_size=(self.__width, self.__height),
          batch_size=self.batch_size,
          class_mode="categorical")    
    
    @property
    def train_size(self):
        return self.__train_size
    
    @train_size.setter
    def train_size(self, train_size):
        self.__train_size = train_size
        self.train_steps = math.ceil(self.train_size / self.batch_size)
    
    @property
    def validation_size(self):
        return self.__validation_size
    
    @validation_size.setter
    def validation_size(self, validation_size):
        self.__validation_size = validation_size
        self.validation_steps = math.ceil(self.validation_size / self.batch_size)
    
    @property
    def batch_size(self):
        return self.__batch_size
    
    @batch_size.setter
    def batch_size(self, batch_size):
        self.__batch_size = batch_size
        self.train_steps = math.ceil(self.train_size / self.batch_size)
        self.validation_steps = math.ceil(self.validation_size / self.batch_size)
        
    @property
    def model(self):
        return self.__model
      
    @model.setter
    def model(self, model):
        self.__model = model


## VGG19 fine tuning

In [None]:
vgg = ImageModel(base_route="tiny-imagenet-200",
                 epochs=20, 
                 train_folder="train",
                 validation_folder="val/images", 
                 fine_tune=True, 
                 fine_tune_epochs = 3,
                 model_name = "vgg19", 
                 model_route = "vgg19_2.h5")
vgg.build()
vgg.model.summary()

In [None]:
vgg.train()

In [None]:
vgg_model = vgg.model
vgg_model.save("models/vgg19_ft_v2.h5")

In [None]:
!cp 'models/vgg19_ft_v2.h5' 'drive/My Drive/Colab Notebooks/TFM-image-feature-selection/models/fine_tuned/vgg19_ft_v2.h5'

## ResNet50 fine tuning

In [None]:
resnet = ImageModel(base_route="tiny-imagenet-200", 
                    epochs=10, 
                    train_folder="train", 
                    validation_folder="val/images",
                    fine_tune=True, fine_tune_epochs = 2, 
                    model_name = "resnet",
                    model_route = "resnet50_v2.h5")
resnet.build()
resnet.model.summary()

In [None]:
resnet.train()

In [None]:
resnet_model = resnet.model
resnet_model.save("models/resnet50_v2.h5")

In [None]:
!cp 'models/resnet50_v2.h5' 'drive/My Drive/Colab Notebooks/TFM-image-feature-selection/models/fine_tuned/resnet50_v2.h5'

## DenseNet201 fine tuning

In [None]:
densenet = ImageModel(base_route="tiny-imagenet-200", 
                      epochs=10, 
                      train_folder="train", 
                      validation_folder="val/images",
                      fine_tune=True, fine_tune_epochs = 2, 
                      model_name = "DenseNet201",
                      model_route = "densenet201_v2.h5")
densenet.build()
densenet.model.summary()

In [None]:
densenet.train()

In [None]:
densenet_model = densenet.model
densenet_model.save("models/densenet201_v2.h5")

In [None]:
!cp 'models/densenet201_v2.h5' 'drive/My Drive/Colab Notebooks/TFM-image-feature-selection/models/fine_tuned/densenet201_v2.h5'

## Super ensemble

In [None]:
from tensorflow.python.keras.models import load_model
from tensorflow.python.keras.applications.vgg19 import VGG19
from tensorflow.python.keras.applications.resnet50 import ResNet50
from tensorflow.python.keras.applications.inception_v3 import InceptionV3
from tensorflow.python.keras.layers import concatenate, GlobalAveragePooling2D, Dense, Input, Dropout
from tensorflow.keras.models import Model

vgg19_model = load_model("models/vgg19_v2.h5")
resnet50_model = load_model("models/resnet50_v2.h5")
densenet_model = load_model("models/densenet201_v2.h5")

In [None]:
vgg19_features = Model(inputs=vgg19_model.input, 
                       outputs=vgg19_model.get_layer('dropout_3').output,
                       name='vgg19_features')


resnet50_features = Model(inputs=resnet50_model.input, 
                          outputs=resnet50_model.get_layer('dropout_9').output, 
                          name='resnet50_features')

densenet_features = Model(inputs=densenet_model.input,
                          outputs=densenet_model.get_layer('dropout_11').output,
                          name='densenet_features')

In [None]:
size = 64

model_input = Input(shape=(size, size, 3))

for layer in vgg19_features.layers:
  layer.trainable = False
  
for layer in resnet50_features.layers:
  layer.trainable = False
  
for layer in densenet_features.layers:
  layer.trainable = False

vgg_x = vgg19_features(model_input)
resnet_x = resnet50_features(model_input)
densenet_x = densenet_features(model_input)

x = concatenate([vgg_x, resnet_x, densenet_x])

##################
x = Dense(1024, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.4)(x)

x = Dense(512, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.6)(x)
##################

predictions = Dense(200, activation='softmax')(x)

model = Model(inputs=model_input, outputs=predictions)

In [None]:
model.summary()

In [None]:
from tensorflow.keras.preprocessing import image

image_generator = image.ImageDataGenerator(rescale=1.0 / 255, horizontal_flip=True, vertical_flip=True)

train_iterator = image_generator.flow_from_directory(
    directory="tiny-imagenet-200/train", target_size=(size, size),
    batch_size=256, class_mode="categorical")

val_iterator = image.ImageDataGenerator(rescale=1.0 / 255).flow_from_directory(
    directory="tiny-imagenet-200/val/images", target_size=(size, size),
    batch_size=256, class_mode="categorical")

In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import SGD

early_stop = EarlyStopping(monitor='val_acc', min_delta=0, patience=10, verbose=1, mode='auto')
checkpoint = ModelCheckpoint("model.h5", monitor='val_acc', verbose=1, save_best_only=True, save_weights_only=False, mode='auto', period=1)

model.compile(optimizer=SGD(lr=0.01, momentum=0.9), loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
model.fit_generator(
    train_iterator,
    steps_per_epoch=390,
    epochs=1,
    validation_data=val_iterator,
    validation_steps=39,
    callbacks=[checkpoint, early_stop])

In [None]:
for layer in model.layers:
  layer.trainable = True

model.compile(optimizer=SGD(lr=0.005, momentum=0.7), loss='categorical_crossentropy', metrics=['accuracy'])

model.fit_generator(
    train_iterator,
    steps_per_epoch=390,
    epochs=10,
    validation_data=val_iterator,
    validation_steps=39,
    callbacks=[checkpoint, early_stop])

In [None]:
model.save("models/ensemble_resnet50_vgg19_densenet201_v2.h5")

In [None]:
!cp 'models/ensemble_resnet50_vgg19_densenet201_v2.h5' 'drive/My Drive/Colab Notebooks/TFM-image-feature-selection/models/fine_tuned/ensemble_resnet50_vgg19_densenet201_v2.h5' 

## Feature extraction

### Load ensemble model

In [None]:
from tensorflow.python.keras.models import load_model
from tensorflow.python.keras.models import Model

ensemble_model = load_model("models/ensemble_resnet50_vgg19_densenet201_v2.h5")

In [None]:
ensemble_model.summary()

In [None]:
feature_extractor = Model(inputs=ensemble_model.input,
                          outputs=ensemble_model.get_layer('concatenate_4').output,
                          name='feature_extractor')

In [None]:
from tensorflow.keras.preprocessing import image

image_generator = image.ImageDataGenerator(rescale=1.0 / 255, horizontal_flip=True, vertical_flip=True)

train_iterator = image_generator.flow_from_directory(
    directory="tiny-imagenet-200/train", target_size=(64, 64),
    batch_size=256, class_mode="categorical")

val_iterator = image.ImageDataGenerator(rescale=1.0 / 255).flow_from_directory(
    directory="tiny-imagenet-200/val/images", target_size=(64, 64),
    batch_size=256, class_mode="categorical")

### Create dataset

In [None]:
train_features = feature_extractor.predict_generator(train_iterator)
train_labels = train_iterator.labels

In [None]:
print(train_features.shape)
print(train_labels.shape)

### Save dataset

In [None]:
import csv
import numpy as np

def save_features(features, labels, file):
  header = [i for i in range(features.shape[-1])]
  header.append(-1)

  with open(file, 'w') as f:
    writer = csv.writer(f)

    writer.writerow(header)
    for i in range(features.shape[0]):
      writer.writerow(np.append(features[i], [labels[i]]))

In [None]:
save_features(train_features, train_labels, 'tiny_imagenet_features_ensemble_v3.csv')

### Alternative method: Create & save dataset

In [None]:
import csv, sys
import numpy as np
from tensorflow.keras.preprocessing import image
from keras_applications.imagenet_utils import preprocess_input

def extract_image_features(image_path, width = 64, height = 64):
  img = image.load_img(image_path, target_size=(width, height))
  x = image.img_to_array(img)
  x = np.expand_dims(x, axis=0)
  x = preprocess_input(x, data_format='channels_last')

  return feature_extractor.predict(x).flatten()

def extract_and_save(file, directory_iterator):
  with open(file, 'w') as f:
    writer = csv.writer(f)
  
    count = 0
    for filepath, label in zip(directory_iterator.filepaths, directory_iterator.labels):
      image_features = extract_image_features(filepath)
      writer.writerow(np.append(image_features, [label]))

      count += 1
      if count % 1000 == 0:
        sys.stdout.write(str((count * 100)/directory_iterator.samples) + " ... ")

In [None]:
extract_and_save('.csv', train_iterator)