In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.layers import Dropout

# # Load training pairs
# data = pd.read_csv('train.csv')
# left_image_paths = ['train/left/' + fname + '.jpg' for fname in data['left']]
# right_image_paths = ['train/right/' + fname + '.jpg' for fname in data['right']]

# left_images = np.array([load_and_preprocess_image(path) for path in left_image_paths])
# right_images = np.array([load_and_preprocess_image(path) for path in right_image_paths])

# # Create positive pairs (assuming all pairs are positive for simplicity)
# labels = np.ones(len(left_images))



In [2]:
# def center_crop(img, desired_size):
#     """Center crop an image to the desired size."""
#     width, height = img.size
#     half_width, half_height = desired_size[0] / 2, desired_size[1] / 2

#     center_x, center_y = width / 2, height / 2

#     left = center_x - half_width
#     right = center_x + half_width
#     top = center_y - half_height
#     bottom = center_y + half_height

#     return img.crop((left, top, right, bottom))

def load_and_preprocess_image(img_path):
    img = load_img(img_path)
    img_resized = img.resize((224, 224))
    img_array = img_to_array(img_resized)
    return preprocess_input(img_array)

In [3]:

# Load training pairs
data = pd.read_csv('train.csv')
left_image_paths = ['train/left/' + fname + '.jpg' for fname in data['left']]
right_image_paths = ['train/right/' + fname + '.jpg' for fname in data['right']]

left_images = np.array([load_and_preprocess_image(path) for path in left_image_paths])
right_images = np.array([load_and_preprocess_image(path) for path in right_image_paths])

# Generate negative pairs
num_positive_pairs = len(left_images)
negative_right_images = np.random.permutation(right_images)

# Stack positive and negative pairs
all_left_images = np.vstack([left_images] * 2)
all_right_images = np.vstack([right_images, negative_right_images])

# Create labels for positive (1) and negative (0) pairs
labels_positive = np.ones(num_positive_pairs)
labels_negative = np.zeros(num_positive_pairs)
all_labels = np.hstack([labels_positive, labels_negative])

# Define the CosineSimilarityLayer


class CosineSimilarityLayer(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(CosineSimilarityLayer, self).__init__(**kwargs)

    def call(self, inputs):
        x1, x2 = inputs
        dot_product = tf.reduce_sum(tf.multiply(x1, x2), axis=1)
        magnitude_x1 = tf.sqrt(tf.reduce_sum(tf.square(x1), axis=1))
        magnitude_x2 = tf.sqrt(tf.reduce_sum(tf.square(x2), axis=1))
        cosine_similarity = dot_product / (magnitude_x1 * magnitude_x2)
        return cosine_similarity

    def compute_output_shape(self, input_shape):
        return (input_shape[0][0], 1)





In [4]:
base_model = ResNet50(weights='imagenet', include_top=False, pooling='avg')

input_left = Input(shape=(224, 224, 3))
input_right = Input(shape=(224, 224, 3))

encoded_left = base_model(input_left)
encoded_right = base_model(input_right)

# Adding dropout after the base model's output
dropout_rate = 0.1  # You can adjust this value based on your needs
encoded_left = Dropout(dropout_rate)(encoded_left)
encoded_right = Dropout(dropout_rate)(encoded_right)

# Compute the cosine similarity between the two encodings
cosine_similarity = CosineSimilarityLayer()([encoded_left, encoded_right])

siamese_net = Model(inputs=[input_left, input_right], outputs=cosine_similarity)

# Use mean squared error as the loss function
siamese_net.compile(optimizer=Adam(), loss='mean_squared_error')

In [13]:
siamese_net.fit([all_left_images, all_right_images], all_labels, epochs=5, batch_size=32)


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x20b43003eb0>

In [6]:


# class CosineSimilarityLayer(tf.keras.layers.Layer):
#     def __init__(self, **kwargs):
#         super(CosineSimilarityLayer, self).__init__(**kwargs)

#     def call(self, inputs):
#         x1, x2 = inputs
#         dot_product = tf.reduce_sum(tf.multiply(x1, x2), axis=1)
#         magnitude_x1 = tf.sqrt(tf.reduce_sum(tf.square(x1), axis=1))
#         magnitude_x2 = tf.sqrt(tf.reduce_sum(tf.square(x2), axis=1))
#         cosine_similarity = dot_product / (magnitude_x1 * magnitude_x2)
#         return cosine_similarity

#     def compute_output_shape(self, input_shape):
#         return (input_shape[0][0], 1)

# base_model = ResNet50(weights='imagenet', include_top=False, pooling='avg')

# input_left = Input(shape=(224, 224, 3))
# input_right = Input(shape=(224, 224, 3))

# encoded_left = base_model(input_left)
# encoded_right = base_model(input_right)

# # Adding dropout after the base model's output
# dropout_rate = 0.1  # You can adjust this value based on your needs
# encoded_left = Dropout(dropout_rate)(encoded_left)
# encoded_right = Dropout(dropout_rate)(encoded_right)

# # Compute the cosine similarity between the two encodings
# cosine_similarity = CosineSimilarityLayer()([encoded_left, encoded_right])

# siamese_net = Model(inputs=[input_left, input_right], outputs=cosine_similarity)

# # Use mean squared error as the loss function
# siamese_net.compile(optimizer=Adam(), loss='mean_squared_error')


In [7]:
# siamese_net.fit([left_images, right_images], labels, epochs=16, batch_size=32)

In [14]:
import os
import pandas as pd
import tensorflow as tf

# Load the test data
data = pd.read_csv('test_candidates.csv')
left_dir = os.path.join('test', 'left')
right_dir = os.path.join('test', 'right')


In [15]:
def extract_features_batch(image_paths):
    """Extract features from a batch of images using the pre-trained model."""
    batch_images = []
    
    for img_path in image_paths:
        # Load and preprocess the image
        img_array = load_and_preprocess_image(img_path)
        batch_images.append(img_array)
    
    # Convert list of image arrays to a single batch array
    batch_array = np.stack(batch_images, axis=0)
    
    # Extract features for the entire batch
    features_batch = base_model.predict(batch_array)
    
    return features_batch


In [16]:
# Create an empty DataFrame to store the similarity results
similarity_results = pd.DataFrame(columns=data.columns)

# Process each row in the dataframe
for index, row in data.iterrows():
    # Extract paths for the left image and its 20 right images
    left_image_name = row['left']
    right_image_names = row[1:].tolist()
    
    # Create a list of paths for the current batch (1 left + 20 right)
    image_paths = [os.path.join(left_dir, left_image_name + '.jpg')]
    image_paths.extend([os.path.join(right_dir, name + '.jpg') for name in right_image_names])
    
    # Extract features for the entire batch
    features_batch = extract_features_batch(image_paths)
    
    # Extract the features for the left image
    left_features = features_batch[0]
    
    # Compute the similarity scores for the 20 right images
    similarities = [left_image_name]
    for i, right_name in enumerate(right_image_names):
        right_features = features_batch[i+1]
        # Compute cosine similarity
        similarity = tf.keras.losses.cosine_similarity(left_features, right_features)
        similarities.append(-similarity.numpy())  # Negative because the loss returns negative similarity
    
    # Append the similarities to the results DataFrame
    similarity_results.loc[index] = similarities






In [17]:
# Save the similarity results to a new CSV file
similarity_results.to_csv('results.csv', index=False)