In [None]:
# Example usage:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
import shutil

source_path = "/content/drive/MyDrive/FRUIT_DATASET_24_CLASS"
destination_path = '/content/FRUIT_DATASET'

shutil.copytree(source_path, destination_path)

In [None]:
# 2. Define paths
import numpy as np
import pickle
import os
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import random
#import numpy as np


base_dir = "/content/FRUIT_DATASET"

train_path = os.path.join(base_dir, "Training")
test_path = os.path.join(base_dir, "Test")


def load_images_from_folder(folder_path, image_size=(100, 100)):
    images = []
    labels = []
    class_names = sorted([d for d in os.listdir(folder_path) if not d.startswith('.') and os.path.isdir(os.path.join(folder_path, d))])
    class_to_idx = {cls_name: idx for idx, cls_name in enumerate(class_names)}

    for cls_name in class_names:
        cls_folder = os.path.join(folder_path, cls_name)
        if not os.path.isdir(cls_folder):
            continue
        for filename in os.listdir(cls_folder):
            if filename.endswith(".jpg") and not filename.startswith('.'):
                img_path = os.path.join(cls_folder, filename)
                img = Image.open(img_path).convert("L")  # convert to grayscale
                img = img.resize(image_size)
                img_array = np.asarray(img, dtype=np.float32) / 255.0       # normalize to [0, 1]
                images.append(img_array)
                labels.append(class_to_idx[cls_name])

    return np.array(images), np.array(labels), class_names

X_train, y_train, class_names = load_images_from_folder(train_path)
X_test, y_test, _ = load_images_from_folder(test_path)


print("Train set shape:", X_train.shape)
print("Train labels shape:", y_train.shape)
print("Test set shape:", X_test.shape)
print("Test labels shape:", y_test.shape)
print("Number of classes:", len(class_names))




In [None]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

In [None]:

class DeepCNN:
    def __init__(self, input_shape, num_classes, learning_rate=0.01):
        self.input_shape = input_shape  # (height, width)
        self.num_classes = num_classes
        self.lr = learning_rate

        # Conv layer 1: 8 filters
        self.input_depth = 1  # For grayscale images
        self.filters1 = np.random.randn(8, 3, 3, self.input_depth) / 9  # 8 filters of size 3x3

        # Conv layer 2: 16 filters
        self.filters2 = np.random.randn(16, 3, 3, 8) / 9  # 16 filters of size 3x3 with input depth 8

        # Calculate the output sizes after each layer
        h, w = input_shape

        # Conv1 output size
        self.conv1_out_h = h - 2  # valid padding
        self.conv1_out_w = w - 2

        # Pool1 output size
        self.pool1_out_h = self.conv1_out_h // 2
        self.pool1_out_w = self.conv1_out_w // 2

        # Conv2 output size
        self.conv2_out_h = self.pool1_out_h - 2  # valid padding
        self.conv2_out_w = self.pool1_out_w - 2

        # Pool2 output size
        self.pool2_out_h = self.conv2_out_h // 2
        self.pool2_out_w = self.conv2_out_w // 2

        # Compute final flattened size after conv + pooling
        self.flattened_size = 16 * self.pool2_out_h * self.pool2_out_w

        # Fully connected layer
        self.W = np.random.randn(self.num_classes, self.flattened_size) * 0.01
        self.b = np.zeros((self.num_classes, 1))

        print(f"Network Architecture:")
        print(f"Input: {input_shape}")
        print(f"Conv1: {8}x{self.conv1_out_h}x{self.conv1_out_w}")
        print(f"Pool1: {8}x{self.pool1_out_h}x{self.pool1_out_w}")
        print(f"Conv2: {16}x{self.conv2_out_h}x{self.conv2_out_w}")
        print(f"Pool2: {16}x{self.pool2_out_h}x{self.pool2_out_w}")
        print(f"Flattened size: {self.flattened_size}")
        print(f"Output: {self.num_classes}")

        # For storing intermediate values during forward pass (needed for backprop)
        self.cache = {}

    def conv_forward(self, x, filters, stride=1):
        """
        Perform forward pass through convolutional layer.

        Args:
            x: Input of shape (H, W, C) or (H, W) for the first layer
            filters: Filters of shape (F, Kh, Kw, C)
            stride: Stride for convolution

        Returns:
            Output of shape (H', W', F)
        """
        # Add channel dimension for first layer if needed
        if x.ndim == 2:
            x = x[..., np.newaxis]  # More efficient than reshape

        h, w, c = x.shape
        num_filters, kh, kw, _ = filters.shape

        # Calculate output dimensions
        out_h = (h - kh) // stride + 1
        out_w = (w - kw) // stride + 1

        # Initialize output
        output = np.zeros((out_h, out_w, num_filters))

        # Vectorized implementation
        for f in range(num_filters):
            for i in range(out_h):
                for j in range(out_w):
                    h_start = i * stride
                    h_end = h_start + kh
                    w_start = j * stride
                    w_end = w_start + kw

                    # Extract patch and compute convolution
                    patch = x[h_start:h_end, w_start:w_end, :]
                    output[i, j, f] = np.sum(patch * filters[f])

        # Apply ReLU activation
        return np.maximum(0, output)

    def pool_forward(self, x, pool_size=2, stride=2):
        """
        Perform max pooling.

        Args:
            x: Input of shape (H, W, C)
            pool_size: Size of pooling window
            stride: Stride for pooling

        Returns:
            Output of shape (H/stride, W/stride, C)
        """
        h, w, c = x.shape

        # Output dimensions
        out_h = h // stride
        out_w = w // stride

        # Initialize output
        output = np.zeros((out_h, out_w, c))

        # Store pooling indices for backpropagation
        pool_indices = np.zeros((out_h, out_w, c, 2), dtype=int)

        # Perform max pooling
        for i in range(out_h):
            for j in range(out_w):
                for k in range(c):
                    h_start, w_start = i * stride, j * stride
                    h_end, w_end = h_start + pool_size, w_start + pool_size

                    # Extract the patch
                    patch = x[h_start:h_end, w_start:w_end, k]

                    # Find the max value and its indices
                    max_idx = np.unravel_index(np.argmax(patch), patch.shape)
                    output[i, j, k] = patch[max_idx]

                    # Store indices for backpropagation
                    pool_indices[i, j, k] = [h_start + max_idx[0], w_start + max_idx[1]]
                    #pool_indices[i, j, k, 0] = h_start + max_idx[0]
                    #pool_indices[i, j, k, 1] = w_start + max_idx[1]
        return output, pool_indices

    def softmax(self, z):
        """
        Apply softmax activation.

        Args:
            z: Input logits

        Returns:
            Probability distribution
        """
        exp_z = np.exp(z - np.max(z))  # Subtract max for numerical stability
        return exp_z / np.sum(exp_z)

    def forward(self, x):
        """
        Perform forward pass through the entire network.

        Args:
            x: Input image of shape (H, W) for grayscale

        Returns:
            Probability distribution over classes
        """
        # Store the input
        self.cache['input'] = x
        # First convolutional layer
        conv1_output = self.conv_forward(x, self.filters1)
        self.cache['conv1_output'] = conv1_output
        # First max pooling layer
        pool1_output, pool1_indices = self.pool_forward(conv1_output)
        self.cache['pool1_output'] = pool1_output
        self.cache['pool1_indices'] = pool1_indices

        # Second convolutional layer
        conv2_output = self.conv_forward(pool1_output, self.filters2)
        self.cache['conv2_output'] = conv2_output

        # Second max pooling layer
        pool2_output, pool2_indices = self.pool_forward(conv2_output)
        self.cache['pool2_output'] = pool2_output
        self.cache['pool2_indices'] = pool2_indices
        # Flatten output
        flattened = pool2_output.reshape(1, -1)
        self.cache['flattened'] = flattened

        # Fully connected layer
        fc_output = np.dot(self.W, flattened.T) + self.b
        self.cache['fc_output'] = fc_output

        # Softmax
        probs = self.softmax(fc_output.flatten())
        self.cache['probs'] = probs

        return probs

    def conv_backward(self, dout, cache, filters):
        """
        Backward pass for convolutional layer.

        Args:
            dout: Gradient from output
            cache: Cached values from forward pass
            filters: Filters used in forward pass

        Returns:
            Gradients for filters
        """
        x = cache['input']
        if len(x.shape) == 2:
            x = x.reshape(x.shape[0], x.shape[1], 1)

        h, w, c = x.shape
        num_filters, kh, kw, _ = filters.shape
        dfilters = np.zeros_like(filters)

        # Calculate dfilters
        for f in range(num_filters):
            for i in range(dout.shape[0]):
                for j in range(dout.shape[1]):
                    # Extract patch from input
                    patch = x[i:i+kh, j:j+kw, :]
                    # Update filter gradients
                    dfilters[f] += patch * dout[i, j, f]

        return dfilters

    def pool_backward(self, dout, cache, pool_indices):
        """
        Backward pass for max pooling layer.

        Args:
            dout: Gradient from output
            cache: Cached values from forward pass
            pool_indices: Indices of max values from forward pass

        Returns:
            Gradient for input to pooling layer
        """
        dpool = np.zeros_like(cache)

        # Unpool (distribute gradients to the max locations)
        for i in range(dout.shape[0]):
            for j in range(dout.shape[1]):
                for k in range(dout.shape[2]):
                    h_idx, w_idx = pool_indices[i, j, k]
                    dpool[h_idx, w_idx, k] = dout[i, j, k]

        return dpool
    def save_weights(self, filepath):
        """
        Save model weights to a file.

        Args:
            filepath: Path to save the weights (should end with .pkl or .npz)
        """
        weights = {
            'filters1': self.filters1,
            'filters2': self.filters2,
            'W': self.W,
            'b': self.b,
            'input_shape': self.input_shape,
            'num_classes': self.num_classes
        }

        if filepath.endswith('.pkl'):
            with open(filepath, 'wb') as f:
                pickle.dump(weights, f)
        elif filepath.endswith('.npz'):
            np.savez(filepath, **weights)
        else:
            raise ValueError("Filepath should end with .pkl or .npz")

        print(f"Weights saved successfully to {filepath}")

    def load_weights(self, filepath):
        """
        Load model weights from a file.

        Args:
            filepath: Path to load the weights from
        """
        if not os.path.exists(filepath):
            raise FileNotFoundError(f"Weights file not found at {filepath}")

        if filepath.endswith('.pkl'):
            with open(filepath, 'rb') as f:
                weights = pickle.load(f)
        elif filepath.endswith('.npz'):
            weights = np.load(filepath)
            weights = {key: weights[key] for key in weights.files}
        else:
            raise ValueError("Filepath should end with .pkl or .npz")

        # Verify architecture matches
        if tuple(weights['input_shape']) != self.input_shape:
            raise ValueError("Input shape doesn't match model architecture")
        if weights['num_classes'] != self.num_classes:
            raise ValueError("Number of classes doesn't match model architecture")

        # Load weights
        self.filters1 = weights['filters1']
        self.filters2 = weights['filters2']
        self.W = weights['W']
        self.b = weights['b']

        print(f"Weights loaded successfully from {filepath}")

    def get_weights_dict(self):
        """
        Return a dictionary of all trainable weights.

        Returns:
            Dictionary containing all weights and biases
        """
        return {
            'filters1': self.filters1,
            'filters2': self.filters2,
            'W': self.W,
            'b': self.b
        }

    def set_weights(self, weights_dict):
        """
        Set weights from a dictionary.

        Args:
            weights_dict: Dictionary containing weights to load
        """
        self.filters1 = weights_dict.get('filters1', self.filters1)
        self.filters2 = weights_dict.get('filters2', self.filters2)
        self.W = weights_dict.get('W', self.W)
        self.b = weights_dict.get('b', self.b)
    def backward(self, y_true):
        """
        Perform backpropagation and update weights.

        Args:
            y_true: True class index
        """
        # Initialize gradients
        dW = np.zeros_like(self.W)
        db = np.zeros_like(self.b)
        dfilters1 = np.zeros_like(self.filters1)
        dfilters2 = np.zeros_like(self.filters2)

        # Retrieve cached values
        probs = self.cache['probs']
        fc_output = self.cache['fc_output']
        flattened = self.cache['flattened']
        pool2_output = self.cache['pool2_output']
        pool2_indices = self.cache['pool2_indices']
        conv2_output = self.cache['conv2_output']
        pool1_output = self.cache['pool1_output']
        pool1_indices = self.cache['pool1_indices']
        conv1_output = self.cache['conv1_output']
        input_img = self.cache['input']

        # Softmax gradient - Cross-entropy loss
        dout = probs.copy()
        dout[y_true] -= 1

        # Gradient for fully connected layer
        dfc = dout.reshape(-1, 1)
        dW = np.dot(dfc, flattened)
        db = dfc

        # Gradient for flattened output
        dflattened = np.dot(self.W.T, dfc)
        dpool2 = dflattened.reshape(pool2_output.shape)

        # Gradient through second max pooling layer
        dconv2 = self.pool_backward(dpool2, conv2_output, pool2_indices)

        # Gradient through ReLU in second conv layer
        dconv2_relu = dconv2 * (conv2_output > 0)

        # Gradient for second conv filters
        self.cache['input'] = pool1_output  # Set input for conv_backward
        dfilters2 = self.conv_backward(dconv2_relu, self.cache, self.filters2)

        # Gradient to first pooling output
        dpool1 = np.zeros_like(pool1_output)
        for f in range(self.filters2.shape[0]):
            for i in range(dconv2_relu.shape[0]):
                for j in range(dconv2_relu.shape[1]):
                    patch = pool1_output[i:i+3, j:j+3, :]
                    for c in range(patch.shape[2]):
                        dpool1[i:i+3, j:j+3, c] += self.filters2[f, :, :, c] * dconv2_relu[i, j, f]

        # Gradient through first max pooling layer
        dconv1 = self.pool_backward(dpool1, conv1_output, pool1_indices)

        # Gradient through ReLU in first conv layer
        dconv1_relu = dconv1 * (conv1_output > 0)

        # Gradient for first conv filters
        self.cache['input'] = input_img  # Set input for conv_backward
        dfilters1 = self.conv_backward(dconv1_relu, self.cache, self.filters1)

        # Update weights and biases
        self.W -= self.lr * dW
        self.b -= self.lr * db
        self.filters1 -= self.lr * dfilters1
        self.filters2 -= self.lr * dfilters2

    def evaluate(self, X, y):
        """
        Evaluate the model on the given data.

        Args:
            X: Input data
            y: Ground truth labels

        Returns:
            Tuple of (predictions, loss, accuracy)
        """
        loss = 0
        correct = 0
        predictions = []

        for i in range(len(X)):
            probs = self.forward(X[i])
            predictions.append(np.argmax(probs))  # Store the predicted class
            loss += -np.log(probs[y[i]] + 1e-10)
            correct += int(np.argmax(probs) == y[i])

        return np.array(predictions), loss / len(X), correct / len(X)

    def visualize_filters(self, layer=1, figsize=(10, 5)):
        """
        Visualize the learned convolutional filters.

        Args:
            layer: Which convolutional layer to visualize (1 or 2)
            figsize: Size of the figure
        """
        if layer == 1:
            filters = self.filters1
            title = "First Convolutional Layer Filters"
        elif layer == 2:
            filters = self.filters2
            title = "Second Convolutional Layer Filters"
        else:
            raise ValueError("Layer must be 1 or 2")

        num_filters = filters.shape[0]
        filter_size = filters.shape[1]
        input_channels = filters.shape[3]

        # For first layer with grayscale input, we can show all filters in one row
        if layer == 1:
            plt.figure(figsize=figsize)
            for i in range(num_filters):
                plt.subplot(1, num_filters, i+1)
                plt.imshow(filters[i, :, :, 0], cmap='gray')
                plt.axis('off')
                plt.title(f'Filter {i+1}')
            plt.suptitle(title)
            plt.tight_layout()
            plt.show()

        # For second layer with multiple input channels, we show each filter's channels separately
        elif layer == 2:
            # Calculate grid size
            rows = num_filters
            cols = input_channels
            fig, axes = plt.subplots(rows, cols, figsize=(cols*2, rows*1.5))

            if num_filters == 1:
                axes = [axes]  # Handle case with single filter

            for i in range(num_filters):
                for j in range(input_channels):
                    if num_filters > 1:
                        ax = axes[i, j]
                    else:
                        ax = axes[j]

                    ax.imshow(filters[i, :, :, j], cmap='gray')
                    ax.axis('off')

                    # Only show filter number on first column
                    if j == 0:
                        ax.set_ylabel(f'Filter {i+1}', rotation=0, ha='right', va='center')

                    # Only show channel number on first row
                    if i == 0:
                        ax.set_title(f'Ch {j+1}')

            plt.suptitle(f"{title} (Each row shows one filter's channels)")
            plt.tight_layout()
            plt.show()

    def visualize_all_filters(self, figsize=(15, 8)):
        """
        Visualize filters from both convolutional layers.
        """
        plt.figure(figsize=figsize)

        # First layer filters
        plt.subplot(1, 2, 1)
        for i in range(self.filters1.shape[0]):
            plt.subplot(1, self.filters1.shape[0], i+1)
            plt.imshow(self.filters1[i, :, :, 0], cmap='gray')
            plt.axis('off')
            plt.title(f'L1 Filter {i+1}')
        plt.suptitle('First Layer Filters')

        # Second layer filters (show first filter's channels as example)
        plt.figure(figsize=figsize)
        for j in range(self.filters2.shape[3]):
            plt.subplot(1, self.filters2.shape[3], j+1)
            plt.imshow(self.filters2[0, :, :, j], cmap='gray')
            plt.axis('off')
            plt.title(f'L2 Filter 1 Ch{j+1}')
        plt.suptitle('Second Layer Filter 1 Channels')

        plt.tight_layout()
        plt.show()

    def train(self, X, y, X_test, y_test, epochs=5, batch_size=32):
        """
        Train the CNN model.

        Args:
            X: Training data
            y: Training labels
            X_test: Test data
            y_test: Test labels
            epochs: Number of epochs
            batch_size: Batch size
        """
        for epoch in range(epochs):
            indices = np.arange(len(X))
            np.random.shuffle(indices)
            X, y = X[indices], y[indices]
            loss = 0
            acc = 0

            for start in range(0, len(X), batch_size):
                end = min(start + batch_size, len(X))
                batch_X, batch_y = X[start:end], y[start:end]

                batch_loss = 0
                batch_acc = 0

                for i in range(len(batch_X)):
                    x = batch_X[i]
                    y_true = batch_y[i]

                    # Forward pass
                    probs = self.forward(x)
                    #print("Forward pass done")

                    # Compute loss
                    batch_loss += -np.log(probs[y_true] + 1e-10)
                    batch_acc += int(np.argmax(probs) == y_true)
                    #print("Loss and accuracy computed")

                    # Backward pass
                    self.backward(y_true)
                    #print("Backward pass done")

                loss += batch_loss
                acc += batch_acc

                # Print progress
                if (start // batch_size) % 1 == 0:
                      print(f"Epoch {epoch+1}, Batch {start//batch_size + 1}/{len(X)//batch_size + 1}, " +
                      f"Loss: {float(batch_loss)/len(batch_X):.4f}, Acc: {float(batch_acc)/len(batch_X):.4f}")

            # Evaluate on test set
            _, _, epoch_test_acc = self.evaluate(X_test, y_test)
            print(f"Epoch {epoch+1} complete - Loss: {float(loss)/len(X):.4f}, Train Acc: {float(acc)/len(X):.4f}, Test Acc: {epoch_test_acc:.4f}")

    def plot_confusion_matrix(self, y_true, y_pred, class_names):
        cm = confusion_matrix(y_true, y_pred)
        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                    xticklabels=class_names, yticklabels=class_names)
        plt.xlabel("Predicted")
        plt.ylabel("True")
        plt.title("Confusion Matrix")
        plt.show()


In [None]:
epochs = 5
learning_rate = 0.01
batch_size = 32
cnn = DeepCNN(input_shape=(100, 100), num_classes=len(class_names), learning_rate=learning_rate)
cnn.train(X_train, y_train, X_test, y_test, epochs=epochs, batch_size=batch_size)
# Visualize filters from the first convolutional layer
cnn.visualize_filters(layer=1, figsize=(10, 5))

# Visualize filters from the second convolutional layer
cnn.visualize_filters(layer=2, figsize=(10, 5))
weights_filepath = rf"C:\Users\kingm\Downloads\CNN_weights\CNN_weights_epochs{epochs}_lr{learning_rate}_bs{batch_size}.npz"

cnn.save_weights(weights_filepath)
preds ,loss , test_acc = cnn.evaluate(X_test, y_test)

cnn.plot_confusion_matrix(y_test, preds, class_names)