# Классификация изображений с эмоциями людей

In [47]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

from tqdm import tqdm

print("TensorFlow Version:", tf.__version__)
print('GPU Device Found.' if tf.config.list_physical_devices('GPU') else 'GPU Device Not Found. Running on CPU')

TensorFlow Version: 2.8.3
GPU Device Not Found. Running on CPU


# <b>1. ПОДГОТОВКА ДАТАСЕТОВ</b>

## Подготовка файлов для работы с локального диска

### Загрузка данных на локальный диск

In [48]:
import gdown
import zipfile
from pathlib import Path

# Загрузка архива
# stable link https://drive.google.com/file/d/1JdDlgpvtlMN99X0eAruGqXrMNi_sh1sg
ident = '1JdDlgpvtlMN99X0eAruGqXrMNi_sh1sg'
fname = 'emotions'

# Пусть к основной папке с изображениями
p = Path('./data')

# Путь к базовой директории, которая будет создана чуть позже
base_dir = p / fname

local_zip = gdown.download(id=ident, output = fname + '.zip')

Downloading...
From: https://drive.google.com/uc?id=1JdDlgpvtlMN99X0eAruGqXrMNi_sh1sg
To: C:\Users\Red_Plague\YandexDisk\code_learning\DS_Sber\Профильный_модуль\SBER_Diploma\emotions.zip
100%|█████████████████████████████████████| 14.8M/14.8M [00:04<00:00, 3.64MB/s]


Распаковка в текущую директорию.

In [49]:
# Пусть к основной папке с изображениями
p = Path('./data')

# Путь к базовой директории, которая будет создана чуть позже
base_dir = p / fname

with zipfile.ZipFile(local_zip, 'r') as zip_ref:
    zip_ref.extractall(p)

KeyboardInterrupt: 

Переименуем папку с изображениями в `emotions`.

