In [None]:
import random
import shutil
import typing as T

import cv2
import PIL
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm

from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix, fbeta_score, accuracy_score

import tensorflow as tf
from tensorflow.keras.applications import ResNet50V2

# seed with some random number...
random.seed(414)

In [None]:
from pathlib import Path

DATA_PATH = Path("../data/eurosat/2750")
category_counts = {category.name: 0 for category in DATA_PATH.glob("*")}
for category in category_counts.keys():
    category_counts[category] = len(list((DATA_PATH / category).glob("*")))

In [None]:
plt.figure(figsize=(12, 6))
plt.bar(range(len(category_counts)), list(category_counts.values()), align="center")
plt.xticks(range(len(category_counts)), list(category_counts.keys()), fontsize=12, rotation=40)
plt.xlabel("Class Label", fontsize=13)
plt.ylabel("Class Size", fontsize=13)
plt.title("EUROSAT Class Distribution", fontsize=15)

In [None]:
def get_random_images(category_counts: T.Dict[str, int] = category_counts) -> T.List[Path]:
    return [DATA_PATH / c / (c + "_" + str(random.randint(0, 2000)) + ".jpg") for c in category_counts.keys()]

img_paths = get_random_images()
img_paths += get_random_images()

In [None]:
def plot_images(paths: T.List[Path]) -> None:
    if len(paths) != 20:
        raise ValueError("Paths list should only have 20 image paths.")

    plt.figure(figsize=(15, 8))
    for i in range(20):
        plt.subplot(4, 5, i + 1, xticks=[], yticks=[])
        image = PIL.Image.open(paths[i], "r")
        plt.imshow(np.asarray(image))
        plt.title(str(paths[i]).split("/")[-2], color="white")

plot_images(img_paths)

In [None]:
NUM_CLASSES = len(category_counts.keys())
(DATA_PATH.parent / "train").mkdir(parents=True, exist_ok=True)
(DATA_PATH.parent / "test").mkdir(parents=True, exist_ok=True)
TRAIN_PATH = DATA_PATH.parent / "train"
TEST_PATH = DATA_PATH.parent / "test"
BATCH_SIZE = 64
INPUT_SHAPE = (64, 64, 3)
CLASS_MODEL = "categorical"
SEED = random.randint(1, 100000)
SPLIT = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=SEED)

In [None]:
# create class subdirectories
for label in category_counts.keys():
    (TRAIN_PATH / label).mkdir(exist_ok=True)
    (TEST_PATH / label).mkdir(exist_ok=True)

In [None]:
all_paths = {}
for category in category_counts.keys():
    for image_path in (DATA_PATH / category).glob("*"):
        all_paths.update({image_path: category})

X = pd.Series(list(all_paths.keys()))
y = pd.get_dummies(pd.Series(all_paths.values()))

In [None]:
def move_image_files() -> None:
    """
    Move all training and test files to train and test directories in the data/eurosat directory.
    """
    for train_index, test_index in SPLIT.split(X, y):
        train_paths = X[train_index]
        test_paths = X[test_index]

        new_train_paths = [TRAIN_PATH / path.parent.name / path.name for path in train_paths]
        new_test_paths = [TEST_PATH / path.parent.name / path.name for path in test_paths]
        train_path_map = list((zip(train_paths, new_train_paths)))
        test_path_map = list((zip(test_paths, new_test_paths)))

        print("Moving training files to: {}".format(TRAIN_PATH.as_posix()))
        for paths in tqdm(train_path_map):
            if not Path(TRAIN_PATH / paths[1]).exists():
                shutil.copy(paths[0], paths[1])
            else:
                print(f"Already copied: {paths}")

        print("Moving testing files to: {}".format(TEST_PATH.as_posix()))
        for paths in tqdm(test_path_map):
            if not Path(TEST_PATH / paths[1]).exists():
                shutil.copy(paths[0], paths[1])
            else:
                print(f"Already copied: {paths}")

move_image_files()

In [None]:
train_preprocessor = tf.keras.preprocessing.image.ImageDataGenerator(
    rescale=1./255,
    horizontal_flip=True, 
    vertical_flip=True, 
    rotation_range=55,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=[0.2, 1],
    validation_split=0.2,
)
train_generator = train_preprocessor.flow_from_directory(directory=TRAIN_PATH, target_size=(64, 64), batch_size=BATCH_SIZE, class_mode=CLASS_MODEL, subset="training", shuffle=True, seed=SEED)
validation_generator = train_preprocessor.flow_from_directory(directory=TRAIN_PATH, target_size=(64, 64), batch_size=BATCH_SIZE, class_mode=CLASS_MODEL, subset="validation", shuffle=True, seed=SEED)

