In [1]:
# TRAINING OF THREE CNNS WITH RESNET50_V2 ARCHITECTURE, ON THE SAME SET OF IMAGES 

from tensorflow.python.keras.applications.resnet_v2 import preprocess_input, ResNet50V2
from tensorflow.python.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from tensorflow.python.keras.models import Sequential, save_model, load_model, optimizers
from tensorflow.python.keras.layers import Activation, Dense, Flatten, GlobalAveragePooling2D 
from tensorflow.keras.optimizers import SGD 
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from tensorflow import keras

from sklearn.preprocessing import LabelBinarizer

from glob import glob

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np

import os, pathlib

# load ResNet50_V2
resnet = ResNet50V2(include_top=False, pooling="avg", weights='imagenet')

print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))
print("Built with CUDA:",tf.test.is_built_with_cuda())
print("Tensorflow version:",tf.__version__)



In [2]:
# SET FILE PATHS AND VARIABLES AND DEFINE FUNCTIONS

# directory with images and for output except weights
base_path = "working_directory/"

# the images for each CNN need to be named and sorted as follows:\n"
#  "working_directory/IMAGES_by_family/"
#  "working_directory/IMAGES_by_family/train/"
#  "working_directory/IMAGES_by_family/train/FamilyName_1/"   <- replace "FamilyName_1" by the actual family name
#  "working_directory/IMAGES_by_family/train/FamilyName_1/img1.jpg
#  "working_directory/IMAGES_by_family/train/FamilyName_1/img2.jpg
#  "working_directory/IMAGES_by_family/train/FamilyName_1/img3.jpg

#  "working_directory/IMAGES_by_family/
#  "working_directory/IMAGES_by_family/train/
#  "working_directory/IMAGES_by_family/train/FamilyName_2/" <- replace "FamilyName_2" by the next family name
#  "working_directory/IMAGES_by_family/train/FamilyName_2/img1.jpg"
#  "working_directory/IMAGES_by_family/train/FamilyName_2/img2.jpg"
#  "working_directory/IMAGES_by_family/train/FamilyName_2/img3.jpg"

#  the same structure is needed for:"
#  "working_directory/IMAGES_by_family/validation/"
#  "working_directory/IMAGES_by_family/test/"

# the same as above is also needed for 'order' and 'clade':
#  "working_directory/IMAGES_by_order/"
#  "working_directory/IMAGES_by_order/train/OrderName_1" etc.
#  "working_directory/IMAGES_by_clade/"
#  "working_directory/IMAGES_by_clade/train/CladeName_1" etc.

# directory for the weights (specify directory other than 'base_path' if desired)
for_weights = base_path

# the taxonomic level and their hyperparameters
tls = ["family", "order", "clade"]
lrs = [0.01, 0.1, 0.01]
dcs = [0.001, 0.01, 0.001]
mms = [0.9, 0, 0.5]

# other variables
batch_size = 16
image_size = 224
no_epochs = 600

# callback
class CustomCallback(keras.callbacks.Callback):
    def __init__(self, directory):
        super().__init__()
        self.directory = directory
        
    def on_epoch_end(self, epoch, logs=None):
        # save model weight
        self.model.save_weights(self.directory + "weights_epoch_{:03d}.hdf5".format(epoch))


In [3]:
# THE FULL TRAINING PROGRAM

