<a href="https://colab.research.google.com/github/rodrigobivarazevedo/machine_learning/blob/main/cnn_google_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


In [2]:
import tensorflow as tf
import timeit

device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  print(
      '\n\nThis error most likely means that this notebook is not '
      'configured to use a GPU.  Change this in Notebook Settings via the '
      'command palette (cmd/ctrl-shift-P) or the Edit menu.\n\n')
  raise SystemError('GPU device not found')

def cpu():
  with tf.device('/cpu:0'):
    random_image_cpu = tf.random.normal((100, 100, 100, 3))
    net_cpu = tf.keras.layers.Conv2D(32, 7)(random_image_cpu)
    return tf.math.reduce_sum(net_cpu)

def gpu():
  with tf.device('/device:GPU:0'):
    random_image_gpu = tf.random.normal((100, 100, 100, 3))
    net_gpu = tf.keras.layers.Conv2D(32, 7)(random_image_gpu)
    return tf.math.reduce_sum(net_gpu)

# We run each op once to warm up; see: https://stackoverflow.com/a/45067900
cpu()
gpu()

# Run the op several times.
print('Time (s) to convolve 32x7x7x3 filter over random 100x100x100x3 images '
      '(batch x height x width x channel). Sum of ten runs.')
print('CPU (s):')
cpu_time = timeit.timeit('cpu()', number=10, setup="from __main__ import cpu")
print(cpu_time)
print('GPU (s):')
gpu_time = timeit.timeit('gpu()', number=10, setup="from __main__ import gpu")
print(gpu_time)
print('GPU speedup over CPU: {}x'.format(int(cpu_time/gpu_time)))

Time (s) to convolve 32x7x7x3 filter over random 100x100x100x3 images (batch x height x width x channel). Sum of ten runs.
CPU (s):
4.140263399999981
GPU (s):
0.11985618400001385
GPU speedup over CPU: 34x


In [3]:
# Are we using a GPU? If not: go to Runtime -> Change runtime type -> Hardware accelerator: GPU
!nvidia-smi

Sun Mar 30 11:44:06 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   34C    P0             25W /   70W |    1206MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

# Helper Funtions

#### Load Data

In [5]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow import keras
from tensorflow.keras.layers import RandomFlip, RandomRotation, RandomZoom, RandomHeight, RandomWidth
import numpy as np

def data_augmentation_object():
    return keras.Sequential([
        RandomFlip("horizontal"),
        RandomRotation(0.2),
        RandomZoom(0.2),
        RandomHeight(0.2),
        RandomWidth(0.2),
    ], name="data_augmentation")

def print_class_distribution(dataset, dataset_name):
    class_counts = dataset.classes
    class_indices = dataset.class_indices
    unique_classes, counts = np.unique(class_counts, return_counts=True)
    print(f"Class distribution for {dataset_name}:")
    for class_idx, count in zip(unique_classes, counts):
        print(f"  Class {class_idx}: {count} samples")
    print(f"  Class indices: {class_indices}")
    print(f"  Class distribution (as percentages):")
    for class_idx, count in zip(unique_classes, counts):
        percentage = (count / len(class_counts)) * 100
        print(f"    Class {class_idx}: {percentage:.2f}%")
    print("-")


def data_preprocessing(train_dataset_path="dataset_dogs_vs_cats/train",
                       test_dataset_path="dataset_dogs_vs_cats/test",
                       img_size=(128, 128),
                       batch_size=32,
                       class_mode="binary"):

    train_datagen = ImageDataGenerator(
        rescale=1./255,
        validation_split=0.2
    )

    train_data = train_datagen.flow_from_directory(
        train_dataset_path,
        target_size=img_size,
        batch_size=batch_size,
        class_mode=class_mode,
        subset="training"
    )

    val_data = train_datagen.flow_from_directory(
        train_dataset_path,
        target_size=img_size,
        batch_size=batch_size,
        class_mode=class_mode,
        subset="validation"
    )

    test_datagen = ImageDataGenerator(rescale=1./255)
    test_data = test_datagen.flow_from_directory(
        test_dataset_path,
        target_size=img_size,
        batch_size=batch_size,
        class_mode=class_mode,
        shuffle=False
    )

    print_class_distribution(train_data, "Training Set")
    print_class_distribution(val_data, "Validation Set")
    print_class_distribution(test_data, "Test Set")

    return train_data, val_data, test_data, data_augmentation_object()


