In [None]:
# Import standard libraries
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2

# Import TensorFlow and Keras modules
import tensorflow as tf
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Dropout, Input, Lambda
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Import scikit-learn utilities
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import train_test_split

# Import Keras backend
import tensorflow.keras.backend as K

# Suppress TensorFlow warnings for cleaner output (optional)
import logging
logging.getLogger('tensorflow').setLevel(logging.ERROR)

# Set random seed for reproducibility
seed = 42
np.random.seed(seed)
tf.random.set_seed(seed)


In [None]:
# Define the base directory where datasets and models are stored
base_dir = r'C:\Users\bheja\OneDrive\Desktop\Dataset'

# Define directories for PlantVillage and PlantDoc datasets
plant_vil_dir = os.path.join(base_dir, 'plantvillage')
plant_doc_dir = os.path.join(base_dir, 'plantdoc')

# Define paths for CSV files
train_csv_path = os.path.join(base_dir, 'PV_train.csv')
test_seen_csv_path = os.path.join(base_dir, 'PV_test_seen.csv')
test_unseen_csv_path = os.path.join(base_dir, 'PV_test_unseen.csv')
doc_unseen_csv_path = os.path.join(base_dir, 'PD_test_unseen.csv')

# Define path for the trained ResNet50V2 model
trained_model_path = r'C:\Users\bheja\OneDrive\Desktop\models\resnet50v2_crop.h5'

# Load CSV files into pandas DataFrames
train_data = pd.read_csv(train_csv_path, header=None, names=["image_name", "crop_class", "disease_class"])
test_seen_data = pd.read_csv(test_seen_csv_path, header=None, names=["image_name", "crop_class", "disease_class"])
test_unseen_data = pd.read_csv(test_unseen_csv_path, header=None, names=["image_name", "crop_class", "disease_class"])
doc_unseen_data = pd.read_csv(doc_unseen_csv_path, header=None, names=["image_name", "crop_class", "disease_class"])

# Display the first few rows of the training data to verify
print("Sample Training Data:")
print(train_data.head())

# Verify the data types of the relevant columns
print("\nData Types:")
print(train_data.dtypes)


In [None]:
# Load the trained ResNet50V2 model
try:
    model = load_model(trained_model_path, compile=False)
    print("ResNet50V2 model loaded successfully.")
except Exception as e:
    print(f"Error loading model: {e}")

# Display the model summary
model.summary()


In [None]:
# List all layers with their indices and names
for idx, layer in enumerate(model.layers):
    print(f"{idx}: {layer.name} - {layer.output_shape}")


In [None]:
# define feature extractor

In [None]:
from tensorflow.keras.models import Model

# Specify the name of the target layer for feature extraction
target_layer_name = 'global_average_pooling2d_2'

# Retrieve the output of the target layer
try:
    target_layer_output = model.get_layer(name=target_layer_name).output
    print(f"Successfully retrieved the output of layer: {target_layer_name}")
except ValueError:
    print(f"Layer {target_layer_name} not found. Please check the layer name.")
    # Optionally, list all layer names for reference
    for layer in model.layers:
        print(layer.name)
    raise

# Create the Feature Extractor Model
feature_extractor = Model(inputs=model.input, outputs=target_layer_output)

print("Feature extractor model created successfully.")


In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Define ImageDataGenerator for feature extraction (no augmentation needed)
feature_datagen = ImageDataGenerator(rescale=1./255)

def create_feature_generator_optimized(dataframe, directory, target_size=(224, 224), batch_size=128, shuffle=False):
    """
    Creates an optimized ImageDataGenerator for feature extraction with multiple workers.
    
    Args:
        dataframe (pd.DataFrame): DataFrame containing image paths.
        directory (str): Directory where images are stored.
        target_size (tuple): Desired image size.
        batch_size (int): Number of images per batch.
        shuffle (bool): Whether to shuffle the data.
    
    Returns:
        Iterator: Keras generator yielding batches of images.
    """
    generator = feature_datagen.flow_from_dataframe(
        dataframe=dataframe,
        directory=directory,
        x_col="image_name",
        y_col=None,  # No labels needed for feature extraction
        target_size=target_size,
        batch_size=batch_size,
        class_mode=None,
        shuffle=shuffle,
        seed=42,
        workers=4,  # Number of parallel workers (adjust based on your CPU)
        use_multiprocessing=True
    )
    return generator

# Recreate the optimized generators with a larger batch size
train_feature_gen_optimized = create_feature_generator_optimized(
    dataframe=train_data,
    directory=plant_vil_dir,
    batch_size=64,
    shuffle=False
)

test_seen_feature_gen_optimized = create_feature_generator_optimized(
    dataframe=test_seen_data,
    directory=plant_vil_dir,
    batch_size=64,
    shuffle=False
)

test_unseen_feature_gen_optimized = create_feature_generator_optimized(
    dataframe=test_unseen_data,
    directory=plant_vil_dir,
    batch_size=64,
    shuffle=False
)

doc_unseen_feature_gen_optimized = create_feature_generator_optimized(
    dataframe=doc_unseen_data,
    directory=plant_doc_dir,
    batch_size=64,
    shuffle=False
)

print("Optimized feature data generators created successfully.")


In [None]:
import numpy as np

def extract_features_direct(generator, model, num_samples):
    """
    Extracts features from all images in a generator using the provided model.
    Utilizes Keras's built-in predict method for efficiency.
    
    Args:
        generator (Iterator): Keras ImageDataGenerator iterator.
        model (tf.keras.Model): Feature extractor model.
        num_samples (int): Total number of samples to process.
    
    Returns:
        np.array: Extracted features.
    """
    # Calculate the number of steps (batches)
    steps = int(np.ceil(num_samples / generator.batch_size))
    
    # Use Keras's predict method with generator
    features = model.predict(generator, steps=steps, verbose=1)
    
    return features


In [None]:
# Define the number of samples in each dataset
num_train = train_feature_gen_optimized.n
num_test_seen = test_seen_feature_gen_optimized.n
num_test_unseen = test_unseen_feature_gen_optimized.n
num_doc_unseen = doc_unseen_feature_gen_optimized.n

# Extract features for training data
print("Extracting Training Features...")
train_features = extract_features_direct(
    generator=train_feature_gen_optimized,
    model=feature_extractor,
    num_samples=num_train
)
print(f"Training features extracted: {train_features.shape}")

# Extract features for seen test data
print("\nExtracting Seen Test Features...")
test_seen_features = extract_features_direct(
    generator=test_seen_feature_gen_optimized,
    model=feature_extractor,
    num_samples=num_test_seen
)
print(f"Seen Test features extracted: {test_seen_features.shape}")

# Extract features for unseen test data
print("\nExtracting Unseen Test Features...")
test_unseen_features = extract_features_direct(
    generator=test_unseen_feature_gen_optimized,
    model=feature_extractor,
    num_samples=num_test_unseen
)
print(f"Unseen Test features extracted: {test_unseen_features.shape}")

# Extract features for PlantDoc unseen test data
print("\nExtracting PlantDoc Unseen Test Features...")
doc_unseen_features = extract_features_direct(
    generator=doc_unseen_feature_gen_optimized,
    model=feature_extractor,
    num_samples=num_doc_unseen
)
print(f"PlantDoc Unseen Test features extracted: {doc_unseen_features.shape}")


In [None]:
import os
import numpy as np

# Define directory to save features
features_dir = r'C:\Users\bheja\OneDrive\Desktop\features'

# Create the directory if it doesn't exist
os.makedirs(features_dir, exist_ok=True)

# Save features as .npy files
np.save(os.path.join(features_dir, 'train_features.npy'), train_features)
np.save(os.path.join(features_dir, 'test_seen_features.npy'), test_seen_features)
np.save(os.path.join(features_dir, 'test_unseen_features.npy'), test_unseen_features)
np.save(os.path.join(features_dir, 'doc_unseen_features.npy'), doc_unseen_features)

print("All features saved successfully.")


In [None]:
import os
import numpy as np

# Define directory where features are saved
features_dir = r'C:\Users\bheja\OneDrive\Desktop\features'

# Load features from .npy files
train_features = np.load(os.path.join(features_dir, 'train_features.npy'))
test_seen_features = np.load(os.path.join(features_dir, 'test_seen_features.npy'))
test_unseen_features = np.load(os.path.join(features_dir, 'test_unseen_features.npy'))
doc_unseen_features = np.load(os.path.join(features_dir, 'doc_unseen_features.npy'))

print("All features loaded successfully.")


In [None]:
# Verify shapes
print(f"Training Features Shape: {train_features.shape}")
print(f"Seen Test Features Shape: {test_seen_features.shape}")
print(f"Unseen Test Features Shape: {test_unseen_features.shape}")
print(f"PlantDoc Unseen Test Features Shape: {doc_unseen_features.shape}")

# Display a sample feature vector
print("\nSample Training Feature Vector (first 5 elements):")
print(train_features[0][:5])


In [None]:
from sklearn.preprocessing import LabelEncoder

# Initialize the LabelEncoder
label_encoder = LabelEncoder()

# Encode the 'crop_class' labels
train_data['encoded_crop_class'] = label_encoder.fit_transform(train_data['crop_class'])

# Verify the encoding
print("\nEncoded Labels:")
print(train_data[['crop_class', 'encoded_crop_class']].head())
print("\nUnique Encoded Classes:", train_data['encoded_crop_class'].unique())


In [None]:
# Encode the 'crop_class' labels in validation data using the same LabelEncoder
test_seen_data['encoded_crop_class'] = label_encoder.transform(test_seen_data['crop_class'])

# Verify the encoding
print("\nEncoded Validation Labels:")
print(test_seen_data[['crop_class', 'encoded_crop_class']].head())

# Display unique encoded classes in validation data
print("\nUnique Encoded Classes in Validation Data:", test_seen_data['encoded_crop_class'].unique())


In [None]:
import numpy as np
from itertools import combinations
import random

