## Combined

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder,StandardScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras import backend as K
import numpy as np
from keras.models import Sequential
from keras.layers import Conv1D, BatchNormalization, Dropout, Flatten, Dense,Conv2D
from keras.regularizers import l2
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping



# Custom cosine similarity metric
def cosine_similarity_metric(y_true, y_pred):
    y_true = K.l2_normalize(y_true, axis=-1)
    y_pred = K.l2_normalize(y_pred, axis=-1)
    return K.mean(K.sum(y_true * y_pred, axis=-1))

# Load the data
image_df = pd.read_csv('/content/Image_Transformed - Rasti.csv')
text_df = pd.read_csv('/content/Text_Transformed - Rasti.csv')

# Convert any non-numeric values to NaN
# image_df.iloc[:, :-1] = image_df.iloc[:, :-1].apply(pd.to_numeric, errors='coerce')
# text_df.iloc[:, :-1] = text_df.iloc[:, :-1].apply(pd.to_numeric, errors='coerce')


# Drop any rows with NaN values (if any)
image_df.dropna(inplace=True)
text_df.dropna(inplace=True)

# Initialize the encoder
encoder = LabelEncoder()
# Fit the encoder on all image names (from both image and text data)
all_image_names = np.concatenate((image_df.iloc[:, -1].values, text_df.iloc[:, -1].values))
encoder.fit(all_image_names)

# Transform the image names to encoded labels
image_df['image_name_encoded'] = encoder.transform(image_df.iloc[:, -1].values)
text_df['image_name_encoded'] = encoder.transform(text_df.iloc[:, -1].values)

In [None]:
# Extract features from both datasets
image_features = image_df.iloc[:, :-2].values
text_features = text_df.iloc[:, :-2].values

In [None]:
# Normalize the features
scaler = StandardScaler()
combined_features = np.vstack((image_features, text_features))
combined_features = scaler.fit_transform(combined_features)

In [None]:
# Combine image and text features into one large feature set
combined_features = np.vstack((image_features, text_features))

### For the padded

In [None]:
# Determine the maximum size for the second dimension
max_dim = max(image_features.shape[1], text_features.shape[1])

# Pad image_features and text_features to the same size in the second dimension
padded_image_features = np.pad(image_features, ((0, 0), (0, max_dim - image_features.shape[1])), 'constant')
padded_text_features = np.pad(text_features, ((0, 0), (0, max_dim - text_features.shape[1])), 'constant')

# Combine the features
combined_features = np.vstack((padded_image_features, padded_text_features))

# Verify the shape of the combined features
print(combined_features.shape)  # Should be (2084, max_dim)

## continue

In [None]:
# Combine labels
combined_labels = np.concatenate((image_df['image_name_encoded'].values, text_df['image_name_encoded'].values))

In [None]:
# Split the combined data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(combined_features, combined_labels, test_size=0.2, random_state=42)

In [None]:
# Reshape for Conv1D
X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

In [None]:
# Define the model
model = Sequential([
    Conv1D(32, 3, activation='relu', input_shape=(combined_features.shape[1], 1), kernel_regularizer=l2(0.01)),
    BatchNormalization(),
    Dropout(0.3),
    Conv1D(64, 3, activation='relu', kernel_regularizer=l2(0.01)),
    BatchNormalization(),
    Dropout(0.3),
    Conv1D(128, 3, activation='relu', kernel_regularizer=l2(0.01)),
    BatchNormalization(),
    Dropout(0.3),
    Flatten(),
    Dense(64, activation='relu', kernel_regularizer=l2(0.01)),
    BatchNormalization(),
    Dropout(0.3),
    Dense(len(encoder.classes_), activation='softmax')
])


In [None]:
# Compile the model
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer,
              loss='sparse_categorical_crossentropy',
              metrics=[cosine_similarity_metric])

# Add early stopping
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

In [None]:
# Train the model
history = model.fit(X_train, y_train, epochs=100, batch_size=32, validation_data=(X_test, y_test), callbacks=[early_stopping])

In [None]:
# Evaluate the model
loss, cosine_sim = model.evaluate(X_test, y_test)
print(f"Loss: {loss}, Cosine Similarity: {cosine_sim}")

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper right')

# Plot training & validation cosine similarity values
plt.subplot(1, 2, 2)
plt.plot(history.history['cosine_similarity_metric'])
plt.plot(history.history['val_cosine_similarity_metric'])
plt.title('Model Cosine Similarity')
plt.ylabel('Cosine Similarity')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.tight_layout()
plt.show()