Для отключения ограничения "Только чтение" у папки с изображениями воспользуемся следующей функцией ([источник](https://www.tutorialspoint.com/How-to-change-the-permission-of-a-directory-using-Python#)):

In [None]:
def change_permissions_recursive(path, mode):
    for root, dirs, files in os.walk(path, topdown=False):
        for dr in [os.path.join(root,d) for d in dirs]:
            os.chmod(dr, mode)
        for file in [os.path.join(root, f) for f in files]:
                os.chmod(file, mode)

In [None]:
import os

zip_content = os.listdir(p)[0]
zip_content

Функция не всегда срабатывает с первого раза, поэтому запустим следующий цикл:

In [None]:
done = False

while done == False:
    try:
        change_permissions_recursive(p, 0o777)
        change_permissions_recursive(p / zip_content, 0o777)
        zip_content = os.renames(p / zip_content,
                   base_dir)
        done = True
    except:
        continue

Теперь загруженный архив можно удалить.

In [None]:
os.remove(fname + '.zip')

### Настройка директорий

Проверим содержимое папки с изображениями, мы должны увидеть список из папок с названиями этих папок и их количество. 

In [None]:
from pathlib import Path

# Базовая директория
base_dir = p / fname
# # base_dir = '/content/drive/MyDrive/Sber DS/Diploma/data/emotions/'  # for colab
classes = os.listdir(base_dir)

num_classes = len(classes)

# и ее содержимое
print("Содержимое базовой директории:")
print(classes)
print('Количество классов:', num_classes)

Переименуем папки в необходимые нам названия классов:

In [None]:
# class_names = [x.lower() for x in classes]
class_names = ['anger', 'disgust', 'fear', 'joyfulness', 'neutral']

done = False
while done == False:
    try:
        change_permissions_recursive(base_dir, 0o777)
        done = True
    except:
        continue

done = False
while done == False:
    try:
        for i, class_name in enumerate(class_names):
            change_permissions_recursive(base_dir/class_name, 0o777)
            os.renames(base_dir/classes[i], base_dir/class_name)
        done = True
    except:
        continue

classes = os.listdir(base_dir)

In [None]:
classes

### Удалим некорректные файлы

Если в архиве содержатся некорректные ("битые") файлы, то мы получим ошибку на том или ином этапе обучения и подготовки модели к переносу на мобильное устройство. Код ниже позволяет обнаружить и сразу удалить такие файлы при их наличии. Запустим эту функцию для всех папок с классами ([источник кода](https://github.com/tensorflow/datasets/issues/2188)).

In [None]:
from pathlib import Path
from tensorflow.io import read_file
from tensorflow.image import decode_image

def delete_corrupted_files(folder, base_dir):
    for image in sorted((base_dir / f'{folder}').glob('*')):
        try:
            img = read_file(str(image))
            img = decode_image(img)

            if img.ndim != 3:
                print(f"[FILE_CORRUPT] {str(image).split('/')[-1]} DELETED")
                image.unlink()

        except Exception as e:
            print(f"[ERR] {str(image).split('/')[-1]}: {e} DELETED")
            image.unlink()

In [None]:
for folder in classes:
    print(folder)
    delete_corrupted_files(folder, base_dir)

Битых файлов нет, можно двигаться дальше.

Сохраним пути к субдиректориям с классами в отдельные переменные и посмотрим на конечное количество изображений в каждом классе.

In [None]:
anger_dir, disgust_dir, fear_dir, joyfulness_dir, neutral_dir = [base_dir / classes[i]
                                                   for i in range(len(classes))]
directories = anger_dir, disgust_dir, fear_dir, joyfulness_dir, neutral_dir


anger_fnames, disgust_fnames, fear_fnames, joyfulness_fnames, neutral_fnames = [os.listdir(i)
                                                              for i in directories]
files_names = anger_fnames, disgust_fnames, fear_fnames, joyfulness_fnames, neutral_fnames


for i, cls_name in enumerate(class_names):
    print(cls_name + ':', len(files_names[i]))

In [None]:
import seaborn as sns

x = np.array([len(anger_fnames), len(disgust_fnames),
              len(fear_fnames), len(joyfulness_fnames),
              len(neutral_fnames)])

plt.title('Распределение количества изображений по классам')
plt.pie(x, labels=classes, autopct='%.1f%%');

Классы несбалансированы, поэтому, помимо метрики `accuracy`, для оценки ошибки классификации мы будем использовать матрицу ошибок. 

Посмотрим на названия отдельных файлов.

In [None]:
len(class_names)

In [None]:
for i, j in zip(class_names, files_names):
    print(i, j[90:93])

### Визуализация оригинальных изображений

Посмотрим на фотографии из обеих субдиректорий - по 4 фотографии каждого класса.

In [None]:
import matplotlib.image as mpimg
import matplotlib.pyplot as plt

# Параметры для отрисовки - количество строк и столбцов
nrows = 5
ncols = 4

# Индекс для итерации изображений
pic_index = 0

In [None]:
# Запустите эту ячейку несколько раз, чтобы увидеть разные наборы фотографий

fig = plt.gcf()
fig.set_size_inches(ncols*4, nrows*4)

pic_index += 4

next_anger_pic = [os.path.join(anger_dir, fname)
                for fname in anger_fnames[pic_index-4:pic_index]]
next_disgust_pic = [os.path.join(disgust_dir, fname)
                for fname in disgust_fnames[pic_index-4:pic_index]]
next_fear_pic = [os.path.join(fear_dir, fname)
                for fname in fear_fnames[pic_index-4:pic_index]]
next_joyfulness_pic = [os.path.join(joyfulness_dir, fname)
                for fname in joyfulness_fnames[pic_index-4:pic_index]]
next_neutral_pic = [os.path.join(neutral_dir, fname)
                for fname in neutral_fnames[pic_index-4:pic_index]]


for i, img_path in enumerate(next_anger_pic+next_disgust_pic+
                            next_fear_pic+next_joyfulness_pic+
                            next_neutral_pic):
    ax = plt.subplot(nrows, ncols, i+1)
    ax.axis(False)
    plt.title(img_path)
    
    img = mpimg.imread(img_path)
    plt.imshow(img)


Изображения в имеющихся наборах имеют различное разрешение и соотношение сторон, их необходимо привести к единому стандарту на стадии формирования датасетов для нейронной сети. Этим мы сейчас и займемся.

## Создание датасетов под данную предобученную модель

### Split изображений

Так как готовых инструментов для формирования датасета из трех частей (тренировка, валидация, тест) в tensorflow нет, предварительно необходимо разделить все изображения на соответствующие директории, внутри которых будут папки с классами.

Напишем собственную функцию, которая переместит все изображения по нужным нам папкам, создав, таким образом, необходимое разделение данных на три датасета. Функция содержит вложенную функцию `split_numbers`, которая автоматически определит количество изображений, необходимое для каждого сплита по всем классам в соотношении: `train : validation : test = 8:1:1` (значение по умолчанию). Однако при необходимости это соотношение можно изменить в любую сторону, для этого нужно указать значения для тренировочной и валидационной частей, тестовая часть посчитается автоматически как остаток (при этом нужно помнить, что сумма частей, на которые мы делим датасает, должна быть кратна 10; при желании это правило можно изменить, переписав вложенную функцию).

In [None]:
from pathlib import Path
import shutil
import os

def make_split(files_names, base_dir, class_folder, relation=(8, 1)):
    
    dataset_split_folders = 'train', 'validation', 'test'
    train_dir, val_dir, test_dir = [base_dir / i
                    for i in dataset_split_folders]

    for directory in (train_dir, val_dir, test_dir):
        try:
            Path.mkdir(directory)
        except:
            # print(f'Directory {directory} already exists or cannot be created.')
            pass
        try:
            Path.mkdir(directory / class_folder)
        except:
            # print(f'Directory {directory / class_folder} already exists or cannot be created.')
            pass

    def split_numbers(files_names=files_names):
        imgs_num = len(files_names)
        train_num, val_num = int(imgs_num//10*relation[0]), int(imgs_num//10*relation[1])
        test_num = imgs_num - train_num - val_num
        return train_num, val_num, test_num
        
    examples = split_numbers(files_names)
    directories = train_dir, val_dir, test_dir

    for num, dr in zip(examples, directories):
        i = num
        while i != 0:
            image_name = os.listdir(base_dir / class_folder)[i-1]
            shutil.move(base_dir / class_folder / image_name,
                         dr / class_folder / image_name)
            i -= 1

    shutil.rmtree(base_dir / class_folder)

Применим эту функцию к каждому из классов.

In [None]:
for i in range(len(classes)):
    make_split(files_names[i],
             base_dir=base_dir,
             class_folder=classes[i],
             # изменим немного соотношение в пользу валидации
             relation=(8, 1.2))

Наш датасет разделен на три части в соотношении 8:1:1. Теперь у нас имеются три директории, содержащие папки с нужными нам классами.

In [None]:
for i in os.listdir(base_dir):
    print(i, os.listdir(base_dir / i))

### Предобученная модель | MobileNet V2

Для нашей задачи в рамках данной части дипломной работы мы будем использовать предобученную модель `MobileNet V2`, которая отличается от первой версии гораздо меньшим количеством параметров при сохранении высоких предсказательных способностей, что критично для имплементации моделей на мобильные устройства. 

Модель разработана в корпорации "Google" и обучена на основе 1,4 млн. изображений для 1000 классов. Это позволяет нам надеяться на то, что мы сможем дообучить нашу модель с небольшим количеством изображений до приемлемого уровня точности, 80%.

В библиотеке TensorFlow существует как минимум два способа использования предобученных моделей:
- использованием библиотеки моделей `tensorflow-hub`,
- с использованием модуля Keras `tf.keras.applications`.

Первый способ проще в имплементации и очень хорошо годится для производства, однако у него есть существенный недостаток для разработчика - предобученные модели поставляются как единый слой и не подлежат частичной разморозке, разморозить можно только все слои зараз, отчего важным становится правильный выбор версии данной модели (`MobileNet V2` в первом случае рассматривается как семейство моделей). Также два способа отличаются методом `rescale` - в первом случае изображения необходимо привести к стандарту [0, 1], тогда как во втором - [-1, 1]. Это необходимо иметь ввиду при подготовке наших изображений перед отправкой на обучение.

Воспользуемся вторым способом, так как он гораздо более пластичен для тонкой настройки и позволяет экспениментировать с гиперпараметрами. 

### Гиперпараметры для модели с переносом обучения

В самом начале обозначим гиперпараметры для будущей модели, это позволит нам уже сейчас начать подготавливать необходимые команды с этими данными для дальнейшего использования. 

`MobileNet V2` позволяет выбрать любое разрешение выше 32х32, мы будем использовать разрешение 224х224. 

In [None]:
MODULE_HANDLE = 'tf.keras.applications.InceptionResNetV2'
IMAGE_SIZE = (224, 224)
IMG_SHAPE = IMAGE_SIZE + (3,)
BATCH_SIZE = 32

print(f"Используем {MODULE_HANDLE}, входное разрешение: {IMAGE_SIZE}, размер батча: {BATCH_SIZE}.")

### Создание датасетов | `image_dataset_from_directory`

Для обучения модели и затем конвертации ее в облегченную версию все предварительно разделенные по папкам изображения необходимо перевести в формат `dataset`. Для выполнения этой задачи будем использовать метод библиотеки `keras` `image_dataset_from_directory`. В качестве `label_mode` установим `categorical` для перевода лейблов в вид `one_hot_encoding`.

Для корректной работы кода, создающего облегченную версию нашей модели, который мы позаимствовали из курса по компьютерному зрению, тренировочный и валидационный датасеты должны иметь достаточно большой батч, тогда как тестовый датасет должен выдавать по одному изображению зараз, то есть батч должен равняться 1. 

Весь необходимый код оформим в функцию, которая и проделает все операции.

In [None]:
from pathlib import Path

def datasets_prep(base_directory=base_dir,
                  seed=123,
                  batch_size=BATCH_SIZE,
                  image_size=IMAGE_SIZE, 
                  label_mode='categorical'):
    
    train_dir = base_dir / 'train'
    val_dir = base_dir / 'validation'
    test_dir = base_dir / 'test'
    
    train_ds = tf.keras.utils.image_dataset_from_directory(
                            train_dir,
                            label_mode=label_mode,
                            seed=seed,
                            image_size=IMAGE_SIZE,
                            batch_size=batch_size)
    
    val_ds = tf.keras.utils.image_dataset_from_directory(
                            val_dir,
                            label_mode=label_mode,
                            seed=seed,
                            image_size=IMAGE_SIZE,
                            batch_size=batch_size)
    
    test_ds = tf.keras.utils.image_dataset_from_directory(
                            test_dir,
                            label_mode=label_mode,
                            seed=seed,
                            image_size=IMAGE_SIZE,
                            batch_size=1)  # установим батч = 1
    
    return train_ds, val_ds, test_ds

In [None]:
train_batches, validation_batches, test_batches = datasets_prep()

Проверим созданные датасеты на соответствие заданным выше параметрам.

In [None]:
for ds in (train_batches, validation_batches, test_batches):
    for image_batch, label_batch in ds.take(1):
        print(image_batch.shape)

Как и требовалось, тренировочный и валидационный датасеты содержат в батче установленное количество изображений, тестовый - 1.

Проверим классы, верно ли отработал данный инструмент.

In [None]:
for ds in (train_batches, validation_batches, test_batches):
    print(ds.class_names)

class_names = train_batches.class_names
num_classes = len(class_names)

### Визуализация изображений из `train_batches`

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 10))
for images, labels in train_batches.take(10):
    for i in range(16):
        ax = plt.subplot(4, 4, i+1)
        plt.imshow(images[i].numpy().astype('uint8'))
        category_class = int(tf.argmax(labels[i]))
        plt.title(class_names[category_class])
        plt.axis('off')

Также возьмем одно из изображений из получившегося датасета и посмотрим, получили ли мы желаемое разрешение. Это изображение пригодится нам в дальнейшм, поэтому сохраним его в переменную `one_pic`.

In [None]:
# одно случайное изображение
for images, labels in train_batches.take(30):
    one_pic = images[1].numpy()
    label = labels[1]
    break

print(one_pic.shape)
plt.figure(figsize=(4, 4))
plt.title(class_names[int(tf.argmax(label))])
plt.axis('off')
plt.imshow(one_pic.astype('uint8'));

Итак, датасет для модели создан, все фотографии приведены к единому разрешению, можно переходить к обработке данных для тренировки модели. Обработка данных для нашей модели состоит из двух частей: нормализация значений массива (`rescale`) и агументация данных. Подготовим обе части и запустим две модели - без аугментации и с ней, чтобы посмотреть, как этот блок слоев влияет на работу модели. 

# <b>2. ПОДГОТОВКА ФУНКЦИЙ И ПЕРЕМЕННЫХ ДЛЯ ОБУЧЕНИЯ</b>

## Слои для препроцессинга изображений

### Rescale

Стандартизация значений матриц изображений - обязательная операция, если мы хотим получить хорошие результаты по работе нашей сети. Для использования данной предобученной модели значения матриц изображений необходимо отшкалировать по стандарту от -1 до 1 (именно такой стандарт использовался для обучения этой сети). Есть два способа создания соответствующего слоя:

```python
preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input

или

rescale = tf.keras.layers.Rescaling(1./127.5, offset=-1)
```

Мы возьмем второй вариант, так как он более пластичен.

Данный слой будет первым в моделях, поэтому необходимо указать `input_shape` входных изображений.

In [None]:
rescale = tf.keras.Sequential([
        tf.keras.layers.Rescaling(1./127.5, offset=-1,
                                  input_shape=IMAGE_SIZE + (3,),
                                  name='Rescaling')
])

Проверим качество работы слоя на фотографии, которую мы использовали выше.

In [None]:
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()

# увеличим размерность изображения до 4, так иначе оно не пройдет
# корректно через слой
result_for_rescale = rescale(np.expand_dims(one_pic, axis=0))
# print(result_for_rescale.shape)

# вернем результату размерность 3 для отображения
result = np.squeeze(result_for_rescale)
# print(result.shape)

plt.figure(figsize=(4, 4))
plt.axis('off')
plt.imshow(result)

# print("Picture's shape:", result.shape)
print("Минимальное и максимальное значение пикселей:", result.min(), result.max())

Слой отработал ожидаемым образом, при этом видно, что изменился контраст изображения. Это нормально, так как значения матрицы изображений были растянуты по обе стороны от 0. В любом случае, алгоритм обнаруживает зависимости не по внешнему виду картинок, а исходя из численных значений матриц изображений, контраст имеет значение только для нашего, человеческого глаза.

### Аугментация данных

Так как наш датасет создан из сравнительно небольшого набора данных, то для избежания переобучения мы воспользуемся всеми доступными возможностями библиотеки `keras`, в частности, добавим в модель слои агументации, которые искусственным образом увеличат количество изображений через внешнее изменение имеющихся. 

Создадим последовательность слоев `RandomFlip` (зеркальное отображение изображения в случайном порядке), `RandomRotation` (поворот изображения по часовой стрелке на случайный угол), `RandomZoom` (увеличение и уменьшение изображения в случайном порядке), а также `RandomContrast`. 

In [None]:
data_aug = tf.keras.Sequential([
    tf.keras.layers.RandomFlip('horizontal_and_vertical', name='RandomFlip'),
    tf.keras.layers.RandomRotation(factor=1, fill_mode='reflect',
                                   name='RandomRotation'),
    tf.keras.layers.RandomZoom(height_factor=(-0.1, 0.3), name='RandomZoom'),
    # tf.keras.layers.RandomContrast(factor=(0.1, 0.1), name='RandomContrast')
])

Как и в случае с созданием датасета, проверим на уже использованном изображении из тренировочного датасета, как работает последовательность по аугментации данных. Если следующий блок с кодом запустить несколько раз, то мы увидим, как меняется изображение, проходя через вышеописанные слои.

> _**Note**: В версии tf 2.11.0 при выполнении нижеследующего кода выводятся предупреждения, которые не влияют на качество работы модели._

In [None]:
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()

# увеличим размерность изображения до 4, так иначе оно не пройдет
# корректно через последовательность слоев
result_for_aug = np.expand_dims(result, axis=0)
augmented = data_aug(result_for_aug).numpy()
# print(augmented.shape)

plt.axis('off')

# вернем результату размерность 3 для отображения
plt.imshow(np.squeeze(augmented));

print("Минимальное и максимальное значение пикселей:", augmented.min(), augmented.max())

Видим, что некоторые каналы из-за усиления контрастности дают выжженные пиксели. Эта особенность будет выправлена следующей операцией - `rescale`, которая при приведении значений к стандарту [-1, 1] также влияет на контраст. В итоге мы получим более менее нормализованную картинку. При желании слой с контрастом, как и другие слои аугментации, всегда можно отключить или изменить параметры, но нас устраивает такой результат.

Итак, обе последовательности вполне успешно справляются с поставленными перед ними задачами. Переходим к созданию и обучению моделей.

## Архитектура, компиляция, обучение модели

`DROPOUT_RATE` назначим небольшим, так как датасет у нас скромный по объему, и, если мы будем исключать значительную часть датасета из обучения, модель просто не сможет обучаться.

Также назначим переменную базовый learning rate - `BASE_LR`, установим значение на 0.01. Мы будем использовать специальный инструмент `callbacks`, который при необходимости будет уменьшать данный параметр непосредственно в ходе обучения.

In [None]:
DROPOUT_RATE = 0.1
BASE_LR = 0.01
REGULARIZER=tf.keras.regularizers.L1L2(l1=0.0, l2=0.5)
EPOCHS=500  # см. callbacks

Также пропишем дополнительную метрику для наших  несбалансированных мультиклассов - PR-AUC с использованием precision и recall. PR-AUC - это площадь под графиком этих двух метрик, позволяет учесть данные по редким классам и получить более объективную картину об эффективности обучения.

In [None]:
curve = 'PR'
name = curve+'_AUC'
AUC = tf.keras.metrics.AUC(curve=curve, multi_label=False, name=name)

metrics = ['accuracy', AUC]

### Callbacks - контроль остановки обучения и `learning_rate`

Так как обучение моделей по компьютерному зрению очень затратно по ресурсам, а мы планируем провести целый ряд различных экспериментов, для снижения нагрузки на систему мы будем использовать так называемые `callbacks` - специальные модули `keras`, которые позволяют в автомтическом режиме контролировать, когда следует вносить те или иные изменения в обучение. Мы будем использовать два вида `callbacks`:

- `EarlyStopping` для ранней остановки обучения при прекращении сокращения `val_loss` (для нашего несбалансированного датасета этот показатель важнее, чем непоказательный для несбалансированных классов `val_accuracy`),

- `ReduceLROnPlateau` для уменьшения `learning rate` при ухудшении показателей `val_loss` (Исследователями обнаружено, что постепенное снижение `learning rate` в процессе обучения положительно сказывается на качестве обучения, что мы увидим на наших экспериментах, см. список использованных источников и литературы Holbrook R. "Intro to Deep Learning").

In [None]:
from tensorflow.keras import callbacks

early_stopping = callbacks.EarlyStopping(
    monitor='val_loss',
    min_delta=0.001,  # минимальное значение для зачета улучшения
    patience=20,  # количество эпох с плохим результатом перед остановкой
    restore_best_weights=True,  # восстановить лучшие показатели модели
)

lr_schedule = callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    patience=1,
    factor=0.5,  # на какое значение будет умножаться текущий lr
    min_lr=1e-8,  # минимальное значение lr
)

Таким образом, наши callbacks создают следующий алгоритм: 
- при ухудшении `val_loss` на данной эпохе следующая эпоха будет прходить с `learning rate` в два раза ниже,
- если `val_loss` ухудшается 15 раз кряду, обучение прекращается.

Все эти параметры можно регулировать и подбирать, для наших задач мы остановимся на этих гиперпараметрах, так как они нас устраивают.

## Визуализация итогов обучения

### Функция потерь и метрики

In [None]:
def val_acc_viz(history, epochs, loss_from=1):
    '''
    Функция отрисовывает историю функции потерь и
    используемых метрик. Аргументы:
    - history - данные истории обучения,
    - epochs - количество эпох для отображения,
    - loss_from - с какой эпохи отображать loss.
    '''
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']

    auc = history.history[name]
    val_auc = history.history['val_' + name]

    loss = history.history['loss'][loss_from-1:]
    val_loss = history.history['val_loss'][loss_from-1:]

    epochs_range = range(1, len(acc)+1)

    sns.set_style('whitegrid')
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 3, 1)
    plt.plot(range(loss_from, len(acc)+1), loss, label='Training Loss')
    plt.plot(range(loss_from, len(acc)+1), val_loss, label='Validation Loss')
    plt.legend(loc='best')
    plt.title('Loss')
    plt.xlabel('epochs')
    # plt.show()

    # plt.figure(figsize=(8, 5))
    # ax1 = plt.subplot(1, 2, 1)
    ax2 = plt.subplot(1, 3, 2)
    plt.plot(epochs_range, acc, label='Training Accuracy')
    plt.plot(epochs_range, val_acc, label='Validation Accuracy')
    plt.legend(loc='best')
    plt.title('Accuracy')
    plt.xlabel('epochs')

    # plt.subplot(1, 2, 2, sharey=ax1)
    plt.subplot(1, 3, 3, sharey=ax2)
    plt.plot(epochs_range, auc, label='Training ' + name)
    plt.plot(epochs_range, val_auc, label='Validation ' + name)
    plt.legend(loc='best')
    plt.title(name)
    plt.xlabel('epochs')
    plt.show()

### Функция матрицы ошибок

In [None]:
def confusion_matrix(model, images_number=None, dataset=test_batches, class_names=class_names):

    '''
    Выводит матрицу ошибок на тестовых данных. 
    
    Аргументы:
    - model - обученная модель.
    - images_number - количество изображений из тестовой выборки;
            если None, то используются все изображения в выборке.
    - dataset - tf.dataset с тестовыми изображениями.
    '''
    
    from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

    
    if images_number is None:
        test_images = dataset
    else:
        test_images = dataset.take(images_number)
        
    y_test, y_pred = ([np.argmax(y) for _, y in test_images], 
                      [np.argmax(x) for x in model.predict(test_images)])
        
    cm = confusion_matrix(y_test, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    disp.plot(cmap='GnBu')
    # disp.plot(cmap='afmhot')

    plt.show()

Для торча.

In [None]:
def confusion_matrix_torch(model, dataset, class_names=class_names):

    '''
    Выводит матрицу ошибок на тестовых данных. 
    
    Аргументы:
    - model - обученная модель.
    - dataset - torch dataloader с тестовыми изображениями.
    '''
    
    from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
        
    y_test, y_pred = ([np.argmax(y) for _, y in dataset], 
                      [np.argmax(x) for x in model.predict(dataset)])
        
    cm = confusion_matrix(y_test, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    disp.plot()
    plt.show()

## Предсказания отдельных изображений

### На изображениях из Сети

При желании качество работы модели можно проверить на реальных фотографиях экспертным методом. Для этого создадим новую папку, куда загрузим желаемые изображения. Далее, подготовленная функция преобразует изображения в необходимый для визуализации и предсказания формат. Оставим эту часть в качестве дополнения, использовать ее пока не будем.

In [None]:
# from PIL import Image
# import numpy as np
# from tensorflow.keras.utils import load_img, img_to_array
# import os

# new_test_dir = Path('./data/test_images')
# Path.mkdir(new_test_dir)

# def show_predictions(model):
    
#     images = os.listdir(new_test_dir)
    
#     for i, img_path in enumerate(images):

#         # путь к тестовым изображениям
#         path = new_test_dir / img_path

#         # приведение изображений к желаемому разрешению
#         img = load_img(path, target_size=IMAGE_SIZE + (3,))
#         # перевод изображения в массив
#         x = img_to_array(img)

#         # добавление четвертого измерения для модели
#         images = np.expand_dims(x, axis=0)
        
#         plt.figure(figsize=(2, 2))
#         # Отключить оси
#         plt.axis(False)
#         plt.imshow(np.squeeze(images).astype('uint8'))
#         # plt.title(img_path)
#         plt.show()

#         # предсказание
#         classes = model.predict(images, batch_size=10)

#         class_pred = np.argmax(classes)
#         # print(np.argmax(class_pred))
#         print(f'{img_path} is {class_names[class_pred]} ({np.max(classes[0])*100:.2f}%)')
        
# show_predictions(model)

### На изображениях из тестовой выборки

Посмотрим на то, как модель предсказывает тестовые изображения, хранящиеся в третьей, тестовой выборке, которая до этого момента не принимала участия в работе. Напомним, что тестовая выбора разбита на батчи по одному изображению, что позволяет нам проще контролировать количество изображений, которое мы хотим использовать для предсказания. Следующая функция покажет изображение, заданное и предсказанное значение класса, а также "уверенность" модели в своем предсказании. На данном этапе посмотрим на небольшое количество изображений (по умолчанию 10, можно изменить это число, задав соответствующий аргумент), а полную картину оценим дальше, построив матрицу ошибок.

In [None]:
import numpy as np
from tensorflow.keras.utils import load_img, img_to_array
import os

def show_predictions(model, image_set=test_batches,
                     images_number=10, visualization=False):

    images = image_set.take(images_number)

    for img, label in images:

        if visualization == True:
            plt.figure(figsize=(2, 2))
            # Отключить оси
            plt.axis(False)
            # оставим три измерения из четырех
            image = np.squeeze(img)
            plt.imshow(image.astype('uint8'))
            plt.show()

        # реальный класс
        label = np.argmax(label)
        # предсказанный класс
        prediction = model.predict(img)
        pred = np.argmax(prediction)

        print(f'{class_names[label]} is {class_names[pred]} ({np.max(prediction[0])*100:.2f}%)')

## Оптимизация работы кэша

Оптимизируем работу кэша следующим кодом:

In [None]:
train_batches = train_batches.cache().prefetch(buffer_size=1)
validation_batches = validation_batches.cache().prefetch(buffer_size=1)

# <b>Эксперимент 0 | Оптимизаторы и лосс</b>

In [None]:
stat_0 = dict()

## <b>Модель 0.5 | RMSprop, MSE</b>

### Архитектура модели

In [None]:
strides = 2

model = tf.keras.Sequential([

            rescale,
            data_aug,

            tf.keras.layers.Conv2D(32, (3, 3), padding="same",
                               strides=2, activation='relu'),
            tf.keras.layers.MaxPooling2D((2, 2)),
            tf.keras.layers.Conv2D(64, (3, 3), padding="same",
                               strides=strides, activation='relu'),
            tf.keras.layers.MaxPooling2D((2, 2)),
            tf.keras.layers.Conv2D(64, (3, 3), padding="same",
                               strides=strides, activation='relu'),
            tf.keras.layers.MaxPooling2D((2, 2)),
            tf.keras.layers.Conv2D(32, (3, 3), padding="same",
                               strides=strides, activation='relu'),

            tf.keras.layers.Flatten(),
    
            tf.keras.layers.Dropout(DROPOUT_RATE),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(64, activation='relu'),
    
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(num_classes, activation='softmax',
                                 kernel_regularizer=REGULARIZER
                                 )

], name='RMSprop_mse')

model.summary()

In [None]:
len(model.trainable_variables)

### Компиляция

In [None]:
model.compile(optimizer=tf.keras.optimizers.RMSprop(learning_rate=BASE_LR),
              loss='mse',
              metrics=metrics)

### Обучение модели

In [None]:
hist = model.fit(train_batches,
                 epochs=EPOCHS,
                 validation_data=validation_batches,
                 callbacks=[early_stopping, lr_schedule])

### Визуализация данных по работе алгоритма

In [None]:
val_acc_viz(hist, len(hist.history['accuracy']))

### Предсказание модели

In [None]:
show_predictions(model, visualization=False)

### Матрица ошибок

In [None]:
confusion_matrix(model)

### Вывод

In [None]:
val_acc_best = np.max(hist.history['val_accuracy'])
val_auc_best = np.max(hist.history['val_PR_AUC'])
model_best_results = round(val_acc_best, 2), round(val_auc_best, 2)

print(f'val_accuracy: {model_best_results[0]}')
print(f'val_PR_AUC: {model_best_results[1]}')

In [None]:
# сохраним данные о модели
model_05 = model
hist_05 = hist

In [None]:
stat_0['RMSprop_mse'] = model_best_results[1]

In [None]:
stat_0

# <b>МОДЕЛЬ 1</b> | `Include_top`

## Модуль `tf.keras.applications` | `feature_extractor_layer`

Создадим `feature_extractor`, который извлечет веса из предобученной модели `tensorflow` для переноса обучения на нашу модель. При переносе обучения рекомендуется отключать самые "верхние", то есть последние слои предобученной модели: именно в них происходит обучение на данную классификацию. Однако для начала мы проверим, как будут обучаться наши модели на полной предобученной модели `Mobile Net V2`.

Во втором блоке экспериментов мы отключим верхние слои, а в третьем пойдем еще дальше и разморозим часть слоев предобученной модели. За "разморозку" слоев у нас будет отвечать параметр `do_fine_tuning`, который может принимать только булевы значения (в первых двух блоках он установлен в режим `False`).

Итак, приступим к первому блоку экспериментов, используем предобученную модель "как есть". 

In [None]:
do_fine_tuning = False
feature_extractor = do_fine_tuning = False
feature_extractor = tf.keras.applications.InceptionResNetV2(
                                                include_top=True,
                                                weights="imagenet",
                                                # input_tensor=None,
                                                input_shape=IMG_SHAPE,
                                                # pooling=None,
                                                # classes=1000,
                                                classifier_activation=None,
                                                # **kwargs
                                                )
feature_extractor.trainable = do_fine_tuning

print(f'Разморозка слоев: {do_fine_tuning}.')

## <b>Модель 1.3</b> | Dropout и BatchNormalization layers

Доработаем предыдущую модель, добавив в ее архитектуру аугментацию данных и слои батч-нормализации и `dropout`. Это позволит нам отложить момент переобучения.

### Архитектура модели

In [None]:
print(f'Строим модель на базе {MODULE_HANDLE}.')
print(f'Разморозка слоев: {do_fine_tuning}.\n')

model = tf.keras.Sequential([

            rescale,
            data_aug,

            feature_extractor,

#             tf.keras.layers.Dropout(DROPOUT_RATE),
#             tf.keras.layers.BatchNormalization(),
#             tf.keras.layers.Dense(512, activation='relu'),

#             tf.keras.layers.Dropout(DROPOUT_RATE),
#             tf.keras.layers.BatchNormalization(),
#             tf.keras.layers.Dense(1024, activation='relu'),

#             tf.keras.layers.Dropout(DROPOUT_RATE),
#             tf.keras.layers.BatchNormalization(),
#             tf.keras.layers.Dense(1024, activation='relu'),

#             tf.keras.layers.Dropout(DROPOUT_RATE),
#             tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(num_classes, activation='softmax')

], name='include_top')

model.build(input_shape=(None,) + IMAGE_SIZE + (3,))
model.summary()

In [None]:
len(model.trainable_variables)

### Компиляция

In [None]:
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=BASE_LR),
              loss=tf.keras.losses.CategoricalCrossentropy(),
              metrics=metrics)

### Обучение модели

In [None]:
hist = model.fit(train_batches,
                 epochs=EPOCHS,
                 validation_data=validation_batches,
                 callbacks=[early_stopping, lr_schedule]
                )

### Визуализация данных по работе алгоритма

In [None]:
val_acc_viz(hist, len(hist.history['accuracy']))

### Предсказание модели

In [None]:
show_predictions(model)

### Матрица ошибок

In [None]:
confusion_matrix(model)

### Вывод

Модель очень быстро переобучилась, практически с первых эпох, при этом и ее предсказательная способность значительно выросла. Анализ матрицы ошибок показывает, что лучше всего модель предсказывает класс `neutral`, что неудивительно, так как именно в данном классе у нас больше всего изображений. Зафиксируем лучший показатель `val_accuracy` и сохраним данные по результатам работы модели в отдельные переменные на случай, если потребуется к ним снова обратиться.

In [None]:
val_acc_best = np.max(hist.history['val_accuracy'])
model_1_3_best_result = round(val_acc_best, 2)
model_1_3_best_result

In [None]:
# сохраним данные о модели
model_1_3 = model
hist_1_3 = hist

# <b>МОДЕЛЬ 3</b> | `fine_tuning`

# <b>Модель 3.1</b> | Pretrained baseline

## Модуль `tf.keras.applications` | `feature_extractor_layer`

Попробуем предолеть проблему переобучения, добавив к предыдущей архитектуре слои с аугментацией данных, которые мы подготовили заранее. Другие параметры оставим нетронутыми. Обратим внимание, что мы создаем новую модель, а не дообучаем предыдущую (дообучением мы займемся позже).

In [None]:
# do_fine_tuning = False
# feature_extractor = tf.keras.applications.MobileNetV2(input_shape=IMG_SHAPE,
#                                                include_top=False,
#                                                weights='imagenet')
# feature_extractor.trainable = do_fine_tuning

In [None]:
# do_fine_tuning = False
# feature_extractor = tf.keras.applications.MobileNet(
#                                                 input_shape=IMG_SHAPE,
#                                                 # alpha=1.0,
#                                                 # depth_multiplier=1,
#                                                 dropout=DROPOUT_RATE,
#                                                 include_top=False,
#                                                 weights="imagenet",
#                                                 # input_tensor=None,
#                                                 # pooling=None,
#                                                 # classes=1000,
#                                                 classifier_activation='softmax',
#                                                 # **kwargs
# )
# feature_extractor.trainable = do_fine_tuning

In [None]:
# do_fine_tuning = False
# feature_extractor = tf.keras.applications.InceptionV3(
#                                                 include_top=False,
#                                                 weights="imagenet",
#                                                 # input_tensor=None,
#                                                 input_shape=IMG_SHAPE,
#                                                 # pooling=None,
#                                                 # classes=1000,
#                                                 # classifier_activation="softmax",
# )
# feature_extractor.trainable = do_fine_tuning

In [None]:
do_fine_tuning = False
feature_extractor = tf.keras.applications.InceptionResNetV2(
                                                include_top=False,
                                                weights="imagenet",
                                                # input_tensor=None,
                                                input_shape=IMG_SHAPE,
                                                # pooling=None,
                                                # classes=1000,
                                                # classifier_activation="softmax",
                                                # **kwargs
)
feature_extractor.trainable = do_fine_tuning

In [None]:
# feature_extractor.summary()

## Архитектура модели

In [None]:
image_batch, label_batch = next(iter(train_batches))
feature_batch = feature_extractor(image_batch)
print(feature_batch.shape)

In [None]:
global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
feature_batch_average = global_average_layer(feature_batch)
print(feature_batch_average.shape)

In [None]:
prediction_layer = tf.keras.layers.Dense(num_classes, activation='softmax',
                                  kernel_regularizer=REGULARIZER
                                 )
prediction_batch = prediction_layer(feature_batch_average)
print(prediction_batch.shape)

In [None]:
inputs = tf.keras.Input(shape=IMG_SHAPE)
x = rescale(inputs)
x = data_aug(x)
x = feature_extractor(x, training=False)
x = global_average_layer(x)
x = tf.keras.layers.Dropout(DROPOUT_RATE)(x)
outputs = prediction_layer(x)
model = tf.keras.Model(inputs, outputs)

model.summary()

In [None]:
# print(f'Строим модель на базе {MODULE_HANDLE}.')
# print(f'Разморозка слоев: {do_fine_tuning}.\n')

# strides = 1
# REGULARIZER=tf.keras.regularizers.L1L2(l1=0.0, l2=0.5)

# model = tf.keras.Sequential([

#             rescale,
#             data_aug,

#             feature_extractor,

#             # tf.keras.layers.Conv2D(32, (3, 3), padding="same",
#             #                    strides=strides, activation='relu'),
#             # tf.keras.layers.MaxPooling2D((2, 2)),
#             # tf.keras.layers.Conv2D(64, (3, 3), padding="same",
#             #                    strides=strides, activation='relu'),

#             tf.keras.layers.Flatten(),
#             tf.keras.layers.Dropout(DROPOUT_RATE),
#             tf.keras.layers.BatchNormalization(),
#             tf.keras.layers.Dense(num_classes, activation='softmax',
#                                   kernel_regularizer=REGULARIZER
#                                  )

# ], name='pretrained_baseline')

# model.summary()

In [None]:
len(model.trainable_variables)

## Компиляция

In [None]:
# model.compile(optimizer=tf.keras.optimizers.RMSprop(learning_rate=BASE_LR),
#               loss='mse',
#               metrics=metrics)

In [None]:
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=BASE_LR),
              loss='categorical_crossentropy',
              metrics=metrics)

## Обучение модели

In [None]:
hist = model.fit(train_batches,
                 epochs=10,
                 validation_data=validation_batches,
                 callbacks=[early_stopping, lr_schedule])

### Визуализация данных по работе алгоритма

In [None]:
val_acc_viz(hist, len(hist.history['accuracy']))

### Предсказание модели

In [None]:
show_predictions(model, visualization=False)

### Матрица ошибок

In [None]:
confusion_matrix(model)

### Вывод

In [None]:
val_acc_best = np.max(hist.history['val_accuracy'])
val_auc_best = np.max(hist.history['val_PR_AUC'])
model_best_results = round(val_acc_best, 2), round(val_auc_best, 2)

print(f'val_accuracy: {model_best_results[0]}')
print(f'val_PR_AUC: {model_best_results[1]}')

In [None]:
# сохраним данные о модели
model_pretrained = model
hist_pretrained = hist

In [None]:
# stat_3['SGD 1_layer_1024'] = model_best_results[1]

In [None]:
# stat_3

## Разморозка слоев

In [None]:
do_fine_tuning = True
feature_extractor.trainable = do_fine_tuning

print(f'Разморозка слоев: {do_fine_tuning}.')

Посмотрим на количество слоев в предобученной модели:

In [None]:
# с какого слоя мы размораживаем модель
fine_tune_at = 600