def create_pairs(features, labels, num_pairs):
    """
    Create positive and negative pairs of feature vectors.

    Args:
        features (np.array): Feature vectors.
        labels (np.array): Corresponding labels.
        num_pairs (int): Number of pairs to create.

    Returns:
        pairs (np.array): Array of paired feature vectors.
        pair_labels (np.array): Array of labels (1 for similar, 0 for dissimilar).
    """
    pairs = []
    pair_labels = []
    num_classes = len(np.unique(labels))
    
    # Create a dictionary mapping labels to indices
    label_to_indices = {label: np.where(labels == label)[0] for label in np.unique(labels)}
    
    for _ in range(num_pairs):
        # Decide whether to create a positive or negative pair
        if random.random() < 0.5:
            # Positive pair
            label = random.choice(list(label_to_indices.keys()))
            idx1, idx2 = np.random.choice(label_to_indices[label], 2, replace=False)
            pairs.append([features[idx1], features[idx2]])
            pair_labels.append(1)
        else:
            # Negative pair
            label1, label2 = random.sample(list(label_to_indices.keys()), 2)
            idx1 = random.choice(label_to_indices[label1])
            idx2 = random.choice(label_to_indices[label2])
            pairs.append([features[idx1], features[idx2]])
            pair_labels.append(0)
    
    return np.array(pairs), np.array(pair_labels)


In [None]:
# Define the number of training pairs you want to create
num_train_pairs = 50000  # Adjust based on your dataset size and requirements

# Extract training labels as a NumPy array
train_labels_array = train_data['encoded_crop_class'].values

# Create training pairs using the provided create_pairs function
train_pair_features, train_pair_labels = create_pairs(train_features, train_labels_array, num_train_pairs)

# Check the number of created pairs
print(f"Total Training Pairs Created: {len(train_pair_features)}")
print(f"Total Training Labels Created: {len(train_pair_labels)}")


In [None]:
# Define the number of validation pairs you want to create
num_val_pairs = 10000  # Adjust based on your dataset size and requirements

# Extract validation labels as a NumPy array
val_labels_array = test_seen_data['encoded_crop_class'].values

# Create validation pairs using the provided create_pairs function
val_pair_features, val_pair_labels = create_pairs(test_seen_features, val_labels_array, num_val_pairs)

# Check the number of created pairs
print(f"Total Validation Pairs Created: {len(val_pair_features)}")
print(f"Total Validation Labels Created: {len(val_pair_labels)}")


In [None]:
# Separate the training pairs into two arrays
train_pair_features_1 = np.array([pair[0] for pair in train_pair_features])
train_pair_features_2 = np.array([pair[1] for pair in train_pair_features])
train_pair_labels = np.array(train_pair_labels)

# Display the shapes to verify
print(f"Training Pair Features 1 Shape: {train_pair_features_1.shape}")
print(f"Training Pair Features 2 Shape: {train_pair_features_2.shape}")
print(f"Training Pair Labels Shape: {train_pair_labels.shape}")

# Separate the validation pairs into two arrays
val_pair_features_1 = np.array([pair[0] for pair in val_pair_features])
val_pair_features_2 = np.array([pair[1] for pair in val_pair_features])
val_pair_labels = np.array(val_pair_labels)

# Display the shapes to verify
print(f"Validation Pair Features 1 Shape: {val_pair_features_1.shape}")
print(f"Validation Pair Features 2 Shape: {val_pair_features_2.shape}")
print(f"Validation Pair Labels Shape: {val_pair_labels.shape}")


In [None]:
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model

def create_base_network(input_dim):
    """
    Creates the base network for feature processing.

    Args:
        input_dim (int): Dimension of the input feature vector.

    Returns:
        keras.Model: Base network model.
    """
    input = Input(shape=(input_dim,), name='Base_Input')
    x = Dense(512, activation='relu', name='Dense_512')(input)
    x = Dense(256, activation='relu', name='Dense_256')(x)
    x = Dense(128, activation='relu', name='Dense_128')(x)
    return Model(inputs=input, outputs=x, name='Base_Network')

# Define input dimension
input_dim = train_features.shape[1]  # 2048

# Create the base network
base_network = create_base_network(input_dim)

# Display the base network summary
base_network.summary()


In [None]:
from tensorflow.keras.layers import Input, Lambda, Dense
from tensorflow.keras.models import Model
import tensorflow.keras.backend as K

def euclidean_distance(vects):
    """
    Compute the Euclidean distance between two vectors.

    Args:
        vects (list): List containing two tensors.

    Returns:
        tensor: Euclidean distance.
    """
    x, y = vects
    sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))

def create_siamese_network(base_network):
    """
    Creates a Siamese Network using the provided base network.

    Args:
        base_network (keras.Model): Base network to process input features.

    Returns:
        keras.Model: Siamese Network model.
    """
    input_a = Input(shape=(base_network.input_shape[1],), name='Input_A')
    input_b = Input(shape=(base_network.input_shape[1],), name='Input_B')
    
    # Process both inputs through the base network
    processed_a = base_network(input_a)
    processed_b = base_network(input_b)
    
    # Compute the Euclidean distance between the two processed inputs
    distance = Lambda(euclidean_distance, name='Euclidean_Distance')([processed_a, processed_b])
    
    # Output layer with sigmoid activation for binary classification
    output = Dense(1, activation='sigmoid', name='Similarity')(distance)
    
    # Define the Siamese Network model
    siamese_network = Model(inputs=[input_a, input_b], outputs=output, name='Siamese_Network')
    
    return siamese_network

