In [2]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import optimizers
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
from itertools import combinations
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import numpy as np
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.applications import ResNet50
from sklearn.model_selection import train_test_split

In [3]:
# from google.colab import drive
# drive.mount('/content/drive')

In [4]:
def load_and_process_images(path_dir, class_labels, target_num_images):
    images = []
    labels = []
    min_images = float('inf')
    class_images = {}

    for label, class_name in enumerate(class_labels):
        class_dir = os.path.join(path_dir, class_name)
        class_images[class_name] = []

        if not os.path.exists(class_dir):
            print(f"Directory '{class_name}' not found in '{path_dir}'. Skipping...")
            continue

        for jpg in os.listdir(class_dir):
            image_path = os.path.join(class_dir, jpg)
            image_high_resolution = cv2.imread(image_path)

            if image_high_resolution is None:
                print(f"Could not read image '{jpg}' in '{class_name}' directory. Skipping...")
                continue

            image_change_color = cv2.cvtColor(image_high_resolution, cv2.COLOR_BGR2RGB)
            image_low_resolution = cv2.resize(image_change_color, (256, 256))
            class_images[class_name].append(image_low_resolution)
            min_images = min(min_images, len(class_images[class_name]))
        print(f"Class '{class_name}' has {len(class_images[class_name])} images.")

    for class_name, images_list in class_images.items():
        images.extend(images_list[:target_num_images])
        labels.extend([class_name] * min(len(images_list), target_num_images))

    return np.asarray(images), np.asarray(labels)

dataset_directory = './Datasets/DataImages/'
class_labels = ['Catla', 'Cyprinus carpio', 'Grass Carp', 'Mori', 'Rohu', 'Silver']
target_num_images = 50

# Load and preprocess data
images, labels = load_and_process_images(dataset_directory, class_labels, target_num_images)

# Split data into train and test sets
images_train_, images_test_, labels_train_, labels_test_ = train_test_split(images, labels, test_size=0.2, random_state=42,stratify=labels)


Class 'Catla' has 20 images.
Class 'Cyprinus carpio' has 50 images.
Class 'Grass Carp' has 11 images.
Class 'Mori' has 70 images.
Class 'Rohu' has 73 images.
Class 'Silver' has 47 images.


In [35]:
images_train, images_test, labels_train, labels_test = images_train_, images_test_, labels_train_, labels_test_

le = LabelEncoder()

labels_train = le.fit_transform(labels_train)

data_augmentation = keras.Sequential([

    layers.experimental.preprocessing.RandomFlip("horizontal"),
    layers.experimental.preprocessing.RandomRotation(0.2),
    layers.experimental.preprocessing.RandomZoom(0.2),
])

def create_pairs(images, labels):
  pairs_list, labels_list = [], []

  print(images.shape)
  print(labels.shape)
  num_classes = len(np.unique(labels))
  print(num_classes)

  # Loop through all classes
  for i in range(num_classes):
    # Select indices of images belonging to class i
    class_indices = np.where(labels == i)[0]
    print(len(class_indices))
    # Ensure there are at least two samples in the class
    if len(class_indices) > 1:
      # Create positive pairs using combinations
      class_pairs = list(combinations(class_indices, 2))
      pairs_list.extend([[images[i], images[j]] for i, j in class_pairs])
      labels_list.extend([1] * len(class_pairs))  # Label positive pairs as 1

      # Create negative pairs by sampling from other classes
      for j in range(num_classes):
        if i != j:  # Skip the same class
          other_class_indices = np.where(labels == j)[0]
          if len(other_class_indices) > 0:
            # Sample random negative pairs from other class
            random_indices = np.random.choice(other_class_indices, size=len(class_pairs))
            negative_pairs = list(zip(class_indices, random_indices))
            pairs_list.extend([[images[i], images[j]] for i, j in negative_pairs])
            labels_list.extend([0] * len(negative_pairs))  # Label negative pairs as 0

  return np.array(pairs_list), np.array(labels_list)


# Create positive and negative pairs for training

print('images_train.shape',images_train.shape)
print('labels_train.shape',labels_train.shape)
pairs_train, pairs_labels_train = create_pairs(images_train, labels_train)
print('pairs_train',pairs_train.shape)
print('pairs_labels_train',pairs_labels_train.shape)
# Shuffle the training pairs
pairs_train, pairs_labels_train = shuffle(pairs_train, pairs_labels_train, random_state=42)

# Split the data into training and validation sets
pairs_train, pairs_val, pairs_labels_train, labels_val = train_test_split(
    pairs_train, pairs_labels_train, test_size=0.1, random_state=42
)

images_train.shape (182, 256, 256, 3)
labels_train.shape (182,)
(182, 256, 256, 3)
(182,)
6
16
40
9
40
40
37
pairs_train (4072, 2, 256, 256, 3)
pairs_labels_train (4072,)


In [6]:
print(pairs_val[:,0].shape)
print(pairs_val[:,1].shape)
print(labels_val.shape)

(408, 256, 256, 3)
(408, 256, 256, 3)
(408,)


