# Set up

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#@title Imports
import numpy as np
import tensorflow as tf
import cv2
import matplotlib.pyplot as plt
import os
import pandas as pd
from keras.models import Sequential, Model
from keras import layers
from keras.optimizers import RMSprop
from tensorflow.keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.layers import Conv1D, GlobalMaxPooling1D, Reshape, Dropout, Dense, Input, Concatenate, Lambda, GlobalMaxPooling2D
from keras.callbacks import EarlyStopping
from tensorflow.keras.applications import ResNet50
from transformers import BertTokenizer
import tensorflow.keras.backend as K
from transformers import TFBertModel

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

import datetime

In [None]:
%load_ext tensorboard
!rm -rf ./logs/

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
#@title Constants

MAX_LEN = 67
INPUT_DIM_IMG = (64, 64, 3)
INPUT_DIM_TEXT = (67,)
INPUT_CONCAT_DIM = 33

batch_size = 8
epochs = 30

# Model Definitions

In [None]:
tf.debugging.disable_traceback_filtering()

In [None]:
#@title Image Network

resnet_model = ResNet50(weights="imagenet", include_top=False)

def image_feat_network(input_shape):

    resnet_model = ResNet50(weights="imagenet", include_top=False, input_shape=input_shape)

    # Freeze all layers of the ResNet50 model
    for layer in resnet_model.layers:
        layer.trainable = False

    dense_layer = Dense(units=2048, activation='relu')(resnet_model.output)

    pooled_output = GlobalMaxPooling2D()(dense_layer)

    model = Model(inputs=resnet_model.input, outputs=pooled_output)

    return model

In [None]:
#@title Text Network

class ExtendedBert(tf.keras.Model):
    def __init__(self):
        super().__init__()

        self.bert = TFBertModel.from_pretrained("bert-base-cased",trainable=False)
        self.dense_layer = tf.keras.layers.Dense(units=2048)

    def call(self, inputs):
        input_ids = tf.cast(inputs['input_ids'], tf.int32)  # Cast input_ids to int32
        attention_mask = tf.cast(inputs['attention_mask'], tf.int32)
        token_type_ids = tf.cast(inputs['token_type_ids'], tf.int32)

        # get the hidden state of the last layer
        last_hidden = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)[0]
        first_token_hidden_state = last_hidden[:, 0, :]
        logits = self.dense_layer(first_token_hidden_state)
        return logits

In [None]:
#@title Loss and Distances

def eucl_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)


def euclidean_distance(vectors):
  x, y = vectors
  sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
  return K.sqrt(K.maximum(sum_square, K.epsilon()))


def contrastive_loss(y_true, y_pred):
      margin=1
      square_pred = K.square(y_pred)
      margin_square = K.square(K.maximum(margin - y_pred, 0))
      y_true = K.cast(y_true, y_pred.dtype)
      return (0.5*y_true * square_pred + 0.5*(1 - y_true) * margin_square)


def compute_accuracy(predictions, labels):

    binary_predictions = tf.cast(predictions < 0.5, dtype=tf.int32)
    labels = tf.cast(labels, dtype=tf.int32)
    correct_predictions = tf.equal(binary_predictions, labels)
    correct_predictions_float = tf.cast(correct_predictions, dtype=tf.float32)

    accuracy = tf.reduce_mean(correct_predictions_float)

    return accuracy

def compute_acc(prediction, label):
    print(prediction)
    return prediction < 0.5



In [None]:
#@title Siamese Modal Network

from tensorflow.keras.layers import Input, Concatenate, Lambda
from tensorflow.keras.models import Model
import tensorflow as tf

