In [None]:
import numpy as np
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow import keras
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import pandas as pd
import cv2
import os
import glob
from tensorflow.keras import regularizers
from tensorflow.keras.models import Sequential
from tensorflow.keras import initializers
from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D, BatchNormalization

In [None]:
def load_data(x): # flag 1 for training data, 0 for testing data
    drct = r"../data/TrainDataC2/" 
    
    imgL = []
    if x:
        df = pd.read_csv('../data/TrainAnnotations.csv')  
        for i in range(len(df.annotation)):

            name = str(df.file_name[i])      # read the file name from the annotation csv
            path = drct+name
            img = cv2.imread(path)           # Read the corresponding image
            img = cv2.resize(img,(224,224))  # Resize all images to (224, 224) 
            imgL.append(img) 

        data = np.array(imgL)                # Convert list of images to numpy array
        clas = df.annotation.values  
        return data, clas                    # return images and class labels

    else: 
        img_dir = "../data/TestData" # Enter Directory of test images 
        data_path = os.path.join(img_dir,'*g')
        files = glob.glob(data_path)
        data=[]
        for f1 in files: 
          img = cv2.imread(f1)               # Read the image in the test directory
          img = cv2.resize(img,(224,224))    # Resize the image 
          data.append(img)   
        data = np.array(data)
        return data                          # Return the array of test images

In [None]:
x_train, y_train =  load_data(1)
x_train.shape, y_train.shape

In [None]:
x_test = load_data(0)
x_test.shape

In [None]:
def get_dir(x,y):
    d_dir = {0:[],1:[],2:[],3:[],4:[]}

    for i in range(len(y)):
        for c in range(5):
            if y[i] == c: 
                d_dir[c].append(x[i])
                break
    return d_dir

In [None]:
def augment(image_arr, d):
    aug_list = np.ndarray(image_arr.shape[1:])
    aug_list = np.expand_dims(aug_list, axis=0)

    for i in range(d//4):

      image = image_arr[i]                              # Read the image 
      flipped = tf.image.flip_left_right(image)         # Flip the image across verticle axis
      saturated = tf.image.adjust_saturation(image, 2)  # Add color saturation to the image
      bright = tf.image.adjust_brightness(image, 0.1)   # Add brightness to the image
      rotated = tf.image.rot90(image)                   # Rotate the image 90 dec right
      
      aug_list = np.vstack((aug_list, np.expand_dims(flipped, axis=0)))
      aug_list = np.vstack((aug_list, np.expand_dims(saturated, axis=0)))
      aug_list = np.vstack((aug_list, np.expand_dims(bright, axis=0)))
      aug_list = np.vstack((aug_list, np.expand_dims(rotated, axis=0)))

    return aug_list

In [None]:
# creating balanced validation dataset
splitList = [0.08, 0.12, 0.31, 0.30, 0.20]   # Fraction of each class to get balanced validation set
data_dir = get_dir(x_train, y_train)

for i in range(len(splitList)):
  data = np.array(data_dir[i])
  label = i*np.ones((len(data)))
  train_X, test_X, train_y, test_y = train_test_split(data, label, test_size=splitList[i])

  # No augmentation for class zero
  if i == 0:
    train_data = train_X
    train_label = train_y
    val_data = test_X
    val_label = test_y

  # Augmentation for class 1, 2, 3 and 4
  else:
    d = len(data_dir[0]) - len(data_dir[i])
    aug = augment(train_X, d)
    print("Difference: {} | Images augmented: {}".format(d, aug.shape[0]))

    train_data = np.vstack((train_data, train_X))
    train_data = np.vstack((train_data, aug))
    train_label = np.hstack((train_label, train_y))
    train_label = np.hstack((train_label, i*np.ones((aug.shape[0],))))

    val_data = np.vstack((val_data, test_X))
    val_label = np.hstack((val_label, test_y))
  print("class {} added | train_data: {} | validation_data: {}".format(i, train_label.shape, val_label.shape))

In [None]:
# Read the Original and augmented train label dataframe
df = pd.read_csv('/content/drive/MyDrive/SoyabeanWilting/data/TrainAnnotations.csv')
train_df = pd.DataFrame(train_label, columns=["annotation"])

fig, axes = plt.subplots(1, 2, sharex=True, figsize=(10,5))
fig.suptitle('Class distribution')

# Plot class distribution before and after augmentation
sns.countplot(x="annotation", data=df, palette="Spectral", ax=axes[0])
axes[0].set_title('Before Augmentation')
sns.countplot(x="annotation", data=train_df, palette="Spectral", ax=axes[1])
axes[1].set_title('After Augmentation')
plt.show()

In [None]:
# Obtain Train and Validation dataset
train_dataset = tf.data.Dataset.from_tensor_slices((train_data, train_label))
val_dataset = tf.data.Dataset.from_tensor_slices((val_data, val_label))
print(train_dataset)

In [None]:
# All images will be resized to 224 X 224 and cast to 0-1 float format
IMG_SIZE = 224 
def format_example(image, label):
    image = tf.cast(image, tf.float32)
    image = (image/255) 
    image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
    return image, label

In [None]:
train = train_dataset.map(format_example)
validation = val_dataset.map(format_example)
print(type(train))

In [None]:
# Define batch size and input image size
BATCH_SIZE = 64
IMG_SHAPE = (IMG_SIZE,IMG_SIZE,3)

train_batches = train.batch(BATCH_SIZE)
validation_batches = validation.batch(BATCH_SIZE)

In [None]:
# Check batch shape
for image_batch, label_batch in train_batches.take(1):
    pass
image_batch.shape

In [None]:
# Define Base model as VGG16 initialized with Imagenet weights
base_model = tf.keras.applications.VGG16(input_shape=IMG_SHAPE,
                                               include_top=False,
                                               weights='imagenet')

In [None]:
# Check output feature shape of the base model
feature_batch = base_model(image_batch)
print(feature_batch.shape)

In [None]:
# Make the base model trainable
base_model.trainable = True

In [None]:
# Fine-tune from this layer onwards
fine_tune_at = 15

# Freeze all the layers before the `fine_tune_at` layer
for layer in base_model.layers[:fine_tune_at]:
    layer.trainable =  False

In [None]:
# Add layers after the base model
global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
feature_batch_average = global_average_layer(feature_batch)

In [None]:
prediction_layer = tf.keras.layers.Dense(5,kernel_regularizer=regularizers.l1_l2(l1=0.01,l2=0.01))
prediction_batch = prediction_layer(feature_batch_average)
print(prediction_batch.shape) 

In [None]:
l1 = 0.0013
l2 = 0.0005
mom = 0.9
lr = 0.0001
drop = 0.6

In [None]:
model = tf.keras.Sequential([    #,kernel_initializer=initializers.LecunNormal()
  base_model,
  tf.keras.layers.Conv2D(512, 5, activation="elu"),
  tf.keras.layers.MaxPool2D(pool_size=(3,3)),
  tf.keras.layers.GlobalAveragePooling2D(),

  tf.keras.layers.Dense(512,
                        activation='elu',
                        kernel_regularizer=regularizers.l1(l1=l1),
                        kernel_initializer=initializers.he_normal()),
  Dropout(drop), 
  tf.keras.layers.Dense(256,
                        activation='elu',
                        kernel_regularizer=regularizers.l1(l1=l1),
                        kernel_initializer=initializers.he_normal()),
  BatchNormalization(momentum=0.9,epsilon=0.01),
  #Dropout(drop), 
  tf.keras.layers.Dense(128,
                        activation='elu',
                        kernel_regularizer=regularizers.l1(l1=l1),
                        kernel_initializer=initializers.he_normal()),
  Dropout(drop), 
  tf.keras.layers.Dense(64,
                        activation='elu',
                        kernel_regularizer=regularizers.l1(l1=l1),
                        kernel_initializer=initializers.he_normal()),
  BatchNormalization(momentum=0.9,epsilon=0.01),
  #Dropout(drop), 
  tf.keras.layers.Dense(32,
                        activation='elu',
                        kernel_regularizer=regularizers.l1(l1=l1),
                        kernel_initializer=initializers.he_normal()),
  Dropout(drop), 
  tf.keras.layers.Dense(16,activation='relu',kernel_regularizer=regularizers.l1(l1=l1)),
  tf.keras.layers.Dense(5, activation="softmax"),
])

In [None]:
model.summary()

In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(lr=lr,),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=[tf.keras.metrics.SparseCategoricalAccuracy(name='accuracy')])