# Заморозим все остальные слои
for layer in feature_extractor.layers[:fine_tune_at+1]:
    layer.trainable = False

In [None]:
print("Number of layers in the feature extractor: ", len(feature_extractor.layers))
print("Trainable layers in the feature extractor: ", len(feature_extractor.trainable_variables))

## Компиляция

Теперь необходимо скомпилировать нашу уже обученную на предыдущем этапе модель, чтобы изменения были применены.

In [None]:
# model.compile(optimizer=tf.keras.optimizers.RMSprop(),
#               loss='mse',
#               metrics=metrics)

In [None]:
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=BASE_LR),
              loss='categorical_crossentropy',
              metrics=metrics)

## Архитектура модели

In [None]:
model.summary()

In [None]:
len(model.trainable_variables)

## Обучение

In [None]:
hist = model.fit(train_batches,
                 epochs=EPOCHS,
                 validation_data=validation_batches,
                 callbacks=[early_stopping, lr_schedule])

### Визуализация данных по работе алгоритма

In [None]:
val_acc_viz(hist, len(hist.history['accuracy']))

### Предсказание модели

In [None]:
show_predictions(model, visualization=False)

### Матрица ошибок

In [None]:
confusion_matrix(model)

### Вывод

In [None]:
val_acc_best = np.max(hist.history['val_accuracy'])
val_auc_best = np.max(hist.history['val_PR_AUC'])
model_best_results = round(val_acc_best, 2), round(val_auc_best, 2)