def siamese_model(input_dim_img, input_dim_text, input_concat_dim):
    img_a = Input(shape=input_dim_img)
    img_b = Input(shape=input_dim_img)

    max_seq_length = 67
    text_a_input_ids = Input(shape=(max_seq_length,), name='input_ids')
    text_a_attention_mask = Input(shape=(max_seq_length,), name='attention_mask')
    text_a_token_type_ids = Input(shape=(max_seq_length,), name='token_type_ids')

    text_a = {'input_ids': text_a_input_ids,
              'attention_mask': text_a_attention_mask,
              'token_type_ids': text_a_token_type_ids}

    text_b_input_ids = Input(shape=(max_seq_length,), name='input_ids_b')
    text_b_attention_mask = Input(shape=(max_seq_length,), name='attention_mask_b')
    text_b_token_type_ids = Input(shape=(max_seq_length,), name='token_type_ids_b')

    text_b = {'input_ids': text_b_input_ids,
              'attention_mask': text_b_attention_mask,
              'token_type_ids': text_b_token_type_ids}

    img_network = image_feat_network(input_dim_img)
    text_network = ExtendedBert()

    feat_img_a = img_network(img_a)
    feat_img_b = img_network(img_b)

    # Pass the dictionary directly to text_network
    feat_text_a = text_network(text_a)
    feat_text_b = text_network(text_b)

    # Concatenate the features
    concat_a = Concatenate(axis=-1)([feat_img_a, feat_text_a])
    concat_b = Concatenate(axis=-1)([feat_img_b, feat_text_b])


    distance = Lambda(euclidean_distance, output_shape=eucl_dist_output_shape)([concat_a, concat_b])


    prediction = distance

    model = Model(inputs=[img_a, text_a_input_ids, text_a_attention_mask, text_a_token_type_ids,
                          img_b, text_b_input_ids, text_b_attention_mask, text_b_token_type_ids],
                  outputs=prediction)

    rms = tf.keras.optimizers.Adam(learning_rate=0.0001)

    return rms, model

In [None]:
#@title Getting Input
def getInput():
  img_inputs = []
  text_inputs = []
  labels=[]

  df = pd.read_csv('path/to/data.csv')

  for i,image_path in enumerate(df["Image Path"].to_list()):
    img_inputs.append(process_image(image_path))

  img_inputs = np.array(img_inputs, dtype=np.ndarray)
  text_inputs = np.array(df["Better Captions"].to_list())
  labels = np.array(df["Class"].to_list())


  print("Image inputs shape:", img_inputs.shape)
  print("Text inputs shape:", text_inputs.shape)
  print("Labels shape:", labels.shape)

  return img_inputs, text_inputs, labels

In [None]:
#@title Data Processing

def process_image(img_path):
  try:
    image = cv2.imread(img_path)
    image = cv2.resize(image, (64, 64))
    return (image / 255.0)
  except:
    print(img_path)


def process_image_input(img_input):
    img_data_list = []

    for x in (img_input):
        img_data = np.asarray(x, dtype=np.float32)

        img_data = np.expand_dims(img_data, axis=0)

        img_tensor = tf.convert_to_tensor(img_data)

        img_data_list.append(img_tensor)

    return img_data_list

def process_text_input(text_inputs):
  # Initialize the BERT tokenizer
  tokenizer = BertTokenizer.from_pretrained('bert-base-cased')

  tokenized_inputs = tokenizer(text_inputs.tolist(), padding='max_length', truncation=True, max_length=67, return_tensors='tf')

  input_ids = tokenized_inputs['input_ids']
  attention_mask = tokenized_inputs['attention_mask']
  token_type_ids = tokenized_inputs.get('token_type_ids')

  return input_ids, attention_mask, token_type_ids

In [None]:
#@title Make Pairs
def make_pairs(labels, images, input_ids, attention_masks, token_type_ids):

    imga=[]
    imgb=[]
    txta=[]
    txtb=[]
    pairImages = []
    pairTexts = []
    pairLabels = []

    #print(type(labels[0]))
    numClasses = len(np.unique(labels))
    #print(np.unique(labels))
    idx = [np.where(labels == i)[0] for i in range(0, numClasses)]

    for idxA in range(len(images)):

      currentImage = images[idxA]
      currentText =  [input_ids[idxA], attention_masks[idxA], token_type_ids[idxA]]
      label = labels[idxA]

      idxB = np.random.choice(idx[label])
      posImage = images[idxB]
      posText = [input_ids[idxB], attention_masks[idxB], token_type_ids[idxB]]

      # positive labelled pair
      pairImages.append([currentImage, posImage])
      imga.append(currentImage)
      imgb.append(posImage)
      txta.append(currentText)
      txtb.append(posText)
      pairTexts.append([currentText, posText])
      pairLabels.append([1])


      negIdx = np.where(labels != label)[0]
      negImage = images[np.random.choice(negIdx)]
      negText = [input_ids[np.random.choice(negIdx)], attention_masks[np.random.choice(negIdx)], token_type_ids[np.random.choice(negIdx)]]
    # negative labelled pair
      imga.append(currentImage)
      imgb.append(negImage)
      txta.append(currentText)
      txtb.append(negText)
      pairImages.append([currentImage, negImage])
      pairTexts.append([currentText, negText])
      pairLabels.append([0])

    return imga,txta, imgb, txtb, pairLabels

