In [1]:
pip install tensorflow-addons

Note: you may need to restart the kernel to use updated packages.


In [1]:
import tensorflow as tf
import tensorflow_addons as tfa
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers


In [2]:
num_classes = 10
input_shape = (32, 32, 3)

AUTO=tf.data.AUTOTUNE
learning_rate = 0.001
batch_size = 265
hidden_units = 512
projection_units = 128
num_epochs = 50
dropout_rate = 0.5
temperature = 0.05



# Load the train and test data splits
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()

# Display shapes of train and test datasets
print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")

test_dataset=(tf.data.Dataset.from_tensor_slices((x_test, y_test))
    .shuffle(1024)
    .batch(batch_size)
    .prefetch(AUTO)
)




x_train shape: (50000, 32, 32, 3) - y_train shape: (50000, 1)
x_test shape: (10000, 32, 32, 3) - y_test shape: (10000, 1)


In [3]:
data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.02),
        layers.RandomWidth(0.2),
        layers.RandomHeight(0.2),
    ]
)

# Setting the state of the normalization layer.
data_augmentation.layers[0].adapt(x_train)


In [4]:
def create_encoder(backbone=keras.applications.ResNet50V2(
        include_top=False, weights=None, input_shape=input_shape, pooling="avg"
    )):
    

    inputs = keras.Input(shape=input_shape)
    augmented = data_augmentation(inputs)
    outputs = backbone(augmented)
    model = keras.Model(inputs=inputs, outputs=outputs, name="cifar10-encoder")
    return model


encoder = create_encoder()
encoder.summary()



Model: "cifar10-encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 32, 32, 3)]       0         
                                                                 
 sequential (Sequential)     (None, 32, 32, 3)         7         
                                                                 
 resnet50v2 (Functional)     (None, 2048)              23564800  
                                                                 
Total params: 23,564,807
Trainable params: 23,519,360
Non-trainable params: 45,447
_________________________________________________________________


In [5]:
def create_classifier(encoder, trainable=True):

    for layer in encoder.layers:
        layer.trainable = trainable

    inputs = keras.Input(shape=input_shape)
    features = encoder(inputs)
    features = layers.Dropout(dropout_rate)(features)
    features = layers.Dense(hidden_units, activation="relu")(features)
    features = layers.Dropout(dropout_rate)(features)
    outputs = layers.Dense(num_classes, activation="softmax")(features)

    model = keras.Model(inputs=inputs, outputs=outputs, name="cifar10-classifier")
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )
    return model


In [6]:
class SupervisedContrastiveLoss(keras.losses.Loss):
    def __init__(self, temperature=1, name=None):
        super(SupervisedContrastiveLoss, self).__init__(name=name)
        self.temperature = temperature

    def __call__(self, labels, feature_vectors, sample_weight=None):
        # Normalize feature vectors
        feature_vectors_normalized = tf.math.l2_normalize(feature_vectors, axis=1)
        # Compute logits
        logits = tf.divide(
            tf.matmul(
                feature_vectors_normalized, tf.transpose(feature_vectors_normalized)
            ),
            self.temperature,
        )
        return tfa.losses.npairs_loss(tf.squeeze(labels), logits)


def add_projection_head(encoder):
    inputs = keras.Input(shape=input_shape)
    features = encoder(inputs)
    outputs = layers.Dense(projection_units, activation="relu")(features)
    model = keras.Model(
        inputs=inputs, outputs=outputs, name="cifar-encoder_with_projection-head"
    )
    return model


# custum model 1 : sequential
vgg like network 

In [8]:

def build_vgg_block(n_conv_layer,depth,pooling_type):
    list_layer=[]
    for i in range(n_conv_layer):
        list_layer.append(layers.Conv2D(depth,3, activation='relu',padding="same"))
    
    if pooling_type=="MaxPooling2D":
        list_layer.append(layers.MaxPooling2D(pool_size=(2, 2),padding="valid"))
    elif pooling_type=="MeanPooling2D":
        list_layer.append(layers.AveragePooling2D(pool_size=(2, 2),padding="valid"))
    else:
        raise NotImplementedError("pooling "+pooling_type+" is not implemented") 
    return list_layer
    
    
def get_vgg_backbone(input_shape,hparams):
    """
    hparams : dict with keys block1 .... blockn
    
    """
    list_layer=[layers.Input(input_shape)]
    blocks=hparams["blocks"]
    for i in range(1,len(blocks)+1):
        list_layer=list_layer+build_vgg_block(**blocks[f"block{i}"])
        
    list_layer.append(layers.Conv2D(hparams["output_dim"],3, activation='relu',padding="same"))
    
    if hparams["globalPoolingType"]=="Mean":
        
        list_layer.append(layers.GlobalAveragePooling2D())
    
    
    
    return tf.keras.Sequential(list_layer)