# Create the Siamese Network
siamese_network = create_siamese_network(base_network)

# Display the Siamese Network summary
siamese_network.summary()


In [None]:
from tensorflow.keras.optimizers import Adam

# Compile the model
siamese_network.compile(
    loss='binary_crossentropy',
    optimizer=Adam(learning_rate=0.001),
    metrics=['accuracy']
)

print("Siamese Network compiled successfully.")


In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

# Define the path to save the best model
checkpoint_path = os.path.join(features_dir, 'siamese_network_best.h5')

# EarlyStopping callback to stop training when validation loss doesn't improve
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True,
    verbose=1
)

# ReduceLROnPlateau callback to reduce learning rate when validation loss plateaus
reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=3,
    min_lr=1e-6,
    verbose=1
)

# ModelCheckpoint callback to save the best model based on validation loss
model_checkpoint = ModelCheckpoint(
    filepath=checkpoint_path,
    monitor='val_loss',
    save_best_only=True,
    verbose=1
)

# Combine all callbacks into a list
callbacks = [early_stopping, reduce_lr, model_checkpoint]

print("Callbacks defined successfully.")


In [None]:
# Define the number of epochs and batch size
epochs = 20  # You can adjust this based on your requirements
batch_size = 128  # Adjust based on your system's memory capacity

# Train the Siamese Network
history = siamese_network.fit(
    [train_pair_features_1, train_pair_features_2],
    train_pair_labels,
    validation_data=([val_pair_features_1, val_pair_features_2], val_pair_labels),
    epochs=epochs,
    batch_size=batch_size,
    callbacks=callbacks,
    verbose=1  # Set to 1 to see progress bar, 2 for one line per epoch
)

print("Training completed successfully.")


In [None]:
from tensorflow.keras.models import load_model
import os

# Define the path to the best saved model
best_model_path = os.path.join(features_dir, 'siamese_network_best.h5')

# Check if the best model file exists
if os.path.exists(best_model_path):
    print(f"Best model found at: {best_model_path}")
else:
    print(f"Best model not found at: {best_model_path}. Please ensure the model was saved correctly.")

# Load the best saved Siamese Network model
try:
    best_siamese_model = load_model(best_model_path, compile=False)
    print("Best Siamese Network model loaded successfully.")
except Exception as e:
    print(f"Error loading the best Siamese Network model: {e}")


In [None]:
import numpy as np

def create_class_prototypes(features, labels, num_classes):
    """
    Creates prototype feature vectors for each class.

    Args:
        features (np.array): Array of feature vectors.
        labels (np.array): Array of encoded class labels.
        num_classes (int): Total number of classes.

    Returns:
        prototypes (dict): Dictionary mapping class labels to prototype vectors.
    """
    prototypes = {}
    for cls in range(num_classes):
        class_features = features[labels == cls]
        if len(class_features) == 0:
            print(f"Warning: No samples for class {cls}")
            continue
        prototype = np.mean(class_features, axis=0)
        prototypes[cls] = prototype
    return prototypes

# Number of classes
num_classes = len(label_encoder.classes_)

# Create prototypes for seen classes using training data
train_prototypes = create_class_prototypes(train_features, train_labels_array, num_classes)

# Display a prototype for the first class
first_class = label_encoder.inverse_transform([0])[0]
print(f"Prototype for class '{first_class}': {train_prototypes[0][:5]}...")


In [None]:
from scipy.spatial.distance import cosine

def classify_image(feature, prototypes, top_k=5):
    """
    Classifies an image feature by finding the top_k closest class prototypes based on cosine similarity.

    Args:
        feature (np.array): Feature vector of the image.
        prototypes (dict): Dictionary of class prototypes.
        top_k (int): Number of top similar classes to consider.

    Returns:
        top_classes (list): List of top_k predicted class labels.
        similarity_scores (list): List of corresponding similarity scores.
    """
    similarity_scores = {}
    for cls, prototype in prototypes.items():
        similarity = 1 - cosine(feature, prototype)  # Cosine similarity
        similarity_scores[cls] = similarity

    # Sort classes based on similarity scores in descending order
    sorted_classes = sorted(similarity_scores.items(), key=lambda item: item[1], reverse=True)
    
    # Extract top_k classes and their scores
    top_classes = [label_encoder.inverse_transform([cls])[0] for cls, score in sorted_classes[:top_k]]
    similarity_scores_sorted = [score for cls, score in sorted_classes[:top_k]]
    
    return top_classes, similarity_scores_sorted

def evaluate_model_on_dataset(features, labels, prototypes, top_k=5):
    """
    Evaluates the model's performance on a given dataset.

    Args:
        features (np.array): Array of feature vectors.
        labels (np.array): Array of true encoded class labels.
        prototypes (dict): Dictionary of class prototypes.
        top_k (int): Number of top similar classes to consider.

    Returns:
        top1_accuracy (float): Top-1 accuracy.
        top5_accuracy (float): Top-5 accuracy.
    """
    correct_top1 = 0
    correct_top5 = 0
    total = len(labels)
    
    for i in range(total):
        feature = features[i]
        true_label = label_encoder.inverse_transform([labels[i]])[0]
        predicted_topk, _ = classify_image(feature, prototypes, top_k=top_k)
        
        if true_label == predicted_topk[0]:
            correct_top1 += 1
            correct_top5 += 1
        elif true_label in predicted_topk:
            correct_top5 += 1
    
    top1_accuracy = correct_top1 / total
    top5_accuracy = correct_top5 / total
    
    return top1_accuracy, top5_accuracy

