In [None]:
#Here we import the libraries needed
import tensorflow as tf
import numpy as np
import os
import random
import pandas as pd
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import confusion_matrix

from PIL import Image

!pip install visualkeras
import visualkeras #to draw the CNNs

tfk = tf.keras
tfkl = tf.keras.layers
print(tf.__version__)

ModuleNotFoundError: No module named 'tensorflow'

## set seed for reproducibility


In [None]:
# setting the Random seed for reproducibility of the experiments
seed = 42

random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

## Get and split the dataset

In [None]:
!pip install split-folders 
import splitfolders

In [None]:
#splitfolder allows to split the content of a directory passed as argument, and creates another output folder which contains three directories
#called "train","val","test" containing each a specified fraction of the elements in the input folder (Splitfolders preserves the ratio for each class
#of element present in the input directory) 
training_ratio=0.8    
validation_ratio=0.15     
testing_ratio=0.05    
splitfolders.ratio('/kaggle/input/dataset/training', output='partitioned_dataset', seed = seed, ratio=(training_ratio,validation_ratio,testing_ratio))
#https://pypi.org/project/split-folders/

In [None]:
# define Dataset folders to be used later
dataset_dir = 'partitioned_dataset'
training_dir = os.path.join(dataset_dir, 'train')
validation_dir = os.path.join(dataset_dir, 'val')
test_dir = os.path.join(dataset_dir, 'test')

# model parameters

In [None]:
#List the labels with which our system will have to deal
labels = ['Apple',              # 0
          'Blueberry',          # 1
          "Cherry",             # 2
          "Corn",               # 3
          "Grape",              # 4
          "Orange",             # 5
          "Peach",              # 6
          "Pepper",             # 7
          "Potato",             # 8
          "Raspberry",          # 9
          "Soybean",            # 10
          "Squash",             # 11
          "Strawberry",         # 12
          "Tomato"]             # 13

#for a total of 14 classes
number_of_classes = 14

In [None]:
#dealing with 256x256 rgb (3 channels) images
input_shape = (256, 256, 3)
image_width = 256
image_height = 256


epochs = 100 #number of epochs
patience_epochs = 30 # number of epochs for early stopping
batch_size = 32

# data loader and augmentation

In [None]:
# Images are divided into folders, one for each class. 
# If the images are organized in such a way, we can exploit the ImageDataGenerator to read them from disk.
# Needed if the images are too many to be kept in memory
from tensorflow.keras.preprocessing.image import ImageDataGenerator

#import the preprocessing function needed for the transfer leaning network used below, must use the same preprocessing for the images 
from tensorflow.keras.applications.efficientnet import preprocess_input 

#Image data generators can be used for data augmentation
# Create an instance of ImageDataGenerator for training, validation, and test sets. Transformations are applied only on training images
image_gen_train = ImageDataGenerator(rotation_range=90,         #rotate +/- x degree
                                        height_shift_range=50,  # [low, high] pixel shift
                                        width_shift_range=50,   # [low, high] pixel shift
                                        zoom_range=0.4,         #zoom rate
                                        horizontal_flip=True,
                                        vertical_flip=True,
                                        brightness_range=[0.4,1.6], #ranges of brightness modification
                                        shear_range=0.2, 
                                        fill_mode='constant', cval=0.,  #fill pixel outised the image due to shifts and rotations with black pixels
                                        preprocessing_function=preprocess_input)    #apply the neural network's preprocessing function
                                        
                                        
#flow from directory takes images from the directory passed, divides them into batched of the specified size and applies the image generator to obtain a
#random configuration of the orginal image, the transformations applied depend on the parameter passed to the ImageDataGenerator
train_generator = image_gen_train.flow_from_directory(directory=training_dir,
                                               target_size=(256,256),#size of images
                                               color_mode='rgb',#channels
                                               classes=labels, # set to labels of classes, see above
                                               class_mode='categorical',
                                               batch_size=batch_size, 
                                               shuffle=True,#data gets shuffled when epoch ends and new one begins
                                               seed=seed)



