In [1]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split

In [9]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split

class CustomCNNClassifier:
    def __init__(
        self,
        img_size=(224, 224),
        batch_size=32,
        learning_rate=0.001,
        dropout_rate=0.3,
        l2_lambda=0.01
    ):
        self.img_size = img_size
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.dropout_rate = dropout_rate
        self.l2_lambda = l2_lambda
        self.model = None
        self.class_names = None
        
        # Add model save path as instance variable
        self.model_save_path = 'best_cnn_model.keras'

    def create_dataset(self, data_dir):
        """Create train, validation, and test datasets."""
        if not os.path.exists(data_dir):
            raise ValueError(f"Data directory {data_dir} does not exist")

        # Create file paths and labels
        filepaths = []
        labels = []
        for class_name in os.listdir(data_dir):
            class_path = os.path.join(data_dir, class_name)
            if os.path.isdir(class_path):
                for filename in os.listdir(class_path):
                    if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
                        filepath = os.path.join(class_path, filename)
                        filepaths.append(filepath)
                        labels.append(class_name)

        if not filepaths:
            raise ValueError("No valid image files found in the data directory")

        # Create dataframe
        df = pd.DataFrame({'filepath': filepaths, 'label': labels})

        # Split data
        train_df, temp_df = train_test_split(
            df, train_size=0.8, stratify=df['label'], random_state=42
        )
        val_df, test_df = train_test_split(
            temp_df, train_size=0.5, stratify=temp_df['label'], random_state=42
        )

        # Create data generators
        try:
            train_gen = self._create_augmented_generator(train_df)
            val_gen = self._create_basic_generator(val_df)
            test_gen = self._create_basic_generator(test_df)
        except Exception as e:
            raise RuntimeError(f"Error creating data generators: {str(e)}")

        self.class_names = sorted(list(set(labels)))
        return train_gen, val_gen, test_gen

    def _create_augmented_generator(self, dataframe):
        """Create generator with augmentation for training."""
        datagen = ImageDataGenerator(
            rescale=1./255,
            rotation_range=20,
            width_shift_range=0.2,
            height_shift_range=0.2,
            horizontal_flip=True,
            vertical_flip=False,
            shear_range=0.2,
            zoom_range=0.2,
            brightness_range=[0.8, 1.2],
            fill_mode='nearest'
        )
        return self._create_generator(datagen, dataframe)

    def _create_basic_generator(self, dataframe):
        """Create generator without augmentation."""
        datagen = ImageDataGenerator(rescale=1./255)
        return self._create_generator(datagen, dataframe, shuffle=False)

    def _create_generator(self, datagen, dataframe, shuffle=True):
        """Helper method to create data generator with error handling."""
        try:
            return datagen.flow_from_dataframe(
                dataframe,
                x_col='filepath',
                y_col='label',
                target_size=self.img_size,
                batch_size=self.batch_size,
                class_mode='categorical',
                shuffle=shuffle
            )
        except Exception as e:
            raise RuntimeError(f"Error creating data generator: {str(e)}")

    def build_model(self, num_classes):
        """Build custom CNN model."""
        if num_classes < 2:
            raise ValueError("Number of classes must be at least 2")

        reg = regularizers.l2(self.l2_lambda)
        
        try:
            model = models.Sequential([
                # First Convolutional Block
                layers.Conv2D(32, (3, 3), padding='same', kernel_regularizer=reg,
                             input_shape=(*self.img_size, 3)),
                layers.BatchNormalization(),
                layers.Activation('relu'),
                layers.Conv2D(32, (3, 3), padding='same', kernel_regularizer=reg),
                layers.BatchNormalization(),
                layers.Activation('relu'),
                layers.MaxPooling2D((2, 2)),
                layers.Dropout(0.25),

                # Second Convolutional Block
                layers.Conv2D(64, (3, 3), padding='same', kernel_regularizer=reg),
                layers.BatchNormalization(),
                layers.Activation('relu'),
                layers.Conv2D(64, (3, 3), padding='same', kernel_regularizer=reg),
                layers.BatchNormalization(),
                layers.Activation('relu'),
                layers.MaxPooling2D((2, 2)),
                layers.Dropout(0.25),

                # Third Convolutional Block
                layers.Conv2D(128, (3, 3), padding='same', kernel_regularizer=reg),
                layers.BatchNormalization(),
                layers.Activation('relu'),
                layers.Conv2D(128, (3, 3), padding='same', kernel_regularizer=reg),
                layers.BatchNormalization(),
                layers.Activation('relu'),
                layers.MaxPooling2D((2, 2)),
                layers.Dropout(0.25),

                # Fourth Convolutional Block
                layers.Conv2D(256, (3, 3), padding='same', kernel_regularizer=reg),
                layers.BatchNormalization(),
                layers.Activation('relu'),
                layers.Conv2D(256, (3, 3), padding='same', kernel_regularizer=reg),
                layers.BatchNormalization(),
                layers.Activation('relu'),
                layers.MaxPooling2D((2, 2)),
                layers.Dropout(0.25),

                # Dense Layers
                layers.Flatten(),
                layers.Dense(512, kernel_regularizer=reg),
                layers.BatchNormalization(),
                layers.Activation('relu'),
                layers.Dropout(self.dropout_rate),
                
                layers.Dense(256, kernel_regularizer=reg),
                layers.BatchNormalization(),
                layers.Activation('relu'),
                layers.Dropout(self.dropout_rate),

                layers.Dense(num_classes, activation='softmax')
            ])

            # Compile model with learning rate schedule
            initial_learning_rate = self.learning_rate
            lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
                initial_learning_rate,
                decay_steps=1000,
                decay_rate=0.9,
                staircase=True
            )

            optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
            
            model.compile(
                optimizer=optimizer,
                loss='categorical_crossentropy',
                metrics=['accuracy']
            )

            self.model = model
            return model

        except Exception as e:
            raise RuntimeError(f"Error building model: {str(e)}")

    def train(self, train_gen, val_gen, epochs=50):
        """Train the model with callbacks."""
        if self.model is None:
            raise ValueError("Model has not been built. Call build_model first.")

        callbacks = [
            tf.keras.callbacks.ModelCheckpoint(
                self.model_save_path,  # Using .keras extension
                monitor='val_accuracy',
                save_best_only=True,
                mode='max'
            ),
            tf.keras.callbacks.EarlyStopping(
                monitor='val_accuracy',
                patience=10,
                restore_best_weights=True
            ),
            tf.keras.callbacks.ReduceLROnPlateau(
                monitor='val_loss',
                factor=0.5,
                patience=5,
                min_lr=1e-6
            ),
            # Add TensorBoard callback for better monitoring
            tf.keras.callbacks.TensorBoard(
                log_dir='./logs',
                histogram_freq=1
            )
        ]

        try:
            history = self.model.fit(
                train_gen,
                epochs=epochs,
                validation_data=val_gen,
                callbacks=callbacks,
                verbose=1
            )
            return history
        except Exception as e:
            raise RuntimeError(f"Error during training: {str(e)}")

    def evaluate(self, test_gen):
        """Evaluate the model on test data."""
        if self.model is None:
            raise ValueError("Model has not been built or loaded.")
            
        try:
            return self.model.evaluate(test_gen)
        except Exception as e:
            raise RuntimeError(f"Error during evaluation: {str(e)}")

    def predict(self, image_path):
        """Predict class for a single image."""
        if self.model is None:
            raise ValueError("Model has not been built or loaded.")
            
        if not os.path.exists(image_path):
            raise ValueError(f"Image file {image_path} does not exist")

        try:
            img = tf.keras.preprocessing.image.load_img(
                image_path, target_size=self.img_size
            )
            img_array = tf.keras.preprocessing.image.img_to_array(img)
            img_array = np.expand_dims(img_array, 0) / 255.0

            predictions = self.model.predict(img_array)
            predicted_class = self.class_names[np.argmax(predictions[0])]
            confidence = float(np.max(predictions[0]))  # Convert to Python float

            return predicted_class, confidence
        except Exception as e:
            raise RuntimeError(f"Error during prediction: {str(e)}")

    def save_model(self, filepath=None):
        """Save the model with error handling."""
        if self.model is None:
            raise ValueError("No model to save")
            
        try:
            save_path = filepath or self.model_save_path
            self.model.save(save_path)
            print(f"Model saved successfully to {save_path}")
        except Exception as e:
            raise RuntimeError(f"Error saving model: {str(e)}")

    def load_model(self, filepath=None):
        """Load a saved model with error handling."""
        try:
            load_path = filepath or self.model_save_path
            self.model = tf.keras.models.load_model(load_path)
            print(f"Model loaded successfully from {load_path}")
        except Exception as e:
            raise RuntimeError(f"Error loading model: {str(e)}")