print("Inference functions defined successfully.")


In [None]:
# Extract training labels as a NumPy array
train_labels_eval = train_data['encoded_crop_class'].values

# Evaluate on Training Data
train_top1_acc, train_top5_acc = evaluate_model_on_dataset(
    features=train_features,
    labels=train_labels_eval,
    prototypes=train_prototypes,
    top_k=5
)

print(f"Training Data - Top-1 Accuracy: {train_top1_acc * 100:.2f}%")
print(f"Training Data - Top-5 Accuracy: {train_top5_acc * 100:.2f}%")


In [None]:
# Extract validation labels as a NumPy array
val_labels_eval = test_seen_data['encoded_crop_class'].values

# Evaluate on Test Seen Data (Validation Set)
val_top1_acc, val_top5_acc = evaluate_model_on_dataset(
    features=test_seen_features,
    labels=val_labels_eval,
    prototypes=train_prototypes,
    top_k=5
)

print(f"Test Seen Data (Validation) - Top-1 Accuracy: {val_top1_acc * 100:.2f}%")
print(f"Test Seen Data (Validation) - Top-5 Accuracy: {val_top5_acc * 100:.2f}%")


In [None]:
# Encode the 'crop_class' labels in test_unseen_data using the same LabelEncoder
try:
    test_unseen_data['encoded_crop_class'] = label_encoder.transform(test_unseen_data['crop_class'])
    print("\nEncoded Test Unseen Labels:")
    print(test_unseen_data[['crop_class', 'encoded_crop_class']].head())
    
    # Display unique encoded classes in test_unseen_data
    print("\nUnique Encoded Classes in Test Unseen Data:", test_unseen_data['encoded_crop_class'].unique())
except ValueError as e:
    print(f"Error encoding test_unseen_data: {e}")
    
    # Identify classes not in label_encoder
    unseen_classes = set(test_unseen_data['crop_class']) - set(label_encoder.classes_)
    print(f"Unseen classes in test_unseen_data: {unseen_classes}")
    
    # Optionally, you can handle unseen classes here
    # For example, you can assign a special label or exclude these samples
    # Here's how to assign a special label (e.g., -1) to unseen classes:
    test_unseen_data['encoded_crop_class'] = test_unseen_data['crop_class'].apply(
        lambda x: label_encoder.transform([x])[0] if x in label_encoder.classes_ else -1
    )
    print("\nAfter handling unseen classes:")
    print(test_unseen_data[['crop_class', 'encoded_crop_class']].head())


In [None]:
# Extract test unseen labels as a NumPy array
test_unseen_labels_eval = test_unseen_data['encoded_crop_class'].values

# Evaluate on Test Unseen Data
test_unseen_top1_acc, test_unseen_top5_acc = evaluate_model_on_dataset(
    features=test_unseen_features,
    labels=test_unseen_labels_eval,
    prototypes=train_prototypes,
    top_k=5
)

print(f"Test Unseen Data - Top-1 Accuracy: {test_unseen_top1_acc * 100:.2f}%")
print(f"Test Unseen Data - Top-5 Accuracy: {test_unseen_top5_acc * 100:.2f}%")


In [None]:
# Encode the 'crop_class' labels in doc_unseen_data using the same LabelEncoder
try:
    doc_unseen_data['encoded_crop_class'] = label_encoder.transform(doc_unseen_data['crop_class'])
    print("\nEncoded PlantDoc Unseen Labels:")
    print(doc_unseen_data[['crop_class', 'encoded_crop_class']].head())
    
    # Display unique encoded classes in doc_unseen_data
    print("\nUnique Encoded Classes in PlantDoc Unseen Data:", doc_unseen_data['encoded_crop_class'].unique())
except ValueError as e:
    print(f"Error encoding doc_unseen_data: {e}")
    
    # Identify classes not in label_encoder
    unseen_classes = set(doc_unseen_data['crop_class']) - set(label_encoder.classes_)
    print(f"Unseen classes in doc_unseen_data: {unseen_classes}")
    
    # Optionally, handle unseen classes similarly
    doc_unseen_data['encoded_crop_class'] = doc_unseen_data['crop_class'].apply(
        lambda x: label_encoder.transform([x])[0] if x in label_encoder.classes_ else -1
    )
    print("\nAfter handling unseen classes:")
    print(doc_unseen_data[['crop_class', 'encoded_crop_class']].head())


In [None]:
# Extract PlantDoc unseen labels as a NumPy array
doc_unseen_labels_eval = doc_unseen_data['encoded_crop_class'].values

# Evaluate on PlantDoc Unseen Test Data
doc_unseen_top1_acc, doc_unseen_top5_acc = evaluate_model_on_dataset(
    features=doc_unseen_features,
    labels=doc_unseen_labels_eval,
    prototypes=train_prototypes,
    top_k=5
)

