# Convolutional Neural Networks for Artwork recognition

## Imports and storage mounting

In [3]:
import numpy as np
import pandas as pd
import tensorflow as tf
from typing import Tuple
from tqdm.notebook import tqdm
import cv2, json, skvideo.io
from pathlib import Path
from typing import Tuple
import pandas as pd
from datetime import datetime
from collections import defaultdict, OrderedDict
from IPython.display import display, display_markdown
from matplotlib import pyplot as plt
from sklearn.metrics import classification_report, roc_curve, confusion_matrix
import seaborn as sn
from typing import Optional, Callable, Any
from itertools import repeat
import torch
from torchvision.datasets import VisionDataset
from torchvision.transforms import transforms
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Dense, Dropout, Flatten, MaxPooling2D, LeakyReLU, BatchNormalization
from tensorflow.keras.layers import Conv2D, Input, GlobalAveragePooling2D
from tensorflow.keras.applications import VGG19, MobileNetV2, InceptionV3
from tensorflow.keras.optimizers import Adam


The storage used below is a Google Drive folder, where I uploaded the artwork videos and related files. 

In [4]:
'''
Please enter your base dir and files dir

base_dir
    - final_result.ipynb
    - files_dir (PEDES)
        - Videos_Cloudy_North Side folder
        - Videos_Cloudy_South Side folder
        - Videos_Day_North Side folder
        - Videos_Day_South Side folder
        - Videos_Night_North Side folder
        - Videos_Night_South Side folder
        - new.csv
        - desc.csv
I have made new.csv from desc.csv using function InfoUpdate():
    There are informations of non-existed video files in desc.csv.
    So I have gotten new.csv file.
'''
base_dir = Path(r"E:\working\obstacle_detection")
files_dir = base_dir / "PEDES"
video_info = "new.csv"

In [23]:
## Considering information for existed video files ##
def InfoUpdate():
    dataset_info = pd.read_csv(files_dir / "desc.csv")
    new = dataset_info.copy()
    inds = []
    for i, row in dataset_info.iterrows():
        if not (files_dir / row["path"]).is_file():
            inds.append(i)
    inds.sort(reverse=True)
    for id in inds:
        new = new.drop(id)
    new.to_csv(files_dir/"new.csv")
InfoUpdate()

## Dataset generation from artwork videos

Here, 
- we generate data generator using pytorch from the artwork videos.
- then, we get dataset from the generator.

In [5]:
'''
First, we consider auxiliary functions.
'''

def get_video_rotation(video_path: str):
    """ Reads video rotation from video metadata using scikit-video.

    Works well with .mp4 files, but has trouble with .mov files due to
    different metadata structure; .mov files also often do not contain rotation
    information, but openCV seems to read frames from .mov files in the correct
    orientation anyway, so things balance out.

    :param video_path: path to the video file
    :return: rotation of video in int degrees (e.g. 0, 90, 180)
    """
    orientation = 0
    try:
        metadata = skvideo.io.ffprobe(video_path)

        for tags in metadata["video"]["tag"]:
            # we get OrderedDicts here
            if tags["@key"] == "rotate":
                orientation = int(tags["@value"])

    except Exception:
        pass

    return orientation

def resize_and_rescale(img, fr_size: int, mean: float, std: float):
    """
    Resizes frames to the desired shape and scale. See
    https://stackoverflow.com/a/58096430 for scale conversion explanation.
    """
    img = tf.image.resize(img, (fr_size, fr_size))
    return (tf.cast(img, tf.float32) - mean) / std

def random_random_crop(img: tf.Tensor):
    """
    Randomly crops 50% of the provided frames. The cropped frames will have a
    random size corresponding to 70-90% of their original height, and 70-90% of
    their original width (the 2 percentages are generated independently). The
    3rd axis of the frame tensor, i.e. the image channels, is not modified.

    NOTE the use of only tf.random functions below, regular Python
    random.random functions won't work with Tensorflow.

    :param img: the frame to be cropped
    :return: the cropped frame
    """
    # lambda function that returns a random boolean, so that random cropping
    # is only applied to 50% of the frames
    rnd_bool = lambda: tf.random.uniform(shape=[], minval=0, maxval=2,
                                         dtype=tf.int32) != 0

    # lambda function that returns a random float in the range 0.7-0.9
    rnd_pcnt = lambda: tf.random.uniform(shape=[], minval=0.7, maxval=0.9,
                                         dtype=tf.float32)

    h, w = int(float(tf.shape(img)[0]) * rnd_pcnt()), int(
        float(tf.shape(img)[1]) * rnd_pcnt())

    return tf.cond(rnd_bool(),
                   lambda: tf.image.random_crop(img, size=[h, w, 3]),
                   lambda: img)

