# EfficientNet-B0 Classification Model Training
## Cattle Breed Recognition System - Stage 2: Breed Classification

This notebook trains an EfficientNet-B0 model for classifying 35 Indian cattle and buffalo breeds.

**Model Specifications:**
- Model: EfficientNet-B0 (pretrained on ImageNet)
- Input Size: 224x224
- Output: 35 breed classes + confidence score
- Final Size: ~8 MB (after INT8 quantization)
- Target Accuracy: 85-90%

**35 Indian Breeds:**
- Cattle (23): Gir, Sahiwal, Red Sindhi, Tharparkar, Rathi, Hariana, Kankrej, Ongole, Deoni, Hallikar, Amritmahal, Khillari, Kangayam, Bargur, Dangi, Krishna Valley, Malnad Gidda, Punganur, Vechur, Pulikulam, Umblachery, Toda, Kalahandi
- Buffalo (12): Murrah, Jaffrabadi, Nili-Ravi, Banni, Pandharpuri, Mehsana, Surti, Nagpuri, Bhadawari, Chilika, Jersey Cross, HF Cross

**Author:** SIH 2025 Team
**Problem Statement:** SIH25004 - Image-based Breed Recognition for Cattle and Buffaloes of India

## 1. Setup Environment

In [None]:
# Check GPU availability
!nvidia-smi

# Install dependencies
!pip install tensorflow==2.15.0 -q
!pip install tensorflow-hub -q
!pip install matplotlib seaborn -q
!pip install scikit-learn -q

In [None]:
import os
import json
import shutil
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from google.colab import drive

# TensorFlow imports
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import (
    ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, 
    TensorBoard, CSVLogger
)
from tensorflow.keras.utils import to_categorical

# Sklearn for metrics
from sklearn.metrics import classification_report, confusion_matrix

# Mount Google Drive
drive.mount('/content/drive')

# Set paths
BASE_PATH = '/content/drive/MyDrive/cattle_breed_recognition'
DATA_PATH = f'{BASE_PATH}/data'
MODELS_PATH = f'{BASE_PATH}/models'
LOGS_PATH = f'{BASE_PATH}/logs'

for path in [DATA_PATH, MODELS_PATH, LOGS_PATH]:
    os.makedirs(path, exist_ok=True)

print(f"TensorFlow version: {tf.__version__}")
print(f"GPU available: {tf.config.list_physical_devices('GPU')}")
print(f"Base Path: {BASE_PATH}")

## 2. Define Breed Classes

In [None]:
# Define 35 Indian cattle and buffalo breeds
BREEDS = [
    # Cattle Breeds (23)
    'Gir', 'Sahiwal', 'Red_Sindhi', 'Tharparkar', 'Rathi',
    'Hariana', 'Kankrej', 'Ongole', 'Deoni',
    'Hallikar', 'Amritmahal', 'Khillari', 'Kangayam', 'Bargur',
    'Dangi', 'Krishna_Valley', 'Malnad_Gidda', 'Punganur', 'Vechur',
    'Pulikulam', 'Umblachery', 'Toda', 'Kalahandi',
    # Buffalo Breeds (12)
    'Murrah', 'Jaffrabadi', 'Nili_Ravi', 'Banni', 'Pandharpuri',
    'Mehsana', 'Surti', 'Nagpuri', 'Bhadawari', 'Chilika',
    # Cross Breeds (2)
    'Jersey_Cross', 'HF_Cross'
]

NUM_CLASSES = len(BREEDS)
print(f"Total breeds: {NUM_CLASSES}")
print(f"\nCattle breeds: {len([b for b in BREEDS if b not in ['Murrah', 'Jaffrabadi', 'Nili_Ravi', 'Banni', 'Pandharpuri', 'Mehsana', 'Surti', 'Nagpuri', 'Bhadawari', 'Chilika', 'Jersey_Cross', 'HF_Cross']])}")
print(f"Buffalo breeds: {len(['Murrah', 'Jaffrabadi', 'Nili_Ravi', 'Banni', 'Pandharpuri', 'Mehsana', 'Surti', 'Nagpuri', 'Bhadawari', 'Chilika'])}")
print(f"Cross breeds: {len(['Jersey_Cross', 'HF_Cross'])}")

## 3. Prepare Dataset

### Dataset Structure Expected:
```
data/
├── train/
│   ├── Gir/
│   ├── Sahiwal/
│   └── ... (35 breed folders)
├── val/
│   ├── Gir/
│   └── ...
└── test/
    ├── Gir/
    └── ...
```