test_preprocessor = tf.keras.preprocessing.image.ImageDataGenerator(rescale=1./255)
test_generator = test_preprocessor.flow_from_directory(directory=TEST_PATH, target_size=(64, 64), batch_size=BATCH_SIZE, class_mode=CLASS_MODEL, color_mode="rgb", shuffle=False, seed=SEED)

In [None]:
image = cv2.imread(str(img_paths[random.randint(0, len(img_paths)-1)]))

plt.imshow(image[:, :, ::-1])
plt.title("Original Image")
plt.show()

In [None]:
samples = np.expand_dims(tf.keras.preprocessing.image.img_to_array(image), 0)
iterator = train_preprocessor.flow(samples * 255, batch_size=1)

figure, axis = plt.subplots(3, 3, figsize=(12, 12))
figure.suptitle("Sample of training image transformations")
figure.set
for i in range(3):
    for j in range(3):
        axis[i][j].imshow(next(iterator)[0].astype("uint8"))

In [None]:
np.save("class_indices", train_generator.class_indices)
print(train_generator.class_indices)

In [None]:
# Try performing some transfer learning on this model
base_model = ResNet50V2(include_top=False, weights="imagenet", input_shape=INPUT_SHAPE)
model_head = base_model.output
model_head = tf.keras.layers.Flatten()(model_head)
model_head = tf.keras.layers.Dense(512, activation="selu")(model_head)
model_head = tf.keras.layers.Dropout(0.15)(model_head)

output_layer = tf.keras.layers.Dense(NUM_CLASSES, activation="softmax")(model_head)
model = tf.keras.Model(inputs=base_model.input, outputs=output_layer)
for layer in base_model.layers:
    layer.trainable = True

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0004), loss="categorical_crossentropy", metrics=["categorical_crossentropy", "categorical_accuracy"])

In [None]:
model.summary()

In [None]:
def plot_model_history(history: T.Dict[str, T.Any]) -> None:
    """
    Plot model history.
    """
    acc = history.history['categorical_crossentropy']
    val_acc = history.history['val_categorical_crossentropy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.plot(acc)
    plt.plot(val_acc)
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    
    plt.subplot(1, 2, 2)
    plt.plot(loss)
    plt.plot(val_loss)
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    
    plt.show()

def display_results(y_true, y_preds, class_labels):
    
    results = pd.DataFrame(precision_recall_fscore_support(y_true, y_preds),
                          columns=class_labels).T
    results.rename(columns={0: 'Precision',
                           1: 'Recall',
                           2: 'F-Score',
                           3: 'Support'}, inplace=True)
    
    conf_mat = pd.DataFrame(confusion_matrix(y_true, y_preds), 
                            columns=class_labels,
                            index=class_labels)    
    f2 = fbeta_score(y_true, y_preds, beta=2, average='micro')
    accuracy = accuracy_score(y_true, y_preds)
    print(f"Accuracy: {accuracy}")
    print(f"Global F2 Score: {f2}")

    return results, conf_mat

def plot_predictions(y_true, y_preds, test_generator, class_indices):

    fig = plt.figure(figsize=(20, 10))
    for i, idx in enumerate(np.random.choice(test_generator.samples, size=20, replace=False)):
        ax = fig.add_subplot(4, 5, i + 1, xticks=[], yticks=[])
        ax.imshow(np.squeeze(test_generator[idx]))
        pred_idx = np.argmax(y_preds[idx])
        true_idx = y_true[idx]
                
        plt.tight_layout()
        ax.set_title("{}\n({})".format(class_indices[pred_idx], class_indices[true_idx]),
                     color=("green" if pred_idx == true_idx else "red")) 

In [None]:
CHECKPOINT_PATH = DATA_PATH / "checkpoints"
N_STEPS = train_generator.samples//BATCH_SIZE
N_VAL_STEPS = validation_generator.samples//BATCH_SIZE
N_EPOCHS = 100

# model callbacks
checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath='../working/model.weights.best.hdf5',
                        monitor="acc",
                        save_best_only=True,
                        verbose=1)

early_stop = tf.keras.callbacks.EarlyStopping(monitor="acc",
                           patience=10,
                           restore_best_weights=True,
                           mode='max')

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor="acc", factor=0.5,
                              patience=3, min_lr=0.00001)

In [None]:
model_history = model.fit(
    train_generator, 
    steps_per_epoch=N_STEPS, 
    epochs=50, 
    callbacks=[early_stop, checkpoint], 
    validation_data=validation_generator, 
    validation_steps=N_VAL_STEPS
)

In [None]:
plot_model_history(model_history)

In [None]:
class_indices = train_generator.class_indices
class_indices = dict((v,k) for k,v in class_indices.items())

predictions = model.predict(test_generator, steps=len(test_generator.filenames))
predicted_classes = np.argmax(np.rint(predictions), axis=1)
true_classes=test_generator.classes

prf, conf_mat = display_results(true_classes, predicted_classes, class_indices.values())
prf

In [None]:
conf_mat