# Obtain a data generator with the 'ImageDataGenerator.flow_from_directory' method
image_gen_val = ImageDataGenerator(preprocessing_function=preprocess_input)
                                    
valid_generator = image_gen_val.flow_from_directory(directory=validation_dir,
                                               target_size=(256,256),
                                               color_mode='rgb',
                                               classes=labels, 
                                               class_mode='categorical',
                                               batch_size=batch_size,
                                               shuffle=False,#shuffle irrelevant for validation (and testing)
                                               seed=seed)



image_gen_test = ImageDataGenerator(preprocessing_function=preprocess_input)
                                    
test_generator = image_gen_test.flow_from_directory(directory=test_dir,
                                              target_size=(256,256),
                                              color_mode='rgb',
                                              classes=labels,
                                              class_mode='categorical',
                                              batch_size=batch_size,
                                              shuffle=False,
                                              seed=seed)

# class weighting for generators

In [None]:
#used for re-weighting classes considering that the number of samples in the dataset for each class are unbalanced

from sklearn.utils import class_weight

class_weights = class_weight.compute_class_weight(
    class_weight = 'balanced',
    classes = np.unique(train_generator.classes),
    y = train_generator.classes
)
train_class_weights = dict(enumerate(class_weights))
print(train_class_weights)

# function to create folders and callbacks


In [None]:
# Utility function to create folders and callbacks for training

from datetime import datetime

def create_folders_and_callbacks(model_name):

  exps_dir = os.path.join('data_augmentation_experiments')#create a folder for experiences
  if not os.path.exists(exps_dir):
      os.makedirs(exps_dir)

  now = datetime.now().strftime('%b%d_%H-%M-%S')#assign different name every time experimenti is run

  exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
  if not os.path.exists(exp_dir):
      os.makedirs(exp_dir)
      
  callbacks = []

  # Model checkpoint
  # ----------------
  ckpt_dir = os.path.join(exp_dir, 'ckpts')
  if not os.path.exists(ckpt_dir):
      os.makedirs(ckpt_dir)

  #CHECKPOINT CALLBACK
  ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp.ckpt'), #where to save model/checkpoints
                                                     save_weights_only=False, # True to save only weights, false save all the model
                                                     save_best_only=True,# True to save only the best epoch, false save all epochs
                                                     monitor='val_accuracy',#monitor the accuracy metric
                                                     mode='max')
  callbacks.append(ckpt_callback)

  # Visualize Learning on Tensorboard
  # ---------------------------------
  tb_dir = os.path.join(exp_dir, 'tb_logs')
  if not os.path.exists(tb_dir):
      os.makedirs(tb_dir)
      
 # By default shows losses and metrics for both training and validation in tensorboard
  tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir, 
                                               profile_batch=0,
                                               histogram_freq=1)  #epoch frequency, if > 0 (epochs) shows weights histograms
  callbacks.append(tb_callback)#append to callback list

  # Early Stopping
  # --------------
  es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=patience_epochs, restore_best_weights=True)
  callbacks.append(es_callback)

  return callbacks

# transfer learning with EfficientNetB5

In [None]:
# Download and plot the EfficientNet model
pre_trained_model = tfk.applications.EfficientNetB5(
    include_top=False,
    weights="imagenet",
    input_shape=(256,256,3)
)

#set EfficientNet as non-trainable (transfer learning)
pre_trained_model.trainable = False

#show and plot the structure of the EfficientNet network    
pre_trained_model.summary()
visualkeras.layered_view(pre_trained_model, legend=True, spacing=15, scale_xy=1)

In [None]:
# Use the supernet as feature extractor
input_layer = tfk.Input(shape=input_shape)

supernet = pre_trained_model(input_layer) #2048 output

#concatenate a fully connected NN to the exit of the pre-trained model
glob_pooling = tfkl.GlobalAveragePooling2D()(supernet)
batch_norm = tfkl.BatchNormalization()(glob_pooling)