hparams_vgg={"output_dim" :2048,
             "globalPoolingType":"Mean",
             "blocks": {"block1":
             {"n_conv_layer":3,
             "depth":64,
             "pooling_type":"MaxPooling2D"},
            
            "block2":
             {"n_conv_layer":3,
             "depth":128,
             "pooling_type":"MaxPooling2D"},
             
             "block3":
             {"n_conv_layer":3,
             "depth":256,
             "pooling_type":"MaxPooling2D"}}
             
            
            }


In [8]:
vgg_backbone=get_vgg_backbone((32,32,3),hparams_vgg)
vgg_backbone.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 32, 32, 64)        1792      
                                                                 
 conv2d_1 (Conv2D)           (None, 32, 32, 64)        36928     
                                                                 
 conv2d_2 (Conv2D)           (None, 32, 32, 64)        36928     
                                                                 
 max_pooling2d_3 (MaxPooling  (None, 16, 16, 64)       0         
 2D)                                                             
                                                                 
 conv2d_3 (Conv2D)           (None, 16, 16, 128)       73856     
                                                                 
 conv2d_4 (Conv2D)           (None, 16, 16, 128)       147584    
                                                      

In [28]:
early_stop=tf.keras.callbacks.EarlyStopping(
    monitor="sparse_categorical_accuracy", patience=3,min_delta=0.01, restore_best_weights=True
)

In [29]:


vgg_backbone=get_vgg_backbone((32,32,3),hparams_vgg)

encoder = create_encoder(vgg_backbone)
classifier = create_classifier(encoder)
classifier.summary(expand_nested=True)

history = classifier.fit(x=x_train, y=y_train,validation_data=test_dataset, batch_size=batch_size, epochs=num_epochs,callbacks=[early_stop])

accuracy = classifier.evaluate(x_test, y_test)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")

Model: "cifar10-classifier"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_10 (InputLayer)       [(None, 32, 32, 3)]       0         
                                                                 
 cifar10-encoder (Functional  (None, 2048)             6640647   
 )                                                               
|¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯|
| input_9 (InputLayer)      [(None, 32, 32, 3)]       0         |
|                                                               |
| sequential (Sequential)   (None, 32, 32, 3)         7         |
||¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯||
|| normalization (Normalizatio  (None, 32, 32, 3)    7         ||
|| n)                                                          ||
||                                                             ||
|| random_flip (RandomFlip)  (None, 32, 32, 3)  

Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Test accuracy: 82.0%


In [31]:
early_stop_unsup=tf.keras.callbacks.EarlyStopping(
    monitor="loss", patience=1, restore_best_weights=True
)
vgg_backbone=get_vgg_backbone((32,32,3),hparams_vgg)

encoder = create_encoder(vgg_backbone)

encoder_with_projection_head = add_projection_head(encoder)
encoder_with_projection_head.compile(
    optimizer=keras.optimizers.Adam(learning_rate),
    loss=SupervisedContrastiveLoss(temperature),
)

encoder_with_projection_head.summary()

history = encoder_with_projection_head.fit(
    x=x_train, y=y_train, batch_size=batch_size, epochs=num_epochs,callbacks=[early_stop_unsup]
)


Model: "cifar-encoder_with_projection-head"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_15 (InputLayer)       [(None, 32, 32, 3)]       0         
                                                                 
 cifar10-encoder (Functional  (None, 2048)             6640647   
 )                                                               
                                                                 
 dense_11 (Dense)            (None, 128)               262272    
                                                                 
Total params: 6,902,919
Trainable params: 6,902,912
Non-trainable params: 7
_________________________________________________________________
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
E

In [32]:
classifier = create_classifier(encoder, trainable=False)

history = classifier.fit(x=x_train, y=y_train,validation_data=test_dataset, batch_size=batch_size, epochs=num_epochs,callbacks=[early_stop])

accuracy = classifier.evaluate(x_test, y_test)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Test accuracy: 82.32%


In [35]:
early_stop=tf.keras.callbacks.EarlyStopping(
    monitor="loss", patience=2,min_delta=0.05, restore_best_weights=True
)



classifier = create_classifier(encoder, trainable=False)

history = classifier.fit(x=x_train, y=y_train,validation_data=test_dataset, batch_size=batch_size, epochs=num_epochs,callbacks=[early_stop])

accuracy = classifier.evaluate(x_test, y_test)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Test accuracy: 83.25%


## model with loss = crossentropyloss+contrastive