#### Prepare Image and Predict

In [6]:
import matplotlib.pyplot as plt
import tensorflow as tf
import os

# Create a function to import an image and resize it to be able to be used with our model
def load_and_prep_image(filename, img_shape=224, scale=False):
  """
  Reads in an image from filename, turns it into a tensor and reshapes into
  (img_shape, img_shape, 3)
  """
  # Read in the image
  img = tf.io.read_file(filename)
  # Decode it into a tensor
  img_array = tf.image.decode_jpeg(img)
  # Resize the image
  img_array = tf.image.resize(img, [img_shape, img_shape])
  # Rescale the image (get all values between 0 and 1)
  if scale:
    return img_array/255.
  else:
    return img_array, img

# Function to predict a single image
def predict_image(img_path, model, img_size=(128, 128)):

    if not os.path.exists(img_path):
        print("Test image not found!")
        return

    img_array, img = load_and_prep_image(img_path, img_shape=img_size[0], scale=True) # load and preprocess the image

    # Expand the dimensions of the image array to match the input shape of the model
    img_expanded = tf.expand_dims(img_array, axis=0) # expand image dimensions (224, 224, 2) -> (1, 224, 224, 3) if img_shape = 224 and binary classification
    #img_expanded = np.expand_dims(img_array, axis=0) # expand image dimensions (224, 224, 2) -> (1, 224, 224, 3) if img_shape = 224 and binary classification

    # Make a prediction
    prediction = model.predict(img_expanded)
    result = "Dog" if prediction[0][0] > 0.5 else "Cat"
    confidence = prediction[0][0] if prediction[0][0] > 0.5 else 1 - prediction[0][0]

    print(f"Prediction: {result} with {confidence:.2%} confidence")

    plt.figure()
    plt.imshow(img)
    plt.title(f"{result}, prob: {confidence:.2%}")
    plt.axis(False);

    return result, confidence


def pred_and_plot(model, image, class_names):
  pred_probs = model.predict(tf.expand_dims(image, axis=0))
  pred_class = class_names[tf.argmax(pred_probs[0])]

  plt.figure()
  plt.imshow(image)
  plt.title(f"{pred_class}, prob: {tf.reduce_max(pred_probs):.2f}")
  plt.axis(False);



#### Model Training Function

In [7]:
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

def train_model(model, train_data, val_data, epochs=10):
    """
    Trains the model using the provided training and validation data.
    """

    # Define callbacks for better training
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=5,
        restore_best_weights=True
    )

    learning_rate_reduction = ReduceLROnPlateau(
        monitor='val_loss',
        patience=2,
        factor=0.5,
        min_lr=1e-6
    )

    # Train model
    history = model.fit(
        train_data,
        validation_data=val_data,
        epochs=epochs,
        callbacks=[early_stopping, learning_rate_reduction]
    )

    return history

#### Evaluate Model Functions

In [8]:
import matplotlib.pyplot as plt

def plot_train_validation_history(history):
    plt.figure(figsize=(16, 6))

    # Accuracy plot
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model Accuracy', fontsize=14)
    plt.ylabel('Accuracy', fontsize=12)
    plt.xlabel('Epoch', fontsize=12)
    plt.legend(['Train', 'Validation'], loc='lower right')
    plt.grid(True, linestyle='--', alpha=0.6)

    # Loss plot
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model Loss', fontsize=14)
    plt.ylabel('Loss', fontsize=12)
    plt.xlabel('Epoch', fontsize=12)
    plt.legend(['Train', 'Validation'], loc='upper right')
    plt.grid(True, linestyle='--', alpha=0.6)

    plt.tight_layout()
    plt.show()



from sklearn.metrics import confusion_matrix, classification_report, precision_score, recall_score
import seaborn as sns


def get_model_metrics(model, test_data):

    # Get true labels and predictions
    y_true = test_data.classes  # True labels from test set
    y_pred_probs = model.predict(test_data)  # Probabilities

    # check if model is binary or categorical
    if len(model.output_shape) == 2:
        y_pred = (y_pred_probs > 0.5).astype(int).flatten()  # Convert probabilities to binary labels

    else:
        y_pred = np.argmax(y_pred_probs, axis=1)  # Convert probabilities to class labels

    # Compute confusion matrix
    cm = confusion_matrix(y_true, y_pred)

    # Print classification report (includes precision, recall, F1-score)
    print("Classification Report:\n", classification_report(y_true, y_pred, target_names=['Cat', 'Dog']))

    # Compute precision and recall
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")


    # Plot confusion matrix
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Cat', 'Dog'], yticklabels=['Cat', 'Dog'])
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()


#### Efficient Net Model

1. Real-time Augmentation During Training
On-the-fly augmentation: Each time an image is passed through the model during training, the augmentation will be applied. This means your model sees different versions of the same image during each epoch, helping it generalize better.
Efficiency: This method allows augmentation to be part of the model graph, enabling efficient GPU computation and possibly reducing memory usage since augmented images aren't saved on disk but created in memory during training.
2. Applied Only to the Training Data
No augmentation for validation and test data: The augmentation layer will be applied only during training, so the validation and test datasets remain unchanged (no augmentation). This ensures that model evaluation is based on the original data, providing an accurate assessment of its performance.

In [9]:
from tensorflow import keras
from tensorflow.keras import layers

# Create a function to build a model
def create_EfficientNet_model(data_augmentation,
                              input_shape=(224, 224, 3),
                              base_model=tf.keras.applications.EfficientNetB0(include_top=False),
                              num_classes=2):
  # Fine-tune?
  base_model.trainable = False

  # Create input layer
  inputs = layers.Input(shape=input_shape, name="input_layer")

  # Add in data augmentation Sequential model as a layer
  x = data_augmentation(inputs)

  # Give base_model inputs (after augmentation) and don't train it
  x = base_model(x, training=False)

  # Pool output features of base model
  x = layers.GlobalAveragePooling2D(name="global_average_pooling_layer")(x)

  if num_classes > 2:
    # Put a dense layer on as the output for multi-class classification
    outputs = layers.Dense(num_classes, activation="softmax", name="output_layer")(x)

    # Add a dropout layer for regularization and to prevent overfitting
    #x = layers.Dropout(0.2)(x)

    # Make a model with inputs and outputs
    model = keras.Model(inputs, outputs)

    # Compile the model for multi-class classification
    model.compile(loss="categorical_crossentropy",
                    optimizer=tf.keras.optimizers.Adam(),
                    metrics=["accuracy", tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])

  else:
    # Put a dense layer on as the output for binary classification
    outputs = layers.Dense(num_classes, activation="sigmoid", name="output_layer")(x)

    #x = layers.Dropout(0.2)(x)

    # Make a model with inputs and outputs
    model = keras.Model(inputs, outputs)

    # Compile the model for binary classification
    model.compile(loss="binary_crossentropy",
                 optimizer=tf.keras.optimizers.Adam(),
                 metrics=["accuracy", tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])


  return model

