<a href="https://colab.research.google.com/github/marshka/ml-20-21/blob/main/assignment_2/src/utils.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 2

In [None]:
import os
import pickle
import urllib.request as http
from zipfile import ZipFile

import tensorflow as tf
import numpy as np
from PIL import Image

from tensorflow.keras import layers as keras_layers
from tensorflow.keras import backend as K
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.models import save_model, load_model


def load_cifar10(num_classes=3):
    """
    Downloads CIFAR-10 dataset, which already contains a training and test set,
    and return the first `num_classes` classes.
    Example of usage:

    >>> (x_train, y_train), (x_test, y_test) = load_cifar10()

    :param num_classes: int, default is 3 as required by the assignment.
    :return: the filtered data.
    """
    (x_train_all, y_train_all), (x_test_all, y_test_all) = cifar10.load_data()

    fil_train = tf.where(y_train_all[:, 0] < num_classes)[:, 0]
    fil_test = tf.where(y_test_all[:, 0] < num_classes)[:, 0]

    y_train = y_train_all[fil_train]
    y_test = y_test_all[fil_test]

    x_train = x_train_all[fil_train]
    x_test = x_test_all[fil_test]

    return (x_train, y_train), (x_test, y_test)


def load_rps(download=False, path='rps', reduction_factor=1):
    """
    Downloads the rps dataset and returns the training and test sets.
    Example of usage:

    >>> (x_train, y_train), (x_test, y_test) = load_rps()

    :param download: bool, default is False but for the first call should be True.
    :param path: str, subdirectory in which the images should be downloaded, default is 'rps'.
    :param reduction_factor: int, factor of reduction of the dataset (len = old_len // reduction_factor).
    :return: the images and labels split into training and validation sets.
    """
    url = 'https://drive.switch.ch/index.php/s/xjXhuYDUzoZvL02/download'
    classes = ('rock', 'paper', 'scissors')
    rps_dir = os.path.abspath(path)
    filename = os.path.join(rps_dir, 'data.zip')
    if not os.path.exists(rps_dir) and not download:
        raise ValueError("Dataset not in the path. You should call this function with `download=True` the first time.")
    if download:
        os.makedirs(rps_dir, exist_ok=True)
        print(f"Downloading rps images in {rps_dir} (may take a couple of minutes)")
        path, msg = http.urlretrieve(url, filename)
        with ZipFile(path, 'r') as zip_ref:
            zip_ref.extractall(rps_dir)
        os.remove(filename)
    train_dir, test_dir = os.path.join(rps_dir, 'train'), os.path.join(rps_dir, 'test')
    print("Loading training set...")
    x_train, y_train = load_images_with_label(train_dir, classes)
    x_train, y_train = x_train[::reduction_factor], y_train[::reduction_factor]
    print("Loaded %d images for training" % len(y_train))
    print("Loading test set...")
    x_test, y_test = load_images_with_label(test_dir, classes)
    x_test, y_test = x_test[::reduction_factor], y_test[::reduction_factor]
    print("Loaded %d images for testing" % len(y_test))
    return (x_train, y_train), (x_test, y_test)


def make_dataset(imgs, labels, label_map, img_size, rgb=True, keepdim=True, shuffle=True):
    x = []
    y = []
    n_classes = len(list(label_map.keys()))
    for im, l in zip(imgs, labels):
        # preprocess img
        x_i = im.resize(img_size)
        if not rgb:
            x_i = x_i.convert('L')
        x_i = np.asarray(x_i)
        if not keepdim:
            x_i = x_i.reshape(-1)
        
        # encode label
        y_i = np.zeros(n_classes)
        y_i[label_map[l]] = 1.
        
        x.append(x_i)
        y.append(y_i)
    x, y = np.array(x).astype('float32'), np.array(y)
    if shuffle:
        idxs = np.arange(len(y))
        np.random.shuffle(idxs)
        x, y = x[idxs], y[idxs]
    return x, y


def load_images(path):
    img_files = os.listdir(path)
    imgs, labels = [], []
    for i in img_files:
        if i.endswith('.jpg'):
            # load the image (here you might want to resize the img to save memory)
            imgs.append(Image.open(os.path.join(path, i)).copy())
    return imgs