def train_and_evaluate_cnn(data_dir, epochs=50):
    """Complete training pipeline with error handling."""
    try:
        # Initialize classifier
        classifier = CustomCNNClassifier(
            img_size=(224, 224),
            batch_size=32,
            learning_rate=0.001,
            dropout_rate=0.3,
            l2_lambda=0.01
        )

        # Create datasets
        train_gen, val_gen, test_gen = classifier.create_dataset(data_dir)

        # Build and train model
        num_classes = len(train_gen.class_indices)
        model = classifier.build_model(num_classes)
        print("Model Architecture:")
        model.summary()

        # Train model
        print("\nTraining model...")
        history = classifier.train(train_gen, val_gen, epochs=epochs)

        # Evaluate model
        print("\nEvaluating model...")
        test_loss, test_accuracy = classifier.evaluate(test_gen)
        print(f"Test accuracy: {test_accuracy:.4f}")

        return classifier, history, test_gen

    except Exception as e:
        print(f"Error in training pipeline: {str(e)}")
        raise

In [10]:
def visualize_results(classifier, history, test_gen):
    """Complete visualization pipeline."""
    visualizer = ModelVisualizer(classifier.model, history, classifier.class_names)
    
    print("1. Training History")
    visualizer.plot_training_history()
    
    print("\n2. Learning Rate Schedule")
    visualizer.plot_learning_rate()
    
    print("\n3. Confusion Matrix and Classification Report")
    visualizer.plot_confusion_matrix(test_gen)
    
    print("\n4. Sample Predictions")
    visualizer.plot_sample_predictions(test_gen)
    
    return visualizer

In [None]:
data_dir = "./color"
classifier, history, test_gen = train_and_evaluate_cnn(data_dir, epochs=50)


model = tf.keras.models.load_model('best_cnn_model.keras')

Found 43444 validated image filenames belonging to 38 classes.
Found 5430 validated image filenames belonging to 38 classes.
Found 5431 validated image filenames belonging to 38 classes.


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Model Architecture:



Training model...


  self._warn_if_super_not_called()


Epoch 1/50
[1m  80/1358[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m1:14:43[0m 4s/step - accuracy: 0.1802 - loss: 25.0949

In [None]:
# Visualize results
visualizer = visualize_results(classifier, history, test_gen)

In [None]:
# Make predictions on new images
class_name, confidence = classifier.predict("path/to/new/image.jpg")
print(f"Predicted class: {class_name} with confidence: {confidence:.2f}")