In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
from tensorflow.python.client import device_lib
device_lib.list_local_devices()

In [None]:
import numpy as np
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.models import Model
from keras.callbacks import ModelCheckpoint
import os
import random
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.model_selection import ShuffleSplit

In [None]:
def set_all_seeds(seed):
    os.environ["PL_GLOBAL_SEED"] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)
    tf.experimental.numpy.random.seed(seed)
    os.environ['TF_CUDNN_DETERMINISTIC'] = '1'
    os.environ['TF_DETERMINISTIC_OPS'] = '1'
    os.environ["PYTHONHASHSEED"] = str(seed)

set_all_seeds(420)

In [None]:
def top_k_accuracy_score(y_true, y_pred, k=1):
    ranks = np.sum(y_pred >= y_pred[y_true == 1].reshape(-1, 1), axis=1)
    return np.sum(ranks <= k) / ranks.shape[0]

In [None]:
model_mn2 = MobileNetV2(
    input_shape=None, alpha=1.0, include_top=True, weights='imagenet',
    input_tensor=None, pooling=None, classes=1000
)

In [None]:
model_mn2.summary()

In [None]:
layer_name = 'block_15_add'
feature_extractor = Model(inputs=model_mn2.input, outputs=model_mn2.get_layer(layer_name).output)

## Training

In [None]:
plants_names = []

for plant in os.listdir('gdrive/MyDrive/flower_data/encoded/train'):
    plants_names += [plant.split('.npy')[0]]

plants_names.sort()
plant_name_to_id = {plant: i for i, plant in enumerate(plants_names)}

In [None]:
instances = {}
labels = {}

for set_kind in ["test", "train", "val"]:
    X_per_class = []
    y_per_class = []

    for i, file in enumerate(os.listdir(f"gdrive/MyDrive/flower_data/encoded/{set_kind}")):
        print(f"\r[{i}] {set_kind}/{file}", end='')
        plants_encoded = np.load(f"gdrive/MyDrive/flower_data/encoded/{set_kind}/{file}")
        plant_name = file.split('.npy')[0]

        X_per_class += [plants_encoded]
        y_per_class += [plant_name_to_id[plant_name] for _ in range(plants_encoded.shape[0])]

    instances[set_kind] = np.concatenate(X_per_class)
    labels[set_kind] = np.array(y_per_class)

In [None]:
n_classes = len(plants_names)

X_train = instances['train']
y_train = np.eye(n_classes)[labels['train']]


X_valid = instances['val']
y_valid = np.eye(n_classes)[labels['val']]

X_test = instances['test']
y_test = np.eye(n_classes)[labels['test']]

In [None]:
input_layer = tf.keras.layers.Input(shape=[7, 7, 160], name="submodel_inputs")
intermediate = tf.keras.layers.Conv2D(filters=960, kernel_size=1, use_bias=False, name="block_16_expand")(input_layer)
intermediate = tf.keras.layers.BatchNormalization(name="block_16_expand_BN")(intermediate)
intermediate = tf.keras.layers.ReLU(name="block_16_expand_relu")(intermediate)
intermediate = tf.keras.layers.DepthwiseConv2D(
    kernel_size=3, padding='same', use_bias=False, name="block_16_depthwise")(intermediate)
intermediate = tf.keras.layers.BatchNormalization(name="block_16_depthwise_BN")(intermediate)
intermediate = tf.keras.layers.ReLU(name="block_16_depthwise_relu")(intermediate)
intermediate = tf.keras.layers.Conv2D(filters=320, kernel_size=1, use_bias=False, name="block_16_project")(intermediate)
intermediate = tf.keras.layers.BatchNormalization(name="block_16_project_BN")(intermediate)
intermediate = tf.keras.layers.Conv2D(filters=1280, kernel_size=1, use_bias=False, name="Conv_1")(intermediate)
intermediate = tf.keras.layers.BatchNormalization(name="Conv_1_bn")(intermediate)
intermediate = tf.keras.layers.ReLU(name="out_relu")(intermediate)
intermediate = tf.keras.layers.GlobalAveragePooling2D(name="global_average_pooling2d")(intermediate)
intermediate = tf.keras.layers.Dropout(rate=0.2, name="dropout")(intermediate)
output_layer = tf.keras.layers.Dense(
    units=102, activation="softmax", kernel_regularizer=keras.regularizers.l1(0.01), name="predictions")(intermediate)

model = Model(input_layer, output_layer)
model.summary()

for layer in model.layers:
    if layer.name not in ["submodel_inputs", "dropout", "predictions"]:
        layer.set_weights(model_mn2.get_layer(layer.name).get_weights())

In [None]:
model.compile(
    loss="categorical_crossentropy",
    optimizer="nadam",
    metrics=["accuracy"]
)

In [None]:
lr_scheduler = keras.callbacks.ReduceLROnPlateau(factor=0.1, patience=1)

In [None]:
idx = np.arange(X_train.shape[0])
np.random.shuffle(idx)

X_train_shuffled = X_train[idx]
y_train_shuffled = y_train[idx]

# Free some precious RAM
del X_train
del instances['train']

In [None]:
checkpoint = ModelCheckpoint(filepath='model_mobilenetv2_regularized_oxford102',
                             monitor='val_loss',
                             verbose=1,
                             save_best_only=True,
                             mode='min')

history = model.fit(
    X_train_shuffled, y_train_shuffled,
    epochs=10,
    validation_data=(X_valid, y_valid),
    callbacks=[lr_scheduler, checkpoint]
)

In [None]:
model.save("model_mobilenetv2_regularized_oxford102")

In [None]:
!zip -r model_mobilenetv2_regularized.zip model_mobilenetv2_regularized_oxford102/

In [None]:
model = keras.models.load_model('model_mobilenetv2_regularized_oxford102')
print("Validation set accuracy")
preds_valid_1 = model(tf.reshape(X_valid, [-1, 7, 7, 160])).numpy()

print("TOP 1", top_k_accuracy_score(y_valid, preds_valid_1, k=1))
print("TOP 5", top_k_accuracy_score(y_valid, preds_valid_1, k=5))

print("Test set")
preds_test_aug_1 = model(tf.reshape(X_test, [-1, 7, 7, 160])).numpy()

print("TOP 1", top_k_accuracy_score(y_test, preds_test_aug_1, k=1))
print("TOP 5", top_k_accuracy_score(y_test, preds_test_aug_1, k=5))