# PHASE 3 - 3 Pretrained Models Transfer Learning

In [18]:
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow.keras.applications import ResNet50, MobileNetV2, EfficientNetB0
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.metrics import (classification_report, confusion_matrix, 
                            precision_score, recall_score, f1_score, 
                            roc_curve, auc, roc_auc_score)
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import label_binarize
from tensorflow.keras import regularizers
import tensorflow as tf
import time
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# Enable mixed precision for GPU acceleration
tf.keras.mixed_precision.set_global_policy('mixed_float16')

class OptimizedTransferLearning:
    def __init__(self, input_shape=(128, 128, 3)):
        self.input_shape = input_shape
        self.models_config = {
            'ResNet50': ResNet50,
            'MobileNetV2': MobileNetV2,
            'EfficientNetB0': EfficientNetB0
        }
        # Local weight paths
        self.weight_paths = {
            'ResNet50': 'pretrained_weights/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5',
            'MobileNetV2': 'pretrained_weights/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224_no_top.h5',
            'EfficientNetB0': 'pretrained_weights/efficientnetb0_notop.h5'
        }
        self.model_blocks = {
            'ResNet50': ['conv5_block3_out', 'conv4_block6_out', 'conv3_block4_out'],
            'MobileNetV2': ['block_16_project_BN', 'block_12_project_BN'],
            'EfficientNetB0': ['top_activation', 'block6a_expand_activation']
        }
        
    def load_and_cache_dataset(self, data_dir, dataset_name, target_size=(128, 128)):
        """Load and cache dataset to avoid reprocessing"""
        cache_path = f"{dataset_name}_cache.npz"
        
        if os.path.exists(cache_path):
            print(f"Loading cached {dataset_name} dataset...")
            cache = np.load(cache_path)
            return cache['images'], cache['labels'], cache['class_names']
        
        print(f"Processing and caching {dataset_name} dataset...")
        images = []
        labels = []
        class_names = []
        
        # Caltech-101 dataset
        if dataset_name == 'caltech101':
            categories = sorted([d for d in os.listdir(data_dir) 
                               if os.path.isdir(os.path.join(data_dir, d)) and d != 'BACKGROUND_Google'])[:101]
            class_names = categories
            
            for class_idx, category in enumerate(categories):
                category_path = os.path.join(data_dir, category)
                image_files = [f for f in os.listdir(category_path) 
                              if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
                
                for img_file in image_files:
                    try:
                        img_path = os.path.join(category_path, img_file)
                        img = load_img(img_path, target_size=target_size)
                        img_array = img_to_array(img) / 255.0
                        images.append(img_array)
                        labels.append(class_idx)
                    except:
                        continue
                        
        # Horse-Cat-Dog dataset
        elif dataset_name == 'horse_cat_dog':
            class_names = ['cat', 'dog', 'horse']
            for class_idx, class_name in enumerate(class_names):
                class_path = os.path.join(data_dir, class_name)
                if not os.path.exists(class_path):
                    continue
                    
                image_files = [f for f in os.listdir(class_path) 
                              if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
                
                for img_file in image_files:
                    try:
                        img_path = os.path.join(class_path, img_file)
                        img = load_img(img_path, target_size=target_size)
                        img_array = img_to_array(img) / 255.0
                        images.append(img_array)
                        labels.append(class_idx)
                    except:
                        continue
        
        images = np.array(images)
        labels = np.array(labels)
        
        print(f"Caching {len(images)} images...")
        np.savez(cache_path, images=images, labels=labels, class_names=class_names)
        
        return images, labels, class_names

    def split_data(self, X, y, num_classes):
        """Split data into train/val/test (70/10/20)"""
        # First split: 70% train, 30% temp
        X_train, X_temp, y_train, y_temp = train_test_split(
            X, y, test_size=0.3, random_state=42, stratify=y
        )
        
        # Second split: 10% val, 20% test (of original)
        val_ratio = 0.1 / 0.3
        X_val, X_test, y_val, y_test = train_test_split(
            X_temp, y_temp, test_size=1 - val_ratio, random_state=42, stratify=y_temp
        )
        
        # Convert to categorical
        y_train_cat = to_categorical(y_train, num_classes)
        y_val_cat = to_categorical(y_val, num_classes)
        y_test_cat = to_categorical(y_test, num_classes)
        
        print(f"Train: {X_train.shape[0]}, Val: {X_val.shape[0]}, Test: {X_test.shape[0]}")
        
        return X_train, X_val, X_test, y_train_cat, y_val_cat, y_test_cat, y_test

    def create_model(self, model_name, num_classes, unfreeze_depth=0):
        """Create model with selective layer unfreezing and local weights"""
        # Load model without top layers
        base_model = self.models_config[model_name](
            weights=None,
            include_top=False,
            input_shape=self.input_shape
        )
        
        # Load local weights
        weight_path = self.weight_paths[model_name]
        if os.path.exists(weight_path):
            print(f"Loading weights from {weight_path}")
            base_model.load_weights(weight_path)
        else:
            print(f"Warning: Weight file not found at {weight_path}")
        
        # Freeze all layers initially
        base_model.trainable = False
        
        # Unfreeze top layers if requested
        if unfreeze_depth > 0:
            # Simplified unfreezing: Unfreeze top 20% of layers
            num_layers = len(base_model.layers)
            unfreeze_from = int(num_layers-5)  # Unfreeze 5 of layers
            
            for layer in base_model.layers[unfreeze_from:]:
                layer.trainable = True
            print(f"Unfroze {num_layers - unfreeze_from} top layers")
        
        # Build model
        x = base_model.output
        x = GlobalAveragePooling2D()(x)
        x = Dense(128, activation='relu', 
          kernel_regularizer=regularizers.l2(0.001))(x)  # L2 weight regularization
        x = BatchNormalization()(x)  # Add batch normalization
        x = Dropout(0.5)(x)  # Increase dropout rate
        predictions = Dense(num_classes, activation='softmax', dtype='float32')(x)
        
        model = Model(inputs=base_model.input, outputs=predictions)
        return model

    def create_data_pipeline(self, X, y, batch_size=64, augment=False):
        """Create optimized tf.data pipeline"""
        ds = tf.data.Dataset.from_tensor_slices((X, y))
        
        # Apply augmentation if requested
        if augment:
            ds = ds.map(
                lambda x, y: (tf.image.random_flip_left_right(x), y),
                num_parallel_calls=tf.data.AUTOTUNE
            )
            ds = ds.map(
                lambda x, y: (tf.image.random_flip_up_down(x), y),
                num_parallel_calls=tf.data.AUTOTUNE
            )
            ds = ds.map(
                lambda x, y: (tf.image.random_brightness(x, 0.15), y),
                num_parallel_calls=tf.data.AUTOTUNE
            )
            ds = ds.map(
                lambda x, y: (tf.image.random_saturation(x, 0.8, 1.2), y),
                num_parallel_calls=tf.data.AUTOTUNE
            )
            ds = ds.map(
                lambda x, y: (tf.image.random_hue(x, 0.1), y),
                num_parallel_calls=tf.data.AUTOTUNE
            )
            ds = ds.map(
                lambda x, y: (tf.image.random_jpeg_quality(x, min_jpeg_quality=75, max_jpeg_quality=95), y),
                num_parallel_calls=tf.data.AUTOTUNE
            )

        ds = ds.batch(batch_size)
        ds = ds.prefetch(buffer_size=tf.data.AUTOTUNE)
        return ds

    def train_model(self, model, train_ds, val_ds, model_name, dataset_name, epochs=20, lr=0.001):
        """Train model with optimized settings"""
        # Compile model
        model.compile(
            optimizer=Adam(learning_rate=lr),
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )
        
        # Callbacks
        callbacks = [
            EarlyStopping(monitor='val_accuracy', patience=8, restore_best_weights=True, verbose=1),
            ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4, min_lr=1e-6, verbose=1),
            ModelCheckpoint(f"best_{model_name}_{dataset_name}.h5", 
                           save_best_only=True, monitor='val_accuracy')
        ]
        
        print(f"Training {model_name} on {dataset_name}...")
        start_time = time.time()
        
        history = model.fit(
            train_ds,
            validation_data=val_ds,
            epochs=epochs,
            callbacks=callbacks,
            verbose=1
        )
        
        train_time = time.time() - start_time
        print(f"Training completed in {train_time:.1f} seconds")
        return history, train_time

    def evaluate_model(self, model, test_ds, y_test, class_names):
        """Comprehensive model evaluation"""
        # Get predictions
        y_pred_probs = model.predict(test_ds, verbose=0)
        y_pred_labels = np.argmax(y_pred_probs, axis=1)
        
        # Calculate metrics
        test_loss, test_acc = model.evaluate(test_ds, verbose=0)
        precision = precision_score(y_test, y_pred_labels, average='weighted')
        recall = recall_score(y_test, y_pred_labels, average='weighted')
        f1 = f1_score(y_test, y_pred_labels, average='weighted')
        
        # ROC/AUC
        if len(class_names) > 2:
            y_test_bin = label_binarize(y_test, classes=np.arange(len(class_names)))
            auc_score = roc_auc_score(y_test_bin, y_pred_probs, multi_class='ovr')
        else:
            auc_score = roc_auc_score(y_test, y_pred_probs[:, 1])
        
        # Confusion matrix
        cm = confusion_matrix(y_test, y_pred_labels)
        
        return {
            'accuracy': test_acc,
            'loss': test_loss,
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'auc': auc_score,
            'cm': cm,
            'y_true': y_test,
            'y_pred': y_pred_labels,
            'y_pred_probs': y_pred_probs
        }

    def visualize_results(self, results, model_name, dataset_name, class_names):
        """Generate all required visualizations"""
        # Confusion matrix
        plt.figure(figsize=(10, 8))
        sns.heatmap(results['cm'], annot=True, fmt='d', cmap='Blues', 
                   xticklabels=class_names if len(class_names) < 15 else [],
                   yticklabels=class_names if len(class_names) < 15 else [])
        plt.title(f'{model_name} - {dataset_name} Confusion Matrix')
        plt.xlabel('Predicted')
        plt.ylabel('True')
        plt.tight_layout()
        plt.savefig(f'{model_name}_{dataset_name}_confusion.png', dpi=150)
        plt.close()
        
        # ROC Curve (for binary or small multi-class)
        if len(class_names) <= 10:
            plt.figure(figsize=(8, 6))
            
            if len(class_names) > 2:
                # One-vs-Rest ROC
                y_test_bin = label_binarize(results['y_true'], classes=np.arange(len(class_names)))
                
                for i in range(len(class_names)):
                    fpr, tpr, _ = roc_curve(y_test_bin[:, i], results['y_pred_probs'][:, i])
                    roc_auc = auc(fpr, tpr)
                    plt.plot(fpr, tpr, label=f'{class_names[i]} (AUC = {roc_auc:.2f})')
            else:
                # Binary ROC
                fpr, tpr, _ = roc_curve(results['y_true'], results['y_pred_probs'][:, 1])
                roc_auc = auc(fpr, tpr)
                plt.plot(fpr, tpr, label=f'ROC curve (AUC = {roc_auc:.2f})')
            
            plt.plot([0, 1], [0, 1], 'k--')
            plt.xlim([0.0, 1.0])
            plt.ylim([0.0, 1.05])
            plt.xlabel('False Positive Rate')
            plt.ylabel('True Positive Rate')
            plt.title(f'{model_name} - {dataset_name} ROC Curve')
            plt.legend(loc="lower right")
            plt.savefig(f'{model_name}_{dataset_name}_roc.png', dpi=150)
            plt.close()
        
        # Metrics comparison
        metrics = ['accuracy', 'precision', 'recall', 'f1', 'auc']
        values = [results[m] for m in metrics]
        
        plt.figure(figsize=(10, 6))
        bars = plt.bar(metrics, values, color=['blue', 'green', 'red', 'purple', 'orange'])
        plt.ylim(0, 1.05)
        plt.title(f'{model_name} - {dataset_name} Performance Metrics')
        plt.ylabel('Score')
        
        # Add values on bars
        for bar in bars:
            height = bar.get_height()
            plt.text(bar.get_x() + bar.get_width()/2, height + 0.02,
                    f'{height:.3f}', ha='center', va='bottom')
        
        plt.tight_layout()
        plt.savefig(f'{model_name}_{dataset_name}_metrics.png', dpi=150)
        plt.close()

    def visualize_predictions(self, model, X_test, y_test, class_names, num_samples=9):
        """Visualize test predictions"""
        indices = np.random.choice(len(X_test), num_samples, replace=False)
        sample_images = X_test[indices]
        sample_labels = y_test[indices]
        
        preds = model.predict(sample_images, verbose=0)
        pred_labels = np.argmax(preds, axis=1)
        probs = np.max(preds, axis=1)
        
        plt.figure(figsize=(12, 8))
        for i in range(num_samples):
            plt.subplot(3, 3, i+1)
            plt.imshow(sample_images[i])
            true_label = class_names[sample_labels[i]]
            pred_label = class_names[pred_labels[i]]
            color = 'green' if sample_labels[i] == pred_labels[i] else 'red'
            plt.title(f"True: {true_label}\nPred: {pred_label} ({probs[i]:.2f})", color=color)
            plt.axis('off')
        plt.tight_layout()
        plt.close()

def main():
    print("="*70)
    print("TRANSFER LEARNING WITH LOCAL PRETRAINED WEIGHTS")
    print("="*70)
    
    # Initialize with standard ImageNet input size
    transfer = OptimizedTransferLearning(input_shape=(128, 128, 3))
    
    # Create weights directory if not exists
    os.makedirs('pretrained_weights', exist_ok=True)
    
    # Dataset paths - UPDATE THESE
    caltech_path = "caltech-101"
    hcd_path = "Classification_Horse,Cat,Dog"
    
    # Load and cache datasets
    print("\nLoading datasets...")
    datasets = []
    
    if os.path.exists(caltech_path):
        try:
            X_cal, y_cal, cal_classes = transfer.load_and_cache_dataset(
                caltech_path, "caltech101", target_size=(128, 128))
            X_train_cal, X_val_cal, X_test_cal, y_train_cal, y_val_cal, y_test_cal, y_test_raw_cal = \
                transfer.split_data(X_cal, y_cal, len(cal_classes))
            
            datasets.append({
                'name': 'Caltech-101',
                'data': (X_train_cal, X_val_cal, X_test_cal, y_train_cal, y_val_cal, y_test_cal, y_test_raw_cal),
                'classes': cal_classes,
                'type': 'caltech'
            })
        except Exception as e:
            print(f"Error loading Caltech-101: {e}")
    
    if os.path.exists(hcd_path):
        try:
            X_hcd, y_hcd, hcd_classes = transfer.load_and_cache_dataset(
                hcd_path, "horse_cat_dog", target_size=(128, 128))
            X_train_hcd, X_val_hcd, X_test_hcd, y_train_hcd, y_val_hcd, y_test_hcd, y_test_raw_hcd = \
                transfer.split_data(X_hcd, y_hcd, len(hcd_classes))
            
            datasets.append({
                'name': 'Horse-Cat-Dog',
                'data': (X_train_hcd, X_val_hcd, X_test_hcd, y_train_hcd, y_val_hcd, y_test_hcd, y_test_raw_hcd),
                'classes': hcd_classes,
                'type': 'hcd'
            })
        except Exception as e:
            print(f"Error loading Horse-Cat-Dog: {e}")
    
    if not datasets:
        print("No datasets found! Check your paths.")
        return
    
    # Models to use
    models = ['MobileNetV2', 'EfficientNetB0','ResNet50']
    
    # Training configuration
    config = {
        'batch_size': 32,
        'initial_epochs': 15,
        'finetune_epochs': 10,
        'initial_lr': 0.001,
        'finetune_lr': 0.0001,
        'unfreeze_depth': 1
    }
    
    # Process each dataset
    for dataset in datasets:
        print(f"\n{'='*70}")
        print(f"PROCESSING {dataset['name']}")
        print(f"{'='*70}")
        
        X_train, X_val, X_test, y_train, y_val, y_test, y_test_raw = dataset['data']
        class_names = dataset['classes']
        num_classes = len(class_names)
        
        # Create data pipelines
        train_ds = transfer.create_data_pipeline(X_train, y_train, 
                                               batch_size=config['batch_size'], 
                                               augment=True)
        val_ds = transfer.create_data_pipeline(X_val, y_val, 
                                             batch_size=config['batch_size'])
        test_ds = transfer.create_data_pipeline(X_test, y_test, 
                                              batch_size=config['batch_size'])
        
        # Process each model
        for model_name in models:
            print(f"\n>>>>> TRAINING {model_name} <<<<<")
            
            # Step 1: Train with frozen base
            print("\nSTEP 1: Training classifier head")
            try:
                model = transfer.create_model(model_name, num_classes, unfreeze_depth=0)
                
                history_step1, time_step1 = transfer.train_model(
                    model, train_ds, val_ds, model_name, dataset['name'],
                    epochs=config['initial_epochs'], lr=config['initial_lr']
                )
            except Exception as e:
                print(f"Error in step 1: {e}")
                continue
            
            # Step 2: Fine-tune with unfrozen top layers
            print("\nSTEP 2: Fine-tuning top layers")
            try:
                model = transfer.create_model(model_name, num_classes, 
                                            unfreeze_depth=config['unfreeze_depth'])
                
                # Try to load weights from step 1 if available
                weight_path = f"best_{model_name}_{dataset['name']}.h5"
                if os.path.exists(weight_path):
                    model.load_weights(weight_path)
                
                history_step2, time_step2 = transfer.train_model(
                    model, train_ds, val_ds, model_name, dataset['name'],
                    epochs=config['finetune_epochs'], lr=config['finetune_lr']
                )
            except Exception as e:
                print(f"Error in step 2: {e}")
                continue
            
            # Evaluate final model
            print("\nEVALUATING MODEL...")
            try:
                results = transfer.evaluate_model(model, test_ds, y_test_raw, class_names)
                
                # Print results
                print(f"\n{model_name} on {dataset['name']} Results:")
                print(f"Accuracy: {results['accuracy']:.4f}")
                print(f"Precision: {results['precision']:.4f}")
                print(f"Recall: {results['recall']:.4f}")
                print(f"F1-Score: {results['f1']:.4f}")
                print(f"AUC: {results['auc']:.4f}")
                
                # Generate visualizations
                transfer.visualize_results(results, model_name, dataset['name'], class_names)
                transfer.visualize_predictions(model, X_test, y_test_raw, class_names)
                
                # Save classification report
                report = classification_report(y_test_raw, results['y_pred'], 
                                             target_names=class_names, digits=4)
                with open(f"{model_name}_{dataset['name']}_report.txt", "w") as f:
                    f.write(report)
            except Exception as e:
                print(f"Evaluation error: {e}")
    
    print("\nTRAINING COMPLETED!")

if __name__ == "__main__":
    # Start timer
    total_start = time.time()
    
    # Run main process
    main()
    
    # Calculate total runtime
    total_time = time.time() - total_start
    print(f"\nTotal execution time: {total_time/60:.1f} minutes")

TRANSFER LEARNING WITH LOCAL PRETRAINED WEIGHTS

Loading datasets...
Loading cached caltech101 dataset...
Train: 6073, Val: 868, Test: 1736
Loading cached horse_cat_dog dataset...
Train: 424, Val: 60, Test: 122

PROCESSING Caltech-101

>>>>> TRAINING MobileNetV2 <<<<<

STEP 1: Training classifier head
Loading weights from pretrained_weights/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224_no_top.h5
Training MobileNetV2 on Caltech-101...
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 13: ReduceLROnPlateau reducing learning rate to 0.0005000000237487257.
Epoch 14/15
Epoch 15/15
Training completed in 888.6 seconds

STEP 2: Fine-tuning top layers
Loading weights from pretrained_weights/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224_no_top.h5
Unfroze 5 top layers
Training MobileNetV2 on Caltech-101...
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoc