In [None]:
# Fix randomness and hide warnings
seed = 42

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

os.environ['PYTHONHASHSEED'] = str(seed)
os.environ['MPLCONFIGDIR'] = os.getcwd()+'/configs/'

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

import numpy as np
# np.random.seed(seed)

import logging

import random
seed = random.seed(seed)

In [None]:
!pip install -U tensorflow==2.14.0
import tensorflow as tf
tf.__version__

In [None]:
# Import tensorflow
import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
from tensorflow.keras import initializers
tf.autograph.set_verbosity(0)
tf.get_logger().setLevel(logging.ERROR)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)
print(tf.__version__)

In [None]:
# Import other libraries
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from sklearn.utils import class_weight
import seaborn as sns

In [None]:
TRAIN_DIR="/kaggle/input/folders/data/training_folder"
VAL_DIR="/kaggle/input/folders/data/validation_folder"
TEST_DIR="/kaggle/input/folders/data/test_folder"
IMG_SHAPE=(96,96)
BATCH_SIZE=32

# IMPORT IMAGES FROM DIRECTORY


In [None]:
import glob
import cv2

In [None]:
training_h='/kaggle/input/folders/data/training_folder/healthy/*.jpg'
training_un='/kaggle/input/folders/data/training_folder/unhealthy/*.jpg'
validation_h='/kaggle/input/folders/data/validation_folder/healthy/*.jpg'
validation_un='/kaggle/input/folders/data/validation_folder/unhealthy/*.jpg'
test_h='/kaggle/input/folders/data/test_folder/healthy/*.jpg'
test_un='/kaggle/input/folders/data/test_folder/unhealthy/*.jpg'

obj_tr_h=glob.glob(training_h)
obj_tr_un=glob.glob(training_un)
obj_val_h=glob.glob(validation_h)
obj_val_un=glob.glob(validation_un)
obj_ts_h=glob.glob(test_h)
obj_ts_un=glob.glob(test_un)

In [None]:
y_train=[]
y_train_mu=[]

x_train=[]

for el in obj_tr_h:
    im=cv2.cvtColor(cv2.imread(el), cv2.COLOR_BGR2RGB)
    x_train.append(im)
    y_train.append([np.float32(0)])
    
    y_train_mu.append([np.float32(1),np.float32(0)])
    
for el in obj_tr_un:
    im=cv2.cvtColor(cv2.imread(el), cv2.COLOR_BGR2RGB)
    x_train.append(im)
    y_train.append([np.float32(1)])   
    
    y_train_mu.append([np.float32(0),np.float32(1)])
    
    
shuffle_indexes=np.arange(len(y_train))
np.random.seed=seed
np.random.shuffle(shuffle_indexes)

x_train=np.array(x_train)
y_train=np.array(y_train)
y_train_mu=np.array(y_train_mu)


x_train=x_train[shuffle_indexes]
y_train=y_train[shuffle_indexes]
y_train_mu=y_train_mu[shuffle_indexes]

In [None]:
y_val=[]
y_val_mu=[]

x_val=[]

for el in obj_val_h:
    im=cv2.cvtColor(cv2.imread(el), cv2.COLOR_BGR2RGB)
    x_val.append(im)
    y_val.append([np.float32(0)])
    y_val_mu.append([np.float32(1),np.float32(0)])
    
for el in obj_val_un:
    im=cv2.cvtColor(cv2.imread(el), cv2.COLOR_BGR2RGB)
    x_val.append(im)
    y_val.append([np.float32(1)])    
    y_val_mu.append([np.float32(0),np.float32(1)])    
    
shuffle_indexes=np.arange(len(y_val))
np.random.seed=seed
np.random.shuffle(shuffle_indexes)

x_val=np.array(x_val)
y_val=np.array(y_val)
y_val_mu=np.array(y_val_mu)

x_val=x_val[shuffle_indexes]
y_val=y_val[shuffle_indexes]
y_val_mu=y_val_mu[shuffle_indexes]

In [None]:
y_test=[]

y_test_mu=[]
x_test=[]

for el in obj_ts_h:
    im=cv2.cvtColor(cv2.imread(el), cv2.COLOR_BGR2RGB)
    x_test.append(im)
    y_test.append([np.float32(0)])
    y_test_mu.append([np.float32(1),np.float32(0)])
    
