In [None]:
import zipfile
import os
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
from tensorflow.keras.applications import EfficientNetB3
from tensorflow.keras import layers, models, Input, GlobalAveragePooling2D, Dense, Dropout
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Model

# Check TensorFlow version
print("TensorFlow version:", tf.__version__)

In [None]:
tf.keras.mixed_precision.set_global_policy('float32')

In [None]:
#Configure MirroredStrategy for multi-GPU training
strategy = tf.distribute.MirroredStrategy()

In [None]:
!pip install gdown
import gdown

In [None]:
# Download PlantDoc.zip
gdown.download("https://drive.google.com/uc?id=1GKs5BTRjrjBuVv7UqHow3XaeYZlmipHg", "plantdoc.zip", quiet=False)

# Download PlantVillage.zip
gdown.download("https://drive.google.com/uc?id=1olr9AIq3XK1x3S79PcFcJSd4mG6f8ZLa", "plantvillage.zip", quiet=False)

In [None]:
# Define extraction paths
plant_doc_extract_path = '/kaggle/working/plantdoc/plantdoc'
plant_village_extract_path = '/kaggle/working/plantvillage/plantvillage'

# Extract PlantDoc dataset
with zipfile.ZipFile("plantdoc.zip", 'r') as zip_ref:
    zip_ref.extractall(plant_doc_extract_path)

# Extract PlantVillage dataset
with zipfile.ZipFile("plantvillage.zip", 'r') as zip_ref:
    zip_ref.extractall(plant_village_extract_path)

# Display extracted folders to verify
print("Extracted PlantDoc dataset files:", os.listdir(plant_doc_extract_path))
print("Extracted PlantVillage dataset files:", os.listdir(plant_village_extract_path))

In [None]:
# Define paths to the CSV files
train_csv_path = '/kaggle/input/plant-dataset/PV_train.csv'       # Change to your actual path on Kaggle
test_seen_csv_path = '/kaggle/input/plant-dataset/PV_test_seen.csv'
test_unseen_csv_path = '/kaggle/input/plant-dataset/PV_test_unseen.csv'
plantdoc_unseen_csv_path = '/kaggle/input/plant-dataset/PD_test_unseen.csv'

In [None]:
# Load CSV files into pandas DataFrames
train_df = pd.read_csv(train_csv_path, header=None)
test_seen_df = pd.read_csv(test_seen_csv_path, header=None)
test_unseen_df = pd.read_csv(test_unseen_csv_path, header=None)
plantdoc_unseen_df = pd.read_csv(plantdoc_unseen_csv_path, header=None)

In [None]:
# Display the first few rows of each DataFrame to verify
print("Training Data Sample:")
print(train_df.head())

print("Seen Testing Data Sample:")
print(test_seen_df.head())

print("Unseen Testing Data Sample:")
print(test_unseen_df.head())

print("PlantDoc Unseen Testing Data Sample:")
print(plantdoc_unseen_df.head())

In [None]:
# Assign column names manually
train_df.columns = ['image', 'crop', 'disease']
test_seen_df.columns = ['image', 'crop', 'disease']
test_unseen_df.columns = ['image', 'crop', 'disease']
plantdoc_unseen_df.columns = ['image', 'crop', 'disease']

# Convert the 'crop' column to string type
train_df['crop'] = train_df['crop'].astype(str)
test_seen_df['crop'] = test_seen_df['crop'].astype(str)
test_unseen_df['crop'] = test_unseen_df['crop'].astype(str)
plantdoc_unseen_df['crop'] = plantdoc_unseen_df['crop'].astype(str)

In [None]:
# Get unique crops in each dataset
train_crops = train_df['crop'].unique()
test_seen_crops = test_seen_df['crop'].unique()
test_unseen_crops = test_unseen_df['crop'].unique()
plantdoc_unseen_crops = plantdoc_unseen_df['crop'].unique()

# Print the number of unique crops in each dataset
print(f"Number of unique crops in training dataset: {len(train_crops)}")
print(f"Number of unique crops in test seen dataset: {len(test_seen_crops)}")
print(f"Number of unique crops in test unseen dataset: {len(test_unseen_crops)}")
print(f"Number of unique crops in plantdoc unseen dataset: {len(plantdoc_unseen_crops)}")

# Print unique crops in each dataset for a comparison
print(f"Unique crops in training dataset: {train_crops}")
print(f"Unique crops in test seen dataset: {test_seen_crops}")
print(f"Unique crops in test unseen dataset: {test_unseen_crops}")
print(f"Unique crops in plantdoc unseen dataset: {plantdoc_unseen_crops}")

# Check if the crops in all datasets are the same
print("Are crops in test seen the same as training crops?", set(train_crops) == set(test_seen_crops))
print("Are crops in test unseen the same as training crops?", set(train_crops) == set(test_unseen_crops))


