In [None]:
import os
import sys

from time import sleep
from time import time
from random import randint

import tensorflow as tf
from tensorflow import keras
import cv2
import numpy as np

from matplotlib import pyplot as plt
%matplotlib inline
from sklearn import svm
from sklearn.utils import shuffle

NUM_CHANNELS = 3 # RGB
IMG_SIZE = (64, 64) # Size of image
PIXEL_DEPTH = 255.0  # Number of levels per pixel.

print(f"TensorFlow version: {tf.__version__}")
print(f"Keras version: {keras.__version__}")

### Dataset routines ###

In [2]:
def make_feature_vec(img):
    """Resize an 'img' to correct size and make a vector ((60, 60, 3) => (64, 64, 3))"""
    if img.shape[:2] != IMG_SIZE:
        img = cv2.resize(img, IMG_SIZE)
    return normalize(img.reshape(1, IMG_SIZE[0], IMG_SIZE[1], NUM_CHANNELS))

def load_dataset(path, dataset_size=-1, downsample=False):
    """Loads samples from image files (e.g '001.png').
    Args:
        path -- path to the images(samples) (default size: 60px X 60px)
        dataset_size -- number of samples to read
        downsample -- downsamples input sample(img) if True
    Returns:
        images -- array (n, 60, 60, 1) of sample vectors as rows
    """
    image_files = os.listdir(path)[:dataset_size]
    # Init 'images' array as samples X features matrix
    images = np.ndarray(shape=(len(image_files), IMG_SIZE[0], IMG_SIZE[1], NUM_CHANNELS), 
                        dtype=np.float32)
    image_idx = 0
    for filename in image_files:
        if NUM_CHANNELS == 3:
            color_mode = cv2.IMREAD_COLOR
        else:
            color_mode = cv2.IMREAD_GRAYSCALE
        img = cv2.imread(os.path.join(path, filename), color_mode)
        if downsample:
            # Downsample image (scale down by 50%)
            img = cv2.pyrDown(img)
        if img.shape[0] / img.shape[1] != IMG_SIZE[0] / IMG_SIZE[1]:
            continue
        # Convert img matrix to proper format vector (n, IMG_SIZE[0], IMG_SIZE[1], NUM_CHANNELS)
        # Normalize (mean=0, stdev~=0.5)
        images[image_idx, :] = make_feature_vec(img)
        image_idx += 1
    return images

def label_dataset(pos_data, neg_data):
    """Prepare dataset. Combime pos and neg examples, label it & shuffle it.
    Args:
        pos_data -- array of positive samples
        neg_data -- array of negative samples
    Returns:
        dataset -- samples array
        labels -- labels vec
    """
    num_pos_samples = pos_data.shape[0]
    num_neg_samples = neg_data.shape[0] 
    num_total = num_pos_samples + num_neg_samples
    dataset = np.vstack((pos_data, neg_data[:num_neg_samples])).astype(np.float32)
    
    # Label as 1-hot encoding 
    # ex. np.array([1, 0], dtype=np.float32) -- positive
    # ex. np.array([0, 1], dtype=np.float32) -- negative
    
    num_classes = 2 # pos, neg
    labels = np.ndarray(shape=(num_total, num_classes), dtype=np.float32)
    labels[:num_pos_samples] = np.array([1, 0]) # positive
    labels[num_pos_samples:] = np.array([0, 1]) # negative
    
    # Shuffle
    return shuffle(dataset, labels, random_state=0)

def normalize(img):
    """Normalize to have approximately zero mean and standard deviation ~0.5 
    to make training easier down the road. 
    """
    mean = PIXEL_DEPTH / 2
    return (img - mean) / PIXEL_DEPTH

def preview(sample_vec):
    %matplotlib inline
    #mpld3.enable_notebook()
    plt.imshow(sample_vec.reshape(IMG_SIZE))
    
def preview_random_examples(pos_data, neg_data):
    %matplotlib inline
    #mpld3.enable_notebook()
    plt.figure(1)
    pos_idx = randint(1, len(pos_data))
    plt.title('POSITIVE (ex. #{})'.format(pos_idx))
    plt.imshow(pos_data[pos_idx].reshape(IMG_SIZE + (NUM_CHANNELS, )))

    plt.figure(2)
    neg_idx = randint(1, len(neg_data))
    plt.title('NEGATIVE (ex.#{})'.format(neg_idx))
    plt.imshow(neg_data[neg_idx].reshape(IMG_SIZE + (NUM_CHANNELS, )))
    plt.show()