for el in obj_ts_un:
    im=cv2.cvtColor(cv2.imread(el), cv2.COLOR_BGR2RGB)
    x_test.append(im)
    y_test.append([np.float32(1)])    
    y_test_mu.append([np.float32(0),np.float32(1)])    
    
shuffle_indexes=np.arange(len(y_test))
np.random.seed=seed
np.random.shuffle(shuffle_indexes)

x_test=np.array(x_test)
y_test=np.array(y_test)
y_test_mu=np.array(y_test_mu)

x_test=x_test[shuffle_indexes]
y_test=y_test[shuffle_indexes]
y_test_mu=y_test_mu[shuffle_indexes]

### Initially we splitted our dataset in train, validation and test. Then we decided to keep only train and validation and test our models with submissions, in order to exploit more data.

In [None]:
x_train = np.append(x_train, x_test, axis=0)
y_train = np.append(y_train, y_test, axis=0)

shuffle_indexes=np.arange(len(y_train))
np.random.seed=seed
np.random.shuffle(shuffle_indexes)
x_train=x_train[shuffle_indexes]
y_train=y_train[shuffle_indexes]

In [None]:
print(x_train.shape)
print(x_val.shape)
print(y_train.shape)
print(y_val.shape)

# DATA AUGMENTATION


In [None]:
AUTO = tf.data.AUTOTUNE
BATCH_SIZE = 32
IMG_SIZE = 96

In [None]:
y_train = tf.keras.utils.to_categorical(y_train, num_classes=2)
y_test = tf.keras.utils.to_categorical(y_test, num_classes=2)
y_val = tf.keras.utils.to_categorical(y_val, num_classes=2)

In [None]:
def preprocess_image(image, label):
    image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
    image = tf.image.convert_image_dtype(image, tf.float32) 
    return image, label

In [None]:
train_ds_one = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(1024)
    .map(preprocess_image, num_parallel_calls=AUTO)
)
train_ds_two = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(1024)
    .map(preprocess_image, num_parallel_calls=AUTO)
)

train_ds_simple = tf.data.Dataset.from_tensor_slices((x_train, y_train))

test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test))

val_ds=tf.data.Dataset.from_tensor_slices((x_val, y_val))

train_ds_simple = (
    train_ds_simple.map(preprocess_image, num_parallel_calls=AUTO)
    .batch(BATCH_SIZE)
    .prefetch(AUTO)
)

# Combine two shuffled datasets from the same training data.
train_ds = tf.data.Dataset.zip((train_ds_one, train_ds_two))

test_ds = (
    test_ds.map(preprocess_image, num_parallel_calls=AUTO)
    .batch(BATCH_SIZE)
    .prefetch(AUTO)
)

val_ds = (
    val_ds.map(preprocess_image, num_parallel_calls=AUTO)
    .batch(BATCH_SIZE)
    .prefetch(AUTO)
)

### We implemented both mixup and cutmix, but then we chose to use only mixup after a few experiments because it gave us better results

In [None]:
def sample_beta_distribution(size, concentration_0=0.2, concentration_1=0.2):
    gamma_1_sample = tf.random.gamma(shape=[size], alpha=concentration_1)
    gamma_2_sample = tf.random.gamma(shape=[size], alpha=concentration_0)
    return gamma_1_sample / (gamma_1_sample + gamma_2_sample)