classifier1 = tfkl.Dense(512,kernel_initializer = tfk.initializers.GlorotUniform(seed))(batch_norm)
classifier1 = tfkl.ELU()(classifier1)#use ELU activation function
classifier1 = tfkl.Dropout(0.3, seed=seed)(classifier1)#close to the input layer, use lower dropout rate

classifier2 = tfkl.Dense(256,kernel_initializer = tfk.initializers.GlorotUniform(seed))(classifier1)
classifier2 = tfkl.ELU()(classifier2)
classifier2 = tfkl.Dropout(0.6, seed=seed)(classifier2)#in the middle of the hidden layers, increase droput rate

classifier3 = tfkl.Dense(64,kernel_initializer = tfk.initializers.GlorotUniform(seed))(classifier2)
classifier3 = tfkl.ELU()(classifier3)

output_layer = tfkl.Dense(number_of_classes, activation='softmax',kernel_initializer = tfk.initializers.GlorotUniform(seed))(classifier3)

# Connect input and output through the Model class
transfer_model = tfk.Model(inputs=input_layer, outputs=output_layer, name='model')

# Compile the model
transfer_model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(), metrics='accuracy')
transfer_model.summary()
visualkeras.layered_view(transfer_model, legend=True, spacing=15, scale_xy=1)

# fine tuning

In [None]:
#set pre-trained model as trainable
transfer_model.get_layer('efficientnetb5').trainable=True

#freeze up to 292nd level
to_freeze=293
for layer in transfer_model.get_layer('efficientnetb5').layers[:to_freeze]:
      layer.trainable = False

#freeze batch_normalization layers
for layer in transfer_model.get_layer('efficientnetb5').layers:
   if isinstance(layer, tfk.layers.BatchNormalization):
      layer.trainable = False

#check1
print("check trainability whole model")
for i, layer in enumerate(transfer_model.layers):
    print(i, layer.name, layer.trainable)

#check2
print("\ncheck trainability of efficientnet model")
for i, layer in enumerate(transfer_model.get_layer('efficientnetb5').layers):
    print(i, layer.name, layer.trainable)


#compile the model
learning_rate=1e-5
transfer_model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(learning_rate), metrics='accuracy')

In [None]:
#https://keras.io/examples/vision/image_classification_efficientnet_fine_tuning/

# fitting and testing

In [None]:
#transfer model training
# tf.get_logger().setLevel('WARNING') #  if you want to suppress only INFOs
# tf.get_logger().setLevel('ERROR') #  if you want to suppress both WARNINGs and INFOs

# Create folders and callbacks and fit
aug_callbacks = create_folders_and_callbacks(model_name='CNN')

# Train the model
transfer_history = transfer_model.fit(
    #pass the training augmeneted set
    x = train_generator,
    epochs = epochs,
    #pass the validation augmeneted set
    validation_data = valid_generator,
    callbacks = aug_callbacks,
    class_weight = train_class_weights
).history

#save the model
transfer_model.save("TransferLearningModel")

In [None]:
# Plot the training
plt.figure(figsize=(15,5))
plt.plot(transfer_history['loss'], label='training', alpha=.8, color='#ff7f0e')
plt.plot(transfer_history['val_loss'], label='validation', alpha=.8, color='#4D61E2')
plt.legend(loc='upper left')
plt.title('Categorical Crossentropy')
plt.grid(alpha=.3)

plt.figure(figsize=(15,5))
plt.plot(transfer_history['accuracy'], label='training', alpha=.8, color='#ff7f0e')
plt.plot(transfer_history['val_accuracy'], label='transfer', alpha=.8, color='#4D61E2')
plt.legend(loc='upper left')
plt.title('Accuracy')
plt.grid(alpha=.3)

plt.show()

In [None]:
#load the model and
#evaluate on augmented test set
model_transf = tfk.models.load_model("TransferLearningModel")
model_transf_test_metrics = model_transf.evaluate(test_generator, return_dict=True)