## imports

In [1]:
import glob
import os
import pickle
import sys
import itertools
from math import ceil

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Patch

import sklearn

import tensorflow as tf
import tensorflow_addons as tfa
import keras

from tensorflow.keras.preprocessing import image_dataset_from_directory
from keras.preprocessing.image import ImageDataGenerator
from keras.applications.vgg19 import VGG19 
from keras.applications.vgg16 import VGG16
from keras.applications.inception_resnet_v2 import InceptionResNetV2
from keras.applications.densenet import DenseNet201

from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix, classification_report

print('python-' + sys.version)
print('tensorflow-' + tf.__version__)
print('sklearn-' + sklearn.__version__)

python-3.10.4 (main, Mar 31 2022, 03:38:35) [Clang 12.0.0 ]
tensorflow-2.8.0
sklearn-1.1.0


In [None]:
# TODO: define varialbes in a config file
# import config as cfg

## Vars

In [2]:
DATA_DIR = "/Users/kendra/Data"
PROJECT_DATA_FOLDER = "Pepsico RnD Potato Lab Dataset"
PROJECT_DATA_DIR = os.path.join(DATA_DIR, PROJECT_DATA_FOLDER)
TRAIN_DIR = os.path.join(PROJECT_DATA_DIR, "Train")
TEST_DIR = os.path.join(PROJECT_DATA_DIR, "Test")

In [9]:
PROJECT_DIR = "."
MODEL_SAVE_DIR = os.path.join(PROJECT_DIR, "models")
os.makedirs(MODEL_SAVE_DIR, exist_ok=True)

In [3]:
target_img_size = (500,500)
NUM_CLASSES = 2

input_shape = target_img_size + (3,)

# base search

## data augmentation

In [5]:
data_augmentation = ImageDataGenerator(
    rotation_range=2,
    shear_range=2,
    vertical_flip=True,
    fill_mode='nearest',
    width_shift_range=0.05,
    height_shift_range=0.05,
    validation_split=0.25
)

## model function

In [21]:
# here, data is balanced, so we can use accuracy &/or cross entropy
METRICS = [
    # these needed to be updated from tensorflow verion 2.4
    tf.keras.metrics.binary_accuracy, #keras.metrics.BinaryAccuracy(),
    tf.keras.metrics.binary_crossentropy, #keras.metrics.BinaryCrossentropy(),
          ]
LOSS = tf.keras.losses.binary_crossentropy #keras.losses.BinaryCrossentropy

def get_model(Model,
              dropout_rate,
              learn_rate,
              metrics=METRICS
             ):
    # Taken from https://keras.io/guides/transfer_learning/#transfer-learning-amp-finetuning
    
    # define our base model
    base_model = Model(
        weights="imagenet",
        input_shape=input_shape,
        include_top=False
    )

    # Freeze it
    base_model.trainable = False

    # create new model on top
    inputs = keras.Input(shape=input_shape)
    x = base_model(inputs, training=False)
    x = keras.layers.GlobalAveragePooling2D()(x)
    x = keras.layers.Dropout(dropout_rate)(x)
    outputs = keras.layers.Dense(NUM_CLASSES, activation='softmax')(x)

    model = keras.Model(inputs, outputs)
    
    # could try to tune the Optimizer chosen
    opt = tf.keras.optimizers.Adam(learning_rate=learn_rate)

    model.compile(
        optimizer=opt, 
        loss=LOSS,
        metrics=metrics
    )
    
    return model

## define parameter grid

In [22]:
# grid search
model_dict = {
    'vgg19': VGG19, 
    'vgg16': VGG16, 
    'inception_resnet': InceptionResNetV2, 
    'densenet': DenseNet201
}

models = model_dict.keys()
dropout_rates =  [0.1, 0.2] #, 0.5] 
learn_rates = [0.01, 0.001] #, 0.0001]
batch_sizes = [16, 32] 
# default_bs = 32

