# Dataset
Source: https://www.kaggle.com/c/plant-pathology-2020-fgvc7

In [None]:
import os

# Dataset path
mainDir = "plant-pathology-2020-fgvc7"
imagesDir = os.path.join(mainDir, "images")
trainFile = os.path.join(mainDir, "train.csv")

# Get list of all images
imagesFileList = [os.path.join(imagesDir, f) for f in os.listdir(imagesDir) 
                  if os.path.isfile(os.path.join(imagesDir, f))]

In [None]:
import pandas as pd
from PIL import Image

# Basic information of dataset
imageSizeStat = {}
for imageFile in imagesFileList:
    with Image.open(imageFile) as im:
        imageSize = im.size
        if imageSize not in imageSizeStat:
            imageSizeStat[imageSize] = 1
        else:
            imageSizeStat[imageSize] += 1
print("Size:")
print("\t{}".format(imageSizeStat))

trainDF = pd.read_csv(trainFile)
print("Label:")
print("\thealthy: {}".format(trainDF["healthy"].sum()))
print("\tmultiple_diseases: {}".format(trainDF["multiple_diseases"].sum()))
print("\trust: {}".format(trainDF["rust"].sum()))
print("\tscab: {}".format(trainDF["scab"].sum()))
print("\t==> Total: {}".format(trainDF.shape[0]))

##### Size:
(2048, 1365): 3620\
(1365, 2048): 22\
==> Can keep size but need to rotate to same direction

We have:
\begin{equation}
\frac{2048}{1365} \approx \frac{3}{2}
\end{equation}
==> Size ratio: 3:2

To avoid running out of memory, crop images with size 1920:1280, then resize images to 384:256.
##### Label:
healthy: 516\
multiple_diseases: 91\
rust: 622\
scab: 592\
==> Not balanced\
==> Need to rebalance

**Note: healthy means no rust and no scab, while multiple_diseases means both rust and scab.**\
==> Build 1 model for both rust and scab\
==> healthy(0,0); multiple_diseases(1,1); rust(1,0); scab(0,1)

Using data augmentation to create training and validation set as follows:\
&emsp;Training set: 300,300,300,300\
&emsp;Validation set: 100,100,100,100

# Baseline

In [None]:
import random
import numpy as np
from sklearn.model_selection import train_test_split