print(f'val_accuracy: {model_best_results[0]}')
print(f'val_PR_AUC: {model_best_results[1]}')

In [None]:
# сохраним данные о модели
model_32 = model
hist_32 = hist

In [None]:
stat_3['SGD 1_layer_1024'] = model_best_results[1]

In [None]:
stat_3

# <b>Использованные источники и литература</b>

**Курсы**

1. Holbrook R. Intro to Deep Learning // https://www.kaggle.com/learn/intro-to-deep-learning
1. Moroney L. Device-based Models with TensorFlow Lite // https://www.coursera.org/learn/device-based-models-tensorflow

**Статьи**
1. Confusion Matrix // https://scikit-learn.org/stable/modules/generated/sklearn.metrics.confusion_matrix.html 
1. Load and preprocess images // https://www.tensorflow.org/tutorials/load_data/images
1. Module: tf.data.Dataset // https://www.tensorflow.org/api_docs/python/tf/data/Dataset
1. Module: tf.keras.applications.mobilenet_v2 // https://www.tensorflow.org/api_docs/python/tf/keras/applications/mobilenet_v
1. Transfer learning and fine-tuning // https://www.tensorflow.org/tutorials/images/transfer_learning
1. Transfer learning and fine-tuning // https://keras.io/guides/transfer_learning/
1. Lakhani N.D. Statistical Evaluation Metrics // https://iust-projects.ir/post/minidm01/

**Форумы**
1. StackOverflow // https://stackoverflow.com/
1. GitHub // https://github.com/