In [1]:
import warnings
from sklearn.exceptions import ConvergenceWarning
warnings.filterwarnings('ignore', category = ConvergenceWarning)

import itertools
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from PIL import Image
from sklearn.metrics import classification_report, f1_score, confusion_matrix

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, Model
from keras.layers import Input, Dense, Dropout, BatchNormalization, Activation
from keras.optimizers import Adam
from keras.preprocessing.image import ImageDataGenerator
from keras.layers.experimental import preprocessing
from keras.callbacks import Callback, EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras import mixed_precision
mixed_precision.set_global_policy('mixed_float16')

print(tf.__version__)

INFO:tensorflow:Mixed precision compatibility check (mixed_float16): OK
Your GPUs will likely run quickly with dtype policy mixed_float16 as they all have compute capability of at least 7.0
2.13.0


# Loading Dataset

In [None]:
path = './Rice_Image_Dataset'
data = {"imgpath": [], 'label':[]}
category = os.listdir(path)
category.pop(3)
for folder in category:
    folderpath = os.path.join(path, folder)
    filelist = os.listdir(folderpath)
    for file in filelist:
        fpath = os.path.join(folderpath, file)
        data['imgpath'].append(fpath)
        data['labels'].append(folder)
        
df = pd.DataFrame(data)

# Convert labels to numbers
lb = LabelEncoder()
df['encoded_labels'] = lb.fit_transform(df['labels'])

# Split the Dataset into Train & Test

In [None]:
train_df, Temp_df = train_test_split(df, train_size = 0.9, shuffle = True, random_state = 124)
valid_df, test_df = train_test_split(Temp_df, train_size = 0.85, shuffle = True, random_state = 124)
train_df = train_df.reset_index(drop = True)
valid_df = valid_df.reset_index(drop = True)
test_df = test_df.reset_index(drop = True)

# Training Image Count Per Class

In [None]:
train  = train_df["labels"].value_counts()
label = train.tolist()
index = train.index.tolist()

colors = [
    "#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd",
   ]

plt.figure(figsize=(17,17))
plt.title("Training data images count per class",fontsize=38)
plt.xlabel('Number of images', fontsize=35)
plt.ylabel('Classes', fontsize=35)
plt.barh(index,label, color=colors)
plt.grid(True)
plt.show()

In [None]:
train_df.sample(n=15, random_state=1)

In [None]:
print(train_df[["imgpath", "labels"]].head(5))
print(train_df.shape)
print(valid_df[["imgpath", "labels"]].head(5))
print(valid_df.shape)
print(test_df[["imgpath", "labels"]].head(5))
print(test_df.shape)

# Show sample from data

In [None]:
plt.figure(figsize=(15,12))
for i, row in valid_df.sample(n=16).reset_index().iterrows():
    plt.subplot(4,4,i+1)
    image_path = row['imgpath']
    image = Image.open(image_path)
    plt.imshow(image)
    plt.title(row["labels"])
    plt.axis('off')
plt.show()

# Creating DataLoaders

In [None]:
%% time
BATCH_SIZE = 25
IMAGE_SIZE = (224,224)

generator = ImageDataGenerator(preprocessing_function = tf.keras.applications.efficientnet.preprocess_input)

#Split the data inot three categories
train_images = generator.flow_from_dataframe(
    dataframe = train_df,
    x_col = 'imgpath'
    y_col = 'labels',
    target_size = IMAGE_SIZE,
    color_mode = 'rgb',
    class_mode = 'categorical',
    bathc_size = BATCH_SIZE,
    shuffle = True,
    seed = 42
)

val_images = generator.flow_from_dataframe(
    dataframe = valid_df,
    x_col = 'imgpath'
    y_col = 'labels',
    target_size = IMAGE_SIZE,
    color_mode = 'rgb',
    class_mode = 'categorical',
    bathc_size = BATCH_SIZE,
    shuffle = True,
    seed = 42
)

test_images = generator.flow_from_dataframe(
    dataframe = test_df,
    x_col = 'imgpath'
    y_col = 'labels',
    target_size = IMAGE_SIZE,
    color_mode = 'rgb',
    class_mode = 'categorical',
    bathc_size = BATCH_SIZE,
    shuffle = True,
    seed = 42
)

# Model Structure

In [None]:
# 미리 학습된 efficientnet 모델을 tensorflow에서 불러옵니다.
pretrained_model = tf.keras.applications.EfficientNetB0(
    input_shape = (224,224,3),
    includde_top = False,   # 맨 마지막 층은 우리가 학습시킬 모델이기 때문에 따로 불러오지 않습니다.
    weights = 'imagenet',
    pooling = 'max'
)

# 각 pretrained된 인공신경망의 각 층이 학습되지 않도록 freezing합니다.
for i, layer in enumerate(pretrained_model.layers):
    pretrained_model.layers[i].trainable = False

In [7]:
num_classes = len(set(train_images.classes))

# Data Augmentation Step
augment = keras.Sequential([
    keras.layers.RandomFlip("horizontal"),
    keras.layers.RandomRotation(0.1),
    keras.layers.RandomZoom(0.1),
    keras.layers.RandomContrast(0.1)
], name = 'AugmentationLayer')