def split_dataset(image_id, total):
    train, test = train_test_split(image_id, test_size=0.25)
    train = train.sample(n=total*3//4, replace=True)
    test = test.sample(n=total//4, replace=True)
    return train, test

def data_augmentation(im):
    # Rotate to same direction
    if im.size[0] < im.size[1]:
        im = im.transpose(Image.ROTATE_90)
    # Random flip
    if random.getrandbits(1):
        im = im.transpose(Image.FLIP_LEFT_RIGHT)
    if random.getrandbits(1):
        im = im.transpose(Image.FLIP_TOP_BOTTOM)
    # Random crop with 1920:1280 size
    left = random.randint(0,2048-1920)
    top = random.randint(0,1365-1280)
    right = left + 1920
    bottom = top + 1280
    im = im.crop((left, top, right, bottom))
    # Resize to 384:256 size
    im = im.resize((384,256))
    return im

def create_dataset(FF, TT, TF, FT):
    train_images = []
    train_labels = []
    test_images = []
    test_labels = []

    FF_train, FF_test = split_dataset(FF, 400)
    TT_train, TT_test = split_dataset(TT, 400)
    TF_train, TF_test = split_dataset(TF, 400)
    FT_train, FT_test = split_dataset(FT, 400)

    for image_id in FF_train:
        path = os.path.join(imagesDir, image_id + ".jpg")
        im = Image.open(path)
        im = data_augmentation(im)
        im = np.asarray(im)
        train_images.append(im)
        train_labels.append([np.uint8(0)])
    for image_id in FF_test:
        path = os.path.join(imagesDir, image_id + ".jpg")
        im = Image.open(path)
        im = data_augmentation(im)
        im = np.asarray(im)
        test_images.append(im)
        test_labels.append([np.uint8(0)])
    for image_id in TT_train:
        path = os.path.join(imagesDir, image_id + ".jpg")
        im = Image.open(path)
        im = data_augmentation(im)
        im = np.asarray(im)
        train_images.append(im)
        train_labels.append([np.uint8(1)])
    for image_id in TT_test:
        path = os.path.join(imagesDir, image_id + ".jpg")
        im = Image.open(path)
        im = data_augmentation(im)
        im = np.asarray(im)
        test_images.append(im)
        test_labels.append([np.uint8(1)])
    for image_id in TF_train:
        path = os.path.join(imagesDir, image_id + ".jpg")
        im = Image.open(path)
        im = data_augmentation(im)
        im = np.asarray(im)
        train_images.append(im)
        train_labels.append([np.uint8(2)])
    for image_id in TF_test:
        path = os.path.join(imagesDir, image_id + ".jpg")
        im = Image.open(path)
        im = data_augmentation(im)
        im = np.asarray(im)
        test_images.append(im)
        test_labels.append([np.uint8(2)])
    for image_id in FT_train:
        path = os.path.join(imagesDir, image_id + ".jpg")
        im = Image.open(path)
        im = data_augmentation(im)
        im = np.asarray(im)
        train_images.append(im)
        train_labels.append([np.uint8(3)])
    for image_id in FT_test:
        path = os.path.join(imagesDir, image_id + ".jpg")
        im = Image.open(path)
        im = data_augmentation(im)
        im = np.asarray(im)
        test_images.append(im)
        test_labels.append([np.uint8(3)])

    train_images = np.array(train_images)
    train_labels = np.array(train_labels)
    test_images = np.array(test_images)
    test_labels = np.array(test_labels)
    return (train_images, train_labels), (test_images, test_labels)

# Split dataset
trainDF = pd.read_csv(trainFile)
healthy = trainDF[trainDF["healthy"] == 1]["image_id"]
multiple_diseases = trainDF[trainDF["multiple_diseases"] == 1]["image_id"]
rust = trainDF[trainDF["rust"] == 1]["image_id"]
scab = trainDF[trainDF["scab"] == 1]["image_id"]

# Pre-process dataset
(train_images, train_labels), (test_images, test_labels) = create_dataset(
    healthy, multiple_diseases, rust, scab
)
np.save(os.path.join(mainDir, "model_train_images.npy"), train_images)
np.save(os.path.join(mainDir, "model_train_labels.npy"), train_labels)
np.save(os.path.join(mainDir, "model_test_images.npy"), test_images)
np.save(os.path.join(mainDir, "model_test_labels.npy"), test_labels)

In [None]:
import numpy as np

def get_information(images, labels):
    print(images.shape, labels.shape)

# Load pre-processed dataset
train_images = np.load(os.path.join(mainDir, "model_train_images.npy"))
train_labels = np.load(os.path.join(mainDir, "model_train_labels.npy"))
test_images = np.load(os.path.join(mainDir, "model_test_images.npy"))
test_labels = np.load(os.path.join(mainDir, "model_test_labels.npy"))
# Get information
get_information(train_images, train_labels)
get_information(test_images, test_labels)

# Normalize pixel values to be between 0 and 1
train_images = train_images / 255.0
test_images = test_images / 255.0

In [None]:
from tensorflow.keras import layers, models

def conv_block(x, growth_rate, drop_rate):
    x1 = layers.BatchNormalization()(x)
    x1 = layers.Activation('relu')(x1)
    x1 = layers.Conv2D(4 * growth_rate, 1, strides=1, padding='same')(x1)
    x1 = layers.BatchNormalization()(x1)
    x1 = layers.Activation('relu')(x1)
    x1 = layers.Conv2D(growth_rate, 3, strides=1, padding='same')(x1)
    x = layers.Concatenate()([x, x1])
    if drop_rate > 0:
        x = layers.Dropout(drop_rate)(x)
    return x

def dense_block(x, channels, blocks, growth_rate, drop_rate):
    channels += blocks * growth_rate
    for i in range(blocks):
        x = conv_block(x, growth_rate, drop_rate)
    return x, channels

def transition(x, channels):
    channels = int(channels * 0.5)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(channels, 1, strides=1, padding='same')(x)
    x = layers.AveragePooling2D(2, strides=2, padding='same')(x)
    return x, channels

def create_model(channels, growth_rate, drop_rate):
    inputs = layers.Input(shape=(256, 384, 3))

    x = layers.Conv2D(channels, 3, strides=2, padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D(3, strides=2, padding='same')(x)

    x, channels = dense_block(x, channels, 3, growth_rate, drop_rate)
    x, channels = transition(x, channels)
    x, channels = dense_block(x, channels, 6, growth_rate, drop_rate)
    x, channels = transition(x, channels)
    x, channels = dense_block(x, channels, 12, growth_rate, drop_rate)
    x, channels = transition(x, channels)
    x, channels = dense_block(x, channels, 8, growth_rate, drop_rate)

    x = layers.GlobalAveragePooling2D()(x)
    outputs = layers.Dense(4, activation='softmax')(x)

    return models.Model(inputs=inputs, outputs=outputs)

In [None]:
model = create_model(64, 8, 0.1)
model.summary()
model.compile(optimizer='Adam', loss="sparse_categorical_crossentropy", metrics="sparse_categorical_accuracy")

In [None]:
from tensorflow.keras import callbacks

history = model.fit(train_images, train_labels, epochs=100,
                   validation_data=(test_images, test_labels),
                   callbacks=[callbacks.ModelCheckpoint(os.path.join(mainDir, "model"),
                                                        monitor='val_sparse_categorical_accuracy',
                                                        save_best_only=True)])

In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['sparse_categorical_accuracy'], label='sparse_categorical_accuracy')
plt.plot(history.history['val_sparse_categorical_accuracy'], label = 'val_sparse_categorical_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0.5, 1])
plt.legend(loc='lower right')

In [None]:
import numpy as np
from tensorflow.keras import models
from sklearn.metrics import confusion_matrix

model = models.load_model(os.path.join(mainDir, "model"))
test_loss, test_acc = model.evaluate(test_images,  test_labels, verbose=2)
pred = np.argmax(model.predict(test_images), axis=1)
print(confusion_matrix(test_labels.reshape(-1), pred))

##### Predict Test Set

In [None]:
import os
import pandas as pd

# Test set path
mainDir = "plant-pathology-2020-fgvc7"
imagesDir = os.path.join(mainDir, "images")
testFile = os.path.join(mainDir, "sample_submission.csv")

# Test set DF
testDF = pd.read_csv(testFile)

In [None]:
from tensorflow.keras import models

model = models.load_model(os.path.join(mainDir, "model"))

In [None]:
import random
import numpy as np

def data_augmentation(im):
    # Rotate to same direction
    if im.size[0] < im.size[1]:
        im = im.transpose(Image.ROTATE_90)
    # Random flip
    if random.getrandbits(1):
        im = im.transpose(Image.FLIP_LEFT_RIGHT)
    if random.getrandbits(1):
        im = im.transpose(Image.FLIP_TOP_BOTTOM)
    # Random crop with 1920:1280 size
    left = random.randint(0,2048-1920)
    top = random.randint(0,1365-1280)
    right = left + 1920
    bottom = top + 1280
    im = im.crop((left, top, right, bottom))
    # Resize to 384:256 size
    im = im.resize((384,256))
    return im

test_images = []
for image_id in testDF["image_id"]:
    path = os.path.join(imagesDir, image_id + ".jpg")
    im = Image.open(path)
    im = data_augmentation(im)
    im = np.asarray(im)
    test_images.append(im)
test_images = np.array(test_images)
np.save(os.path.join(mainDir, "test_images.npy"), test_images)

In [None]:
import numpy as np

# Load pre-processed dataset
test_images = np.load(os.path.join(mainDir, "test_images.npy"))

# Get information
print(test_images.shape)

# Normalize pixel values to be between 0 and 1
test_images = test_images / 255.0

In [None]:
pred = model.predict(test_images)

In [None]:
testDF["healthy"] = pred[:, 0]
testDF["multiple_diseases"] = pred[:, 1]
testDF["rust"] = pred[:, 2]
testDF["scab"] = pred[:, 3]

testDF.to_csv(os.path.join(mainDir, "submission.csv"), float_format="%.4f", header=True, index=False)