In [None]:
# Configuration
IMG_SIZE = 224
BATCH_SIZE = 32
EPOCHS = 50
LEARNING_RATE = 0.001

# Data paths
TRAIN_DIR = f'{DATA_PATH}/train'
VAL_DIR = f'{DATA_PATH}/val'
TEST_DIR = f'{DATA_PATH}/test'

# Check if data exists
if os.path.exists(TRAIN_DIR):
    print("Dataset found!")
    for split in ['train', 'val', 'test']:
        split_path = f'{DATA_PATH}/{split}'
        if os.path.exists(split_path):
            total_images = sum([len(files) for r, d, files in os.walk(split_path)])
            print(f"  {split}: {total_images} images")
else:
    print("Dataset not found. Please upload data to Google Drive.")
    print(f"Expected path: {TRAIN_DIR}")

### Option: Download Sample Dataset from Kaggle

In [None]:
# Option: Download dataset from Kaggle
# First, upload your kaggle.json to Colab

# !pip install kaggle -q
# !mkdir -p ~/.kaggle
# !cp kaggle.json ~/.kaggle/
# !chmod 600 ~/.kaggle/kaggle.json

# Download cattle breed dataset
# !kaggle datasets download -d dataset-name -p {DATA_PATH}/raw
# !unzip {DATA_PATH}/raw/dataset-name.zip -d {DATA_PATH}/raw

## 4. Data Augmentation & Preprocessing

In [None]:
# Data augmentation for training
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=15,          # ±15 degrees
    width_shift_range=0.1,      # Horizontal shift
    height_shift_range=0.1,     # Vertical shift
    shear_range=0.1,            # Shear
    zoom_range=0.1,             # Zoom (0.9-1.1x)
    horizontal_flip=True,       # Random horizontal flip
    brightness_range=[0.8, 1.2], # Brightness variation
    fill_mode='nearest'
)

# Only rescaling for validation and test
val_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)

# Create data generators
train_generator = train_datagen.flow_from_directory(
    TRAIN_DIR,
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=True,
    seed=42
)

val_generator = val_datagen.flow_from_directory(
    VAL_DIR,
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=False
)

test_generator = test_datagen.flow_from_directory(
    TEST_DIR,
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=False
)

# Get class indices
class_indices = train_generator.class_indices
idx_to_class = {v: k for k, v in class_indices.items()}

print(f"\nClass indices saved!")
print(f"Number of classes: {len(class_indices)}")

In [None]:
# Save class indices for later use
with open(f'{MODELS_PATH}/class_indices.json', 'w') as f:
    json.dump(class_indices, f, indent=2)

print("Class indices:")
for breed, idx in sorted(class_indices.items(), key=lambda x: x[1]):
    print(f"  {idx}: {breed}")

In [None]:
# Visualize augmented samples
def plot_augmented_samples(generator, num_samples=5):
    """Plot augmented image samples."""
    images, labels = next(generator)
    
    fig, axes = plt.subplots(1, num_samples, figsize=(15, 3))
    for i in range(num_samples):
        axes[i].imshow(images[i])
        class_idx = np.argmax(labels[i])
        axes[i].set_title(idx_to_class[class_idx])
        axes[i].axis('off')
    plt.tight_layout()
    plt.show()

print("Sample augmented training images:")
plot_augmented_samples(train_generator)

## 5. Build EfficientNet-B0 Model

In [None]:
def build_efficientnet_b0(num_classes, img_size=224, fine_tune=False):
    """
    Build EfficientNet-B0 model for breed classification.
    
    Args:
        num_classes: Number of breed classes
        img_size: Input image size
        fine_tune: Whether to unfreeze some layers for fine-tuning
    
    Returns:
        Compiled Keras model
    """
    # Load EfficientNet-B0 with ImageNet weights
    base_model = EfficientNetB0(
        weights='imagenet',
        include_top=False,
        input_shape=(img_size, img_size, 3)
    )
    
    # Freeze base model initially
    base_model.trainable = False
    
    # Build custom head
    inputs = keras.Input(shape=(img_size, img_size, 3))
    
    # Pass through base model
    x = base_model(inputs, training=False)
    
    # Global average pooling
    x = layers.GlobalAveragePooling2D()(x)
    
    # Batch normalization
    x = layers.BatchNormalization()(x)
    
    # Dropout for regularization
    x = layers.Dropout(0.3)(x)
    
    # Dense layer
    x = layers.Dense(256, activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.3)(x)
    
    # Output layer
    outputs = layers.Dense(num_classes, activation='softmax')(x)
    
    # Create model
    model = keras.Model(inputs, outputs)
    
    return model, base_model