In [None]:
def calculate_recall_at_k_with_names(model, X_test, y_test, encoder, k=1):
    # Get predictions as probabilities
    probabilities = model.predict(X_test)

    # For each prediction, get the top K category indices
    top_k_indices = np.argsort(-probabilities, axis=1)[:, :k]

    # Decode these indices to actual names using the encoder
    top_k_labels = np.vectorize(lambda x: encoder.classes_[x])(top_k_indices)

    # Decode the true labels to names
    true_labels_names = encoder.inverse_transform(y_test)

    # Check if the true category name is within these top K predictions
    matches = [true_labels_names[i] in top_k_labels[i] for i in range(len(y_test))]

    # Calculate recall at K
    recall_at_k = np.mean(matches)
    return recall_at_k

In [None]:
# Calculate Recall@K for K = 1, 5, 10
k_values = [1, 5, 10]
recalls = {f"Recall@{k}": calculate_recall_at_k_with_names(model, X_test, y_test, encoder, k) for k in k_values}

# Print recalls formatted as percentages
for k, recall in recalls.items():
    print(f"{k}: {recall:.2%}")


## Triple loss

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense,BatchNormalization, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import backend as K
from sklearn.preprocessing import LabelEncoder, StandardScaler
from tensorflow.keras.regularizers import l2

In [None]:
# Load the data
image_df = pd.read_csv('/content/Image_Transformed - Rasti.csv')
text_df = pd.read_csv('/content/Text_Transformed - Rasti.csv')

In [None]:
# Drop any rows with NaN values (if any)
image_df.dropna(inplace=True)
text_df.dropna(inplace=True)

# Extract features and image names
image_feature_values = image_df.iloc[:, :-1].values
image_image_name_values = image_df.iloc[:, -1].values

text_feature_values = text_df.iloc[:, :-1].values
text_image_name_values = text_df.iloc[:, -1].values

In [None]:
# Create dataframes with labels as a column
image_df = pd.DataFrame(image_feature_values)
image_df['image_name'] = image_image_name_values

text_df = pd.DataFrame(text_feature_values)
text_df['image_name'] = text_image_name_values

In [None]:
# Find common image names
common_image_names = np.intersect1d(image_image_name_values, text_image_name_values)

In [None]:
# Filter dataframes to include only common image names
image_df = image_df[image_df['image_name'].isin(common_image_names)]
text_df = text_df[text_df['image_name'].isin(common_image_names)]

In [None]:
# Ensure the dataframes are sorted by image names
image_df = image_df.sort_values(by='image_name')
text_df = text_df.sort_values(by='image_name')

In [None]:
# Merge dataframes on image names to ensure alignment
aligned_df = pd.merge(image_df, text_df, on='image_name', suffixes=('_image', '_text'))

In [None]:
# Extract aligned features and labels
aligned_image_features = aligned_df.filter(regex='_image$').values
aligned_text_features = aligned_df.filter(regex='_text$').values
aligned_image_names = aligned_df['image_name'].values

In [None]:
# Standardize the features
scaler = StandardScaler()
aligned_image_features = scaler.fit_transform(aligned_image_features)
aligned_text_features = scaler.fit_transform(aligned_text_features)

In [None]:
# Debug statements to check lengths
print(f"Aligned image features length: {len(aligned_image_features)}")
print(f"Aligned text features length: {len(aligned_text_features)}")
print(f"Aligned image names length: {len(aligned_image_names)}")
# Ensure lengths match
assert len(aligned_image_features) == len(aligned_text_features) == len(aligned_image_names), "Lengths of aligned features and names do not match"

In [None]:
# Initialize and fit the label encoder
encoder = LabelEncoder()
encoder.fit(aligned_image_names)

# Transform the image names to encoded labels
labels_encoded = encoder.transform(aligned_image_names)

In [None]:
# Combine aligned features and labels into a single dataframe for splitting
combined_df = pd.DataFrame(aligned_image_features)
combined_df['text_features'] = list(aligned_text_features)
combined_df['label'] = labels_encoded

In [None]:
# Split data into training and test sets
train_df, test_df = train_test_split(combined_df, test_size=0.2, random_state=42)

In [None]:
# Extract features and labels for training and testing sets
image_train = np.array(train_df.iloc[:, :-2].values.tolist())
text_train = np.array(train_df['text_features'].tolist())
label_train = train_df['label'].values

image_test = np.array(test_df.iloc[:, :-2].values.tolist())
text_test = np.array(test_df['text_features'].tolist())
label_test = test_df['label'].values

The triplet loss function is defined as follows:

$$
L = \max \left( d(a, p) - d(a, n) + \text{margin}, 0 \right)
$$