def random_modifications(img, label):
    """
    Applies random modifications to the frame provided. The modifications may
    include variation in brightness, horizontal flipping, and random cropping
    (or none of the previous, in which case the frame will be returned
    untouched).

    :param img: the frame to be modified
    :param label: the corresponding label of the frame, this is returned as is
    :return: the modified frame
    """
    img = random_random_crop(img)
    img = tf.image.random_brightness(img, 0.2)
    img = tf.image.random_flip_left_right(img)
    # img = tf.image.random_hue(img, 0.2)
    return img, label

def split_dataset(dataset: tf.data.Dataset, validation_data_fraction: float):
    """
    Splits a dataset of type tf.data.Dataset into a training and validation
    dataset using given ratio. Fractions are rounded up to two decimal places.
    From https://stackoverflow.com/a/59696126

    :param dataset: the input dataset to split
    :param validation_data_fraction: the fraction of the validation data as a
     float between 0 and 1+
    :return: a tuple of two tf.data.Datasets as (training, validation)
    """
    
    validation_data_percent = round(validation_data_fraction*100)
    if not (0 <= validation_data_percent <= 100):
        raise ValueError("validation data fraction must be ∈ [0,100]")

    dataset = dataset.enumerate()
    train_dataset = dataset.filter(lambda f, data: f % 100 > validation_data_percent)
    validation_dataset = dataset.filter(lambda f, data: f % 100 <= validation_data_percent)

    # remove enumeration
    train_dataset = train_dataset.map(lambda f, data: data)
    validation_dataset = validation_dataset.map(lambda f, data: data)

    return train_dataset, validation_dataset

In [12]:
### using pytorch  ###
class ObstacleImageDataset(VisionDataset):
    def __init__(
            self,
            *,
            videos_dir: Path,
            annotations_file_path: Path,
            train: bool = True,
            size: int = 3000,
            transform: Optional[Callable] = None,
            target_transform: Optional[Callable] = None,
    ) -> None:
        """
        if not transform is passed, PIL images will be returned
        :param videos_dir:
        :type videos_dir:
        :param annotations_file_path:
        :type annotations_file_path:
        :param train:
        :type train:
        :param transform:
        :type transform:
        :param target_transform:
        :type target_transform:
        """
        super().__init__(
            root="",  # we are not making use of the root dir attribute of the class we are inheriting from
            transform=transform,
            target_transform=target_transform,
        )

        assert videos_dir.is_dir(), f"{videos_dir} does not exist!"
        assert annotations_file_path.is_file(), f"{annotations_file_path} not a file!"

        self.annotations: pd.DataFrame = pd.read_csv(annotations_file_path)
        self.videos_dir: Path = videos_dir
        self.size: int = size
        self.transform: Optional[Callable] = transform
        self.target_transform: Optional[Callable] = target_transform

        self.obstacle_types: list[str] = sorted(
            self.annotations["id"].unique()
        )
        if len(self.obstacle_types) > self.size:
            raise ValueError(
                f"{self.size} cannot be lower than {len(self.obstacle_types)}."
            )

        # calculate number of frames to extract per obstacle type
        frames_per_obs, remaining = divmod(self.size, len(self.obstacle_types))
        self.frames_per_obs_type: dict[str, int] = dict(
            zip(self.obstacle_types, repeat(frames_per_obs))
        )

        if remaining != 0:
            # distribute remaining number of frames to as many obstacle types as possible
            # by definition, remaining < len(self.obstacle_types)
            for i, k in enumerate(self.frames_per_obs_type.keys(), start=1):
                self.frames_per_obs_type[k] += 1
                if i == remaining:
                    break
        assert self.size == sum(self.frames_per_obs_type.values()), \
                f"{self.size} != {sum(self.frames_per_obs_type.values())}"

        # check that enough frames are available in the videos for all obstacle types
        self.available_frames_per_obs_type: dict[str, int] = \
                (self.annotations.groupby(["id"])["total_frames_proper"].sum().to_dict())

        obs_types_with_insufficient_frames = []
        for obs_type, n_frames in self.frames_per_obs_type.items():
            if self.available_frames_per_obs_type[obs_type] < n_frames:
                obs_types_with_insufficient_frames.append(obs_type)
        if obs_types_with_insufficient_frames:
            raise ValueError(
                f"Insufficient number of frames available for obstacle type(s): {obs_types_with_insufficient_frames}."
            )
        self.frames_info = []
        len_obstacle = len(self.obstacle_types)

        for obs_type in self.obstacle_types:
            videos_per_obs_type = self.annotations.loc[
                self.annotations["id"] == obs_type
                ]
            # check that all videos for obstacle type exist
            assert all(
                Path(self.videos_dir, vid_path)
                for vid_path in videos_per_obs_type["path"]
            ), f"Missing videos for {obs_type}!"

            available_frames_for_obs = self.available_frames_per_obs_type[obs_type]
            needed_frames_for_obs = self.frames_per_obs_type[obs_type]
            interval = available_frames_for_obs // needed_frames_for_obs
            assert interval >= 1
            label = torch.zeros(len_obstacle)
            label[self.obstacle_types.index(obs_type)] = 1
            obs_frames = []
            for _, vid_info in videos_per_obs_type.iterrows():

                frames_indices_to_extract = [
                    [label, Path(self.videos_dir, vid_info["path"]), i]
                    for i in range(
                        vid_info["proper_start_frame"],
                        vid_info["proper_end_frame"] - 1,
                        interval,
                    )
                ]
                obs_frames.extend(frames_indices_to_extract)

            assert len(obs_frames) >= needed_frames_for_obs
            obs_frames = obs_frames[:needed_frames_for_obs]

            self.frames_info.extend(obs_frames)

    def __len__(self) -> int:
        return self.size

    def __getitem__(self, index: int):
        label, video_path, frame_index = self.frames_info[index]

        cap = cv2.VideoCapture(str(video_path))
        cap.set(1, frame_index)
        success, frame = cap.read()
        if not success:
            raise ValueError
        # read video orientation once, in case of error value will be 0
        orientation = get_video_rotation(str(video_path))
        # rot90() below rotates counter-clockwise, so by providing a
        # negative k the frames are rotated clockwise
        k = orientation // -90
        # frame = tf.image.rot90(frame, k)
        frame = np.rot90(frame,k)
        # openCv reads frames in BGR format, convert to RGB
        # frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = frame[:, :, ::-1]
        frame = torch.from_numpy(frame.copy())
        frame = frame.type(torch.FloatTensor)

        return (frame, label)

