<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"></ul></div>

In [2]:
import numpy as np
from sklearn.model_selection import KFold
from keras.datasets import imdb
from keras.models import Sequential
import pandas as pd
import numpy as np
from tensorflow import keras
from keras import layers
import tensorflow_addons as tfa
import matplotlib.pyplot as plt
%matplotlib inline 
import seaborn as sns
from tqdm import tqdm

import cv2
import os
import tensorflow as tf
import sklearn as sk

import warnings
warnings.filterwarnings('ignore')
tf.config.list_physical_devices(
    device_type=None
)



[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU')]

In [3]:
#Import data_labels_mainData.csv into a DataFrame
main_data = pd.read_csv('data_labels_mainData.csv')

#Import data_labels_extraData.csv into a DataFrame
extra_data = pd.read_csv('data_labels_extraData.csv')

In [4]:
from sklearn.model_selection import train_test_split

train_task, val_task = train_test_split(main_data[['ImageName','cellType']], 
                                              test_size=0.2, random_state=9)                                        


In [5]:
from PIL import Image
def GetImage(directory):
    images=[]
    for name in tqdm(directory, desc="Adding images"):
        image = cv2.imread("patch_images/"+name)
        dim = (32,32)
        image = cv2.resize(image, dim)
        image = Image.fromarray(image,'RGB')
        images.append(np.array(image))
    result = np.array(images)
    print("\ngetImage COMPLETED!")
    return result

#Create a function to generate sample to fix the Imblance of the dataset
from imblearn.over_sampling import RandomOverSampler
def GenerateSample(X,Y):
    ros = RandomOverSampler(random_state = 1)
    x, y = ros.fit_resample(X.values.reshape(-1,1), Y)
    x = x.flatten()
    return x,y

In [6]:
x_train = train_task['ImageName']
y_train = train_task['cellType']

#Generate sample
x_train, y_train = GenerateSample(x_train,y_train)
x_train = GetImage(x_train)

x_test = val_task['ImageName']
x_test = GetImage(x_test)

y_test = val_task['cellType']

Adding images: 100%|██████████| 13128/13128 [00:02<00:00, 5297.09it/s]



getImage COMPLETED!


Adding images: 100%|██████████| 1980/1980 [00:00<00:00, 4872.41it/s]


getImage COMPLETED!





In [7]:
num_classes = 4
input_shape = (32, 32, 3)


# Display shapes of train and test datasets
print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")

x_train shape: (13128, 32, 32, 3) - y_train shape: (13128,)
x_test shape: (1980, 32, 32, 3) - y_test shape: (1980,)


In [8]:
data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.02),
        layers.RandomWidth(0.2),
        layers.RandomHeight(0.2),
    ]
)

# Setting the state of the normalization layer.
data_augmentation.layers[0].adapt(x_train)

In [9]:
def create_encoder():
    resnet = keras.applications.ResNet50V2(
        include_top=False, weights=None, input_shape=input_shape, pooling="avg"
    )

    inputs = keras.Input(shape=input_shape)
    augmented = data_augmentation(inputs)
    outputs = resnet(augmented)
    model = keras.Model(inputs=inputs, outputs=outputs, name="cifar10-encoder")
    return model


encoder = create_encoder()
encoder.summary()

learning_rate = 0.001
batch_size = 265
hidden_units = 512
projection_units = 128
num_epochs = 50
dropout_rate = 0.5
temperature = 0.05

Model: "cifar10-encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 32, 32, 3)]       0         
                                                                 
 sequential (Sequential)     (None, 32, 32, 3)         7         
                                                                 
 resnet50v2 (Functional)     (None, 2048)              23564800  
                                                                 
Total params: 23,564,807
Trainable params: 23,519,360
Non-trainable params: 45,447
_________________________________________________________________


In [10]:
def create_classifier(encoder, trainable=True):

    for layer in encoder.layers:
        layer.trainable = trainable

    inputs = keras.Input(shape=input_shape)
    features = encoder(inputs)
    features = layers.Dropout(dropout_rate)(features)
    features = layers.Dense(hidden_units, activation="relu")(features)
    features = layers.Dropout(dropout_rate)(features)
    outputs = layers.Dense(num_classes, activation="softmax")(features)

    model = keras.Model(inputs=inputs, outputs=outputs, name="cellType-classifier")
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )
    return model

In [13]:
class SupervisedContrastiveLoss(keras.losses.Loss):
    def __init__(self, temperature=1, name=None):
        super(SupervisedContrastiveLoss, self).__init__(name=name)
        self.temperature = temperature

    def __call__(self, labels, feature_vectors, sample_weight=None):
        # Normalize feature vectors
        feature_vectors_normalized = tf.math.l2_normalize(feature_vectors, axis=1)
        # Compute logits
        logits = tf.divide(
            tf.matmul(
                feature_vectors_normalized, tf.transpose(feature_vectors_normalized)
            ),
            self.temperature,
        )
        return tfa.losses.npairs_loss(tf.squeeze(labels), logits)


def add_projection_head(encoder):
    inputs = keras.Input(shape=input_shape)
    features = encoder(inputs)
    outputs = layers.Dense(projection_units, activation="relu")(features)
    model = keras.Model(
        inputs=inputs, outputs=outputs, name="cellType-encoder_with_projection-head"
    )
    return model

In [14]:
encoder = create_encoder()

encoder_with_projection_head = add_projection_head(encoder)
encoder_with_projection_head.compile(
    optimizer=keras.optimizers.Adam(learning_rate),
    loss=SupervisedContrastiveLoss(temperature),
)

encoder_with_projection_head.summary()

history = encoder_with_projection_head.fit(
    x=x_train, y=y_train, batch_size=batch_size, epochs=num_epochs
)

Model: "cellType-encoder_with_projection-head"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_8 (InputLayer)        [(None, 32, 32, 3)]       0         
                                                                 
 cifar10-encoder (Functional  (None, 2048)             23564807  
 )                                                               
                                                                 
 dense_1 (Dense)             (None, 128)               262272    
                                                                 
Total params: 23,827,079
Trainable params: 23,781,632
Non-trainable params: 45,447
_________________________________________________________________
Epoch 1/50
 7/50 [===>..........................] - ETA: 3:12 - loss: 6.3665

KeyboardInterrupt: 

In [None]:
classifier = create_classifier(encoder, trainable=False)

history = classifier.fit(x=x_train, y=y_train, batch_size=batch_size, epochs=num_epochs)

accuracy = classifier.evaluate(x_test, y_test)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")