In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

In [None]:
#userDirectory = "/content/drive/MyDrive/CS465/CS465_final_project/"
userDirectory = "C:/Users/seanm/OneDrive - MNSCU/Fall 2021/CS 465/CS465_final_project"

In [None]:
#userDirectory = "/content/drive/MyDrive/CNN/"

#Import Modules

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
from glob import glob
import tensorflow as tf
import matplotlib.pyplot as plt

from tensorflow.keras import layers
from tensorflow.keras.layers import *
from tensorflow.keras import regularizers
from tensorflow.keras.models import Sequential
from tensorflow.keras.applications import Xception, DenseNet201, VGG19, ResNet50
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split

#Helper Functions

In [None]:
def build_model(size, num_classes, model_type, aug_input):
    import tensorflow as tf
    from tensorflow.keras import layers, regularizers
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.applications import Xception
    from tensorflow.keras.layers import Input
    from tensorflow.keras.layers import RandomFlip, RandomZoom, RandomContrast
    from tensorflow.keras.layers import Dropout, BatchNormalization, GlobalAveragePooling2D, Dense, Flatten

    inputs = Input((size, size, 3))

    ##Data augmentation layers
    data_augmentation = Sequential(
       [
         RandomFlip("horizontal"),
         RandomZoom(0.1),
         RandomContrast(0.2)
       ]
    )
    
    x = tf.cast(inputs, tf.float32)
    if(model_type == "resnet50"):
        x = tf.keras.applications.resnet50.preprocess_input(x)
        backbone = ResNet50(input_tensor=x, include_top=False, weights="imagenet")
    elif(model_type == "xception"):
        x = tf.keras.applications.xception.preprocess_input(x)
        backbone = Xception(input_tensor=x, include_top=False, weights="imagenet")
    elif(model_type == "densnet"):
        x = tf.keras.applications.densenet.preprocess_input(x)
        backbone = DenseNet201(input_tensor=x, include_top=False, weights="imagenet")
    elif(model_type == "vgg19"):
        x = tf.keras.applications.vgg16.preprocess_input(x)
        backbone = VGG19(input_tensor=x, include_top=False, weights="imagenet")
        
    if(aug_input):
        x = data_augmentation(x)
    backbone.trainable = False
    x = backbone.output
    x = Dropout(0.25)(x)
    x = BatchNormalization()(x)
    x = Dropout(0.25)(x)
    x = GlobalAveragePooling2D()(x)
    x = Dropout(0.25)(x)
    x = Dense(1024, activity_regularizer=regularizers.l2())(x)
    x = Dropout(0.25)(x)
    x = Dense(num_classes, activation="softmax", activity_regularizer=regularizers.l2())(x)
    

    model = tf.keras.Model(inputs, x)
    return model

##Reads image, resizes images to size passed in, then converts to a numpy array
def read_image(path, size):
    import cv2
    import numpy as np
    image = cv2.imread(path, cv2.IMREAD_COLOR)
    image = cv2.resize(image, (size, size))
    image = image.astype(np.float32)
    return image

##decodes x data from utf-8, num_class = number of breeds classified, size = the dimensions of the image
##creates a label array with num_class indices, then converts labels to a numpy array as int32
def parse_data(x, y):
    import numpy as np
    x = x.decode()

    num_class = 120
    size = 224
    
    image = read_image(x, size)
    label = [0] * num_class
    label[y] = 1
    label = np.array(label)
    label = label.astype(np.int32)

    return image, label

##calls parse_data on x and y passed into the function, then sets the shape of the numpy array
##to 224x224x3 for x and 120 for y
def tf_parse(x, y):
    import numpy as np
    import tensorflow as tf
    x, y = tf.numpy_function(parse_data, [x, y], [tf.float32, tf.int32])
    x.set_shape((224, 224, 3))
    y.set_shape((120))
    return x, y

##retreives the data from x and y and slices it into a image and label tuple,
##the dataset is then mapped with the converted information and broken up into *batch* number of batches
def tf_dataset(x, y, batch):
    import tensorflow as tf
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.repeat()
    return dataset


#Build & Train Model

In [None]:
##Paths are established to be used via glob later on
path = userDirectory
train_path = os.path.join(path, "datasets/Images/**/*")
labels_path = os.path.join(path, "datasets/labels_full.csv")

##using pandas to read in the csv of labels as a dataframe
labels_df = pd.read_csv(labels_path)
##breed is established as a dataframe of all 120 unique dog breeds we are going to classify
breed = labels_df["breed"].unique()
##print number of breeds to verify
print("Number of Breed: ", len(breed))
##convert breed names to integers
breed2id = {name: i for i, name in enumerate(breed)}
##ids is a list of all image paths in the training set
ids = glob(train_path)
##labels array is created
labels = []

##each element in ids is parsed so that the file extensions and preceding pathnames are removed
##then each image is paired with its respective breed as an integer
for image_id in ids:
    image_id = image_id.replace("\\","/")
    image_id = image_id.split("/")[-1].split(".")[0].split("-")[0]
    breed_name = list(labels_df[labels_df.id == image_id]["breed"])[0]
    breed_idx = breed2id[breed_name]
    labels.append(breed_idx)

## Spliting the dataset
train_x, valid_x = train_test_split(ids, test_size=0.1, random_state=0)
train_y, valid_y = train_test_split(labels, test_size=0.1, random_state=0)


## Parameters
size = 224
num_classes = 120
lr = 1e-2#1e-1
batch = 64
epochs = 20

### Model
##specify "densenet", "xception", "vgg19", or "resnet50"
##specify whether to use image aug or not
model = build_model(size, num_classes, model_type="densenet", aug_input=False)
model.compile(loss="categorical_crossentropy", optimizer=SGD(lr), metrics=["acc"])
# for layer in model:
#    layer.trainable = True
##uncomment next line to see the structure of the model
# model.summary()

## convert lists to tf Datasets
train_dataset = tf_dataset(train_x, train_y, batch=batch)
valid_dataset = tf_dataset(valid_x, valid_y, batch=batch)

##sets the path in which to save the model (we mostly want this to use the weights for further training)
callbackFileName = os.path.join(path, "datasets/DenseNet201.h5")
checkpoint_path = callbackFileName
##uncomment next line and run again to load previous weights
model.load_weights(checkpoint_path)

## Training
callbacks = [
      ModelCheckpoint(checkpoint_path, verbose=1, save_best_only=True),
      #this will reduce the learning rate in case of a dropoff in validation loss
      ReduceLROnPlateau(monitor="val_loss", factor=0.1, patience=3, min_lr=1e-6,verbose=1),
      #this will stop the model from training if the validation lass does not improve after 5 epochs
      EarlyStopping(monitor="val_loss",patience=5, verbose=1)
]
train_steps = (len(train_x)//batch) + 1
valid_steps = (len(valid_x)//batch) + 1
#saves the training metrics to a variable to be plotted later
history = model.fit(train_dataset,
    steps_per_epoch=train_steps,
    validation_steps=valid_steps,
    validation_data=valid_dataset,
    epochs=epochs,
    callbacks=callbacks)



#Plot Data

In [None]:
import matplotlib.pyplot as plt
# summarize history for accuracy
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validate'], loc='upper left')
plt.minorticks_on()
plt.ylim((0,1.1))
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validate'], loc='upper left')
plt.minorticks_on()
plt.ylim(0,10)
plt.show()