# Training

In [None]:
img_inputs, text_inputs, labels = getInput()

Image inputs shape: (130, 64, 64, 3)
Text inputs shape: (130,)
Labels shape: (130,)


In [None]:
img_data_list = process_image_input(img_inputs)
input_ids, attention_mask, token_type_ids = process_text_input(text_inputs)

print("Padded input IDs shape:", input_ids.shape)
print("Padded attention mask shape:", attention_mask.shape)
print("Padded input IDs type:", type(input_ids))

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/49.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/213k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/436k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

Padded input IDs shape: (130, 67)
Padded attention mask shape: (130, 67)
Padded input IDs type: <class 'tensorflow.python.framework.ops.EagerTensor'>


In [None]:
img_network = image_feat_network(INPUT_DIM_IMG)

text_network = ExtendedBert()

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-5)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
text_network.compile(optimizer=optimizer, loss=loss_fn)

model.safetensors:   0%|          | 0.00/436M [00:00<?, ?B/s]

Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFBertModel: ['cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight', 'cls.seq_relationship.bias']
- This IS expected if you are initializing TFBertModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFBertModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions w

In [None]:
tf.experimental.numpy.experimental_enable_numpy_behavior()

In [None]:
train_size = 0.8

label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(labels)

input_ids_list = input_ids.tolist()
attention_mask_list = attention_mask.tolist()
token_type_ids_list = token_type_ids.tolist()

labels_train, labels_test, img_data_train, img_data_test, input_ids_train, input_ids_test, attention_mask_train, attention_mask_test, token_type_ids_train, token_type_ids_test = train_test_split(encoded_labels, img_data_list, input_ids_list, attention_mask_list, token_type_ids_list, train_size=train_size, random_state=42)

In [None]:
img_a, text_a, img_b, text_b, lab = make_pairs(labels_train, img_data_train, input_ids_train, attention_mask_train, token_type_ids_train)


In [None]:
tf.config.run_functions_eagerly(True)

In [None]:
img_a = tf.squeeze(img_a)
img_b = tf.squeeze(img_b)

In [None]:
img_a = tf.convert_to_tensor(img_a)
text_a = tf.convert_to_tensor(text_a)
img_b = tf.convert_to_tensor(img_b)
text_b = tf.convert_to_tensor(text_b)
lab = tf.convert_to_tensor(lab)

In [None]:
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.1,
    patience=5,
    verbose=0,
    min_lr=0.000001
)

In [None]:
es_loss = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=4)

In [None]:
#@title compile

opt,model = siamese_model (INPUT_DIM_IMG ,INPUT_DIM_TEXT , INPUT_CONCAT_DIM)
model.compile(loss=contrastive_loss, optimizer=opt,run_eagerly=True)
# model.summary()

In [None]:
log_dir = "/content/drive/Shareddrives/Naan Mudhalvan /Disease Detection/siamese/logs/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

lr_schedule = tf.keras.callbacks.LearningRateScheduler(lambda epoch: 1e-5 * 10**(epoch / 20))

In [None]:
# tf.summary.scalar('learning_rate', learning_rate)

In [None]:
history = model.fit([img_a,  text_a[:,0,:],text_a[:,1,:],text_a[:,2,:],img_b,text_b[:,0,:],text_b[:,1,:],text_b[:,2,:]],
                    lab,
                    validation_split=.25,
                    batch_size=batch_size,
                    verbose=2,
                    epochs=60,
                    callbacks=[tensorboard_callback, reduce_lr,es_loss],
                    )

In [None]:
# plt.semilogx(history.history["lr"], history.history["loss"])
# plt.axis([1e-6, 1e-1, 0, 0.3])

In [None]:
history.history.keys()

In [None]:

train_loss = history.history['loss']


val_loss= history.history['val_loss']

In [None]:
import matplotlib.pyplot as plt