param_dict = dict(
    Model=[model_dict[model] for model in models],
    dropout_rate=dropout_rates,
    learn_rate=learn_rates, 
    batch_size=batch_sizes
)

param_keys = list(param_dict.keys()) 
param_list = list(itertools.product(*(param_dict[key] for key in param_keys)))
param_grid = [{param_keys[i]: x[i] for i in range(len(param_keys))} for x in param_list]

len(param_grid)

32

In [15]:
param_grid[0]

{'Model': <function keras.applications.vgg19.VGG19(include_top=True, weights='imagenet', input_tensor=None, input_shape=None, pooling=None, classes=1000, classifier_activation='softmax')>,
 'dropout_rate': 0.1,
 'learn_rate': 0.01,
 'batch_size': 16}

## define early stopping callback

In [11]:
num_epochs = 50

early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    mode='min',
    verbose=1,
    patience=5,
    restore_best_weights=True
)

## loop over grid params

In [None]:
grid_results = []

for i, params in enumerate(param_grid):
#     if i != 0:
#         break
    print(f"{i+1}: {params}")
    
    if 'batch_size' in params:
        bs = params.pop('batch_size')
    else:
        bs = default_bs
        
    model = get_model(**params)
    
    # must define train & val data generators inside loop to enable tuning of batch size
    train_gen = data_augmentation.flow_from_directory(
        TRAIN_DIR,
        subset="training",
        class_mode="categorical",
        batch_size=bs,
        seed=19,
    )

    val_gen = data_augmentation.flow_from_directory(
        TRAIN_DIR,
        subset="validation",
        class_mode="categorical",
        batch_size=bs,
        seed=19,
    )

    classes = list(train_gen.class_indices.keys())

    history = model.fit(
        train_gen,
        epochs=num_epochs,
        validation_data=val_gen,
        callbacks=[early_stopping],
        verbose=0
    )
    
    # save results
    grid_results.append([history, model])
    file_name = f'model_{i:02}'
    model.save(os.path.join(MODEL_SAVE_DIR, file_name))
    
#     print("")
#     # Note: Here we print out last epoch's metrics, even though we restore best weights.
#     # This is just for quick monitoring of grid search results.
#     print(f"Train score: {history.history['accuracy'][-1]:.4f}")
#     print(f"Val score  : {history.history['val_accuracy'][-1]:.4f}")
#     print("---------------------------")
#     print("")
    
    print("")
    # Here we get the best scores
    train_scores = history.history['accuracy']
    val_scores = history.history['val_accuracy']
    print(f"Train acc: {max(train_scores):.4f}")
    print(f"Val acc  : {max(val_scores):.4f}")
    print("---------------------------")
    print("")

1: {'Model': <function VGG19 at 0x7fd5ed35c9d0>, 'dropout_rate': 0.1, 'learn_rate': 0.01}
Found 577 images belonging to 2 classes.
Found 192 images belonging to 2 classes.



## plot results

In [None]:
val_scores = [np.max(x[0].history['val_accuracy']) for x in grid_results]

base_colors = ['orange', 'blue', 'green', 'pink']
colors = [[x]*9 for x in base_colors]
colors = [y for x in colors for y in x]

model_names = [x.__name__ for x in models]
handler = [Patch(facecolor=base_colors[i], label=model_names[i]) for i in range(len(model_names))]

w = 0.6
plt.figure(figsize=(16,5))
plt.bar(xs, val_scores, width=w, color=colors)
plt.ylabel("Val score (accuracy)")
plt.xlabel("Parameter set")
plt.legend(handles=handler, loc=(0.55, 0.75));

## inspect best

In [None]:
val_score_best = np.max(val_scores)
print(val_score_best)
best_args = [i for i, x in enumerate(val_score_best) if x == val_score_best]
len(best_args)

In [None]:
best_historys = [grid_results[idx][0] for idx in best_args]
best_models = [grid_results[idx][1] for idx in best_args]
best_params = [param_grid[idx] for idx in best_args]