@tf.function
def get_box(lambda_value):
    cut_rat = tf.math.sqrt(1.0 - lambda_value)

    cut_w = IMG_SIZE * cut_rat  # rw
    cut_w = tf.cast(cut_w, tf.int32)

    cut_h = IMG_SIZE * cut_rat  # rh
    cut_h = tf.cast(cut_h, tf.int32)

    cut_x = tf.random.uniform((1,), minval=0, maxval=IMG_SIZE, dtype=tf.int32)  # rx
    cut_y = tf.random.uniform((1,), minval=0, maxval=IMG_SIZE, dtype=tf.int32)  # ry

    boundaryx1 = tf.clip_by_value(cut_x[0] - cut_w // 2, 0, IMG_SIZE)
    boundaryy1 = tf.clip_by_value(cut_y[0] - cut_h // 2, 0, IMG_SIZE)
    bbx2 = tf.clip_by_value(cut_x[0] + cut_w // 2, 0, IMG_SIZE)
    bby2 = tf.clip_by_value(cut_y[0] + cut_h // 2, 0, IMG_SIZE)

    target_h = bby2 - boundaryy1
    if target_h == 0:
        target_h += 1

    target_w = bbx2 - boundaryx1
    if target_w == 0:
        target_w += 1

    return boundaryx1, boundaryy1, target_h, target_w


@tf.function
def cutmix(train_ds_one, train_ds_two):
    (image1, label1), (image2, label2) = train_ds_one, train_ds_two

    alpha = [2]
    beta = [5]

    # Get a sample from the Beta distribution
    lambda_value = sample_beta_distribution(1, alpha, beta)

    # Define Lambda
    lambda_value = lambda_value[0][0]

    # Get the bounding box offsets, heights and widths
    boundaryx1, boundaryy1, target_h, target_w = get_box(lambda_value)

    # Get a patch from the second image (`image2`)
    crop2 = tf.image.crop_to_bounding_box(
        image2, boundaryy1, boundaryx1, target_h, target_w
    )
    # Pad the `image2` patch (`crop2`) with the same offset
    image2 = tf.image.pad_to_bounding_box(
        crop2, boundaryy1, boundaryx1, IMG_SIZE, IMG_SIZE
    )
    # Get a patch from the first image (`image1`)
    crop1 = tf.image.crop_to_bounding_box(
        image1, boundaryy1, boundaryx1, target_h, target_w
    )
    # Pad the `image1` patch (`crop1`) with the same offset
    img1 = tf.image.pad_to_bounding_box(
        crop1, boundaryy1, boundaryx1, IMG_SIZE, IMG_SIZE
    )

    # Modify the first image by subtracting the patch from `image1`
    # (before applying the `image2` patch)
    image1 = image1 - img1
    # Add the modified `image1` and `image2`  together to get the CutMix image
    image = image1 + image2

    # Adjust Lambda in accordance to the pixel ration
    lambda_value = 1 - (target_w * target_h) / (IMG_SIZE * IMG_SIZE)
    lambda_value = tf.cast(lambda_value, tf.float32)

    # Combine the labels of both images
    label = lambda_value * label1 + (1 - lambda_value) * label2
    return image, label



def mixup(train_ds_one , train_ds_two):
    (image1, label1), (image2, label2) = train_ds_one, train_ds_two

    alpha = [0.2]
    beta = [0.2]

    # Get a sample from the Beta distribution
    lambda_value = sample_beta_distribution(1, alpha, beta)
    
    lambda_value = lambda_value[0][0]
    
    image=image1*lambda_value+image2*(1-lambda_value)
    label = label1 * lambda_value + label2 * (1 - lambda_value)
    
    return image,label



def cutmix_mixup(train_ds_one , train_ds_two , p=1):
    if np.random.binomial(1,p):
        return mixup(train_ds_one,train_ds_two)
        
        
    else:
        return cutmix(train_ds_one,train_ds_two)

In [None]:
train_ds_cm_mu = (
    train_ds.shuffle(1024)
    .map(cutmix_mixup, num_parallel_calls=AUTO)
    .batch(BATCH_SIZE)
    .prefetch(AUTO)
)

# Let's preview 9 samples from the dataset
image_batch, label_batch = next(iter(train_ds_cm_mu))
plt.figure(figsize=(10, 10))
for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.title(np.array(label_batch[i]))
    plt.imshow(image_batch[i]/255)
    plt.axis("off")

In [None]:
for image_batch, label_batch in train_ds_cm_mu.take(1):
    plt.figure(figsize=(10, 10))
    for i in range(len(image_batch)):
        ax = plt.subplot(8, 4, i + 1)
        plt.title(np.array(label_batch[i]))
        plt.imshow(image_batch[i].numpy() / 255.)
#        print(label_batch[i])
        plt.axis("off")

In [None]:
image_batch, label_batch = next(iter(train_ds_cm_mu))
image_batch.shape

In [None]:
train_ds_cm_mu.cardinality()

# BUILD MODEL

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# train generator with augmentation
train_image_gen  = ImageDataGenerator()                                      

train_dataset = train_image_gen.flow_from_directory(directory=TRAIN_DIR,
                                                    target_size=IMG_SHAPE,
                                                   # color_mode='rgb',
                                                    classes=None,
                                                    class_mode='binary',
                                                    batch_size=BATCH_SIZE,
                                                    shuffle=True,
                                                    seed=seed,
                                                    )

test_dataset = train_image_gen.flow_from_directory(directory=TEST_DIR,
                                                    target_size=IMG_SHAPE,
                                                   # color_mode='rgb',
                                                    classes=None,
                                                    class_mode='categorical',
                                                    batch_size=BATCH_SIZE,
                                                    shuffle=False,
                                                    seed=seed,
                                                    )

In [None]:
class_weights = class_weight.compute_class_weight(class_weight='balanced',
                                                  classes=np.unique(train_dataset.classes),
                                                  y=train_dataset.classes)

class_weights = dict(zip(np.unique(train_dataset.classes), class_weights))
class_weights

In [None]:
from tensorflow.keras.applications.convnext import preprocess_input

In [None]:
pretrained_model  = tfk.applications.ConvNeXtTiny (
    weights='imagenet',
    include_top=False,
#    input_shape=(224, 224,3),
)
pretrained_model.trainable = False

In [None]:
preprocessing = tf.keras.Sequential([
  #        tfkl.RandomBrightness(0.35,value_range=(0,255)),
          tfkl.RandomFlip("horizontal"),
          tfkl.RandomFlip("vertical"),
 #         tfkl.RandomContrast(0.5),
          tfkl.RandomRotation(0.3),
          tfkl.RandomTranslation(0.05,0.05,fill_mode="reflect"),
  #        tfkl.RandomZoom((-0.15,0),(-0.15,0),fill_mode="reflect")
    ], name='preprocessing')


inputs = tfk.Input(shape=(96, 96, 3))
x = tfkl.Resizing(224, 224, interpolation='bicubic', name='resizing')(inputs)

x = preprocessing(x)
x = preprocess_input(x)

x = pretrained_model(x)
# Add a Dense layer with 2 units and softmax activation as the classifier

x=tfkl.BatchNormalization()(x)

x = tfkl.GlobalAveragePooling2D()(x)
x = tfkl.Dropout(0.2)(x)



outputs = tfkl.Dense(2, activation='softmax', kernel_initializer=tfk.initializers.GlorotNormal())(x)

# Create a Model connecting input and output
tl_model = tfk.Model(inputs=inputs, outputs=outputs, name='resnet')

# Compile the model with Categorical Cross-Entropy loss and Adam optimizer
tl_model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(learning_rate=0.001), metrics=['accuracy'])

# Display model summary
tl_model.summary()

In [None]:
earlyStopping = tfk.callbacks.EarlyStopping(monitor='val_loss', patience=10, verbose=0, mode='min')
mcp_save = tfk.callbacks.ModelCheckpoint('.mdl_wts.hdf5', save_best_only=True, monitor='val_loss', mode='min')
reduce_lr_loss = tfk.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10, verbose=1, epsilon=1e-4, mode='min')

In [None]:
tl_model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(learning_rate=10**-3), metrics=['accuracy'])

# TRANSFER LEARNING

In [None]:
initial_epochs = 15

history = tl_model.fit(
    x = train_ds_cm_mu,
    epochs = initial_epochs,
    validation_data=val_ds,
    class_weight = class_weights,
    callbacks=[earlyStopping, mcp_save, reduce_lr_loss]
).history

In [None]:
acc = [0.] + history['accuracy']
val_acc = [0.] + history['val_accuracy']

loss = history['loss']
val_loss = history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
plt.ylim([min(plt.ylim()),1])
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
plt.ylim([0,1.0])
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()

# FINE TUNING

In [None]:
pretrained_model.trainable = True

# Let's take a look to see how many layers are in the base model
print("Number of layers in the base model: ", len(pretrained_model.layers))

# Fine-tune from this layer onwards
fine_tune_at = 90

# Freeze all the layers before the `fine_tune_at` layer
for layer in pretrained_model.layers[:fine_tune_at]:
    layer.trainable =  False

# Define a BinaryCrossentropy loss function. Use from_logits=True
loss_function = tf.keras.losses.CategoricalCrossentropy()

# Define an Adam optimizer with a learning rate of 0.1 * base_learning_rate
optimizer = tf.keras.optimizers.Adam(0.0001)

# Use accuracy as evaluation metric
metrics = ['accuracy']

learning_rate= 1e-4 #0.1 * base_learning_rate

# Compile the model
tl_model.compile(optimizer=optimizer,
              loss=loss_function,
              metrics=metrics)

In [None]:
tl_model.summary()

In [None]:
model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    '/kaggle/working/best_fine_tuned_model.h5',
    monitor='val_loss',
    mode='min',
    verbose=1,
    save_best_only=True
)