In [17]:
def create_simclr_model(base_model, input_shape):
    inputs_1 = keras.Input(shape=input_shape, name="input_1")
    inputs_2 = keras.Input(shape=input_shape, name="input_2")

    x_1 = base_model(inputs_1)
    x_2 = base_model(inputs_2)

    x_1 = layers.GlobalAveragePooling2D()(x_1)
    x_2 = layers.GlobalAveragePooling2D()(x_2)

    x_1 = layers.Dense(256, activation='relu')(x_1)
    x_2 = layers.Dense(256, activation='relu')(x_2)

    x_1 = layers.Lambda(lambda x: K.l2_normalize(x, axis=1))(x_1)
    x_2 = layers.Lambda(lambda x: K.l2_normalize(x, axis=1))(x_2)

    model = Model([inputs_1, inputs_2], [x_1, x_2])
    return model

# ResNet50 is just an example; you can use ResNet150 if available
base_model = keras.applications.ResNet50(weights='imagenet', include_top=False, input_shape=(256, 256, 3))
base_model.trainable = False

simclr_model = create_simclr_model(base_model, (256, 256, 3))

def contrastive_loss(y_true, y_pred):
    margin = 1.0
    y_true = K.cast(y_true, 'float32')  # Cast y_true to float32
    square_pred = K.square(y_pred[0] - y_pred[1])
    margin_square = K.square(K.maximum(margin - square_pred, 0))
    return K.mean(y_true * K.cast(square_pred, 'float32') + (1 - y_true) * K.cast(margin_square, 'float32'), axis=-1)


In [11]:
print("Training data shapes:")
augmentation_train_0 = data_augmentation(pairs_train[:, 0])
augmentation_train_1 = data_augmentation(pairs_train[:, 1])

print("\nValidation data shapes:")
augmentation_val_0 = data_augmentation(pairs_val[:, 0])
augmentation_val_1 = data_augmentation(pairs_val[:, 1])


Training data shapes:

Validation data shapes:


In [18]:
# Compile the model
simclr_model.compile(optimizer=optimizers.Adam(learning_rate=0.001), loss=contrastive_loss)

# Train the model
simclr_model.fit(
    [augmentation_train_0, augmentation_train_1],
    [np.zeros_like(pairs_labels_train), np.zeros_like(pairs_labels_train)],
    epochs=2,
    batch_size=32,
    validation_data=([augmentation_val_0, augmentation_val_1], [np.zeros_like(labels_val), np.zeros_like(labels_val)])
)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x2b2e6968c40>

In [34]:
# Evaluate on the test set
pairs_test, pairs_labels_test = create_pairs(images_test, labels_test)
print('pairs_test',pairs_test.shape)
print('pairs_labels_test',pairs_labels_test.shape)

# Apply data augmentation on test pairs
augmentation_test_0 = data_augmentation(pairs_test[:, 0])
augmentation_test_1 = data_augmentation(pairs_test[:, 1])

# Predict embeddings for the test pairs
embeddings_test_0, embeddings_test_1 = simclr_model.predict([augmentation_test_0, augmentation_test_1])

# Calculate cosine similarity between embeddings
cosine_similarity = np.sum(embeddings_test_0 * embeddings_test_1, axis=-1)

# Threshold for considering pairs as matching
threshold = 0.5

# Predict binary labels (1 for matching pairs, 0 for non-matching pairs)
predicted_labels = (cosine_similarity > threshold).astype(int)

# Calculate accuracy
accuracy = accuracy_score(pairs_labels_test, predicted_labels)

print(f"Test Accuracy: {accuracy}")


(46, 256, 256, 3)
(46,)
6
0
0
0
0
0
0
pairs_test (0,)
pairs_labels_test (0,)


  class_indices = np.where(labels == i)[0]


IndexError: too many indices for array: array is 1-dimensional, but 2 were indexed

In [22]:
from sklearn.metrics import accuracy_score

# Predict the labels for test data
predictions = simclr_model.predict(images_test)
predicted_labels = np.argmax(predictions, axis=1)

# Convert predicted labels back to original class labels
predicted_labels_original = label_encoder.inverse_transform(predicted_labels)

# Calculate and print the accuracy
accuracy = accuracy_score(labels_test, predicted_labels_original)
print(f'Test Accuracy: {accuracy}')

ValueError: in user code:

    File "C:\Users\POOJYANTH REDDY\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\keras\engine\training.py", line 2137, in predict_function  *
        return step_function(self, iterator)
    File "C:\Users\POOJYANTH REDDY\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\keras\engine\training.py", line 2123, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "C:\Users\POOJYANTH REDDY\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\keras\engine\training.py", line 2111, in run_step  **
        outputs = model.predict_step(data)
    File "C:\Users\POOJYANTH REDDY\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\keras\engine\training.py", line 2079, in predict_step
        return self(x, training=False)
    File "C:\Users\POOJYANTH REDDY\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\keras\utils\traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "C:\Users\POOJYANTH REDDY\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\keras\engine\input_spec.py", line 216, in assert_input_compatibility
        raise ValueError(

    ValueError: Layer "model_4" expects 2 input(s), but it received 1 input tensors. Inputs received: [<tf.Tensor 'IteratorGetNext:0' shape=(None, 256, 256, 3) dtype=uint8>]