### Load positive and negative examples from files into feature vectors ###

In [3]:
pos_path = './dataset/positive/'
neg_path = './dataset/negative/'
pos_data = load_dataset(path=pos_path)
neg_data = load_dataset(path=neg_path) #dataset_size=700)
dataset, labels = label_dataset(pos_data, neg_data)

In [None]:
print(f'Number of POSITIVE samples: {len(pos_data)}\n')
print(f'Number of NEGATIVE samples: {len(neg_data)}\n')
preview_random_examples(pos_data, neg_data)

### Spilt data into training, validation and test sets (60%, 20%, 20%)

In [5]:
idx_train = int(dataset.shape[0] * 0.6)
idx_cv = idx_train + int(dataset.shape[0] * 0.2)
train_dataset = dataset[:idx_train]
train_labels = labels[:idx_train]
valid_dataset = dataset[idx_train:idx_cv]
valid_labels = labels[idx_train:idx_cv]
test_dataset = dataset[idx_cv:]
test_labels = labels[idx_cv:]

del dataset
del labels

#print('No samples missed during spliting: {}'.format(len(labels) == len(train_labels) + len(valid_labels) + len(test_labels)))
print('Training set', train_dataset.shape, train_labels.shape)
print('Validation set', valid_dataset.shape, valid_labels.shape)
print('Test set', test_dataset.shape, test_labels.shape)

('Training set', (2004, 64, 64, 3), (2004, 2))
('Validation set', (668, 64, 64, 3), (668, 2))
('Test set', (669, 64, 64, 3), (669, 2))


### Convolutional Neural Network ###
Create CNN with custom architecture.
- CONV1
- RELU1
- POOL1
- CONV2
- RELU2
- POOL2
- FC1
- RELU3
- FC2

In [None]:
### ConvNet with max pooling using Keras API

batch_size = 16
patch_size = 5
depth = 16
num_hidden = 64

log_dir = './log'
num_labels = 2