In [None]:
for i, best_history in enumerate(best_historys):
    plt.figure()
    plt.plot(best_history.history['accuracy'])
    plt.plot(best_history.history['val_accuracy'])
    plt.ylabel('F1-score')
    plt.xlabel("epoch")
    plt.legend(['Train', 'Val']);
    params = best_params[i]
    plt.title(f"{params['Model'].__name__}, dropout: {params['dropout_rate']}, learn rate: {params['learn_rate']}")


## misc

Save label dict:

In [None]:
class_dict = train_gen.class_indices
rev_class_dict = {val:key for key,val in class_dict.items()}
rev_class_dict

In [None]:
LABEL_DICT_PATH = os.path.join(MODEL_SAVE_DIR, 'class-labels.pkl')

with open(LABEL_DICT_PATH, 'wb') as file:
    pickle.dump(rev_class_dict, file)

# moar tune?

Could refine tuning to hone in on other parameters near best model(s)'s parameters

# Eval best

## vs. test

In [None]:
test_batch_size = 12

# load test data, get labels
test_data = image_dataset_from_directory(
    directory = TEST_DIR,
    image_size=target_img_size,
    color_mode='rgb',
    batch_size=test_batch_size,
    labels='inferred',
    label_mode='categorical',
    shuffle=False
)

class_names = test_data.class_names
num_test = len(test_data.file_paths)

y_test = []
for data, labels in test_data.take(ceil(num_test/test_batch_size)):
    batch_labels = [class_names[np.argmax(x)] for x in labels]
    y_test.extend(batch_labels)

In [None]:
# evaluate model against test data
i = 0
m = best_models[i]
params = best_params[i]
print(f"{params['Model'].__name__}, dropout: {params['dropout_rate']}, learn rate: {params['learn_rate']}")
test_eval = m.evaluate(test_data, return_dict=True)
test_eval

In [None]:
raw_preds = m.predict(test_data)
pred_idxs = np.argmax(raw_preds, axis=1)
y_pred = [rev_class_dict[x] for x in pred_idxs]

print(classification_report(y_test, y_pred))

conf_mtx = confusion_matrix(y_test, y_pred, labels=classes)
plt.figure(figsize=(6,4))
sns.heatmap(conf_mtx, annot=True, cmap='gray_r', xticklabels=classes, yticklabels=classes)
plt.xlabel("Predicted")
plt.ylabel("True");
plt.title("Hold-out data predictions");

# Summary

**TODO**: refine this part

In [None]:
i = 0
m = best_models[i]
final_params = best_params[i]
print(f"""{final_params['Model'].__name__}, 
      dropout: {final_params['dropout_rate']}, 
      learn rate: {final_params['learn_rate']}""")
test_eval = m.evaluate(test_data, return_dict=True)
test_eval

In [None]:
final_params #= {
#     'Model': VGG19,
#     'dropout_rate': 0.1,
#     'learn_rate': 0.0001
# }

In [None]:
def save_text_file(obj, file_path):
    with open(file_path, "w") as file:
        file.write(str(obj))

In [None]:
from importlib import reload

In [None]:
reload(cfg)

In [None]:
cfg.MODEL_PARAM_PATH

In [None]:
save_text_file(final_params, cfg.MODEL_PARAM_PATH)

In [None]:
import ast

In [None]:
def read_text_file(file_path, obj_type='list'):
    with open(file_path, "r") as file:
        obj = file.read()
    
    if obj_type == 'dict':
        return ast.literal_eval(obj)
    else:
        return obj

In [None]:
check = read_text_file(cfg.MODEL_PARAM_PATH, obj_type='dict')

In [None]:
final_params = {
#     'Model': VGG19,
    'dropout_rate': 0.1,
    'learn_rate': 0.0001
}

In [None]:
save_text_file(final_params, cfg.MODEL_PARAM_PATH)

In [None]:
check = read_text_file(cfg.MODEL_PARAM_PATH, obj_type='dict')

In [None]:
check

In [None]:
# save model params
# u.save_pickle_file(final_params, cfg.MODEL_PARAM_PATH)