# Build model
model, base_model = build_efficientnet_b0(NUM_CLASSES, IMG_SIZE)

# Compile model
model.compile(
    optimizer=optimizers.Adam(learning_rate=LEARNING_RATE),
    loss='categorical_crossentropy',
    metrics=['accuracy', 'top_k_categorical_accuracy']
)

# Model summary
model.summary()

In [None]:
# Calculate model size
def count_parameters(model):
    trainable = sum([tf.keras.backend.count_params(w) for w in model.trainable_weights])
    non_trainable = sum([tf.keras.backend.count_params(w) for w in model.non_trainable_weights])
    return trainable, non_trainable

trainable, non_trainable = count_parameters(model)
print(f"\nModel Parameters:")
print(f"  Trainable: {trainable:,}")
print(f"  Non-trainable: {non_trainable:,}")
print(f"  Total: {trainable + non_trainable:,}")

## 6. Training Callbacks

In [None]:
# Define callbacks
callbacks = [
    # Save best model
    ModelCheckpoint(
        filepath=f'{MODELS_PATH}/best_model.keras',
        monitor='val_accuracy',
        save_best_only=True,
        mode='max',
        verbose=1
    ),
    
    # Early stopping
    EarlyStopping(
        monitor='val_accuracy',
        patience=10,
        restore_best_weights=True,
        verbose=1
    ),
    
    # Reduce learning rate on plateau
    ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=5,
        min_lr=1e-7,
        verbose=1
    ),
    
    # CSV logger
    CSVLogger(f'{LOGS_PATH}/training_log.csv'),
    
    # TensorBoard
    TensorBoard(log_dir=f'{LOGS_PATH}/tensorboard')
]

print("Callbacks configured!")

## 7. Phase 1: Train with Frozen Backbone

In [None]:
# Calculate steps
steps_per_epoch = train_generator.samples // BATCH_SIZE
validation_steps = val_generator.samples // BATCH_SIZE

print(f"Training samples: {train_generator.samples}")
print(f"Validation samples: {val_generator.samples}")
print(f"Steps per epoch: {steps_per_epoch}")
print(f"Validation steps: {validation_steps}")

In [None]:
# Phase 1: Train with frozen backbone
print("="*60)
print("Phase 1: Training with frozen EfficientNet backbone")
print("="*60)

history_phase1 = model.fit(
    train_generator,
    epochs=20,
    steps_per_epoch=steps_per_epoch,
    validation_data=val_generator,
    validation_steps=validation_steps,
    callbacks=callbacks,
    verbose=1
)

## 8. Phase 2: Fine-Tuning

In [None]:
# Unfreeze top layers of EfficientNet for fine-tuning
base_model.trainable = True

# Freeze first 80% of layers, fine-tune last 20%
num_layers = len(base_model.layers)
freeze_until = int(num_layers * 0.8)

for layer in base_model.layers[:freeze_until]:
    layer.trainable = False

print(f"Total layers in base model: {num_layers}")
print(f"Frozen layers: {freeze_until}")
print(f"Trainable layers: {num_layers - freeze_until}")

In [None]:
# Recompile with lower learning rate for fine-tuning
model.compile(
    optimizer=optimizers.Adam(learning_rate=LEARNING_RATE / 10),  # Lower LR
    loss='categorical_crossentropy',
    metrics=['accuracy', 'top_k_categorical_accuracy']
)

trainable, non_trainable = count_parameters(model)
print(f"\nAfter unfreezing:")
print(f"  Trainable: {trainable:,}")
print(f"  Non-trainable: {non_trainable:,}")

In [None]:
# Phase 2: Fine-tuning
print("="*60)
print("Phase 2: Fine-tuning EfficientNet layers")
print("="*60)

history_phase2 = model.fit(
    train_generator,
    epochs=30,  # Additional epochs
    initial_epoch=history_phase1.epoch[-1] + 1,
    steps_per_epoch=steps_per_epoch,
    validation_data=val_generator,
    validation_steps=validation_steps,
    callbacks=callbacks,
    verbose=1
)

## 9. Plot Training History

In [None]:
# Combine histories
def combine_histories(h1, h2):
    combined = {}
    for key in h1.history.keys():
        combined[key] = h1.history[key] + h2.history[key]
    return combined

history = combine_histories(history_phase1, history_phase2)