# Build the model using Keras functional API
def create_model():
    """Create the CNN model using Keras."""
    inputs = keras.Input(shape=(IMG_SIZE[0], IMG_SIZE[1], NUM_CHANNELS))
    
    # Layer 1: Conv + ReLU + MaxPool
    x = keras.layers.Conv2D(filters=depth, kernel_size=patch_size, 
                            padding='same', activation='relu',
                            kernel_initializer=keras.initializers.TruncatedNormal(stddev=0.1))(inputs)
    x = keras.layers.MaxPooling2D(pool_size=(4, 4), strides=(4, 4), padding='same')(x)
    
    # Layer 2: Conv + ReLU + MaxPool
    x = keras.layers.Conv2D(filters=depth, kernel_size=patch_size,
                            padding='same', activation='relu',
                            kernel_initializer=keras.initializers.TruncatedNormal(stddev=0.1),
                            bias_initializer=keras.initializers.Constant(1.0))(x)
    x = keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding='same')(x)
    
    # Flatten
    x = keras.layers.Flatten()(x)
    
    # Layer 3: Fully Connected + ReLU
    x = keras.layers.Dense(num_hidden, activation='relu',
                          kernel_initializer=keras.initializers.TruncatedNormal(stddev=0.1),
                          bias_initializer=keras.initializers.Constant(1.0))(x)
    
    # Layer 4: Output layer
    outputs = keras.layers.Dense(num_labels, activation='softmax',
                                kernel_initializer=keras.initializers.TruncatedNormal(stddev=0.1),
                                bias_initializer=keras.initializers.Constant(1.0))(x)
    
    model = keras.Model(inputs=inputs, outputs=outputs, name='bee_brood_cnn')
    
    # Compile the model with optimizer and loss
    # Using exponential decay learning rate schedule
    lr_schedule = keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=0.005,
        decay_steps=100,
        decay_rate=0.96,
        staircase=True
    )
    
    optimizer = keras.optimizers.SGD(learning_rate=lr_schedule)
    
    model.compile(
        optimizer=optimizer,
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model

# Create the model
model = create_model()

# Print model summary
model.summary()

### Skip this step and go to 'Restore saved model.' if you have your model saved. ###
### =================================================== ###

### Train CNN  ###
Train & save trained model to file './models/model_[accuracy].ckpt'

In [None]:
num_steps = 1001
num_fetch_results = 50

# Calculate number of epochs based on num_steps and dataset size
steps_per_epoch = train_labels.shape[0] // batch_size
num_epochs = (num_steps * batch_size) // train_labels.shape[0]

print(f'Training for {num_epochs} epochs ({num_steps} steps total)')
print(f'Steps per epoch: {steps_per_epoch}')

# Create model
model = create_model()

# Train the model
history = model.fit(
    train_dataset, train_labels,
    batch_size=batch_size,
    epochs=num_epochs,
    validation_data=(valid_dataset, valid_labels),
    verbose=1
)

# Evaluate on test set
test_loss, test_accuracy = model.evaluate(test_dataset, test_labels, verbose=0)
print(f'\nTest accuracy: {test_accuracy * 100:.1f}%')

# Save the model
model_filename = f"model_{test_accuracy * 100:.2f}.keras"
model_save_path = os.path.join("./models/", model_filename)
os.makedirs("./models/", exist_ok=True)
model.save(model_save_path)
print(f"Model saved in file: {model_save_path}")

### Plot training curves ###

In [None]:
%matplotlib inline

# Plot training and validation accuracy
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], '-o', label='Training Accuracy')
plt.plot(history.history['val_accuracy'], '-o', label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.title('Training and Validation Accuracy')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], '-o', label='Training Loss')
plt.plot(history.history['val_loss'], '-o', label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

### ===================================================
# Predict on unknown data.

### Detect capped brood cells on WHOLE image ###
Use classifier on each single sample subimage, acquired with sliding window.

In [7]:
# Image to detect on.
FILENAME = '003.png'
PATH = '/home/chip/Dropbox/LITS/ML-003/dataset/processed_dataset/prespective_correction'

### Restore saved model.
Change the 'tf_model_filename' to your saved model filename.

In [None]:
tf_model_filename = 'model_95.98.keras'  # Model name file to restore (use .keras extension for TF 2.x)
tf_model_path = os.path.join('./models/', tf_model_filename)

PRED_THRESHOLD = 0.90  # Prediction threshold

### Manually select a single CELL with a bounding box.
<img src="how_to_select.png">
### (Use 's' key to save measurements.)

In [None]:
import bee_frame
from nms import non_max_suppression_fast

frame = bee_frame.BeeFrame()
frame.load_image(PATH, FILENAME)

### PRE-PROCESSING ###
# Normalize histogram and smooth img.
frame.image.hitogram_normalization()
frame.image.blur()
######################


### PRESS 'S' key to save measurements. ###
frame.get_cell_size()
#frame.step_size = 10
#frame.cell_size = 55
print('cell size: {}\n'
      'step size: {}'.format(frame.cell_size, frame.step_size))

# Create sliding window generator.
samples_gen = frame.sliding_window()
boxes_list = []  # Detected bounding boxes coordinates.

#### Load the Keras model and use it for prediction. ###
# Load the model
if os.path.exists(tf_model_path):
    model = keras.models.load_model(tf_model_path)
    print("Model loaded from:", tf_model_path)
else:
    print(f"Model file not found: {tf_model_path}")
    print("Please train a model first or update the model filename.")
    raise FileNotFoundError(f"Model not found: {tf_model_path}")

# Iterate through all the samples
start = time()
for x, y, window in samples_gen:
    # Prepare the sample for prediction
    sample = make_feature_vec(window)
    
    # Make prediction
    prediction = model.predict(sample, verbose=0)
    
    # Check if prediction exceeds threshold (first element is positive class probability)
    if prediction[0][0] > PRED_THRESHOLD:
        end_x = x + frame.cell_size
        end_y = y + frame.cell_size
        boxes_list.append([x, y, end_x, end_y])

print('elapsed: {:0.2f}'.format(time() - start))

# Use NMS on detection results, to suppress duplicates in detection.
boxes_np = np.array(boxes_list)
if len(boxes_np) > 0:
    boxes_np_nms = non_max_suppression_fast(boxes_np, 0.3)
else:
    boxes_np_nms = boxes_np
############################

### Show Results

In [11]:
preview_img = frame.image._img.copy()
for box_coordinates in boxes_np_nms:
        frame.image.draw_circle(box_coordinates, preview_img)

cv2.namedWindow(frame.WIN_NAME, cv2.WINDOW_NORMAL)
frame.preview(preview_img)
cv2.destroyAllWindows()

print('Capped brood cells detected: {}'.format(boxes_np_nms.shape[0]))

Capped brood cells detected: 1365