inputs = Input(shape = (224,224,3), name = 'inputLayer')
x = augment(inputs)
pretrain_out = pretrained_model(x, training = False)
x = Dense(256)(pretrained_out)
x = Activation(activation = 'relu')(x)
x = BatchNormlization()(x)
x = Dropout(0.3)(x)
x = Dense(num_classes)(x)
outputs = Activation(activation = 'softmax', dtype = tf.float32, name = 'activationLayer')(x)

model = Model(inputs = inputs, outputs = outputs)

model.compile(optimizer = Adam(5e-5),
              loss = 'categorical_crossentropy',
              metrics = ['accuracy'])

print(model.summary())

# Training : Transfer Learning

In [None]:
history = model.fit(train_images, 
                    step_per_epoch = len(train_images),
                    validation_data = val_images,
                    epochs = 50,
                    callbacks = [EarlyStopping(monitor = 'val_loss',
                                               patience = 10,
                                               restore_best_weights = True),
                                 ReduceLROnPlateau(monitor = 'val_loss',
                                                   factor = 0.2,
                                                   patience = 5,
                                                   mode = 'min')
                                 ]
                    )
model.save('./my_checkpoints.h5')

# Display model performance

In [None]:
def makeGraph(hist):
    train_acc = hist.history['accuracy']
    train_loss = hist.history['loss']
    val_acc = hist.history['val_accuracy']
    val_loss = hist.history['val_loss']
    index_loss = np.argmin(val_loss)
    val_lowest = val_loss[index_loss]
    index_acc = np.argmax(val_acc)
    acc_highest = val_acc[index_acc]
    Epochs = [i + 1 for i in range(len(train_acc))]
    loss_label = f'best epoch : {str(index_loss + 1)}'
    acc_label = f'best epoch : {str(index_acc + 1)}'
    
    # Plot training history
    plt.figure(figsize = (20,8))
    plt.style.use('fivethirtyeight')
    
    plt.subplot(1,2,1)
    plt.plot(Epochs, tr_loss, 'r', label= 'Training loss')
    plt.plot(Epochs, val_loss, 'g', label= 'Validation loss')
    plt.scatter(index_loss + 1, val_lowest, s= 150, c= 'blue', label= loss_label)
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(Epochs, tr_acc, 'r', label= 'Training Accuracy')
    plt.plot(Epochs, val_acc, 'g', label= 'Validation Accuracy')
    plt.scatter(index_acc + 1 , acc_highest, s= 150, c= 'blue', label= acc_label)
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.tight_layout
    plt.show()
    
makeGraph(history)

# Training : Fine Tuning

In [None]:
pretrained_model.trainable = True
for layer in pretrained_model.layers:
    if isinstance(layer, layers.BatchNormalization): # 만약 layer == BatchNormalization layer이면, 학습층을 freezing합니다.
        layer.trainable = False
        
# let`s see first 10 layers
for l in pretrained_model.layers[:10]:
    print(l.name, l.trainable)

model.compile(
    optimizer=Adam(0.00001), # fine tuning을 할 때에는 learning_rate을 매우 작게 설정합니다.
    loss='categorical_crossentropy',
    metrics=['accuracy']
)
# model.load_weights('./checkpoints/my_checkpoint')
print(model.summary())
history = model.fit(
    train_images,
    steps_per_epoch=len(train_images),
    validation_data=val_images,
    validation_steps=len(val_images),
    epochs=3,
    callbacks=[
        EarlyStopping(monitor = "val_loss", # watch the val loss metric
                               patience = 5,
                               restore_best_weights = True), # if val loss decreases for 5 epochs in a row, stop training,
        ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=2, mode='min') 
    ]
)
model.save('./my_model.h5')

# Display model performance

In [None]:
makeGraph(history)

# Model.evaluate

In [None]:
results = model.evaluate(test_images, verbose=0)

print("    Test Loss: {:.5f}".format(results[0]))
print("Test Accuracy: {:.2f}%".format(results[1] * 100))

# F1 score / Recall / Precision

In [None]:
y_true = test_images.classes
y_pred = np.argmax(model.predict(test_images), axis = 1)
f1 = f1_score(y_true, y_pred, average='macro')
print("F1 Score:", f1)
print(classification_report(y_true, y_pred, target_names=test_images.class_indices.keys()))

# Get Predictions

In [None]:
classes = dict(zip(test_images.class_indices.values(), test_images.class_indices.keys()))
Predictions = pd.DataFrame({"Image Index" : list(range(len(test_images.labels))), 
                            "Test Labels" : test_images.labels, 
                            "Test Classes" : [classes[i] for i in test_images.labels],
                            "Prediction Labels" : y_pred,
                            "Prediction Classes" : [classes[i] for i in y_pred],
                            "Path": test_images.filenames,
                            "Prediction Probability" : [x for x in np.asarray(tf.reduce_max(model.predict(test_images), axis = 1))]
                           })
Predictions.head(8)