Downloading data from https://storage.googleapis.com/keras-applications/efficientnetb0_notop.h5
[1m16705208/16705208[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


#### Binary CNN model

In [10]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout


def create_binary_cnn_model(data_augmentation, input_shape=(128, 128, 3)):

    # Define CNN model
    model = Sequential([
        # Apply Augmentation as a Preprocessing Layer in the Model
        data_augmentation,

        # First convolutional block
        Conv2D(32, (3,3), activation='relu', input_shape=input_shape),
        MaxPooling2D(pool_size=(2,2)),

        # Second convolutional block
        Conv2D(64, (3,3), activation='relu'),
        MaxPooling2D(pool_size=(2,2)),

        # Third convolutional block
        Conv2D(128, (3,3), activation='relu'),
        MaxPooling2D(pool_size=(2,2)),

        # Fully connected layers
        Flatten(),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])

    # Compile the model for binary classification
    model.compile(loss="binary_crossentropy",
                 optimizer='adam',
                 metrics=["accuracy", tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])

    model.summary()

    return model

# Model 2 - larger images

# Model 3 - MobileNet ransfer learning

MobileNetV2 is a convolutional neural network architecture designed for mobile and edge devices. It was introduced by Google researchers in 2018 as an improvement over the original MobileNet architecture.
Key characteristics of MobileNetV2:

Efficiency: It's specifically designed to be lightweight and computationally efficient, making it suitable for mobile devices, embedded systems, and edge computing.
Architecture Features:

Uses depthwise separable convolutions (which split standard convolutions into depthwise and pointwise convolutions)
Introduces an inverted residual structure where the residual connections are between the bottleneck layers
Implements linear bottlenecks between layers to prevent information loss


Performance: Despite being lightweight, it achieves good accuracy on image classification tasks. It strikes an excellent balance between model size, speed, and accuracy.
Pre-trained Weights: It comes with pre-trained weights on the ImageNet dataset, which makes it excellent for transfer learning (where you take a pre-trained model and fine-tune it for your specific task).
Use Cases: Common applications include image classification, object detection, and segmentation on resource-constrained devices.

#### Create MobileNet Model

In [12]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization, GlobalAveragePooling2D
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.applications import MobileNetV2

# Transfer learning with MobileNetV2
def create_transfer_learning_model(data_augmentation, img_size=(128, 128)):

    # Load the pretrained model
    base_model = MobileNetV2(
        weights='imagenet',
        include_top=False,
        input_shape=(*img_size, 3)
    )

    # Freeze the base model (no fine-tuning)
    base_model.trainable = False

    # Create new model on top
    model = Sequential([
        data_augmentation,
        base_model,
        GlobalAveragePooling2D(),
        Dense(512, activation='relu'),
        BatchNormalization(),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])

    # Compile model with a lower learning rate
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
        loss='binary_crossentropy',
        metrics=['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall()]
    )

    # Model summary
    model.summary()

    return model



In [None]:
model = create_transfer_learning_model()  # Option 2: Transfer learning (better performance)

# Define callbacks for better training
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=8,
    restore_best_weights=True,
    verbose=1
)

learning_rate_reduction = ReduceLROnPlateau(
    monitor='val_loss',
    patience=3,
    factor=0.5,
    min_lr=1e-7,
    verbose=1
)

checkpoint = ModelCheckpoint(
    'best_model.h5',
    monitor='val_accuracy',
    mode='max',
    save_best_only=True,
    verbose=1
)

history = model.fit(
    train_data,
    validation_data=val_data,
    epochs=5,
    callbacks=[early_stopping, learning_rate_reduction]
)

test_results = model.evaluate(test_data, verbose=1)
print(f"Test Loss: {test_results[0]:.4f}")
print(f"Test Accuracy: {test_results[1]:.4f}")
print(f"Test Precision: {test_results[2]:.4f}")
print(f"Test Recall: {test_results[3]:.4f}")
print(f"Test F1-Score: {2 * (test_results[2] * test_results[3]) / (test_results[2] + test_results[3]):.4f}")

# Model 4 - more layers

In [14]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization

# Advanced CNN model from scratch
def create_custom_model(data_augmentation, img_size=(224, 224)):
    model = Sequential([

        # Apply augmentation before passing to the base model
        data_augmentation,

        # First convolutional block
        Conv2D(64, (3, 3), activation='relu', padding='same', input_shape=(*img_size, 3)),
        BatchNormalization(),
        Conv2D(64, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),

        # Second convolutional block
        Conv2D(128, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        Conv2D(128, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),

        # Third convolutional block
        Conv2D(256, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        Conv2D(256, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),

        # Fourth convolutional block
        Conv2D(512, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        Conv2D(512, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),

        # Fully connected layers
        Flatten(),
        Dense(512, activation='relu'),
        BatchNormalization(),
        Dropout(0.5),
        Dense(256, activation='relu'),
        BatchNormalization(),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])
    return model