# Implementation of SimCLR with training the encoder and after the classification head

**The encoder can be pretrained using an unsupervised way. It is after trained using the SimCLR algorithm and the classification head is trained by supervised learning**

In [None]:
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"  # disable GPU devices
os.environ["TFDS_DATA_DIR"] = os.path.expanduser("~/tensorflow_datasets")  # default location of tfds database
os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
from keras import layers, models, regularizers

import tensorflow as tf
import tensorflow_datasets as tfds

import librosa.display

import matplotlib.pyplot as plt

# Turn off logging for TF
import logging
tf.get_logger().setLevel(logging.ERROR)

from dpmhm.datasets import preprocessing, feature, utils, transformer

ds_all, ds_info = tfds.load(
    'CWRU',
    with_info=True,
)

ds0 = ds_all['train']
ds0.element_spec

# Preprocessing on data

In [None]:
compactor = transformer.DatasetCompactor(ds0,
                                         channels=['DE', 'FE', 'BA'],
                                         keys=['FaultLocation', 'FaultComponent', 'FaultSize'],
                                         resampling_rate=12000)

# Feature extractor
# Spectrogram is computed on a time window of 0.025 second every 0.0125 second, then converted to decibel scale.
_func = lambda x, sr: feature.spectral_features(x, sr, 'spectrogram',
#                                                 n_mfcc=256,
                                                time_window=0.025, hop_step=0.0125, n_fft=512,
                                                normalize=False, to_db=True)[0]

extractor = transformer.FeatureExtractor(compactor.dataset, _func)

# A window of width w correspond to w*0.0125 seconds
window = transformer.WindowSlider(extractor.dataset, window_size=(64,64), hop_size=(32,32))
# window = transformer.WindowSlider(extractor.dataset, window_size=(256, 80), hop_size=40)  # 1s, full bandwidth
# window = transformer.WindowSlider(extractor.dataset, window_size=64, hop_size=32)

labels = list(compactor.full_label_dict.keys())

preproc = preprocessing.get_mapping_supervised(labels)
    
ds_window = window.dataset.map(preproc, num_parallel_calls=tf.data.AUTOTUNE)

eles = list(ds_window.take(10).as_numpy_iterator())
input_shape = eles[0][0].shape

ds_window = ds_window.map(lambda x,y: (tf.ensure_shape(x, input_shape), y), num_parallel_calls=tf.data.AUTOTUNE)

splits = {'train':0.7, 'val':0.2, 'test':0.1}
ds_split = utils.split_dataset(ds_window, splits, ds_size=int(ds_window.cardinality()))

**Parameters**

In [None]:
batch_size = 32
ds_size = sum([1 for _ in ds_window])
n_embedding  = 128 #128
kernel_size = (3,3) #(3,3)
tau = 0.1
projection_dim = 128 #512

# With an encoder from scratch

**Autoencoder**

Adapt data for the autoencoder

In [None]:
ds_train = ds_split['train'].map(lambda x,l:(x,x))
ds_val = ds_split['val'].map(lambda x,l:(x,x)).batch(batch_size)

ds_train = ds_train.shuffle(ds_size, reshuffle_each_iteration=True).batch(batch_size).prefetch(tf.data.AUTOTUNE)

Create the autoencoder