In [None]:
# Generate label names from training data
label_names = [str(crop) for crop in train_crops]


In [None]:
# One-hot encode training and test seen labels based on training crops
train_labels_df = pd.get_dummies(train_df['crop']).reindex(columns=label_names, fill_value=False).values
test_seen_labels_df = pd.get_dummies(test_seen_df['crop']).reindex(columns=label_names, fill_value=False).values

# One-hot encode test unseen labels based on training crops
# This step ensures that unseen labels have the same structure as training labels
test_unseen_labels_df = pd.get_dummies(test_unseen_df['crop']).reindex(columns=label_names, fill_value=False)
test_unseen_labels = test_unseen_labels_df.values

# One-hot encode PlantDoc unseen labels based on training crops
plantdoc_unseen_labels_df = pd.get_dummies(plantdoc_unseen_df['crop']).reindex(columns=label_names, fill_value=False)
plantdoc_unseen_labels = plantdoc_unseen_labels_df.values

# Verify the shape of each dataset to ensure consistency
print("Train Labels Shape:", train_labels.shape)
print("Test Seen Labels Shape:", test_seen_labels.shape)
print("Test Unseen Labels Shape:", test_unseen_labels.shape)
print("PlantDoc Unseen Labels Shape:", plantdoc_unseen_labels.shape)

In [None]:
# Check the first few rows of each DataFrame to confirm correctness
print("Train Labels Head:")
print(pd.DataFrame(train_labels_df).head())  

print("\nTest Seen Labels Head:")
print(pd.DataFrame(test_seen_labels_df).head()) 

print("\nTest Unseen Labels Head:")
print(test_unseen_labels_df.head())

print("\nPlantdoc Unseen Labels Head:")
print(plantdoc_unseen_labels_df.head())

In [None]:
# Print column names of the training DataFrame
print("Columns in training DataFrame:", train_df.columns)

# Print column names of the seen testing DataFrame
print("Columns in seen testing DataFrame:", test_seen_df.columns)

# Print column names of the unseen testing DataFrame
print("Columns in unseen testing DataFrame:", test_unseen_df.columns)

# Print column names of the unseen testing DataFrame
print("Columns in plantdoc unseen testing DataFrame:", plantdoc_unseen_df.columns)


In [None]:
# Function to update image paths with the full directory prefix
def update_image_paths(df, base_path):
    df['image'] = df['image'].apply(lambda x: os.path.join(base_path, x))
    return df

In [None]:
# Adjust the path for nested directories
correct_base_path_plantvillage = '/kaggle/working/plantvillage/plantvillage/plantvillage'
# Adjust the path for nested directories
correct_base_path_plantdoc = '/kaggle/working/plantdoc/plantdoc/plantdoc'

In [None]:
# Check if the path exists
if os.path.exists(correct_base_path_plantvillage):
    print("Path exists. Here are some files or folders in the directory:\n")
    # List the first few files or folders in the directory
    for i, item in enumerate(os.listdir(correct_base_path_plantvillage)):
        print(item)
        if i >= 9:  # Limit the output to the first 10 items
            break
else:
    print("The path does not exist. Please check the path and try again.")

In [None]:
# Check if the path exists
if os.path.exists(correct_base_path_plantdoc):
    print("Path exists. Here are some files or folders in the directory:\n")
    # List the first few files or folders in the directory
    for i, item in enumerate(os.listdir(correct_base_path_plantdoc)):
        print(item)
        if i >= 9:  # Limit the output to the first 10 items
            break
else:
    print("The path does not exist. Please check the path and try again.")

In [None]:
# Update paths in the DataFrames
train_df = update_image_paths(train_df, correct_base_path_plantvillage)
test_seen_df = update_image_paths(test_seen_df, correct_base_path_plantvillage)
test_unseen_df = update_image_paths(test_unseen_df, correct_base_path_plantvillage)
plantdoc_unseen_df = update_image_paths(plantdoc_unseen_df, correct_base_path_plantdoc)

In [None]:
# Check for missing files
missing_train_files = train_df[~train_df['image'].apply(os.path.exists)]
missing_test_seen_files = test_seen_df[~test_seen_df['image'].apply(os.path.exists)]
missing_test_unseen_files = test_unseen_df[~test_unseen_df['image'].apply(os.path.exists)]
missing_test_field_files = plantdoc_unseen_df[~plantdoc_unseen_df['image'].apply(os.path.exists)]

print(f"Number of missing files in training set: {len(missing_train_files)}")
print(f"Number of missing files in test seen set: {len(missing_test_seen_files)}")
print(f"Number of missing files in test unseen set: {len(missing_test_unseen_files)}")
print(f"Number of missing files in test field set: {len(missing_test_field_files)}")