In [None]:
len(model.trainable_variables)

In [None]:
w_arr = compute_class_weight('balanced', np.unique(train_label), train_label)
weights = {i : w_arr[i] for i in range(5)}
weights

In [None]:
H = model.fit(train_data, train_label,
	validation_data=(val_data, val_label),
  batch_size= BATCH_SIZE,
  epochs=80, 
  class_weight=weights)

In [None]:
acc = H.history['accuracy']
val_acc = H.history['val_accuracy']

loss = H.history['loss']
val_loss = H.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
#plt.ylim([min(plt.ylim()),1])
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
#plt.ylim([0,1.0])
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()

In [None]:
import datetime

now = datetime.datetime.now()
date_time = now.strftime("%d.%m.%H:%M:%S")
print("date and time:",date_time)
acc = val_acc[-1]
acc = round(acc, 4)
print(acc)

In [None]:
# serialize model to JSON
model_json = model.to_json()
with open("../model.json", "w") as json_file:
    json_file.write(model_json)
# serialize weights to HDF5
#model.save_weights("model.h5")
model.save('../'+str(acc)+"."+date_time+'.h5')
print("Saved model to disk")

In [None]:
test_data = x_test

In [None]:
probability_model = tf.keras.Sequential([model, tf.keras.layers.Softmax()])
predictions_test = probability_model.predict(test_data)

In [None]:
import csv
p0 = 0
p1 = 0
p2 = 0
p3 = 0
p4 = 0
for i in range(len(test_data)):
    if np.argmax(predictions_test[i]) == 0: 
        p0 += 1
    elif np.argmax(predictions_test[i]) == 1: 
        p1 += 1
    elif np.argmax(predictions_test[i]) == 2: 
        p2 += 1
    elif np.argmax(predictions_test[i]) == 3: 
        p3 += 1
    elif np.argmax(predictions_test[i]) == 4: 
        p4 += 1
print(p0,p1,p2,p3,p4)