<a href="https://colab.research.google.com/github/tomger2/RandAugment_Course_Project/blob/main/RandAugment_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras import layers
import tensorflow_datasets as tfds
from imgaug import augmenters as iaa
import imgaug as ia
from tensorflow.keras import metrics
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix as cx
from sklearn.metrics import classification_report
from scipy.stats import friedmanchisquare

np.random.seed(1)
tfds.disable_progress_bar()
tf.random.set_seed(42)
ia.seed(42)

!pip install imgaug --upgrade
print (ia.__version__)

Requirement already up-to-date: imgaug in /usr/local/lib/python3.7/dist-packages (0.4.0)
0.4.0


In [None]:
AUTO = tf.data.AUTOTUNE
BATCH_SIZE = 128
EPOCHS = 10
IMAGE_SIZE = 72
NUM_FOLDS = 10

CURRENT_METHOD = 'AUTOAUGMENT' #NORMAL / AUTOAUGMENT / COMBINED

CURRENT_DATASET = 'cifar10' #cifar10 / cifar100 / mnist / fashion


In [None]:
rand_aug = iaa.RandAugment(n=5, m=5)


simple_aug = tf.keras.Sequential(
    [
        layers.experimental.preprocessing.Resizing(IMAGE_SIZE, IMAGE_SIZE),
        layers.experimental.preprocessing.RandomFlip("horizontal"),
        layers.experimental.preprocessing.RandomRotation(factor=0.02),
        layers.experimental.preprocessing.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ]
)


def augment(images):
    # Input to `augment()` is a TensorFlow tensor which
    # is not supported by `imgaug`. This is why we first
    # convert it to its `numpy` variant.
    images = tf.cast(images, tf.uint8)
    return rand_aug(images=images.numpy())


def get_training_model(augmentation):
    resnet50_v2 = tf.keras.applications.ResNet50V2(
        weights=None,
        include_top=True,
        input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3),
        classes=10,
    )
    model = tf.keras.Sequential(
        [
            augmentation,
            layers.InputLayer((IMAGE_SIZE, IMAGE_SIZE, 3)),
            layers.experimental.preprocessing.Rescaling(scale=1.0 / 127.5, offset=-1),
            resnet50_v2,
        ]
    )
    return model


def get_basic_training_model():
  resnet50_v2 = tf.keras.applications.ResNet50V2(
        weights=None,
        include_top=True,
        input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3),
        classes=10,
    )
  model = tf.keras.Sequential(
        [
            layers.InputLayer((IMAGE_SIZE, IMAGE_SIZE, 3)),
            layers.experimental.preprocessing.Rescaling(scale=1.0 / 127.5, offset=-1),
            resnet50_v2,
        ]
    )
  return model


def frideman_test(data):
  stat,p = friedmanchisquare(data)
  alpha = 0.05
  if p > alpha:
    print('same distribution')
  else:
    print('diffrent distribution')