Where:
- \( L \) is the triplet loss value.
- \( d(a, p) \) is the distance between the anchor and positive samples.
- \( d(a, n) \) is the distance between the anchor and negative samples.
- \( \text{margin} \) is a margin hyperparameter.


In [None]:
# Define Triplet Loss
def triplet_loss(alpha=0.4): # alpha is Margin
    def loss(y_true, y_pred):
        total_length = y_pred.shape[-1]
        #three tensors: anchor, positive, and negative
        anchor, positive, negative = y_pred[:, :total_length//3], y_pred[:, total_length//3:2*total_length//3], y_pred[:, 2*total_length//3:]

        #Euclidean distances
        pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=-1)
        neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=-1)

        basic_loss = pos_dist - neg_dist + alpha
        #tf.reduce_mean() is used to aggregate the individual triplet loss
        return tf.reduce_mean(tf.maximum(basic_loss, 0.0)) #L=max(d(a,p)−d(a,n)+margin,0)
    return loss

In [None]:
# Enhanced Model Architecture
input_dim = aligned_image_features.shape[1]
#dimensionality of the embedding space
#embedding_dim = 128    Test: 10, 16, 32, 64, and 128
embedding_dim = 32

In [None]:
# Inputs
anchor_input = Input(shape=(input_dim,), name="anchor_input")
positive_input = Input(shape=(input_dim,), name="positive_input")
negative_input = Input(shape=(input_dim,), name="negative_input")

### Siamese neural network

In [None]:
# Shared embedding layer with BatchNormalization and Dropout
def create_embedding_network(input):
    x = Dense(512, activation='relu', kernel_regularizer=l2(0.01))(input)  # Increased units
    x = BatchNormalization()(x)
    x = Dropout(0.4)(x)  # Adjusted dropout rate
    x = Dense(256, activation='relu', kernel_regularizer=l2(0.01))(x)
    x = BatchNormalization()(x)
    x = Dropout(0.4)(x)  # Adjusted dropout rate
    x = Dense(128, activation='relu', kernel_regularizer=l2(0.01))(x)
    x = BatchNormalization()(x)
    return x

In [None]:
anchor_embedding = create_embedding_network(anchor_input)
positive_embedding = create_embedding_network(positive_input)
negative_embedding = create_embedding_network(negative_input)

In [None]:
# Concatenate embeddings
#which contains representations of anchor, positive, and negative inputs stacked together in a Siamese neural network architecture.
combined_embeddings = tf.concat([anchor_embedding, positive_embedding, negative_embedding], axis=-1)

# Build the model
model = Model(inputs=[anchor_input, positive_input, negative_input], outputs=combined_embeddings)
model.compile(optimizer=Adam(learning_rate=0.001), loss=triplet_loss())

In [None]:
# Data preparation for triplets
def create_triplets(image_features, text_features, image_labels, num_triplets):
    triplets = []
    num_classes = len(np.unique(image_labels))
    for _ in range(num_triplets):
        anchor_idx = np.random.randint(0, len(image_labels))#randomly select an index from the range of available indices in the dataset
        anchor_label = image_labels[anchor_idx]
        positive_idx = np.random.choice(np.where(image_labels == anchor_label)[0])
        negative_idx = np.random.choice(np.where(image_labels != anchor_label)[0])
        triplets.append((image_features[anchor_idx], text_features[positive_idx], text_features[negative_idx]))
    triplets = np.array(triplets)
    return triplets[:, 0], triplets[:, 1], triplets[:, 2]

In [None]:
# Generate triplets for training
triplets_train = create_triplets(image_train, text_train, label_train, num_triplets=20000) # for num_triplets try 5000 10000 20000

In [None]:
# Train the model
history = model.fit([triplets_train[0], triplets_train[1], triplets_train[2]], np.zeros((triplets_train[0].shape[0], 1)), epochs=20, batch_size=32)

In [None]:
import matplotlib.pyplot as plt

# Generate triplets for testing
triplets_test = create_triplets(image_test, text_test, label_test, num_triplets=5000)

# Evaluate the model on the test triplets
test_loss = model.evaluate([triplets_test[0], triplets_test[1], triplets_test[2]], np.zeros((triplets_test[0].shape[0], 1)))

print(f"Test Loss: {test_loss}")

# Plot the training history
plt.plot(history.history['loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train'], loc='upper right')
plt.show()


In [None]:
# Generate embeddings for test data
anchor_embeddings_test = model.predict([image_test, text_test, text_test])