for taxonomic_level, lr, decay, momentum in zip(tls, lrs, dcs, mms):
    
    # create directory for weights
    weights_filepath = for_weights + "weights_" + taxonomic_level + "/"
    os.mkdir(weights_filepath)
    
    #_________________________________________
    # EXAMINE IMAGES AND CLASS NAMES
    print("Assessing images for", taxonomic_level)
    
    # path to directory with images
    data_path = base_path + 'IMAGES_by_' + taxonomic_level + '/'

    # the file to write class names into
    classnames_filename = base_path + "classnames_" + taxonomic_level + ".txt"
    
    # examine TRAIN images
    train_path = data_path + 'train'
    train_dir = pathlib.Path(train_path)
    train_image_count = len(list(pathlib.Path(train_dir).glob('*/*.jpg')))
    print('Found',train_image_count,'TRAINING images.')

    # examine VALIDATION images
    val_path = data_path + 'validation'
    val_dir = pathlib.Path(val_path)
    val_image_count = len(list(pathlib.Path(val_dir).glob('*/*.jpg')))
    print('Found',val_image_count,'VALIDATION images.')

    # examine TEST images
    test_path = data_path + 'test'
    test_dir = pathlib.Path(test_path)
    test_image_count = len(list(pathlib.Path(test_dir).glob('*/*.jpg')))
    print('Found',test_image_count,'TEST images.')

    # get class names
    class_names = np.array([item.name for item in train_dir.glob('*')])
    num_classes = len(class_names)
    print('Found',num_classes,'CLASSES:',class_names)

    # create bivalve class_name decoder
    class_name_list=np.ndarray.tolist(class_names)
    keys=range(len(class_name_list))
    decode_bivalves=dict(zip(keys, class_name_list))

    # write the class_names into a file
    with open(classnames_filename, "w") as outfile:
        outfile.write("\n".join(class_name_list))

    print(f"\nSaved class names and created decoder for", taxonomic_level + ".\n")

    #_________________________________________
    # SET UP DATA GENERATORS
    # train data generator
    data_generator_train = ImageDataGenerator(    
        preprocessing_function=preprocess_input,
        rotation_range=30,
        width_shift_range=0.2,
        height_shift_range=0.2,
        shear_range=0.1,
        zoom_range=0.2,
        channel_shift_range=0.3)  #, horizontal_flip=True, vertical_flip=True)

    train_generator = data_generator_train.flow_from_directory(
        train_path,
        target_size=(image_size, image_size),
        batch_size=batch_size,
        class_mode='categorical')

    # validation data generator
    data_generator_val = ImageDataGenerator(preprocessing_function=preprocess_input)

    validation_generator = data_generator_val.flow_from_directory(
        val_path,
        target_size=(image_size, image_size),
        class_mode='categorical')
    
    # test data generator
    data_generator = ImageDataGenerator(preprocessing_function=preprocess_input)

    test_data_loader = data_generator.flow_from_directory(
        test_path,
        target_size=(image_size, image_size),
        batch_size=test_image_count,
        class_mode='categorical')

    # load test images
    print(f'\nNow loading TEST images with img_size = {image_size}...')
    test_images, test_labels = next(test_data_loader)
    test_images = test_images.astype('float32')
    print('...done.')

    #_____________________________________________________________________________________
    # CALCULATE CLASS WEIGHTS based on the number of images in each subfolder in TRAIN
    # get number of images in each folder of TRAIN (=train images per class)
    images_per_dir = [len(files) for r, d, files in os.walk(train_path)]
    images_per_dir.pop(0)           # remove first element (= the main directory)
    no_dirs = len(images_per_dir)   # should be the same as num_classes
    total_images = sum(images_per_dir)
    weights_per_dir = [total_images/x for x in images_per_dir]

    # put them into class_weights dictionary
    class_weights = dict(zip(range(no_dirs), weights_per_dir))
    print(f'Calculated class weights for {no_dirs} classes based on {total_images} images in TRAIN.\n')

    #_________________________________________
    # COMPILE MODEL
    # re-create resnet's output layer with 'num_classes'
    for layer in resnet.layers:
        layer.trainable=False

    logits = Dense(num_classes)(resnet.layers[-1].output)
    output = Activation('softmax')(logits)
    bivalve_model = Model(resnet.input, output)

    # create optimizer and compile model
    opt=SGD(lr=lr, decay=decay, momentum=momentum)
    bivalve_model.compile(optimizer=opt, loss = "categorical_crossentropy", metrics=["accuracy"])

    print(taxonomic_level, "model compiled.")
    
    #_________________________________________
    # TRAINING
    # print training parameters
    print(f'\nNow training for {no_epochs} epochs, with image size = {image_size}, batch size = {batch_size},')
    print(f'  lr = {lr}, decay = {decay} and momentum = {momentum}\n')

    # Fit Model
    history = bivalve_model.fit(
        train_generator,
        epochs=no_epochs,
        validation_data=validation_generator,
        class_weight=class_weights,
        callbacks=[CustomCallback(
            directory=weights_filepath)])

    # print results of training
    print(f"\n----------------------------------------------------------------------")
    print(f"Training results:")
    print(' Mean val_accuracy:', np.mean(history.history['val_accuracy']))
    print(' Maximum val_accuracy:',max(history.history['val_accuracy']))

    # convert the history.history dict to a pandas DataFrame:     
    hist_df = pd.DataFrame(history.history) 

    # and save to csv:
    with open(base_path + taxonomic_level + '_history.csv', mode='w') as f:
        hist_df.to_csv(f, index=False)

    # print scores on TEST images
    scores = bivalve_model.evaluate(test_images, test_labels, batch_size = test_image_count, verbose=0)
    print(f' Scores for {test_image_count} test images: {bivalve_model.metrics_names[0]} of {"{:.2f}".format(scores[0])}; {bivalve_model.metrics_names[1]} of {"{:.2f}".format(scores[1]*100)}%')
    print("----------------------------------------------------------------------")

    #_________________________________________
    # PLOT TRAINING HISTORY
    # Plot training & validation accuracy values
    print("Training history:")

    plt.subplot(1,2,1)
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validate'], loc='upper left')

    # Plot training & validation loss values
    plt.subplot(1,2,2)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validate'], loc='upper left')
    plt.tight_layout()
    plt.show()