epochs = range(1, 61)
plt.plot(epochs, train_loss, 'b', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.show()

In [None]:
checkpoint_path = "/path/"
model.save_weights(checkpoint_path)


In [None]:
np.save('/path/', labels)

# Inference

In [None]:
img_a,text_a,img_b,text_b,lab =  make_pairs(labels_test, img_data_test, input_ids_test, attention_mask_test, token_type_ids_test)

In [None]:
img_a = tf.squeeze(img_a, axis=1)
img_b = tf.squeeze(img_b, axis=1)

In [None]:
img_a = tf.convert_to_tensor(img_a)
text_a = tf.convert_to_tensor(text_a)
img_b = tf.convert_to_tensor(img_b)
text_b = tf.convert_to_tensor(text_b)
lab = tf.convert_to_tensor(lab)

In [None]:
output= model.predict([img_a,  text_a[:,0,:],text_a[:,1,:],text_a[:,2,:],img_b,text_b[:,0,:],text_b[:,1,:],text_b[:,2,:]])

In [None]:
test_acc = compute_accuracy(output, lab)
print(test_acc.numpy())

In [None]:
#@title Load Model

CKPT_PATH = "/path/"

opt,loaded_model = siamese_model (INPUT_DIM_IMG ,INPUT_DIM_TEXT , INPUT_CONCAT_DIM)

checkpoint = tf.train.Checkpoint(model=loaded_model)

# Restore weights from checkpoint
checkpoint.restore(CKPT_PATH).expect_partial()

Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFBertModel: ['cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight', 'cls.seq_relationship.bias']
- This IS expected if you are initializing TFBertModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFBertModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions w

<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x7ef0c52cf590>

In [None]:
loaded_model.layers.pop()

<Lambda name=lambda_3, built=True>

In [None]:
model2= Model(inputs=loaded_model.input, outputs=loaded_model.layers[-1].input)

In [None]:
len(input_ids_test)

26

In [None]:
max_seq_length = 67
input_ids = tf.convert_to_tensor(input_ids_test)
attention_mask = tf.convert_to_tensor(attention_mask_test)
token_type_ids = tf.convert_to_tensor(token_type_ids_test)

batch_size = len(input_ids_test)
print(batch_size)

input_ids = tf.reshape(input_ids, (batch_size, max_seq_length))
attention_mask = tf.reshape(attention_mask, (batch_size, max_seq_length))
token_type_ids = tf.reshape(token_type_ids, (batch_size, max_seq_length))


input_ids = tf.cast(input_ids, tf.int32)
attention_mask = tf.cast(attention_mask, tf.float32)
token_type_ids = tf.cast(token_type_ids, tf.int32)


text = {
    'input_ids': input_ids,
    'attention_mask': attention_mask,
    'token_type_ids': token_type_ids
}

In [None]:
output= model2.predict([img_a,  text_a[:,0,:],text_a[:,1,:],text_a[:,2,:],img_b,text_b[:,0,:],text_b[:,1,:],text_b[:,2,:]])

In [None]:
tf.squeeze(tf.convert_to_tensor(img_data_test), axis=1).shape

In [None]:
op= model2.predict([tf.squeeze(tf.convert_to_tensor(img_data_test), axis=1),  text['input_ids'],text['attention_mask'],text['token_type_ids'],tf.squeeze(tf.convert_to_tensor(img_data_test), axis=1),text['input_ids'],text['attention_mask'],text['token_type_ids']])



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 7s/step


In [None]:
data = np.array(op[0])

In [None]:
data

array([[0.01309842, 0.        , 0.06618597, ..., 0.13646635, 0.7361431 ,
        0.5808059 ],
       [0.        , 0.        , 0.11341009, ..., 0.14186475, 0.6511147 ,
        0.6087172 ],
       [0.        , 0.        , 0.18411045, ..., 0.07568753, 0.8258367 ,
        0.570537  ],
       ...,
       [0.        , 0.        , 0.05287366, ..., 0.10643341, 0.695094  ,
        0.58238477],
       [0.01019871, 0.        , 0.20670238, ..., 0.09621214, 0.6533444 ,
        0.57769173],
       [0.        , 0.        , 0.20093586, ..., 0.1282919 , 0.64091295,
        0.49844763]], dtype=float32)

In [None]:
labels_test

array([9, 3, 5, 2, 6, 9, 8, 6, 7, 2, 1, 2, 8, 4, 1, 6, 3, 7, 7, 7, 5, 4,
       6, 4, 0, 3])

In [None]:
np.save('/path/', data)

np.save('/path/', labels_test)