# Plot training history
fig, axes = plt.subplots(1, 3, figsize=(18, 5))

# Accuracy
axes[0].plot(history['accuracy'], label='Train')
axes[0].plot(history['val_accuracy'], label='Validation')
axes[0].set_title('Model Accuracy')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Accuracy')
axes[0].legend()
axes[0].grid(True)

# Loss
axes[1].plot(history['loss'], label='Train')
axes[1].plot(history['val_loss'], label='Validation')
axes[1].set_title('Model Loss')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Loss')
axes[1].legend()
axes[1].grid(True)

# Top-3 Accuracy
axes[2].plot(history['top_k_categorical_accuracy'], label='Train')
axes[2].plot(history['val_top_k_categorical_accuracy'], label='Validation')
axes[2].set_title('Top-3 Accuracy')
axes[2].set_xlabel('Epoch')
axes[2].set_ylabel('Accuracy')
axes[2].legend()
axes[2].grid(True)

plt.tight_layout()
plt.savefig(f'{LOGS_PATH}/training_history.png', dpi=150)
plt.show()

## 10. Evaluate on Test Set

In [None]:
# Load best model
best_model = keras.models.load_model(f'{MODELS_PATH}/best_model.keras')

# Evaluate on test set
test_generator.reset()
test_results = best_model.evaluate(test_generator, verbose=1)

print("\n" + "="*60)
print("Test Results:")
print("="*60)
print(f"Loss: {test_results[0]:.4f}")
print(f"Accuracy: {test_results[1]:.4f}")
print(f"Top-3 Accuracy: {test_results[2]:.4f}")

In [None]:
# Get predictions
test_generator.reset()
predictions = best_model.predict(test_generator, verbose=1)
predicted_classes = np.argmax(predictions, axis=1)
true_classes = test_generator.classes

# Classification report
print("\nClassification Report:")
print(classification_report(
    true_classes, 
    predicted_classes, 
    target_names=[idx_to_class[i] for i in range(len(idx_to_class))]
))

In [None]:
# Confusion matrix
cm = confusion_matrix(true_classes, predicted_classes)

plt.figure(figsize=(20, 16))
sns.heatmap(
    cm, 
    annot=True, 
    fmt='d', 
    cmap='Blues',
    xticklabels=[idx_to_class[i] for i in range(len(idx_to_class))],
    yticklabels=[idx_to_class[i] for i in range(len(idx_to_class))]
)
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.xticks(rotation=90)
plt.yticks(rotation=0)
plt.tight_layout()
plt.savefig(f'{LOGS_PATH}/confusion_matrix.png', dpi=150)
plt.show()

## 11. Export to TFLite with INT8 Quantization

In [None]:
# Export to TFLite with INT8 quantization
def export_to_tflite_int8(model, train_generator, output_path):
    """
    Export Keras model to TFLite with INT8 quantization.
    
    Args:
        model: Trained Keras model
        train_generator: Training data generator for calibration
        output_path: Path to save TFLite model
    """
    # Convert to TFLite
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    
    # Enable default optimizations (includes INT8)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    
    # Representative dataset for calibration
    def representative_dataset():
        for _ in range(100):  # 100 calibration samples
            images, _ = next(train_generator)
            for img in images:
                yield [np.expand_dims(img, axis=0)]
    
    converter.representative_dataset = representative_dataset
    
    # Full INT8 quantization
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
    converter.inference_input_type = tf.uint8  # or tf.int8
    converter.inference_output_type = tf.uint8
    
    # Convert
    tflite_model = converter.convert()
    
    # Save
    with open(output_path, 'wb') as f:
        f.write(tflite_model)
    
    print(f"TFLite model saved to: {output_path}")
    print(f"Size: {len(tflite_model) / (1024 * 1024):.2f} MB")
    
    return tflite_model

# Export
tflite_path = f'{MODELS_PATH}/efficientnet_b0_int8.tflite'
tflite_model = export_to_tflite_int8(best_model, train_generator, tflite_path)

In [None]:
# Also export float16 version (alternative)
def export_to_tflite_float16(model, output_path):
    """Export to TFLite with Float16 quantization."""
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    converter.target_spec.supported_types = [tf.float16]
    
    tflite_model = converter.convert()
    
    with open(output_path, 'wb') as f:
        f.write(tflite_model)
    
    print(f"TFLite Float16 model saved to: {output_path}")
    print(f"Size: {len(tflite_model) / (1024 * 1024):.2f} MB")