In [10]:
class partial_contrastive_model(tf.keras.Model):
    def __init__(self, backbone):
        super(partial_contrastive_model, self).__init__()
        
        self.backbone=backbone
        self.classification=tf.keras.Sequential([
            layers.Dropout(dropout_rate),
            layers.Dense(hidden_units, activation="relu"),
            layers.Dropout(dropout_rate),
            layers.Dense(num_classes, activation="softmax")
        ]
        )
        
        
        
        self.projection=layers.Dense(projection_units, activation="relu")
        self.loss_contrastive_fn=SupervisedContrastiveLoss(temperature)
        self.loss_classification_fn=keras.losses.SparseCategoricalCrossentropy()
        
        self.loss_class_tracker = tf.keras.metrics.Mean(name="loss_classification")
        self.loss_contr_tracker=tf.keras.metrics.Mean(name="loss_contrastive")
        self.loss_regu_tracker=tf.keras.metrics.Mean(name="loss_regu")
        
        self.accuracy_tracker=tf.keras.metrics.SparseCategoricalAccuracy()
        self.val_accuracy_tracker=tf.keras.metrics.SparseCategoricalAccuracy()
        
        
    @property
    def metrics(self):
        return [self.loss_class_tracker,
                self.loss_contr_tracker,
                self.loss_regu_tracker,
                self.accuracy_tracker,
               self.val_accuracy_tracker]
        
    def call(self,inputs,training=False):
        
        embedding=self.backbone(inputs,training=training)
    
        return self.classification(embedding,training=training)
    
    def train_step(self, inputs):
        
        images,labels=inputs
        
        
        
        with tf.GradientTape() as tape:
            embedding=self.backbone(images,training=True)
            class_pred=self.classification(embedding,training=True)
            projection_embeding=self.projection(embedding,training=True)
            con_loss=self.loss_contrastive_fn(labels,projection_embeding)
            class_loss=self.loss_classification_fn(labels,class_pred)
            regu_loss=sum(self.losses)
            loss=(con_loss+class_loss)/2+regu_loss
            
        
        learnable_params = (
            self.backbone.trainable_variables+self.classification.trainable_variables + self.projection.trainable_variables
        )
        
        gradients = tape.gradient(loss, learnable_params)
        self.optimizer.apply_gradients(zip(gradients, learnable_params))
        self.loss_class_tracker.update_state(class_loss)
        self.loss_contr_tracker.update_state(con_loss)
        self.loss_regu_tracker.update_state(regu_loss)
        self.accuracy_tracker.update_state(labels,class_pred)

        return {"class_loss": self.loss_class_tracker.result(),
                "con_loss":self.loss_contr_tracker.result(),
                "accuracy":self.accuracy_tracker.result(),
                "loss_regu":self.loss_regu_tracker.result()}
    
    def test_step(self,inputs):
        
        
        images,labels=inputs
        class_pred=self(images,training=False)
        self.val_accuracy_tracker.update_state(labels,class_pred)
        
        return {"accuracy":self.val_accuracy_tracker.result()}

early_stop=tf.keras.callbacks.EarlyStopping(
monitor="con_loss", patience=3, restore_best_weights=True
)
    
vgg_backbone=get_vgg_backbone((32,32,3),hparams_vgg)

inputs=layers.Input((32,32,3))
x=data_augmentation(inputs)
x=vgg_backbone(x)
vgg_backbone_with_data_aug=tf.keras.Model(inputs,x)
model=partial_contrastive_model(vgg_backbone_with_data_aug)  


model.compile(
    optimizer=keras.optimizers.Adam(learning_rate)
)

early_stop=tf.keras.callbacks.EarlyStopping(
monitor="con_loss", patience=3, restore_best_weights=True
)
history = model.fit(x=x_train, y=y_train,validation_data=test_dataset, batch_size=batch_size, epochs=num_epochs,callbacks=[early_stop])



Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50


In [12]:

early_stop=tf.keras.callbacks.EarlyStopping(
monitor="con_loss", patience=3, restore_best_weights=True
)
history = model.fit(x=x_train, y=y_train,validation_data=test_dataset, batch_size=batch_size, epochs=22,callbacks=[early_stop])


Epoch 1/22
Epoch 2/22
Epoch 3/22
Epoch 4/22
Epoch 5/22
Epoch 6/22
Epoch 7/22
Epoch 8/22
Epoch 9/22
Epoch 10/22
Epoch 11/22
Epoch 12/22
Epoch 13/22
Epoch 14/22
Epoch 15/22
Epoch 16/22
Epoch 17/22
Epoch 18/22
Epoch 19/22
Epoch 20/22
Epoch 21/22
Epoch 22/22