In [None]:
# Define recall@k function
def recall_at_k(embeddings, labels, k):
    recalls = []
    for i, anchor_embedding in enumerate(embeddings):
        distances = np.linalg.norm(embeddings - anchor_embedding, axis=1) #Euclidean distance
        sorted_indices = np.argsort(distances)
        top_k_indices = sorted_indices[1:k+1]  # Exclude the anchor itself
        true_positives = np.sum(labels[top_k_indices] == labels[i])
        recalls.append(true_positives / k)
    return np.mean(recalls)

# Evaluate Recall@1, Recall@5, and Recall@10 on test data
for k in [1, 5, 10]:
    recall_at_k_test = recall_at_k(anchor_embeddings_test, label_test, k)
    print("Recall@{} on test data: {:.2f}%".format(k, recall_at_k_test))

The Euclidean distance between two points $ P = (p_1, p_2, \ldots, p_n) $ and $ Q = (q_1, q_2, \ldots, q_n) $ in $ n $-dimensional space is calculated using the following formula:

$\text{Euclidean distance} = \sqrt{\sum_{i=1}^{n} (q_i - p_i)^2}$

In this formula:
- $ n $ represents the number of dimensions (or features) in the space.
- $ p_i $ and $ q_i $ are the $ i $th components of points $ P $ and $ Q $, respectively.

This formula computes the square root of the sum of the squared differences between corresponding components of the two points. It represents the straight-line distance between the two points in the $ n $-dimensional space.


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.layers import Input

def prepare_triplet_data(image_csv, text_csv, input_dim):
    # Load the data
    image_df = pd.read_csv(image_csv)
    text_df = pd.read_csv(text_csv)

    # Drop any rows with NaN values (if any)
    image_df.dropna(inplace=True)
    text_df.dropna(inplace=True)

    # Extract features and image names
    image_feature_values = image_df.iloc[:, :-1].values
    image_image_name_values = image_df.iloc[:, -1].values

    text_feature_values = text_df.iloc[:, :-1].values
    text_image_name_values = text_df.iloc[:, -1].values

    # Create dataframes with labels as a column
    image_df = pd.DataFrame(image_feature_values)
    image_df['image_name'] = image_image_name_values

    text_df = pd.DataFrame(text_feature_values)
    text_df['image_name'] = text_image_name_values

    # Find common image names
    common_image_names = np.intersect1d(image_image_name_values, text_image_name_values)

    # Filter dataframes to include only common image names
    image_df = image_df[image_df['image_name'].isin(common_image_names)]
    text_df = text_df[text_df['image_name'].isin(common_image_names)]

    # Ensure the dataframes are sorted by image names
    image_df = image_df.sort_values(by='image_name')
    text_df = text_df.sort_values(by='image_name')

    # Extract aligned features and labels
    aligned_image_features = image_df.iloc[:, :-1].values
    aligned_text_features = text_df.iloc[:, :-1].values
    aligned_image_names = image_df['image_name'].values

    # Initialize and fit the label encoder
    encoder = LabelEncoder()
    encoder.fit(aligned_image_names)

    # Transform the image names to encoded labels
    labels_encoded = encoder.transform(aligned_image_names)

    # Define anchor, positive, and negative inputs
    anchor_input = Input(shape=(input_dim,), name="anchor_input")
    positive_input = Input(shape=(input_dim,), name="positive_input")
    negative_input = Input(shape=(input_dim,), name="negative_input")

    return anchor_input, positive_input, negative_input, aligned_image_features, aligned_text_features, labels_encoded

def recall_at_k(embeddings, labels, k):
    recalls = []
    for i, anchor_embedding in enumerate(embeddings):
        distances = np.linalg.norm(embeddings - anchor_embedding, axis=1) # Euclidean distance
        sorted_indices = np.argsort(distances)
        top_k_indices = sorted_indices[1:k+1]  # Exclude the anchor itself
        true_positives = np.sum(labels[top_k_indices] == labels[i])
        recalls.append(true_positives / k)
    return np.mean(recalls)

# Example usage
image_csv_path = '/content/Text_Rasti.csv'
text_csv_path = '/content/Image_Rasti_test.csv'
input_dim = 16  # Example input dimension

anchor_input, positive_input, negative_input, aligned_image_features, aligned_text_features, labels_encoded = prepare_triplet_data(image_csv_path, text_csv_path, input_dim)

# Evaluate Recall@1, Recall@5, and Recall@10 on test data
for k in [1, 5, 10]:
    recall_at_k_test = recall_at_k(aligned_image_features, labels_encoded, k)
    print("Recall@{} on test data: {:.2f}%".format(k, recall_at_k_test * 100))