In [None]:
if CURRENT_DATASET == 'cifar10':
  (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
elif CURRENT_DATASET == 'cifar100':
  (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar100.load_data()
elif CURRENT_DATASET == 'mnist':
  (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
else:
  (x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()


print(f"Total training examples: {len(x_train)}")
print(f"Total test examples: {len(x_test)}")

inputs = np.concatenate((x_train, x_test), axis=0)
targets = np.concatenate((y_train, y_test), axis=0)

kfold = KFold(n_splits=NUM_FOLDS, shuffle=True)

acc_per_fold = []
loss_per_fold = []
precission_per_fold = []
f1_per_fold = []
recall_per_fold = []



for train, test in kfold.split(inputs, targets):

  if CURRENT_METHOD == 'AUTOAUGMENT':
    train_ds_rand = (
        tf.data.Dataset.from_tensor_slices((inputs[train], targets[train]))
        .shuffle(BATCH_SIZE * 100)
        .batch(BATCH_SIZE)
        .map(
            lambda x, y: (tf.image.resize(x, (IMAGE_SIZE, IMAGE_SIZE)), y),
            num_parallel_calls=AUTO,
        )
        .map(
            lambda x, y: (tf.py_function(augment, [x], [tf.float32])[0], y),
            num_parallel_calls=AUTO,
        )
        .prefetch(AUTO)
    )


    test_ds = (
        tf.data.Dataset.from_tensor_slices((inputs[test], targets[test]))
        .batch(BATCH_SIZE)
        .map(lambda x, y: (tf.image.resize(x, (72, 72)), y),
            num_parallel_calls=AUTO)
        .prefetch(AUTO))

    rand_aug_model = get_basic_training_model()
    rand_aug_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]
    )


    rand_aug_model.fit(train_ds_rand, epochs=EPOCHS)

    pred = rand_aug_model.predict(np.array([tf.image.resize(x,(72, 72)) for x in inputs[test]]))

    pred = np.argmax(pred, axis=-1)

    #confusion = cx(targets[test], pred)

    #print(confusion)
    report = classification_report(targets[test], pred, output_dict=True)

    macro_precision =  report['macro avg']['precision']
    macro_recall = report['macro avg']['recall']    
    macro_f1 = report['macro avg']['f1-score']


    scores = rand_aug_model.evaluate(test_ds)
    acc_per_fold.append(scores[1] * 100)
    loss_per_fold.append(scores[0])
    precission_per_fold.append(macro_precision)
    recall_per_fold.append(macro_recall)
    f1_per_fold.append(macro_f1)

  elif CURRENT_METHOD == 'COMBINED':

    train_ds_rand = (
        tf.data.Dataset.from_tensor_slices((inputs[train], targets[train]))
        .shuffle(BATCH_SIZE * 100)
        .batch(BATCH_SIZE)
        .map(
            lambda x, y: (tf.image.resize(x, (IMAGE_SIZE, IMAGE_SIZE)), y),
            num_parallel_calls=AUTO,
        )
        .map(
            lambda x, y: (tf.py_function(augment, [x], [tf.float32])[0], y),
            num_parallel_calls=AUTO,
        )
        .prefetch(AUTO)
    )


    test_ds = (
        tf.data.Dataset.from_tensor_slices((inputs[test], targets[test]))
        .batch(BATCH_SIZE)
        .map(lambda x, y: (tf.image.resize(x, (72, 72)), y),
            num_parallel_calls=AUTO)
        .prefetch(AUTO))

    rand_aug_model = get_training_model(simple_aug)
    rand_aug_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]
    )


    rand_aug_model.fit(train_ds_rand, epochs=EPOCHS)

    pred = rand_aug_model.predict(np.array([tf.image.resize(x,(72, 72)) for x in inputs[test]]))

    pred = np.argmax(pred, axis=-1)

    report = classification_report(targets[test], pred, output_dict=True)

    macro_precision =  report['macro avg']['precision']
    macro_recall = report['macro avg']['recall']    
    macro_f1 = report['macro avg']['f1-score']


    scores = rand_aug_model.evaluate(test_ds)
    acc_per_fold.append(scores[1] * 100)
    loss_per_fold.append(scores[0])
    precission_per_fold.append(macro_precision)
    recall_per_fold.append(macro_recall)
    f1_per_fold.append(macro_f1)

  else:

    simple_aug_model = get_training_model(simple_aug)
    simple_aug_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )

    test_ds = (
        tf.data.Dataset.from_tensor_slices((inputs[test], targets[test]))
        .batch(BATCH_SIZE)
        .map(lambda x, y: (tf.image.resize(x, (72, 72)), y),
            num_parallel_calls=AUTO)
        .prefetch(AUTO))

    simple_aug_model.fit(inputs[train], targets[train], batch_size=BATCH_SIZE, epochs=EPOCHS)

    pred = simple_aug_model.predict(np.array([tf.image.resize(x,(72, 72)) for x in inputs[test]]))

    pred = np.argmax(pred, axis=-1)

    report = classification_report(targets[test], pred, output_dict=True)

    macro_precision =  report['macro avg']['precision']
    macro_recall = report['macro avg']['recall']    
    macro_f1 = report['macro avg']['f1-score']


    scores = simple_aug_model.evaluate(test_ds)
    acc_per_fold.append(scores[1] * 100)
    loss_per_fold.append(scores[0])
    precission_per_fold.append(macro_precision)
    recall_per_fold.append(macro_recall)
    f1_per_fold.append(macro_f1)


for i in range(0, len(acc_per_fold)):
  print('------------------------------------------------------------------------')
  print(f'> Fold {i+1} - Loss: {loss_per_fold[i]} - Accuracy: {acc_per_fold[i]}%  - Precission: {precission_per_fold[i]} - Recall : {recall_per_fold[i]} - f1 : {f1_per_fold[i]}')


print('Average scores for all folds:')
print(f'> Accuracy: {np.mean(acc_per_fold)} (+- {np.std(acc_per_fold)})')
print(f'> Loss: {np.mean(loss_per_fold)}')
print(f'> Precission : {np.mean(precission_per_fold)}')
print(f'> Recall : {np.mean(recall_per_fold)}')
print(f'> f1 : {np.mean(f1_per_fold)}')
print('------------------------------------------------------------------------')


Total training examples: 50000
Total test examples: 10000
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10