In [None]:
@keras.utils.register_keras_serializable()
class Autoencoder(models.Model):
    """Convolution Auto-Encoder stacks.


    Notes
    -----
    Shape (H,W) of the input tensor must be power of 2.
    """
    def __init__(self, input_shape, n_embedding,kernel_size):
        self.input_shape = input_shape
        activation = 'relu'
        padding = 'same'
        strides = (2,2)
        pool_size = (2,2)
        a_reg = 0. 

        super(Autoencoder, self).__init__()

        # Use more blocks and larger kernel size to get more smoothing in the reconstruction.
        input_layer= layers.Input(shape=input_shape, name='input_enc')

        layers_encoder = [
            # Block 1
            layers.Conv2D(32, kernel_size=kernel_size, activation=activation, padding=padding, name='conv1_enc'),
            layers.MaxPooling2D(pool_size=pool_size, strides=strides, name='pool1_enc'),
            layers.BatchNormalization(name='bn1_enc'), # by default axis=-1 for channel-last

            # Block 2
            layers.Conv2D(64, kernel_size=kernel_size, activation=activation, padding=padding, name='conv2_enc'),
            layers.MaxPooling2D(pool_size=pool_size, strides=strides, name='pool2_enc'),
            layers.BatchNormalization(name='bn2_enc'),

            # Block 3
            layers.Conv2D(128, kernel_size=kernel_size, activation=activation, padding=padding, name='conv3_enc'),
            layers.MaxPooling2D(pool_size=pool_size, strides=strides, name='pool3_enc'),
            layers.BatchNormalization(name='bn3_enc'),

            # # Block 4
            # layers.Conv2D(256, kernel_size=kernel_size, activation=activation, padding=padding, name='conv4_enc'),
            # layers.MaxPooling2D(pool_size=pool_size, strides=strides, name='pool4_enc'),
            # layers.BatchNormalization(name='bn4_enc'),

            # Block fc
            layers.Flatten(name='flatten'),
            layers.Dense(n_embedding, activation=activation,activity_regularizer=regularizers.L1(a_reg), name='fc1_enc') if a_reg > 0
            else layers.Dense(n_embedding, activation=activation, name='fc1_enc')
        ]

        self.encoder = models.Sequential([input_layer] +layers_encoder, name='encoder')

        output_layer = layers.Input(shape=(n_embedding,), name='input_dec')
        layers_decoder = [
            # Block fc
            layers.Dense(128 * (input_shape[0] // 8) * (input_shape[1] // 8), activation=activation, activity_regularizer=regularizers.L1(a_reg), name='fc1_dec') if a_reg > 0 else layers.Dense(128 * (input_shape[0] // 8) * (input_shape[1] // 8), activation=activation, name='fc1_dec'),
            layers.Reshape((input_shape[0] // 8, input_shape[1] // 8, 128), name='reshape'),

            # # Block 4
            # layers.BatchNormalization(name='bn4_dec'),
            # layers.UpSampling2D(strides, name='ups4_dec'),
            # layers.Conv2DTranspose(128, kernel_size=kernel_size, activation=activation, padding=padding, name='tconv4_dec'),

            # Block 3
            layers.BatchNormalization(name='bn3_dec'),
            layers.UpSampling2D(strides, name='ups3_dec'),
            layers.Conv2DTranspose(64, kernel_size=kernel_size, activation=activation, padding=padding, name='tconv3_dec'),

            # Block 2
            layers.BatchNormalization(name='bn2_dec'),
            layers.UpSampling2D(strides, name='ups2_dec'),
            layers.Conv2DTranspose(32, kernel_size=kernel_size, activation=activation, padding=padding, name='tconv2_dec'),

            # Block 1
            layers.BatchNormalization(name='bn1_dec'),
            layers.UpSampling2D(strides, name='ups1_dec'),
            layers.Conv2DTranspose(input_shape[-1], kernel_size=kernel_size, activation=None, padding=padding, name='tconv1_dec'),
        ]

        self.decoder = models.Sequential([output_layer] + layers_decoder, name='decoder')
        # self.decoder.build()

        self.autoencoder = models.Sequential([input_layer] + layers_encoder + layers_decoder, name='auto-encoder')
        # self.build(input_shape=(None, *input_shape))

    def call(self, x):
        return self.decoder(self.encoder(x))

Compile and train the autoencoder on unlabeled data under the form (img,img)

In [None]:
autoencoder = Autoencoder(input_shape,n_embedding,kernel_size)

# autoencoder.compile(
#     optimizer=keras.optimizers.Adam(),
#     loss=keras.losses.MeanSquaredError(),
#     # metrics=['accuracy'],
# )

# history = autoencoder.fit(
#     ds_train,
#     validation_data=ds_val,
#     epochs=10,
#     callbacks=keras.callbacks.EarlyStopping(verbose=1, patience=3),
#     steps_per_epoch=int((0.7*ds_size) // batch_size)
# )

In [None]:
# autoencoder.save_weights('model_weights.weights.h5')

In [None]:
# autoencoder.load_weights('model_weights.weights.h5')

**Contrastive learning on the encoder**

In [None]:
# Prepare the training data
ds_train = ds_split['train'].shuffle(ds_size, reshuffle_each_iteration=True).cache().batch(batch_size,drop_remainder=True).prefetch(tf.data.AUTOTUNE)
ds_val = ds_split['val'].batch(batch_size,drop_remainder=True)
ds_test = ds_split['test'].batch(1)

Contrastive loss function

In [None]:
def contrastive_loss_fn(z_i, z_j, tau=0.5):
    z_i = tf.math.l2_normalize(z_i, axis=1)
    z_j = tf.math.l2_normalize(z_j, axis=1)

    # Compute the similarity matrix
    similarity_matrix = tf.matmul(z_i, z_j, transpose_b=True) / tau

    # Compute the positive similarity
    positive_similarity = tf.linalg.diag_part(similarity_matrix)

    # Compute the negative similarity
    negative_similarity = tf.linalg.set_diag(similarity_matrix, tf.zeros_like(tf.linalg.diag_part(similarity_matrix)))

    # Compute the numerator of the loss function
    numerator = tf.exp(positive_similarity)

    # Compute the denominator of the loss function
    denominator = tf.reduce_sum(tf.exp(negative_similarity), axis=1)

    # Compute the loss function
    loss = -tf.reduce_mean(tf.math.log(numerator / denominator))

    return loss

Create the SimCLR model

In [None]:
tf.config.experimental_run_functions_eagerly(True)

# Define the contrastive model with model-subclassing
class SimCLRModel(keras.Model):
    def __init__(self):
        super().__init__()

        self.tau = tau

        self.contrastive_augmenter = keras.Sequential([
            layers.RandomFlip("horizontal_and_vertical"),
            layers.RandomZoom(0.2),
            layers.RandomTranslation(height_factor=0.2, width_factor=0.2),
        ], name='Data_augmentation')
        
        self.encoder = autoencoder.encoder

        self.projection_head = keras.Sequential([
                layers.Dense(256, activation='relu'),
                layers.BatchNormalization(),
                layers.Dense(128, activation='relu'),
                layers.BatchNormalization(),
                layers.Dense(projection_dim),
            ], name='Projection_head')
        
        self.classification_head = keras.Sequential([
                layers.Dense(128, activation='relu', input_shape=(128,)),
                layers.BatchNormalization(),
                layers.Dense(30) #nb labels
            ], name='Classification_head')

    def compile(self, contrastive_optimizer, **kwargs):
        super().compile(**kwargs)

        self.contrastive_optimizer = contrastive_optimizer

        self.contrastive_loss_tracker = keras.metrics.Mean(name="c_loss")
        self.mean_cosine_similarity = keras.metrics.Mean(name="mean_cosine_similarity")

    @property
    def metrics(self):
        return [
            self.contrastive_loss_tracker,
            self.mean_cosine_similarity,
        ]

    def train_step(self, data):
        train_image, _ = data
        
        # Each set of unlabeled images is augmented separately
        augmented_image_1 = self.contrastive_augmenter(train_image, training=True)
        augmented_image_2 = self.contrastive_augmenter(train_image, training=True)
        
        if tf.reduce_all(augmented_image_1 == augmented_image_2):
            print("Augmented train images are identical!")
        with tf.GradientTape() as tape:
            # Extract features and compute projections for the first set of images
            features_1 = self.encoder(augmented_image_1, training=True)
            projections_1 = self.projection_head(features_1, training=True)
            
            # Extract features and compute projections for the second set of images
            features_2 = self.encoder(augmented_image_2, training=True)
            projections_2 = self.projection_head(features_2, training=True)
            
            # Compute contrastive loss
            contrastive_loss = contrastive_loss_fn(projections_1, projections_2, self.tau)
        
        # Compute gradients and apply updates for the contrastive loss
        gradients = tape.gradient(
            contrastive_loss,
            self.encoder.trainable_weights + self.projection_head.trainable_weights,
        )
        self.contrastive_optimizer.apply_gradients(
            zip(
                gradients,
                self.encoder.trainable_weights + self.projection_head.trainable_weights,
            )
        )
        self.contrastive_loss_tracker.update_state(contrastive_loss)
        
        cosine_similarity = tf.reduce_mean(keras.losses.cosine_similarity(projections_1, projections_2))
        self.mean_cosine_similarity.update_state(cosine_similarity)

        return {m.name: m.result() for m in self.metrics[:2]}

    def test_step(self, data):
        test_image, _ = data

        # Compute contrastive loss on the validation set
        augmented_image_1 = self.contrastive_augmenter(test_image, training=True)
        augmented_image_2 = self.contrastive_augmenter(test_image, training=True)

        features_1 = self.encoder(augmented_image_1, training=False)
        features_2 = self.encoder(augmented_image_2, training=False)

        projections_1 = self.projection_head(features_1, training=False)
        projections_2 = self.projection_head(features_2, training=False)
        
        contrastive_loss = contrastive_loss_fn(projections_1, projections_2, self.tau)
        self.contrastive_loss_tracker.update_state(contrastive_loss)

        cosine_similarity = tf.reduce_mean(keras.losses.cosine_similarity(projections_1, projections_2))
        self.mean_cosine_similarity.update_state(cosine_similarity)

        return {m.name: m.result() for m in self.metrics}

Compile and train SimCLR model

In [None]:
pretraining_model = SimCLRModel()

pretraining_model.compile(
    contrastive_optimizer=keras.optimizers.Adam(),
)

pretraining_history = pretraining_model.fit(
    ds_train.repeat(), epochs=15, validation_data=ds_val, steps_per_epoch=int((0.7*ds_size) // batch_size),
)

pretraining_model.summary()

In [None]:
fig, ax1 = plt.subplots()

color = 'tab:red'
ax1.set_xlabel('Epochs')
ax1.set_ylabel('c_loss', color=color)
ax1.plot(pretraining_history.history['c_loss'], color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis

color = 'tab:blue'
ax2.set_ylabel('mean_cosine_similarity', color=color) 
ax2.plot(pretraining_history.history['mean_cosine_similarity'], color=color)
ax2.tick_params(axis='y', labelcolor=color)

fig.tight_layout()
plt.title('Evolution of metrics')
plt.show()

Train the classification head

In [None]:
# Freeze the encoder weights
pretraining_model.encoder.trainable = False

# Create a new classification model
encoder_output = pretraining_model.encoder(pretraining_model.encoder.layers[0].input)
classification_model = keras.Model(
    inputs=pretraining_model.encoder.layers[0].input,
    outputs=pretraining_model.classification_head(encoder_output)
)
classification_model.summary(show_trainable=True, expand_nested=True)

classification_model.compile(optimizer=keras.optimizers.Adam(), loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])

classification_history = classification_model.fit(ds_train, epochs=5, validation_data=ds_val, steps_per_epoch=int((0.7*ds_size) // batch_size))

Evaluate the model 

In [None]:
evaluation = classification_model.evaluate(ds_test)

# Print the evaluation metrics
print("Evaluation Loss:", evaluation[0])
print("Evaluation Accuracy: {:.2f}%".format(evaluation[1]*100))