print(f"PlantDoc Unseen Test Data - Top-1 Accuracy: {doc_unseen_top1_acc * 100:.2f}%")
print(f"PlantDoc Unseen Test Data - Top-5 Accuracy: {doc_unseen_top5_acc * 100:.2f}%")


DISEASE

In [None]:
from sklearn.preprocessing import LabelEncoder

# Initialize the LabelEncoder for disease_class
disease_label_encoder = LabelEncoder()

# Encode the 'disease_class' labels for training data
train_data['encoded_disease_class'] = disease_label_encoder.fit_transform(train_data['disease_class'])

# Verify the encoding
print("\nEncoded Disease Labels (Training Data):")
print(train_data[['disease_class', 'encoded_disease_class']].head())

print("\nUnique Encoded Disease Classes:", train_data['encoded_disease_class'].unique())


In [None]:
import numpy as np

# number of disease training and validation paris
num_train_disease_pairs = 50000 
num_val_disease_pairs = 10000    

# Extract disease labels as a NumPy array
train_disease_labels_array = train_data['encoded_disease_class'].values
val_disease_labels_array = test_seen_data['encoded_disease_class'].values

# Create disease training pairs
train_disease_pair_features, train_disease_pair_labels = create_pairs(
    features=train_features,
    labels=train_disease_labels_array,
    num_pairs=num_train_disease_pairs
)

print(f"Total Disease Training Pairs Created: {len(train_disease_pair_features)}")
print(f"Total Disease Training Labels Created: {len(train_disease_pair_labels)}")

# Create disease validation pairs
val_disease_pair_features, val_disease_pair_labels = create_pairs(
    features=test_seen_features,
    labels=val_disease_labels_array,
    num_pairs=num_val_disease_pairs
)

print(f"Total Disease Validation Pairs Created: {len(val_disease_pair_features)}")
print(f"Total Disease Validation Labels Created: {len(val_disease_pair_labels)}")


In [None]:
# Separate the disease training pairs into two arrays
train_disease_pair_features_1 = np.array([pair[0] for pair in train_disease_pair_features])
train_disease_pair_features_2 = np.array([pair[1] for pair in train_disease_pair_features])
train_disease_pair_labels = np.array(train_disease_pair_labels)

# Display the shapes to verify
print(f"Disease Training Pair Features 1 Shape: {train_disease_pair_features_1.shape}")
print(f"Disease Training Pair Features 2 Shape: {train_disease_pair_features_2.shape}")
print(f"Disease Training Pair Labels Shape: {train_disease_pair_labels.shape}")

# Separate the disease validation pairs into two arrays
val_disease_pair_features_1 = np.array([pair[0] for pair in val_disease_pair_features])
val_disease_pair_features_2 = np.array([pair[1] for pair in val_disease_pair_features])
val_disease_pair_labels = np.array(val_disease_pair_labels)

# Display the shapes to verify
print(f"Disease Validation Pair Features 1 Shape: {val_disease_pair_features_1.shape}")
print(f"Disease Validation Pair Features 2 Shape: {val_disease_pair_features_2.shape}")
print(f"Disease Validation Pair Labels Shape: {val_disease_pair_labels.shape}")


In [None]:
from tensorflow.keras.layers import Input, Lambda, Dense
from tensorflow.keras.models import Model
import tensorflow.keras.backend as K
from tensorflow.keras.optimizers import Adam

def euclidean_distance(vects):
    """
    Compute the Euclidean distance between two vectors.

    Args:
        vects (list): List containing two tensors.

    Returns:
        tensor: Euclidean distance.
    """
    x, y = vects
    sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))

def create_siamese_network_shared_base(base_network):
    """
    Creates a Siamese Network using the provided base network.

    Args:
        base_network (keras.Model): Base network to process input features.

    Returns:
        keras.Model: Siamese Network model.
    """
    input_a = Input(shape=(base_network.input_shape[1],), name='Disease_Input_A')
    input_b = Input(shape=(base_network.input_shape[1],), name='Disease_Input_B')
    
    # Process both inputs through the base network
    processed_a = base_network(input_a)
    processed_b = base_network(input_b)
    
    # Compute the Euclidean distance between the two processed inputs
    distance = Lambda(euclidean_distance, name='Disease_Euclidean_Distance')([processed_a, processed_b])
    
    # Output layer with sigmoid activation for binary classification
    output = Dense(1, activation='sigmoid', name='Disease_Similarity')(distance)
    
    # Define the Disease Siamese Network model
    siamese_network_disease = Model(inputs=[input_a, input_b], outputs=output, name='Siamese_Network_Disease')
    
    return siamese_network_disease

# Create the Disease Siamese Network
siamese_network_disease = create_siamese_network_shared_base(base_network)

# Display the Disease Siamese Network summary
siamese_network_disease.summary()


# Compile the Disease Siamese Network
siamese_network_disease.compile(
    loss='binary_crossentropy',
    optimizer=Adam(learning_rate=0.001),
    metrics=['accuracy']
)

print("Disease Siamese Network compiled successfully.")


In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
import os

# Define directory to save disease model
disease_model_dir = os.path.join(features_dir, 'disease_model')
os.makedirs(disease_model_dir, exist_ok=True)

# Define the path to save the best disease model
checkpoint_path_disease = os.path.join(disease_model_dir, 'siamese_network_disease_best.h5')

# EarlyStopping callback to stop training when validation loss doesn't improve
early_stopping_disease = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True,
    verbose=1
)

# ReduceLROnPlateau callback to reduce learning rate when validation loss plateaus
reduce_lr_disease = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=3,
    min_lr=1e-6,
    verbose=1
)

# ModelCheckpoint callback to save the best model based on validation loss
model_checkpoint_disease = ModelCheckpoint(
    filepath=checkpoint_path_disease,
    monitor='val_loss',
    save_best_only=True,
    verbose=1
)

# Combine all callbacks into a list
callbacks_disease = [early_stopping_disease, reduce_lr_disease, model_checkpoint_disease]

print("Disease Callbacks defined successfully.")


In [None]:

epochs_disease = 50
batch_size_disease = 128  #

# Train the Disease Siamese Network
history_disease = siamese_network_disease.fit(
    [train_disease_pair_features_1, train_disease_pair_features_2],
    train_disease_pair_labels,
    validation_data=([val_disease_pair_features_1, val_disease_pair_features_2], val_disease_pair_labels),
    epochs=epochs_disease,
    batch_size=batch_size_disease,
    callbacks=callbacks_disease,
    verbose=1  
)

print("Disease Training completed successfully.")


In [None]:
# Define the path to save the final disease model
final_disease_model_path = os.path.join(disease_model_dir, 'disease_final_model.h5')

# Save the final Disease Siamese Network model
siamese_network_disease.save(final_disease_model_path)

print(f"Disease Siamese Network model saved at: {final_disease_model_path}")


In [None]:
def create_class_prototypes(features, labels, num_classes):
    """
    Creates prototype feature vectors for each class.

    Args:
        features (np.array): Array of feature vectors.
        labels (np.array): Array of encoded class labels.
        num_classes (int): Total number of classes.

    Returns:
        prototypes (dict): Dictionary mapping class labels to prototype vectors.
    """
    prototypes = {}
    for cls in range(num_classes):
        class_features = features[labels == cls]
        if len(class_features) == 0:
            print(f"Warning: No samples for class {cls}")
            continue
        prototype = np.mean(class_features, axis=0)
        prototypes[cls] = prototype
    return prototypes

# Number of disease classes
num_disease_classes = len(disease_label_encoder.classes_)

# Create prototypes for disease classes using training data
disease_train_prototypes = create_class_prototypes(
    features=train_features,
    labels=train_disease_labels_array,
    num_classes=num_disease_classes
)

# Display a prototype for the first disease class
first_disease_class = disease_label_encoder.inverse_transform([0])[0]
print(f"Prototype for disease class '{first_disease_class}': {disease_train_prototypes[0][:5]}...")


In [None]:
from scipy.spatial.distance import cosine

def classify_image(feature, prototypes, label_encoder, top_k=5):
    """
    Classifies an image feature by finding the top_k closest class prototypes based on cosine similarity.

    Args:
        feature (np.array): Feature vector of the image.
        prototypes (dict): Dictionary of class prototypes.
        label_encoder (LabelEncoder): LabelEncoder to inverse transform class labels.
        top_k (int): Number of top similar classes to consider.

    Returns:
        top_classes (list): List of top_k predicted class labels.
        similarity_scores_sorted (list): List of corresponding similarity scores.
    """
    similarity_scores = {}
    for cls, prototype in prototypes.items():
        similarity = 1 - cosine(feature, prototype)  # Cosine similarity
        similarity_scores[cls] = similarity

    # Sort classes based on similarity scores in descending order
    sorted_classes = sorted(similarity_scores.items(), key=lambda item: item[1], reverse=True)
    
    # Extract top_k classes and their scores
    top_classes = [label_encoder.inverse_transform([cls])[0] for cls, score in sorted_classes[:top_k]]
    similarity_scores_sorted = [score for cls, score in sorted_classes[:top_k]]
    
    return top_classes, similarity_scores_sorted

def evaluate_model_on_dataset(features, labels, prototypes, label_encoder, top_k=5):
    """
    Evaluates the model's performance on a given dataset.

    Args:
        features (np.array): Array of feature vectors.
        labels (np.array): Array of true encoded class labels.
        prototypes (dict): Dictionary of class prototypes.
        label_encoder (LabelEncoder): LabelEncoder to inverse transform class labels.
        top_k (int): Number of top similar classes to consider.

    Returns:
        top1_accuracy (float): Top-1 accuracy.
        top5_accuracy (float): Top-5 accuracy.
    """
    correct_top1 = 0
    correct_top5 = 0
    total = len(labels)
    
    for i in range(total):
        feature = features[i]
        true_label = label_encoder.inverse_transform([labels[i]])[0]
        predicted_topk, _ = classify_image(feature, prototypes, label_encoder, top_k=top_k)
        
        if true_label == predicted_topk[0]:
            correct_top1 += 1
            correct_top5 += 1
        elif true_label in predicted_topk:
            correct_top5 += 1
    
    top1_accuracy = correct_top1 / total
    top5_accuracy = correct_top5 / total
    
    return top1_accuracy, top5_accuracy

print("Classification functions defined successfully.")


In [None]:
# Extract disease labels as a NumPy array
disease_train_labels_eval = train_disease_labels_array
disease_val_labels_eval = val_disease_labels_array

# Evaluate on Disease Training Data
train_disease_top1_acc, train_disease_top5_acc = evaluate_model_on_dataset(
    features=train_features,
    labels=disease_train_labels_eval,
    prototypes=disease_train_prototypes,
    label_encoder=disease_label_encoder,
    top_k=5
)

print(f"Disease Training Data - Top-1 Accuracy: {train_disease_top1_acc * 100:.2f}%")
print(f"Disease Training Data - Top-5 Accuracy: {train_disease_top5_acc * 100:.2f}%")

# Evaluate on Disease Validation Data
val_disease_top1_acc, val_disease_top5_acc = evaluate_model_on_dataset(
    features=test_seen_features,
    labels=disease_val_labels_eval,
    prototypes=disease_train_prototypes,
    label_encoder=disease_label_encoder,
    top_k=5
)

print(f"Disease Validation Data - Top-1 Accuracy: {val_disease_top1_acc * 100:.2f}%")
print(f"Disease Validation Data - Top-5 Accuracy: {val_disease_top5_acc * 100:.2f}%")

# Encode the 'disease_class' labels in test_unseen_data using the same LabelEncoder
try:
    test_unseen_data['encoded_disease_class'] = disease_label_encoder.transform(test_unseen_data['disease_class'])
    print("\nEncoded Test Unseen Disease Labels:")
    print(test_unseen_data[['disease_class', 'encoded_disease_class']].head())
    
    # Display unique encoded classes in test_unseen_data
    print("\nUnique Encoded Classes in Test Unseen Data:", test_unseen_data['encoded_disease_class'].unique())
except ValueError as e:
    print(f"Error encoding test_unseen_data: {e}")
    
    # Identify classes not in label_encoder
    unseen_classes = set(test_unseen_data['disease_class']) - set(disease_label_encoder.classes_)
    print(f"Unseen classes in test_unseen_data: {unseen_classes}")
    
    test_unseen_data['encoded_disease_class'] = test_unseen_data['disease_class'].apply(
        lambda x: disease_label_encoder.transform([x])[0] if x in disease_label_encoder.classes_ else -1
    )
    print("\nAfter handling unseen classes:")
    print(test_unseen_data[['disease_class', 'encoded_disease_class']].head())

# Extract test unseen labels as a NumPy array
test_unseen_labels_eval = test_unseen_data['encoded_disease_class'].values

# Evaluate on Test Unseen Data
test_unseen_top1_acc, test_unseen_top5_acc = evaluate_model_on_dataset(
    features=test_unseen_features,
    labels=test_unseen_labels_eval,
    prototypes=disease_train_prototypes,
    label_encoder=disease_label_encoder,
    top_k=5
)

print(f"Test Unseen Data - Top-1 Accuracy: {test_unseen_top1_acc * 100:.2f}%")
print(f"Test Unseen Data - Top-5 Accuracy: {test_unseen_top5_acc * 100:.2f}%")

# Evaluate on PlantDoc Unseen Test Data
doc_unseen_labels_eval = doc_unseen_data['encoded_disease_class'].values

doc_unseen_top1_acc, doc_unseen_top5_acc = evaluate_model_on_dataset(
    features=doc_unseen_features,
    labels=doc_unseen_labels_eval,
    prototypes=disease_train_prototypes,
    label_encoder=disease_label_encoder,
    top_k=5
)

print(f"PlantDoc Unseen Test Data - Top-1 Accuracy: {doc_unseen_top1_acc * 100:.2f}%")
print(f"PlantDoc Unseen Test Data - Top-5 Accuracy: {doc_unseen_top5_acc * 100:.2f}%")


In [None]:

best_disease_model_path = os.path.join(disease_model_dir, 'disease_best_model.h5')

# Save the best Disease Siamese Network model
siamese_network_disease.save(best_disease_model_path)

print(f"Disease Siamese Network model saved explicitly at: {best_disease_model_path}")


In [None]:
# Generate and plot the confusion matrix for validation data
evaluate_confusion_matrix(
    true_labels=val_disease_true_labels_decoded,
    predicted_labels=val_disease_predicted_labels_decoded,
    label_encoder=disease_label_encoder,
    title='Confusion Matrix - Disease Classification (Seen)'
)
# Generate and plot the confusion matrix for test unseen data
evaluate_confusion_matrix(
    true_labels=test_unseen_true_labels_decoded,
    predicted_labels=test_unseen_predicted_labels_decoded,
    label_encoder=disease_label_encoder,
    title='Confusion Matrix - Disease Classification (Test Unseen Data)'
)
# Generate and plot the confusion matrix for PlantDoc unseen test data
evaluate_confusion_matrix(
    true_labels=doc_unseen_true_labels_decoded,
    predicted_labels=doc_unseen_predicted_labels_decoded,
    label_encoder=disease_label_encoder,
    title='Confusion Matrix - Disease Classification (PlantDoc Unseen Test Data)'
)