tflite_float16_path = f'{MODELS_PATH}/efficientnet_b0_float16.tflite'
export_to_tflite_float16(best_model, tflite_float16_path)

## 12. Test TFLite Model

In [None]:
# Test TFLite model
def test_tflite_inference(tflite_path, test_image_path, idx_to_class):
    """
    Test TFLite model inference on a single image.
    """
    # Load TFLite model
    interpreter = tf.lite.Interpreter(model_path=tflite_path)
    interpreter.allocate_tensors()
    
    # Get input/output details
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    
    print(f"Input shape: {input_details[0]['shape']}")
    print(f"Input dtype: {input_details[0]['dtype']}")
    print(f"Output shape: {output_details[0]['shape']}")
    
    # Load and preprocess image
    img = tf.keras.preprocessing.image.load_img(
        test_image_path, 
        target_size=(224, 224)
    )
    img_array = tf.keras.preprocessing.image.img_to_array(img)
    img_array = img_array / 255.0
    
    # Quantize input if needed
    if input_details[0]['dtype'] == np.uint8:
        input_scale = input_details[0]['quantization_parameters']['scales'][0]
        input_zero_point = input_details[0]['quantization_parameters']['zero_points'][0]
        img_array = (img_array / input_scale + input_zero_point).astype(np.uint8)
    
    img_array = np.expand_dims(img_array, axis=0).astype(input_details[0]['dtype'])
    
    # Run inference
    import time
    start_time = time.time()
    
    interpreter.set_tensor(input_details[0]['index'], img_array)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details[0]['index'])
    
    inference_time = (time.time() - start_time) * 1000
    
    # Dequantize output if needed
    if output_details[0]['dtype'] == np.uint8:
        output_scale = output_details[0]['quantization_parameters']['scales'][0]
        output_zero_point = output_details[0]['quantization_parameters']['zero_points'][0]
        output = (output.astype(np.float32) - output_zero_point) * output_scale
    
    # Get top predictions
    top_3_idx = np.argsort(output[0])[-3:][::-1]
    
    print(f"\nInference time: {inference_time:.2f} ms")
    print(f"\nTop 3 Predictions:")
    for idx in top_3_idx:
        confidence = output[0][idx] * 100
        print(f"  {idx_to_class[idx]}: {confidence:.2f}%")
    
    return output

# Test on sample image
test_image_path = f'{TEST_DIR}/Gir/sample_image.jpg'  # Update with actual image
if os.path.exists(test_image_path):
    test_tflite_inference(tflite_path, test_image_path, idx_to_class)

## 13. Save Final Models

In [None]:
# Save all model formats
OUTPUT_DIR = f'{MODELS_PATH}/final'
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Save Keras model
best_model.save(f'{OUTPUT_DIR}/efficientnet_b0_classifier.keras')

# Save SavedModel format (for TensorFlow Serving)
best_model.save(f'{OUTPUT_DIR}/efficientnet_b0_classifier_savedmodel')

# Copy TFLite models
shutil.copy(tflite_path, f'{OUTPUT_DIR}/efficientnet_b0_int8.tflite')
shutil.copy(tflite_float16_path, f'{OUTPUT_DIR}/efficientnet_b0_float16.tflite')

# Copy class indices
shutil.copy(f'{MODELS_PATH}/class_indices.json', f'{OUTPUT_DIR}/class_indices.json')

print(f"Final models saved to: {OUTPUT_DIR}")
print("\nFiles:")
for f in os.listdir(OUTPUT_DIR):
    fpath = os.path.join(OUTPUT_DIR, f)
    if os.path.isfile(fpath):
        size_mb = os.path.getsize(fpath) / (1024 * 1024)
        print(f"  {f}: {size_mb:.2f} MB")

## 14. Model Summary

In [None]:
print("="*60)
print("EfficientNet-B0 Classification Training Complete!")
print("="*60)
print(f"\nModel: EfficientNet-B0")
print(f"Task: 35 Indian Breed Classification")
print(f"Input Size: 224x224")
print(f"\nPerformance Metrics:")
print(f"  Test Accuracy: {test_results[1]:.4f}")
print(f"  Test Top-3 Accuracy: {test_results[2]:.4f}")
print(f"\nModel Files:")
print(f"  Keras: efficientnet_b0_classifier.keras")
print(f"  TFLite INT8: efficientnet_b0_int8.tflite (~8 MB)")
print(f"  TFLite Float16: efficientnet_b0_float16.tflite (~16 MB)")
print(f"\nReady for Integration with YOLOv8 Detection Pipeline!")
print("="*60)