In [None]:
# tl_model.load_weights('/kaggle/working/best_fine_tuned_model.h5')

In [None]:
history_fine = tl_model.fit(train_ds_cm_mu,
                         epochs=55,
                         initial_epoch=45,
                         validation_data=val_ds,
                         callbacks=model_checkpoint,
                         class_weight = class_weights  
                        )

In [None]:
acc += history_fine.history['accuracy']
val_acc += history_fine.history['val_accuracy']

loss += history_fine.history['loss']
val_loss += history_fine.history['val_loss']

In [None]:
plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.ylim([0, 1])
plt.plot([initial_epochs-1,initial_epochs-1],
          plt.ylim(), label='Start Fine Tuning')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.ylim([0, 1.0])
plt.plot([initial_epochs-1,initial_epochs-1],
         plt.ylim(), label='Start Fine Tuning')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()

In [None]:
tl_model.save('/kaggle/working/ConvNext2')

In [None]:
# model = tfk.models.load_model('/kaggle/working/ResNet_fin')
model = tfk.models.load_model('/kaggle/working/ConvNext2')

# Predict labels for the entire test set
predictions = model.predict(x_val, verbose=0)

# Display the shape of the predictions
print("Predictions Shape:", predictions.shape)

# Test the model
#### We computed the metrics for validation set, the real test is done with submission