def load_images_with_label(path, classes):
    imgs, labels = [], []
    for c in classes:
        # iterate over all the files in the folder
        c_imgs = load_images(os.path.join(path, c))
        imgs.extend(c_imgs)
        labels.extend([c] * len(c_imgs))
    return imgs, labels


def save_keras_model(model, filename):
    """
    Saves a Keras model to disk.
    Example of usage:

    >>> model = Sequential()
    >>> model.add(Dense(...))
    >>> model.compile(...)
    >>> model.fit(...)
    >>> save_keras_model(model, 'my_model.h5')

    :param model: the model to save;
    :param filename: string, path to the file in which to store the model.
    :return: the model.
    """
    save_model(model, filename)


def load_keras_model(filename):
    """
    Loads a compiled Keras model saved with models.save_model.

    :param filename: string, path to the file storing the model.
    :return: the model.
    """
    model = load_model(filename)
    return model


def save_vgg16(model, filename='nn_task2.pkl', additional_args=()):
    """
    Optimize task2 model by only saving the layers after vgg16. This function
    assumes that you only added Flatten and Dense layers. If it is not the case,
    you should include into `additional_args` other layers' attributes you
    need.

    :param filename: string, path to the file in which to store the model.
    :param additional_args: tuple or list, additional layers' attributes to be 
    saved. Default are ['units', 'activation', 'use_bias']
    :return: the path of the saved model.
    """
    filename = filename if filename.endswith('.pkl') else (filename + '.pkl')
    args = ['units', 'activation', 'use_bias', 'name', *additional_args]
    layers = []
    for l in model.layers[1:]:
        layer = dict()
        layer['class'] = l.__class__.__name__
        layer['kwargs'] = {k: getattr(l, k) for k in dir(l) if k in args}
        if l.weights:
            layer['weights'] = l.get_weights()
        layers.append(layer)

    with open(filename, 'wb') as fp:
        pickle.dump(layers, fp)
    
    return os.path.abspath(filename)


def load_vgg16(filename='nn_task2.pkl', img_h=224, img_w=224):
    """
    Loads the model saved with save_vgg16.

    :param filename: string, path to the file storing the model.
    :param img_h: int, the height of the input image.
    :param img_w: int, the width of the input image.
    :return: the model.
    """
    K.clear_session()

    vgg16 = applications.VGG16(weights='imagenet',  
                              include_top=False, 
                              input_shape=(img_h, img_w, 3))
    model = Sequential()
    model.add(vgg16)

    with open(filename, 'rb') as fp:
        layers = pickle.load(fp)
    for l in layers:
        cls = getattr(keras_layers, l['class'])
        layer = cls(**l['kwargs'])
        model.add(layer)
        if 'weights' in l:
            model.layers[-1].set_weights(l['weights'])
    
    model.trainable = False
    return model

In [None]:
(x_train, y_train), (x_test, y_test) = load_rps(download=True, reduction_factor=1)

## Save/load `vgg16` model

In [None]:
from tensorflow.keras import Sequential, applications
from tensorflow.keras.layers import Dense, Flatten

# since VGG16 was trained on high-resolution images using a low resolution might not be a good idea
img_h, img_w = 224, 224

# Build the VGG16 network and download pre-trained weights and remove the last dense layers.
vgg16 = applications.VGG16(weights='imagenet',  
                           include_top=False, 
                           input_shape=(img_h, img_w, 3))
# Freezes the network weights
vgg16.trainable = False

# Now you can use vgg16 as you would use any other layer.
# Example:

net = Sequential()
net.add(vgg16)
net.add(Flatten())
net.add(Dense(1))  # <- JUST AN EXAMPLE TO MAKE A WORKING NETWORK, DON'T COPY
net.summary()

# Save model
path = save_vgg16(net)

# Load model
print("\nReload model\n")
loaded_net = load_vgg16(path)
loaded_net.summary()

assert len(net.weights) == len(loaded_net.weights)
weights_are_equal = [tf.equal(w1, w2).numpy().all()
                     for w1, w2 in zip(net.weights, loaded_net.weights)]
if all(weights_are_equal):
    print("\nThe loaded model has the same weights of the original one.")