<a href="https://colab.research.google.com/github/jared-ni/cs2241-final/blob/main/4_25_2025_DogCat_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import torch
import matplotlib.pyplot as plt
from scipy.ndimage import gaussian_filter
import os

In [2]:
    from google.colab import drive
    drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# --- 1. Setup and Imports ---
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
import kagglehub
import os
import pathlib
import matplotlib.pyplot as plt
import time

print("TensorFlow Version:", tf.__version__)

# --- Check for GPU ---
gpu_devices = tf.config.list_physical_devices('GPU')
if gpu_devices:
    print(f"Found GPU: {gpu_devices[0].name}. Training will be accelerated.")
    tf.config.experimental.set_memory_growth(gpu_devices[0], True) # Prevent TensorFlow from allocating all GPU memory at once
else:
    print("No GPU found. Training will run on CPU (might be slow).")
    print("Runtime > Change runtime type > Hardware accelerator > GPU")

TensorFlow Version: 2.18.0
Found GPU: /physical_device:GPU:0. Training will be accelerated.


In [4]:
import kagglehub
import os
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
import matplotlib.pyplot as plt
import pathlib

# Download latest version
data_root = kagglehub.dataset_download("bhavikjikadara/dog-and-cat-classification-dataset")

data_dir = os.path.join(data_root, 'PetImages')

print("Path to dataset files:", data_dir)

Path to dataset files: /kaggle/input/dog-and-cat-classification-dataset/PetImages


In [5]:
from tensorflow import keras
from tensorflow.keras import Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [6]:
import os
import shutil
import random
from tqdm import tqdm

output_base_dir = "/kaggle/split_data"  # You can change this to your desired location

# Output folders
output_dirs = {
    'train': os.path.join(output_base_dir, 'train'),
    'val': os.path.join(output_base_dir, 'val'),
    'test': os.path.join(output_base_dir, 'test')
}

# Classes (folder names in PetImages/)
classes = ['Cat', 'Dog']
split_ratio = [0.8, 0.1, 0.1]

# Create output folders
for split in output_dirs:
    for cls in classes:
        os.makedirs(os.path.join(output_dirs[split], cls), exist_ok=True)