In [None]:
# Split the train dataframe into training and validation sets
train_data, val_data = train_test_split(train_df, test_size=0.2, stratify=train_df['crop'], random_state=42)

In [None]:
# Create image paths and labels for training and validation sets
train_image_paths = train_data['image'].values
val_image_paths = val_data['image'].values
train_labels = pd.get_dummies(train_data['crop']).values
val_labels = pd.get_dummies(val_data['crop']).values


In [None]:
# Prepare image paths and labels for each test set
test_seen_image_paths = test_seen_df['image'].values
test_unseen_image_paths = test_unseen_df['image'].values
plantdoc_unseen_image_paths = plantdoc_unseen_df['image'].values

# Check the shape of image paths and labels for all test datasets
print("Test Seen - Image Paths Shape:", test_seen_image_paths.shape)
print("Test Unseen - Image Paths Shape:", test_unseen_image_paths.shape)
print("Test Field - Image Paths Shape:", plantdoc_unseen_image_paths.shape)

print("Test Seen Labels Shape:", test_seen_labels.shape)
print("Test Unseen Labels Shape:", test_unseen_labels.shape)
print("Test Field Labels Shape:", plantdoc_unseen_labels.shape)


In [None]:
# Define a function to process the images and labels
def process_data(image_path, label, is_training=True):
    # Load and preprocess the image
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [300, 300])
    image = tf.image.per_image_standardization(image)
    image = tf.cast(image, tf.float32)
    
    return image, label

In [None]:
# Define a function to apply data augmentation
def augmentation(image):
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_brightness(image, max_delta=0.2)
    image = tf.image.random_contrast(image, lower=0.8, upper=1.2)
    return image

In [None]:
# Create training and validation datasets
def create_dataset(image_paths, labels, batch_size, is_training=True):
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels))
    
    if is_training:
        dataset = dataset.shuffle(buffer_size=1000)
    
    dataset = dataset.map(
        lambda x, y: (augmentation(process_data(x, y)[0]), y) if is_training else process_data(x, y), 
        num_parallel_calls=tf.data.AUTOTUNE
    )
    
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

In [None]:
# Create the datasets
train_dataset = create_dataset(train_image_paths, train_labels, batch_size=32, is_training=True)
val_dataset = create_dataset(val_image_paths, val_labels, batch_size=32, is_training=False)

In [None]:
def create_test_dataset(image_paths, labels, batch_size=32):
    # Create a dataset without augmentation (testing phase)
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels))
    dataset = dataset.map(process_data, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

In [None]:
# Create datasets for seen and unseen test images
test_seen_dataset = create_test_dataset(test_seen_image_paths, test_seen_labels, batch_size=32)
test_unseen_dataset = create_test_dataset(test_unseen_image_paths, test_unseen_labels, batch_size=32)
plantdoc_unseen_dataset = create_test_dataset(plantdoc_unseen_image_paths, plantdoc_unseen_labels, batch_size=32)


In [None]:
# Check dataset shapes
for image_batch, label_batch in train_dataset.take(1):
    print("Training Batch - Image Shape:", image_batch.shape)
    print("Training Batch - Label Shape:", label_batch.shape)

for image_batch, label_batch in val_dataset.take(1):
    print("Validation Batch - Image Shape:", image_batch.shape)
    print("Validation Batch - Label Shape:", label_batch.shape)


In [None]:
# Define model within strategy scope
with strategy.scope():
    
    # Define the input shape explicitly
    input_shape = (300, 300, 3)

    # Create an Input layer
    inputs = tf.keras.layers.Input(shape=input_shape, dtype=tf.float32)

    # Load the base EfficientNetB3 model with the specified input shape
    base_model = EfficientNetB3(
        include_top=False,
        weights='imagenet',
        input_tensor=inputs  # Connect the input layer to EfficientNetB3
    )

    # Freeze the base model layers
    base_model.trainable = False

    # Add custom layers on top of EfficientNetB3
    x = base_model.output
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(len(train_labels[0]), activation='softmax')(x)

    # Create the complete model by specifying the inputs and outputs
    model = tf.keras.models.Model(inputs, outputs)

    # Compile the model
    model.compile(optimizer='adam', 
              loss='categorical_crossentropy', 
              metrics=['accuracy', tf.keras.metrics.TopKCategoricalAccuracy(k=5, name='top_5_accuracy')])

# Display the model summary
model.summary()

In [None]:
# Train the model using the training generator
history = model.fit(
    train_dataset,
    epochs=20,
    validation_data=val_dataset
)

In [None]:
# Save the model weights after training
model.save_weights('/kaggle/working/efficientnet_b3_baseline.weights.h5')

In [None]:
# Define an SGD optimizer with momentum
sgd_optimizer = tf.keras.optimizers.SGD(learning_rate=1e-4, momentum=0.9)

In [None]:
# Define an RMSProp optimizer
rmsprop_optimizer = tf.keras.optimizers.RMSprop(learning_rate=1e-4)


In [None]:
strategy = tf.distribute.MirroredStrategy()


In [None]:
with strategy.scope():
    # Recreate the model structure, ensuring the input shape and layers match the original
    input_shape = (300, 300, 3)
    inputs = tf.keras.layers.Input(shape=input_shape, dtype=tf.float32)

    # Load the base EfficientNetB3 model
    base_model = EfficientNetB3(
        include_top=False,
        weights='imagenet',
        input_tensor=inputs
    )

    # Add custom layers on top of EfficientNetB3
    x = layers.GlobalAveragePooling2D()(base_model.output)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(len(train_labels[0]), activation='softmax')(x)

    # Create the complete model by specifying the inputs and outputs
    model = tf.keras.models.Model(inputs, outputs)

    # Load the previously saved weights (matching the current strategy)
    model.load_weights('/kaggle/working/efficientnet_b3_baseline.weights.h5')

    # Unfreeze some layers for fine-tuning
    for layer in base_model.layers[-30:]:
        layer.trainable = True
    
    # Compile the fine-tuned model with SGD optimizer
    model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=1e-4, momentum=0.9),
                  loss='categorical_crossentropy',
                  metrics=['accuracy', tf.keras.metrics.TopKCategoricalAccuracy(k=5, name='top_5_accuracy')])