In [7]:

def dataset_from_videos(files_dir: Path, dataset_csv_info_file: str,
                        total_frames: int = 750, batch_size: int = 128,
                        img_normalization_params: Tuple[float, float] = (
                                0.0, 255.0), frame_size: int = 224,
                        train_val_test_percentages: Tuple[int, int, int] = (
                                70, 30, 0)):
    """
    Generates train, validation and test tf.data.Datasets from the provided
    video files.

    :param files_dir: path of the directory containing the videos
    :param dataset_csv_info_file: name of csv file containing information about
     the videos, must be located in files_dir
    :param img_normalization_params: tuple of doubles (mean, standard_deviation)
     to use for normalizing the extracted frames, e.g. if (0.0, 255.0) is
     provided, the frames are normalized to the range [0, 1], see this comment
     for explanation of how to convert between the two
     https://stackoverflow.com/a/58096430
    :param total_frames: total number of frames to extract for all artwork
    :param frame_size: the size of the final resized square frames, this is
     dictated by the needs of the underlying NN that will be used
    :param train_val_test_percentages: tuple specifying how to split the
     generated dataset into train, validation and test datasets, the provided
     ints must add up to 100
    :param batch_size: batch size for datasets
    :return: a tuple of train, validation and test tf.data.Datasets, as well
     as a list of the artworks ids
    """
    assert sum(train_val_test_percentages) == 100, "Percentages must add up to 100!"
    dataset_info = pd.read_csv(files_dir / dataset_csv_info_file)

    # make sure all files in csv are present
    for _, row in dataset_info.iterrows():
        assert (files_dir / row["path"]).is_file(), files_dir / row["path"]

    # sort artwork ids in alphabetical order, this is important as it
    # determines how the CNN outputs its predictions
    artwork_dict = {artwork_id: i for i, artwork_id in enumerate(sorted(dataset_info["id"].unique()))}
    artwork_list = list(artwork_dict.keys())

    # create dataset, output_shapes are set to (None, None, 3), since the
    # extracted frames are not initially resized,
    # to allow applying variations to the train dataset only below

    dt = tf.data.Dataset.from_generator(
    lambda: ObstacleImageDataset(videos_dir=files_dir, \
        annotations_file_path=Path(files_dir, dataset_csv_info_file), size=total_frames, \
            transform=transforms.Compose([transforms.ToTensor(), transforms.Resize(size=1080 // 3)]),), \
                output_types=(tf.float32, tf.float32), output_shapes=((None, None, 3), (27)))

    # TODO calculate the datasets' sizes, perhaps print them, and also use them
    #  in the shuffling of the train dt below can be calculated like so:
    #  (num of classes * total_frames) * % of dataset

    # split into train, validation & test datasets
    train, val, test = train_val_test_percentages
    print(train_val_test_percentages)
    train_dataset, validation_and_test = split_dataset(dt, (val + test)/100)
    validation_dataset, test_dataset = split_dataset(validation_and_test,
                                                     test / (val + test))

    mean, std = img_normalization_params

    # apply necessary conversions (normalization, random modifications,
    # batching & caching) to the created datasets
    # see https://www.tensorflow.org/datasets/keras_example for batching and
    # caching explanation
    AUTO = tf.data.experimental.AUTOTUNE  # auto-optimise dataset mapping below
    
    # print(list(train_dataset))
    train_dataset = train_dataset \
        .map(random_modifications, num_parallel_calls=AUTO) \
        .map(lambda x, y: (
        resize_and_rescale(x, fr_size=frame_size, mean=mean, std=std), y),
             num_parallel_calls=AUTO) \
        .cache() \
        .shuffle(1000) \
        .batch(batch_size) \
        .prefetch(AUTO)

    validation_dataset = validation_dataset \
        .map(lambda x, y: (
        resize_and_rescale(x, fr_size=frame_size, mean=mean, std=std), y),
             num_parallel_calls=AUTO) \
        .batch(batch_size) \
        .cache() \
        .prefetch(AUTO)

    test_dataset = test_dataset \
        .map(lambda x, y: (
        resize_and_rescale(x, fr_size=frame_size, mean=mean, std=std), y),
             num_parallel_calls=AUTO) \
        .batch(batch_size) \
        .cache() \
        .prefetch(AUTO)

    return train_dataset, validation_dataset, test_dataset, artwork_list

In [8]:
## train function ##
def train_evaluate_save(model, model_name: str, files_dir: Path, dataset_csv_info_file: str, total_frames: int = 29*300,
                        img_normalization_params: Tuple[float, float] = (0.0, 255.0), frame_size: int = 224,
                        batch_size: int = 32, train_val_test_percentages: Tuple[int, int, int] = (70, 20, 10),
                        epochs=20):
    """
    Consolidates model training and evaluation, as well as presentation of the results. Additionally, the trained
    model is saved to disk, both in its original form, as well as converted to the Tensorflow Lite format. All
    relevant information about the model (evaluation results, plots, other stats) are save to disk as well.

    :param model: the model to be trained
    :param model_name: the preferred name for the model, used for naming the folder where the training results are saved
    :param files_dir: path of the directory containing the videos
    :param dataset_csv_info_file: name of csv file containing information about the videos, must be located in files_dir
    :param total_frames: total number of frames to extract for all artwork
    :param img_normalization_params: tuple of doubles (mean, standard_deviation) to use for normalizing the extracted
     frames, e.g. if (0.0, 255.0) is provided, the frames are normalized to the range [0, 1], see this comment for
     explanation of how to convert between the two https://stackoverflow.com/a/58096430
    :param frame_size: the size of the final resized square frames, this is dictated by the needs of the underlying NN
     that will be used in the training
    :param batch_size: batch size for datasets
    :param train_val_test_percentages: tuple specifying how to split the generated dataset into train, validation and
     test datasets, the provided ints must add up to 100
    :param epochs: the number of epochs to train the model
    """
    pd.options.display.float_format = '{:,.3f}'.format
    # folder to save info about model
    info_dir = base_dir / model_name / "model_info"
    info_dir.mkdir(parents=True, exist_ok=True)

    print("Generating/splitting dataset...", "\n", flush=True)
    train_dt, val_dt, test_dt, artwork_list = dataset_from_videos(files_dir=files_dir, total_frames=total_frames,
                                                                  dataset_csv_info_file=dataset_csv_info_file,
                                                                  batch_size=batch_size, frame_size=frame_size,
                                                                  train_val_test_percentages=train_val_test_percentages,
                                                                  img_normalization_params=img_normalization_params)

    print("Creating model...", "\n", flush=True)
    model = model(len(artwork_list))

    # saves model train history logs, which can be visualised with TensorBoard
    log_dir = base_dir / "logs" / "fit" / f"{model_name}_{datetime.now().strftime('%Y%m%d%H%M')}"
    tb_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

    # train model
    print("Training model...", "\n", flush=True)
    model_train_info = model.fit(train_dt, epochs=epochs, validation_data=val_dt, callbacks=[tb_callback])
    print("Finished training!", "\n", flush=True)

    # save trained model to disk, also convert to Tensorflow Lite format
    save_model(model, model_name, artwork_list)

    # evaluation
    evaluation = model.evaluate(test_dt)
    eval_res = pd.DataFrame.from_dict({k: [v] for k, v in zip(["Test loss", "Test accuracy"], evaluation)})
    eval_res.to_csv(info_dir / "evaluation_results.csv")
    display_markdown("#### Evaluation results", raw=True)
    display(eval_res)

    # model predictions
    predicted_labels = model.predict(test_dt)
    predicted_labels = np.argmax(predicted_labels, axis=1)

    actual_labels = np.concatenate([label for _, label in test_dt], axis=0)
    actual_labels = np.argmax(actual_labels, axis=1)  # labels are in categorical form (one_hot), convert them back

    # classification report
    report = classification_report(actual_labels, predicted_labels, target_names=artwork_list, output_dict=True)
    report = pd.DataFrame(report)
    report.to_csv(info_dir / "classification_report.csv")
    display_markdown("### Classification report", raw=True)
    display(report)

    # plots
    display_markdown("### Training history plots & confusion matrix", raw=True)
    ep = np.array(model_train_info.epoch) + 1
    fig, axes = plt.subplots(3, 1, figsize=(5, 15))

    # training and validation accuracy plot
    axes[0].plot(ep, model_train_info.history['accuracy'], "bo", label='Training accuracy')
    axes[0].plot(ep, model_train_info.history['val_accuracy'], "b", label='Validation accuracy')
    axes[0].set_xlabel("Epochs")
    axes[0].set_ylabel("Training and validation accuracy")
    axes[0].set_title("Training and validation accuracy")
    axes[0].legend()
    axes[0].tick_params(axis='both', which='major')

    # training and validation loss plot
    axes[1].plot(ep, model_train_info.history['loss'], "bo", label='Training loss', color="red")
    axes[1].plot(ep, model_train_info.history['val_loss'], "b", label='Validation loss', color="red")
    axes[1].set_xlabel("Epochs")
    axes[1].set_ylabel("Training and validation loss")
    axes[1].set_title("Training and validation loss")
    axes[1].legend()
    axes[1].tick_params(axis='both', which='major')

    # confusion matrix
    cm = confusion_matrix(actual_labels, predicted_labels)
    df_cm = pd.DataFrame(cm, artwork_list, artwork_list)
    df_cm.to_csv(info_dir / "confusion_matrix.csv")
    sn.set(font_scale=.7)
    sn.heatmap(df_cm, ax=axes[2], vmin=0, annot=True, cmap="YlGnBu", fmt="d", linewidths=0.1, linecolor="black")

    # display and save plots to files
    fig.savefig(info_dir / "graphs.svg", bbox_inches="tight")
    fig.savefig(info_dir / "graphs.pdf", bbox_inches="tight")
    plt.show()

    # save training history
    with open(info_dir / "train_history.json", "w+") as f:
        json.dump(model_train_info.history, f, indent=4)

    # save a few other details about the model
    other_info = {
        "batch_size": batch_size,
        "epochs": epochs,
        "frame_size": frame_size,
        "img_normalization_params": img_normalization_params,
        "model_name": model_name,
        "train_val_test_percentages": train_val_test_percentages
    }
    with open(info_dir / "other_info.json", "w+") as f:
        json.dump(other_info, f, indent=4)

def save_model(trained_model, model_name: str, artwork_list: list):
    """
    Saves the provided model to disk, and also converts it to the Tensorflow Lite format.

    :param trained_model: the trained model
    :param model_name: name to be used when saving the model
    :param artwork_list: list containing the artwork ids sorted according to the model outputs
    """
    # save model
    print("Saving model to file...", flush=True)
    saved_model_path = base_dir / model_name / "saved_model"
    trained_model.save(saved_model_path)

    # Convert model to tflite
    # first convert to normal tflite
    converter = tf.lite.TFLiteConverter.from_saved_model(str(saved_model_path))
    tflite_model = converter.convert()

    # second convert to quantized tflite
    print("Converting to tflite format...", flush=True)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    tflite_quant_model = converter.convert()

    tflite_dir = base_dir / model_name / "tflite"
    tflite_dir.mkdir(parents=True, exist_ok=True)

    with tf.io.gfile.GFile(str(tflite_dir / f"{model_name}.tflite"), "wb") as f:
        f.write(tflite_model)

    with tf.io.gfile.GFile(str(tflite_dir / f"{model_name}_quant.tflite"), "wb") as f:
        f.write(tflite_quant_model)

    # save txt file with list of labels, ready for use in mobile device
    labels_file = tflite_dir / f"{model_name}_labels.txt"
    with open(labels_file, "w+") as f:
        for label in list(artwork_list):
            f.write(label + "\n")
    print("Done!", flush=True)


## CNN training

This section contains functions related to CNN training, conversion to Tensorflow Lite format, and saving the trained models.

## Results

This sections presents the results of the trained CNNs.  

# SqueezeNet

The cell below shows the configuration used for SqueezeNet.

In [8]:
from keras.models import Model
from keras.layers import Add, Activation, Concatenate, Conv2D, Dropout 
from keras.layers import Flatten, Input, GlobalAveragePooling2D, MaxPooling2D
import keras.backend as K

__version__ = '0.0.1'


def SqueezeNet(input_shape, nb_classes, use_bypass=False, dropout_rate=None, compression=1.0):
    """
    Creating a SqueezeNet of version 1.0
    
    Arguments:
        input_shape  : shape of the input images e.g. (224,224,3)
        nb_classes   : number of classes
        use_bypass   : if true, bypass connections will be created at fire module 3, 5, 7, and 9 (default: False)
        dropout_rate : defines the dropout rate that is accomplished after last fire module (default: None)
        compression  : reduce the number of feature-maps (default: 1.0)
        
    Returns:
        Model        : Keras model instance
    """
    
    input_img = Input(shape=input_shape)

    x = Conv2D(int(96*compression), (7,7), activation='relu', strides=(2,2), padding='same', name='conv1')(input_img)

    x = MaxPooling2D(pool_size=(3,3), strides=(2,2), name='maxpool1')(x)
    
    x = create_fire_module(x, int(16*compression), name='fire2')
    x = create_fire_module(x, int(16*compression), name='fire3', use_bypass=use_bypass)
    x = create_fire_module(x, int(32*compression), name='fire4')
    
    x = MaxPooling2D(pool_size=(3,3), strides=(2,2), name='maxpool4')(x)
    
    x = create_fire_module(x, int(32*compression), name='fire5', use_bypass=use_bypass)
    x = create_fire_module(x, int(48*compression), name='fire6')
    x = create_fire_module(x, int(48*compression), name='fire7', use_bypass=use_bypass)
    x = create_fire_module(x, int(64*compression), name='fire8')
    
    x = MaxPooling2D(pool_size=(3,3), strides=(2,2), name='maxpool8')(x)
    
    x = create_fire_module(x, int(64*compression), name='fire9', use_bypass=use_bypass)

    if dropout_rate:
        x = Dropout(dropout_rate)(x)
        
    x = output(x, nb_classes)
    model = Model(inputs=input_img, outputs=x)
    model.compile(loss="categorical_crossentropy", optimizer="adam",
                    metrics=['accuracy'])
    return model


def SqueezeNet_11(input_shape, nb_classes, dropout_rate=None, compression=1.0):
    """
    Creating a SqueezeNet of version 1.1
    
    2.4x less computation over SqueezeNet 1.0 implemented above.
    
    Arguments:
        input_shape  : shape of the input images e.g. (224,224,3)
        nb_classes   : number of classes
        dropout_rate : defines the dropout rate that is accomplished after last fire module (default: None)
        compression  : reduce the number of feature-maps
        
    Returns:
        Model        : Keras model instance
    """
    
    input_img = Input(shape=input_shape)

    x = Conv2D(int(64*compression), (3,3), activation='relu', strides=(2,2), padding='same', name='conv1')(input_img)

    x = MaxPooling2D(pool_size=(3,3), strides=(2,2), name='maxpool1')(x)
    
    x = create_fire_module(x, int(16*compression), name='fire2')
    x = create_fire_module(x, int(16*compression), name='fire3')
    
    x = MaxPooling2D(pool_size=(3,3), strides=(2,2), name='maxpool3')(x)
    
    x = create_fire_module(x, int(32*compression), name='fire4')
    x = create_fire_module(x, int(32*compression), name='fire5')
    
    x = MaxPooling2D(pool_size=(3,3), strides=(2,2), name='maxpool5')(x)
    
    x = create_fire_module(x, int(48*compression), name='fire6')
    x = create_fire_module(x, int(48*compression), name='fire7')
    x = create_fire_module(x, int(64*compression), name='fire8')
    x = create_fire_module(x, int(64*compression), name='fire9')

    if dropout_rate:
        x = Dropout(dropout_rate)(x)
    
    # Creating last conv10
    x = output(x, nb_classes)
    return Model(inputs=input_img, outputs=x)


def output(x, nb_classes):
    x = Conv2D(nb_classes, (1,1), strides=(1,1), padding='valid', name='conv10')(x)
    x = GlobalAveragePooling2D(name='avgpool10')(x)
    x = Activation("softmax", name='softmax')(x)
    return x


def create_fire_module(x, nb_squeeze_filter, name, use_bypass=False):
    """
    Creates a fire module
    
    Arguments:
        x                 : input
        nb_squeeze_filter : number of filters of squeeze. The filtersize of expand is 4 times of squeeze
        use_bypass        : if True then a bypass will be added
        name              : name of module e.g. fire123
    
    Returns:
        x                 : returns a fire module
    """
    
    nb_expand_filter = 4 * nb_squeeze_filter
    squeeze    = Conv2D(nb_squeeze_filter,(1,1), activation='relu', padding='same', name='%s_squeeze'%name)(x)
    expand_1x1 = Conv2D(nb_expand_filter, (1,1), activation='relu', padding='same', name='%s_expand_1x1'%name)(squeeze)
    expand_3x3 = Conv2D(nb_expand_filter, (3,3), activation='relu', padding='same', name='%s_expand_3x3'%name)(squeeze)
    
    axis = get_axis()
    x_ret = Concatenate(axis=axis, name='%s_concatenate'%name)([expand_1x1, expand_3x3])
    
    if use_bypass:
        x_ret = Add(name='%s_concatenate_bypass'%name)([x_ret, x])
        
    return x_ret


def get_axis():
    axis = -1 if K.image_data_format() == 'channels_last' else 1
    return axis
squeezenet = SqueezeNet(input_shape=(224,224,3), nb_classes=27)
squeezenet.summary()

In [18]:
train_evaluate_save(model=squeezenet, model_name="SqueezeNetNoArt300Frames", files_dir=files_dir, 
                    dataset_csv_info_file=video_info, total_frames=29*300)

### EfficientNet

The cell below shows the configuration used for EfficientNet0.

In [None]:
from tensorflow.keras.applications.efficientnet import EfficientNetB0

def efficientnetb0(num_classes: int):
    eff = EfficientNetB0(include_top=False, weights="imagenet", input_tensor=Input(shape=(224, 224, 3)))
    outputs = eff.output
    outputs = Flatten(name="flatten")(outputs)
    outputs = Dropout(0.5)(outputs)
    outputs = Dense(num_classes, activation="softmax")(outputs)

    model = Model(inputs=eff.input, outputs=outputs)

    for layer in eff.layers:
        layer.trainable = False

    model.compile(loss="categorical_crossentropy", optimizer="adam",
                    metrics=['accuracy'])
    return model

efficientnetb0(num_classes=17).summary()

In [None]:
train_evaluate_save(model=efficientnetb0, model_name="EfficientNetB0NoArt500Frames", files_dir=files_dir, 
                    dataset_csv_info_file=video_info, total_frames=29*300)

### ResNet50

The cell below shows the configuration used for ResNedt50.

In [None]:
from tensorflow.keras.applications.resnet50 import ResNet50
def resnet50(num_classes: int):
    res = ResNet50(include_top=False, weights="imagenet", input_tensor=Input(shape=(224, 224, 3)))

    outputs = res.output
    outputs = Flatten(name="flatten")(outputs)
    outputs = Dropout(0.5)(outputs)
    outputs = Dense(num_classes, activation="softmax")(outputs)

    model = Model(inputs=res.input, outputs=outputs)

    for layer in res.layers:
        layer.trainable = False

    model.compile(loss="categorical_crossentropy", optimizer="adam",
                    metrics=['accuracy'])
    return model

resnet50(num_classes=17).summary()

In [None]:
train_evaluate_save(model=resnet50, model_name="ResNet50NoArt500Frames", files_dir=files_dir, 
                    dataset_csv_info_file=video_info, total_frames=29*300)

### VGG19

The cell below shows the configuration used for VGG19.


In [None]:
def vgg19(num_classes: int):
    vgg = VGG19(include_top=False, weights="imagenet", 
                        input_tensor=Input(shape=(224, 224, 3)))

    outputs = vgg.output
    outputs = Flatten(name="flatten")(outputs)
    outputs = Dropout(0.5)(outputs)
    outputs = Dense(num_classes, activation="softmax")(outputs)

    model = Model(inputs=vgg.input, outputs=outputs)

    for layer in vgg.layers:
        layer.trainable = False

    model.compile(loss="categorical_crossentropy", optimizer="adam",
                    metrics=['accuracy'])
    return model

vgg19(num_classes=17).summary()

The training for the VGG19 based CNN was done using a dataset that contained 500 frames for each artwork, as well as the same number for the "No artwork" category.

The training results are shown in the output of the cell below - scroll within the output to see training history graphs and confusion matrix.

In [19]:
train_evaluate_save(model=vgg19, model_name="VGG19NoArt500Frames", files_dir=files_dir, 
                    dataset_csv_info_file=video_info, total_frames=29*300)

### MobileNetV2

The cell below shows the configuration used for MobileNet v2.


In [None]:
def mobileNetV2(num_classes: int):
    # https://www.kaggle.com/devang/transfer-learning-with-keras-and-efficientnets?scriptVersionId=24113974
    # https://www.tensorflow.org/hub/tutorials/tf2_image_retraining
    mnv2 = MobileNetV2(input_shape=(224, 224, 3), include_top=False, weights="imagenet",
                       input_tensor=Input(shape=(224, 224, 3)))
    outputs = mnv2.output
    outputs = GlobalAveragePooling2D()(outputs)
    outputs = BatchNormalization()(outputs)

    outputs = Dense(1280, activation='relu', kernel_initializer="glorot_uniform", bias_initializer='zeros')(outputs)
    outputs = BatchNormalization()(outputs)

    # final layer
    outputs = Dense(num_classes, activation='softmax', kernel_initializer='random_uniform', bias_initializer='zeros')(outputs)

    model = Model(inputs=mnv2.input, outputs=outputs)

    for layer in mnv2.layers:
        layer.trainable = False

    model.compile(loss="categorical_crossentropy", 
                  optimizer=Adam(lr=0.0001), metrics=['accuracy'])
    return model

mobileNetV2(num_classes=27).summary()

The training for the MobileNet based CNN was done using a dataset that contained 500 frames for each artwork, as well as the same number for the "No artwork" category.

The training results are shown in the output of the cell below - scroll within the output to see training history graphs and confusion matrix.

In [None]:
train_evaluate_save(model=mobileNetV2, model_name="MobNetNoArt500Frames_4", files_dir=files_dir, 
                    dataset_csv_info_file=video_info, batch_size = 4, epochs=2,
                    total_frames=29*300, img_normalization_params=(127.5,127.5))

#### Older training attempt with MobileNet

The cell below shows an earlier attempt to train a MobileNet CNN, with a different configuration for the final layers. The training results seem OK, but the CNN is much less accurate when used in the app. 

In [None]:
# run with older settings for mobileNetV2
def mobileNetV2_3(num_classes: int):
    mnv2 = MobileNetV2(input_shape=(224, 224, 3), include_top=False, weights="imagenet",
                       input_tensor=Input(shape=(224, 224, 3)))
    outputs = mnv2.output
    outputs = Flatten(name="flatten")(outputs)
    outputs = Dropout(0.5)(outputs)
    outputs = Dense(num_classes, activation="softmax")(outputs)

    model = Model(inputs=mnv2.input, outputs=outputs)

    for layer in mnv2.layers:
        layer.trainable = False

    model.compile(loss="categorical_crossentropy", optimizer="adam",
                    metrics=['accuracy'])
    return model

train_evaluate_save(model=mobileNetV2_3, model_name="MobNetNoArt500Frames_3", files_dir=files_dir, dataset_csv_info_file=video_info,
                    total_frames=29*300, img_normalization_params=(127.5,127.5))

### InceptionV3

In [None]:
def inceptionV3(num_classes: int):
    # based on example here https://keras.io/api/applications/
    # create the base pre-trained model
    base_model = InceptionV3(input_shape=(224, 224, 3), weights='imagenet', include_top=False)

    # add a global spatial average pooling layer
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    # add a fully-connected layer
    x = Dense(1024, activation='relu')(x)
    # and a logistic layer
    predictions = Dense(num_classes, activation='softmax')(x)

    # this is the model we will train
    model = Model(inputs=base_model.input, outputs=predictions)

    # first: train only the top layers (which were randomly initialized)
    # i.e. freeze all convolutional InceptionV3 layers
    for layer in base_model.layers:
        layer.trainable = False

    # compile the model (should be done *after* setting layers to non-trainable)
    model.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['accuracy'])

    return model

# input images must be normalised to range [-1, 1], see https://www.tensorflow.org/api_docs/python/tf/keras/applications/inception_v3/preprocess_input
train_evaluate_save(model=inceptionV3, model_name="InceptionV3NoArt500Frames", files_dir=files_dir, dataset_csv_info_file=video_info,
                    total_frames=29*300, img_normalization_params=(127.5,127.5))

In [None]:
def inceptionV3(num_classes: int):
    # based on example here https://keras.io/api/applications/
    # create the base pre-trained model
    base_model = InceptionV3(input_shape=(224, 224, 3), weights='imagenet', include_top=False)

    # add a global spatial average pooling layer
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    # add a fully-connected layer
    x = Dense(1024, activation='relu')(x)
    # and a logistic layer
    predictions = Dense(num_classes, activation='softmax')(x)

    # this is the model we will train
    model = Model(inputs=base_model.input, outputs=predictions)

    # first: train only the top layers (which were randomly initialized)
    # i.e. freeze all convolutional InceptionV3 layers
    for layer in base_model.layers:
        layer.trainable = False

    # compile the model (should be done *after* setting layers to non-trainable)
    model.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['accuracy'])

    return model

mod= inceptionV3(10)

### 0, 1, or multiple artworks (VGG19)

Here I trained a CNN to distinguish between cases where there are zero, one, or multiple artworks in a frame, that could potentially be used first in the app to make sure there is exactly one artwork in the frame before trying to recognise it with one of the CNNs trained above.

The goal was to train the CNN to classify a frame with one of the labels: `no_artwork`, `one_artwork`, or `multiple_artworks`. To create the dataset used for training I used the following sources for each label:

*   `no_artwork`: Extracted frames from the videos that show ceiling, floor, walls, etc.
*   `one_artwork`: Extracted frames from all videos showing a single artwork (the same that were used above to train for classify individual artworks).
*   `multiple_artworks`: Here I used the 2 videos we have of the entire room, which I edited to contain only the parts that show at least 2 artworks. 

I used 1500 frames for each label, and used VGG19 as the basis of training.

In [None]:
train_evaluate_save(model=vgg19, model_name="VGG19ZeroOneMultiple1500Frames", files_dir=files_dir, 
                    dataset_csv_info_file="description_export_01plus.csv", total_frames=1500)

In [None]:
########################################
########################################
########################################