print("Path to output files:", output_dirs)
# Split images
for cls in classes:
    src_folder = os.path.join(data_dir, cls)
    all_files = [f for f in os.listdir(src_folder) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    random.shuffle(all_files)

    train_cutoff = int(len(all_files) * split_ratio[0])
    val_cutoff = int(len(all_files) * (split_ratio[0] + split_ratio[1]))

    splits = {
        'train': all_files[:train_cutoff],
        'val': all_files[train_cutoff:val_cutoff],
        'test': all_files[val_cutoff:]
    }

    for split, files in splits.items():
        for f in tqdm(files, desc=f"Copying {cls} to {split}"):
            src = os.path.join(src_folder, f)
            dst = os.path.join(output_dirs[split], cls, f)
            try:
                shutil.copyfile(src, dst)
            except Exception as e:
                pass  # Skip corrupted files


Path to output files: {'train': '/kaggle/split_data/train', 'val': '/kaggle/split_data/val', 'test': '/kaggle/split_data/test'}


Copying Cat to train: 100%|██████████| 9999/9999 [00:59<00:00, 168.14it/s]
Copying Cat to val: 100%|██████████| 1250/1250 [00:07<00:00, 165.82it/s]
Copying Cat to test: 100%|██████████| 1250/1250 [00:07<00:00, 162.57it/s]
Copying Dog to train: 100%|██████████| 9999/9999 [01:02<00:00, 160.31it/s]
Copying Dog to val: 100%|██████████| 1250/1250 [00:06<00:00, 180.02it/s]
Copying Dog to test: 100%|██████████| 1250/1250 [00:07<00:00, 163.08it/s]


In [7]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

input_size = (128, 128)
batch_size = 128

augmentor = ImageDataGenerator(samplewise_center=True,
                               samplewise_std_normalization=True)

train_generator = augmentor.flow_from_directory(
    '/kaggle/split_data/train',
    target_size=input_size,
    batch_size=batch_size,
    shuffle=True
)

val_generator = augmentor.flow_from_directory(
    '/kaggle/split_data/val',
    target_size=input_size,
    batch_size=batch_size,
    shuffle=False
)

test_generator = augmentor.flow_from_directory(
    '/kaggle/split_data/test',
    target_size=input_size,
    batch_size=batch_size,
    shuffle=False
)

Found 19998 images belonging to 2 classes.
Found 2500 images belonging to 2 classes.
Found 2500 images belonging to 2 classes.


In [28]:
num_classes = len(train_generator.class_indices)
input_shape = (128,128,3)
print(num_classes)

2


In [29]:
model = Sequential([
    Input(shape = input_shape),
    Conv2D (32, kernel_size = (3,3), activation= "relu"),
    Conv2D (32, kernel_size = (3,3), activation= "relu"),
    MaxPooling2D(pool_size=(2,2)),
    Dropout(0.3),


    Conv2D (64, kernel_size = (3,3), activation= "relu"),
    Conv2D (64, kernel_size = (3,3), activation= "relu"),
    MaxPooling2D(pool_size=(2,2)),
    Dropout(0.3),


    Conv2D (128, kernel_size = (3,3), activation= "relu"),
    Conv2D (128, kernel_size = (3,3), activation= "relu"),
    MaxPooling2D(pool_size=(2,2)),
    Dropout(0.3),

    Conv2D (256, kernel_size = (3,3), activation= "relu"),
    Conv2D (256, kernel_size = (3,3), activation= "relu"),
    MaxPooling2D(pool_size=(2,2), name='final_pool_layer'),
    Dropout(0.3),

    Flatten(),
    Dense(256, activation = 'relu'),
    Dense(128 , activation = 'relu'),
    Dropout(0.3),
    Dense(num_classes, activation='softmax')
], name="original_sequential_model")

model.build(input_shape=(None,) + input_shape)

model.compile(optimizer='adam',
             loss= 'categorical_crossentropy',
             metrics =["accuracy"])
# model.summary()

print("Model Layers:")
for i, layer in enumerate(model.layers):
    print(i, layer.name, getattr(layer, 'output_shape', 'N/A'))

Model Layers:
0 conv2d_56 N/A
1 conv2d_57 N/A
2 max_pooling2d_23 N/A
3 dropout_35 N/A
4 conv2d_58 N/A
5 conv2d_59 N/A
6 max_pooling2d_24 N/A
7 dropout_36 N/A
8 conv2d_60 N/A
9 conv2d_61 N/A
10 max_pooling2d_25 N/A
11 dropout_37 N/A
12 conv2d_62 N/A
13 conv2d_63 N/A
14 final_pool_layer N/A
15 dropout_38 N/A
16 flatten_7 N/A
17 dense_21 N/A
18 dense_22 N/A
19 dropout_39 N/A
20 dense_23 N/A


In [30]:
model.fit(train_generator,
          epochs=15,
          validation_data=val_generator)

Epoch 1/15
[1m  6/157[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m25s[0m 168ms/step - accuracy: 0.4882 - loss: 0.7747



[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 252ms/step - accuracy: 0.5164 - loss: 0.6986

  self._warn_if_super_not_called()


[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m67s[0m 316ms/step - accuracy: 0.5167 - loss: 0.6984 - val_accuracy: 0.6452 - val_loss: 0.6299
Epoch 2/15
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 230ms/step - accuracy: 0.6764 - loss: 0.5975 - val_accuracy: 0.6960 - val_loss: 0.5981
Epoch 3/15
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 231ms/step - accuracy: 0.7258 - loss: 0.5476 - val_accuracy: 0.7588 - val_loss: 0.5115
Epoch 4/15
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 230ms/step - accuracy: 0.7601 - loss: 0.4934 - val_accuracy: 0.7696 - val_loss: 0.4738
Epoch 5/15
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 227ms/step - accuracy: 0.7958 - loss: 0.4378 - val_accuracy: 0.8096 - val_loss: 0.4096
Epoch 6/15
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 227ms/step - accuracy: 0.8291 -

<keras.src.callbacks.history.History at 0x7f287069d210>

In [41]:
try:
    # Try accessing the model's input tensor if build worked correctly
    # (This is what *should* work)
    model_input_tensor = model.input
    if model_input_tensor is None: # Check if it's None
         raise AttributeError # Force fallback if None
    print("Using model.input")
except AttributeError:
    try:
        # Fallback: If Input() is the first layer *in the list*:
        model_input_tensor = model.layers[0].output
        print("Using model.layers[0].output")
         # This might actually get the output of the *first Conv layer* if Input isn't counted
         # Need to be careful. Let's try model.inputs list
        if isinstance(model.inputs, list) and len(model.inputs) > 0:
             model_input_tensor = model.inputs[0]
             print("Using model.inputs[0]")
        else:
             # If the above fail, maybe the input layer NAME works?
             model_input_tensor = model.get_layer(index=0).output # Try getting layer 0 output by index
             print("Using model.get_layer(index=0).output")

    except Exception as e:
        print(f"Error getting input tensor: {e}")
        print("Could not reliably determine the input tensor. Consider defining Input() outside the Sequential list.")
        # Handle error appropriately - maybe exit or raise
        raise ValueError("Cannot find the input tensor for the feature extractor.") from e


# Identify the target layer by name (more robust)
target_layer_name = 'final_pool_layer'
target_output = model.get_layer(target_layer_name).output

# Create a new Model using the input tensor we found
feature_extractor_model = Model(inputs=model_input_tensor, outputs=target_output, name="feature_extractor")

print("\n--- Feature Extractor Model Summary ---")
# Need to build the extractor model too before summary sometimes
feature_extractor_model.build(input_shape=(None,) + input_shape)
feature_extractor_model.summary()

# --- How to Use It (remains the same) ---

def load_and_preprocess_image(image_path, target_shape):
    img = tf.keras.preprocessing.image.load_img(image_path, target_size=target_shape[:2])
    img_array = tf.keras.preprocessing.image.img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)
    img_array = img_array / 255.0
    return img_array

image_path = '/kaggle/split_data/test/Cat/100.jpg'
try:
    # Create a dummy image if needed for testing:
    dummy_data = np.random.rand(input_shape[0], input_shape[1], input_shape[2]) * 255
    tf.keras.preprocessing.image.save_img(image_path, dummy_data)
except Exception as e:
    print(f"Could not create dummy image: {e}. Please provide a real image path.")
    exit()


preprocessed_image = load_and_preprocess_image(image_path, input_shape)

# Use the feature_extractor_model to get the feature map
feature_map = feature_extractor_model.predict(preprocessed_image)

print(f"\nShape of extracted feature map: {feature_map.shape}")

# Clean up dummy image if created
import os
if os.path.exists(image_path):
    try:
        os.remove(image_path)
    except OSError as e:
        print(f"Error removing dummy image: {e}")
"""
0 conv2d_56 N/A
1 conv2d_57 N/A
2 max_pooling2d_23 N/A
3 dropout_35 N/A
4 conv2d_58 N/A
5 conv2d_59 N/A
6 max_pooling2d_24 N/A
7 dropout_36 N/A
8 conv2d_60 N/A
9 conv2d_61 N/A
10 max_pooling2d_25 N/A
11 dropout_37 N/A
12 conv2d_62 N/A
13 conv2d_63 N/A
14 final_pool_layer N/A
"""

Using model.layers[0].output
Using model.inputs[0]

--- Feature Extractor Model Summary ---


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 386ms/step

Shape of extracted feature map: (1, 4, 4, 256)


'\n0 conv2d_56 N/A\n1 conv2d_57 N/A\n2 max_pooling2d_23 N/A\n3 dropout_35 N/A\n4 conv2d_58 N/A\n5 conv2d_59 N/A\n6 max_pooling2d_24 N/A\n7 dropout_36 N/A\n8 conv2d_60 N/A\n9 conv2d_61 N/A\n10 max_pooling2d_25 N/A\n11 dropout_37 N/A\n12 conv2d_62 N/A\n13 conv2d_63 N/A\n14 final_pool_layer N/A\n'

In [45]:
print(f"\nExtracted feature map: {feature_map}")

import numpy as np
import os # For checking file existence
import sys

log_filename = "inference_log.txt"
print(f"Attempting to write feature map details to '{log_filename}'...")


# Make sure feature_map exists before proceeding
if 'feature_map' not in locals() and 'feature_map' not in globals():
    print("Error: 'feature_map' variable not found. Please ensure it was generated.")
else:
    # --- WARNING ---
    total_elements = np.prod(feature_map.shape)
    print(f"[WARNING] Feature map has {total_elements} elements.")
    if total_elements > 10000: # Arbitrary threshold for warning
        print(f"[WARNING] Writing all elements might create a very large log file (> {total_elements * 5 // 1024} KB approx) and could be slow.")
    # -------------

    try:
        with open(log_filename, 'w') as log_file: # Use 'w' to overwrite each time for full logs
            log_file.write("--- Full Extracted Feature Map ---\n")
            log_file.write(f"Timestamp: {np.datetime64('now')}\n")
            log_file.write(f"Shape: {feature_map.shape}\n")
            log_file.write(f"Data Type: {feature_map.dtype}\n")
            log_file.write(f"Min value: {np.min(feature_map)}\n")
            log_file.write(f"Max value: {np.max(feature_map)}\n")
            log_file.write(f"Mean value: {np.mean(feature_map)}\n")
            log_file.write(f"Std Dev: {np.std(feature_map)}\n")
            log_file.write("\nFeature Map Content (Full):\n")

            # Option 2 (Modified): Write the FULL array content
            # Use threshold=sys.maxsize or np.inf to disable summarization
            feature_map_string = np.array2string(
                feature_map,
                precision=6,          # Increase precision if needed
                suppress_small=False, # Show small numbers accurately
                max_line_width=150,   # Wider lines might help for large arrays
                threshold=sys.maxsize # Force printing the entire array
                # threshold=np.inf   # Alternative way to force printing all
            )
            log_file.write(feature_map_string)

            log_file.write("\n--- End Full Feature Map ---\n")

        print(f"Successfully wrote FULL feature map to '{log_filename}'")

    except Exception as e:
        print(f"Error writing to log file '{log_filename}': {e}")


Extracted feature map: [[[[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]]]
Attempting to write feature map details to 'inference_log.txt'...
Successfully wrote FULL feature map to 'inference_log.txt'


In [37]:
# --- 1. Create the "Classifier Head" Model ---

# Find the index of the layer RIGHT AFTER our feature extraction layer ('final_pool_layer')
# This is where the classifier head starts.
feature_layer_name = 'final_pool_layer'
try:
    # Find the index of the layer by name
    feature_layer_index = [i for i, layer in enumerate(model.layers) if layer.name == feature_layer_name][0]
    classifier_start_index = feature_layer_index + 1
    print(f"Feature extraction layer ('{feature_layer_name}') index: {feature_layer_index}")
    print(f"Classifier head starts at layer index: {classifier_start_index}")
except IndexError:
    print(f"Error: Could not find layer named '{feature_layer_name}' in the original model.")
    # Handle error appropriately
    exit()
except Exception as e:
    print(f"An error occurred finding the layer index: {e}")
    exit()


# Define an Input layer matching the shape of the feature map (excluding batch dimension)
feature_map_shape = feature_map.shape[1:] # e.g., (8, 8, 256)
classifier_input = Input(shape=feature_map_shape, name="feature_map_input")

# Sequentially connect the *remaining layers* from the original model
# Start from the layer AFTER the feature extraction layer
x = classifier_input
for i in range(classifier_start_index, len(model.layers)):
    # Apply the original layer (which has the trained weights) to the current tensor 'x'
    original_layer = model.layers[i]
    x = original_layer(x, training=False)
    print(f"Added layer {i}: {original_layer.name}")

# Create the new model
classifier_model = Model(inputs=classifier_input, outputs=x, name="classifier_head")

print("\n--- Classifier Head Model Summary ---")
classifier_model.summary()

"""
15 dropout_33 N/A
16 flatten_6 N/A
17 dense_18 N/A
18 dense_19 N/A
19 dropout_34 N/A
20 dense_20 N/A
"""

Feature extraction layer ('final_pool_layer') index: 14
Classifier head starts at layer index: 15
Added layer 15: dropout_38
Added layer 16: flatten_7
Added layer 17: dense_21
Added layer 18: dense_22
Added layer 19: dropout_39
Added layer 20: dense_23

--- Classifier Head Model Summary ---


'\n15 dropout_33 N/A\n16 flatten_6 N/A\n17 dense_18 N/A\n18 dense_19 N/A\n19 dropout_34 N/A\n20 dense_20 N/A\n'

In [40]:
# --- 2. Use the Classifier Head for Inference ---

# IMPORTANT NOTE: In a real application, you would:
#    a) Get feature_map from feature_extractor_model.predict()
#    b) COMPRESS feature_map and store it.
#    c) Later, LOAD the compressed data and DECOMPRESS it back into a numpy array (let's call it 'loaded_feature_map').
#    d) Pass 'loaded_feature_map' to classifier_model.predict()

# For this *test*, we'll use the 'feature_map' directly without the compress/decompress step
# to verify the pipeline works.
final_predictions = classifier_model.predict(feature_map)

print(f"\nShape of final predictions: {final_predictions.shape}") # Should be (1, num_classes)
print(f"Raw predictions (probabilities):\n{final_predictions}")

# Get the predicted class index
predicted_class_index = np.argmax(final_predictions, axis=1)[0]
print(f"\nPredicted class index: {predicted_class_index}")

# If you have a list or dictionary mapping indices to class names:
# class_names = ['cat', 'dog', ...] # Example
# predicted_class_name = class_names[predicted_class_index]
# print(f"Predicted class name: {predicted_class_name}")

# --- Optional: Compare with original model's prediction ---
# This helps verify that splitting the model didn't break anything (before compression)
print("\n--- Verifying with original model ---")

# (Need to load the image again if not available)
image_path = '/kaggle/split_data/test/Cat/11530.jpg'

if not os.path.exists(image_path):
     # Create a dummy image if needed for testing:
    try:
        print("Creating dummy image for verification...")
        dummy_data = np.random.rand(input_shape[0], input_shape[1], input_shape[2]) * 255
        tf.keras.preprocessing.image.save_img(image_path, dummy_data)
    except Exception as e:
        print(f"Could not create dummy image: {e}.")
        # Cannot verify without an image
else:
    def load_and_preprocess_image(image_path, target_shape):
        img = tf.keras.preprocessing.image.load_img(image_path, target_size=target_shape[:2])
        img_array = tf.keras.preprocessing.image.img_to_array(img)
        img_array = np.expand_dims(img_array, axis=0)
        img_array = img_array / 255.0
        return img_array

    preprocessed_image_for_full_model = load_and_preprocess_image(image_path, input_shape)
    original_model_predictions = model.predict(preprocessed_image_for_full_model)
    original_predicted_class_index = np.argmax(original_model_predictions, axis=1)[0]

    print(f"Original model predicted index: {original_predicted_class_index}")
    print(f"Feature map pipeline predicted index: {predicted_class_index}")

    # Check if predictions are numerically close (they should be almost identical)
    if np.allclose(final_predictions, original_model_predictions, atol=1e-6):
        print("SUCCESS: Predictions from split model match original model (before compression).")
    else:
        print("WARNING: Predictions from split model DO NOT match original model.")
        print("Original:", original_model_predictions)
        print("Split:", final_predictions)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step

Shape of final predictions: (1, 2)
Raw predictions (probabilities):
[[0.75012386 0.24987611]]

Predicted class index: 0

--- Verifying with original model ---
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
Original model predicted index: 0
Feature map pipeline predicted index: 0
Original: [[0.7234268  0.27657315]]
Split: [[0.75012386 0.24987611]]