In [None]:
# Train the fine-tuned model with SGD
fine_tuning_history_sgd = model.fit(
    train_dataset,
    epochs=20,  # Fewer epochs for fine-tuning
    validation_data=val_dataset,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
    ]
)

In [None]:
model.save('/kaggle/working/fine_tuned_model.h5')

In [None]:
import pickle

# Save the initial training history
with open('/kaggle/working/initial_training_history.pkl', 'wb') as file:
    pickle.dump(history.history, file)

# Save the fine-tuning history
with open('/kaggle/working/fine_tuning_history.pkl', 'wb') as file:
    pickle.dump(fine_tuning_history_sgd.history, file

In [None]:
import matplotlib.pyplot as plt

# Function to plot accuracy and loss curves
def plot_accuracy_loss_curves(history):
    # Plot training & validation accuracy values
    plt.figure(figsize=(12, 5))
    
    # Plot accuracy
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend(loc='lower right')

    # Plot loss
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(loc='upper right')

    plt.tight_layout()
    plt.show()


In [None]:
# Call the plotting function with the history object
plot_accuracy_loss_curves(history)
plot_accuracy_loss_curves(fine_tuning_history_sgd)

In [None]:
def evaluate_dataset(model, dataset, label_names):
    """
    Evaluate the model on the seen dataset, calculating Top-1, Top-5 accuracy, and generating a confusion matrix.
    """
    true_labels = []
    predictions = []
    
    for image_batch, label_batch in dataset:
        # Predict on the batch
        batch_predictions = model.predict(image_batch)
        
        # Store the true labels and predictions
        true_labels.extend(np.argmax(label_batch, axis=1))  # Convert one-hot to class indices
        predictions.extend(batch_predictions)
    
    # Convert predictions to numpy array
    predictions = np.array(predictions)
    
    # Calculate Top-1 and Top-5 accuracy
    top1_predictions = np.argmax(predictions, axis=1)
    top5_predictions = np.argsort(predictions, axis=1)[:, -5:]  # Get indices of top 5 predictions

    true_labels = np.array(true_labels)
    top1_accuracy = np.mean(top1_predictions == true_labels)
    top5_correct = np.any(top5_predictions == true_labels.reshape(-1, 1), axis=1)
    top5_accuracy = np.mean(top5_correct)

    # Generate confusion matrix and classification report
    cm = confusion_matrix(true_labels, top1_predictions)
    report = classification_report(true_labels, top1_predictions, target_names=label_names)
    
    return top1_accuracy, top5_accuracy, cm, report

In [None]:
# Evaluate on seen dataset
seen_top1_acc, seen_top5_acc, seen_cm, seen_report = evaluate_dataset(
    model, test_seen_dataset, label_names
)
print(f"Seen Dataset - Top-1 Accuracy: {seen_top1_acc * 100:.2f}%")
print(f"Seen Dataset - Top-5 Accuracy: {seen_top5_acc * 100:.2f}%")
print("Confusion Matrix (Seen):\n", seen_cm)
print("Classification Report (Seen):\n", seen_report)

In [None]:
# Calculate Top-1 accuracy on unseen dataset
unseen_top1_acc, unseen_top5_acc, _, _ = evaluate_dataset(
    model, test_unseen_dataset, label_names
)