In [None]:
out = model.predict(x_val, verbose=0)
# out = (pred >= 0.5).astype(float)
print(y_val[:20])
print(out[:20])

In [None]:
out2 = [np.argmax(el) for el in out]
y_val2 = [np.argmax(el) for el in y_val]
print(out2[:42])
print(y_val2[:42])

In [None]:
accuracy = tf.keras.metrics.BinaryAccuracy(
    name="binary_accuracy", dtype=None
)
accuracy.update_state(y_val2, out2)
accuracy.result().numpy()

In [None]:
precision = tf.keras.metrics.Precision(
    name="precision", dtype=None
)
precision.update_state(y_val2, out2)
precision.result().numpy()

In [None]:
recall = tf.keras.metrics.Recall(
    name="recall", dtype=None
)
recall.update_state(y_val2, out2)
recall.result().numpy()

In [None]:
f1_score = tf.keras.metrics.F1Score(
    name="f1_score", dtype=None
)
f1_score.update_state(np.reshape(y_val2, (len(y_val2),1)), np.reshape(out2, (len(out2),1)))
f1_score.result().numpy()

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
labels = ["Healthy", "Unhealthy"]

cm = confusion_matrix(y_val2, out2)

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)

disp.plot(cmap=plt.cm.Blues)
plt.show()

# Test Time Augmentation

In [None]:
def TTA(model,X):  
    test_datagen = ImageDataGenerator(
                rotation_range=45,
               # width_shift_range=0.2,
               # height_shift_range=0.2,
                #zoom_range=[0.5,1.2],

                #shear_range=0.2,
                vertical_flip=True,
                horizontal_flip=True,
                fill_mode='reflect',
                )
    y_hats=[]

    n_steps=5
    BATCH=32
    
    for i in range(n_steps):
        preds=model.predict_generator(test_datagen.flow(X,batch_size=BATCH,shuffle=False, seed=seed), 
                                      steps=len(X)/BATCH)
        y_hats.append(preds)

    y_hats=np.array(y_hats)
    pred=np.mean(y_hats,axis=0)
    print(pred.shape)
    results= np.array([np.argmax(el) for el in pred])
    results = results.flatten()
    return results

In [None]:
results=TTA(model,x_val)
y_pred=results

In [None]:
accuracy = tf.keras.metrics.BinaryAccuracy(
    name="binary_accuracy", dtype=None
)
accuracy.update_state(y_val2, y_pred)
accuracy.result().numpy()

In [None]:
precision = tf.keras.metrics.Precision(
    name="precision", dtype=None
)
precision.update_state(y_val2, y_pred)
precision.result().numpy()

In [None]:
recall = tf.keras.metrics.Recall(
    name="recall", dtype=None
)
recall.update_state(y_val2, y_pred)
recall.result().numpy()

In [None]:
f1_score = tf.keras.metrics.F1Score(
    name="f1_score", dtype=None
)
f1_score.update_state(np.reshape(y_val2, (len(y_val),1)), np.reshape(y_pred, (len(y_pred),1)))
f1_score.result().numpy()

In [None]:
labels = ["Healthy", "Unhealthy"]

cm = confusion_matrix(y_val2, y_pred)

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)

disp.plot(cmap=plt.cm.Blues)
plt.show()