In [13]:
accuracy = model.evaluate(x_test, y_test)
print(f"Test accuracy: {round(accuracy * 100, 2)}%")


Test accuracy: 84.6%


In [14]:
accuracy = model.evaluate(x_test, y_test)
print(f"Test accuracy: {round(accuracy * 100, 2)}%")


Test accuracy: 84.6%


In [15]:
accuracy = model.evaluate(test_dataset)
print(f"Test accuracy: {round(accuracy * 100, 2)}%")


Test accuracy: 84.6%


In [16]:
accuracy = model.evaluate(test_dataset)
print(f"Test accuracy: {round(accuracy * 100, 2)}%")

Test accuracy: 84.6%


## custum model 2 : 1 training step = training step contrastive, then training step 

In [46]:
class partial_contrastive_model2(tf.keras.Model):
    def __init__(self, backbone):
        super(partial_contrastive_model2, self).__init__()
        self.backbone=backbone
        self.classification=tf.keras.Sequential([
            layers.Dropout(dropout_rate),
            layers.Dense(hidden_units, activation="relu"),
            layers.Dropout(dropout_rate),
            layers.Dense(num_classes, activation="softmax")
        ]
        )
        
        
        
        self.projection=layers.Dense(projection_units, activation="relu")
        self.loss_contrastive_fn=SupervisedContrastiveLoss(temperature)
        self.loss_classification_fn=keras.losses.SparseCategoricalCrossentropy()
        
        self.loss_class_tracker = tf.keras.metrics.Mean(name="loss_classification")
        self.loss_contr_tracker=tf.keras.metrics.Mean(name="loss_contrastive")
        self.accuracy_tracker=tf.keras.metrics.SparseCategoricalAccuracy()
        self.val_accuracy_tracker=tf.keras.metrics.SparseCategoricalAccuracy()
        
        
    @property
    def metrics(self):
        return [self.loss_class_tracker,
                self.loss_contr_tracker,
                self.accuracy_tracker,
               self.val_accuracy_tracker]
        
    def call(self,inputs,training=False):
        
        embedding=self.backbone(inputs,training=training)
    
        return self.classification(embedding,training=training)
    
    def train_step(self, inputs):
        
        images,labels=inputs
        
        
        
        with tf.GradientTape() as tape:
            embedding=self.backbone(images,training=True)

            projection_embeding=self.projection(embedding,training=True)
            con_loss=self.loss_contrastive_fn(labels,projection_embeding)
            
            
            
        
        learnable_params = (
            self.backbone.trainable_variables+ self.projection.trainable_variables
        )
        
        gradients = tape.gradient(con_loss, learnable_params)
        self.optimizer.apply_gradients(zip(gradients, learnable_params))
        
        with tf.GradientTape() as tape:
            embedding=self.backbone(images,training=True)
            class_pred=self.classification(embedding,training=True)
           
            class_loss=self.loss_classification_fn(labels,class_pred)
           


        
        learnable_params = (
            self.backbone.trainable_variables+self.classification.trainable_variables
        )
        
        gradients = tape.gradient(class_loss, learnable_params)
        self.optimizer.apply_gradients(zip(gradients, learnable_params))
        
        
        
        
        self.loss_class_tracker.update_state(class_loss)
        self.loss_contr_tracker.update_state(con_loss)
        
        self.accuracy_tracker.update_state(labels,class_pred)

        return {"class_loss": self.loss_class_tracker.result(),
                "con_loss":self.loss_contr_tracker.result(),
                "accuracy":self.accuracy_tracker.result()}
    def test_step(self,inputs):
        
        
        images,labels=inputs
        
        class_pred=self(images,training=False)
        
        self.val_accuracy_tracker.update_state(labels,class_pred)

        return {"accuracy":self.val_accuracy_tracker.result()}

early_stop=tf.keras.callbacks.EarlyStopping(
monitor="accuracy", patience=3,min_delta=0.01, restore_best_weights=True
)
vgg_backbone=get_vgg_backbone((32,32,3),hparams_vgg)

    
inputs=layers.Input((32,32,3))
x=data_augmentation(inputs)
x=vgg_backbone(x)
vgg_backbone_with_data_aug=tf.keras.Model(inputs,x)
model=partial_contrastive_model2(vgg_backbone_with_data_aug)  


model.compile(
    optimizer=keras.optimizers.Adam(learning_rate)
)


history = model.fit(x=x_train, y=y_train,validation_data=test_dataset, batch_size=batch_size, epochs=num_epochs,callbacks=[early_stop])


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50


In [47]:
accuracy = model.evaluate(x_test, y_test)
print(f"Test accuracy: {round(accuracy * 100, 2)}%")

Test accuracy: 85.34%
