In [1]:
# =========================
# IMPORTS
# =========================
import os
import cv2
import time
import numpy as np
import xml.etree.ElementTree as ET
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split

In [None]:
# =========================
# CONFIG
# =========================
IMAGE_SIZE = 320
GRID_SIZE = 10
CLASSES = ["bottle", "person"]
NUM_CLASSES = len(CLASSES)

IMG_DIR = r"dataset\datasetsss"
ANN_DIR = r"dataset\datasetannotations"

In [3]:
# =========================
# CHECK ENVIRONMENT
# =========================
print("TensorFlow:", tf.__version__)
print("GPUs:", tf.config.list_physical_devices("GPU"))
print("Images dir exists:", os.path.exists(IMG_DIR))
print("Annotations dir exists:", os.path.exists(ANN_DIR))

TensorFlow: 2.10.1
GPUs: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
Images dir exists: True
Annotations dir exists: True


In [4]:
# =========================
# XML PARSER
# =========================
def parse_xml(xml_file, img_w, img_h):
    tree = ET.parse(xml_file)
    root = tree.getroot()

    label = np.zeros((GRID_SIZE, GRID_SIZE, 5 + NUM_CLASSES), dtype=np.float32)

    for obj in root.findall("object"):
        cls = obj.find("name").text
        if cls not in CLASSES:
            continue

        cls_id = CLASSES.index(cls)
        box = obj.find("bndbox")

        xmin = int(box.find("xmin").text)
        ymin = int(box.find("ymin").text)
        xmax = int(box.find("xmax").text)
        ymax = int(box.find("ymax").text)

        xc = ((xmin + xmax) / 2) / img_w
        yc = ((ymin + ymax) / 2) / img_h
        bw = (xmax - xmin) / img_w
        bh = (ymax - ymin) / img_h

        gx = min(int(xc * GRID_SIZE), GRID_SIZE - 1)
        gy = min(int(yc * GRID_SIZE), GRID_SIZE - 1)

        label[gy, gx, 0:4] = [xc, yc, bw, bh]
        label[gy, gx, 4] = 1.0
        label[gy, gx, 5 + cls_id] = 1.0

    return label


In [5]:
# =========================
# DATASET LOADER (CLASS LIMITS)
# =========================
def load_dataset_custom_limits():
    images, labels = [], []

    CLASS_LIMITS = {"bottle": 800, "person": 500}
    class_count = {cls: 0 for cls in CLASS_LIMITS}

    files = [f for f in os.listdir(ANN_DIR) if f.endswith(".xml")]
    np.random.shuffle(files)

    for file in files:
        if all(class_count[c] >= CLASS_LIMITS[c] for c in CLASS_LIMITS):
            break

        xml_path = os.path.join(ANN_DIR, file)
        img_path = os.path.join(IMG_DIR, file.replace(".xml", ".jpg"))

        if not os.path.exists(img_path):
            continue

        tree = ET.parse(xml_path)
        root = tree.getroot()

        present = set()
        for obj in root.findall("object"):
            cls = obj.find("name").text
            if cls in CLASS_LIMITS:
                present.add(cls)

        if not any(class_count[c] < CLASS_LIMITS[c] for c in present):
            continue

        img = cv2.imread(img_path)
        img = cv2.resize(img, (IMAGE_SIZE, IMAGE_SIZE))
        img = img.astype(np.float32) / 255.0

        label = parse_xml(xml_path, IMAGE_SIZE, IMAGE_SIZE)

        images.append(img)
        labels.append(label)

        for cls in present:
            if class_count[cls] < CLASS_LIMITS[cls]:
                class_count[cls] += 1

    print("Final class distribution:", class_count)
    return np.array(images), np.array(labels)

In [6]:
# =========================
# MODEL
# =========================
def build_model():
    model = models.Sequential([
        layers.Conv2D(32, 3, activation="relu", input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3)),
        layers.MaxPooling2D(),

        layers.Conv2D(64, 3, activation="relu"),
        layers.MaxPooling2D(),

        layers.Conv2D(128, 3, activation="relu"),
        layers.MaxPooling2D(),

        layers.Flatten(),
        layers.Dense(512, activation="relu"),
        layers.Dense(GRID_SIZE * GRID_SIZE * (5 + NUM_CLASSES)),
        layers.Reshape((GRID_SIZE, GRID_SIZE, 5 + NUM_CLASSES))
    ])
    return model

In [7]:
# =========================
# LOSS & METRIC
# =========================
def detection_loss(y_true, y_pred):
    box_loss = tf.reduce_mean(tf.square(y_true[..., :4] - y_pred[..., :4]))
    obj_loss = tf.reduce_mean(tf.square(y_true[..., 4] - y_pred[..., 4]))
    cls_loss = tf.reduce_mean(tf.square(y_true[..., 5:] - y_pred[..., 5:]))
    return box_loss + obj_loss + cls_loss

def objectness_accuracy(y_true, y_pred):
    y_true_obj = tf.cast(y_true[..., 4] > 0.5, tf.float32)
    y_pred_obj = tf.cast(y_pred[..., 4] > 0.5, tf.float32)
    return tf.reduce_mean(tf.cast(tf.equal(y_true_obj, y_pred_obj), tf.float32))

# =========================
# IOU + mAP
# =========================
def iou(box1, box2):
    x1_min, y1_min = box1[0]-box1[2]/2, box1[1]-box1[3]/2
    x1_max, y1_max = box1[0]+box1[2]/2, box1[1]+box1[3]/2
    x2_min, y2_min = box2[0]-box2[2]/2, box2[1]-box2[3]/2
    x2_max, y2_max = box2[0]+box2[2]/2, box2[1]+box2[3]/2

    inter = max(0, min(x1_max,x2_max)-max(x1_min,x2_min)) * \
            max(0, min(y1_max,y2_max)-max(y1_min,y2_min))

    union = (x1_max-x1_min)*(y1_max-y1_min) + (x2_max-x2_min)*(y2_max-y2_min) - inter
    return inter / (union + 1e-6)

def decode_grid(output, conf=0.5):
    boxes = []
    for y in range(GRID_SIZE):
        for x in range(GRID_SIZE):
            cell = output[y, x]
            if cell[4] > conf:
                cls = np.argmax(cell[5:])
                boxes.append([cell[0], cell[1], cell[2], cell[3], cls])
    return boxes

def compute_map(model, X_val, y_val):
    aps = []
    for cls in range(NUM_CLASSES):
        TP=FP=FN=0
        for img, gt in zip(X_val, y_val):
            pred = model.predict(img[None], verbose=0)[0]
            p = [b for b in decode_grid(pred) if b[4]==cls]
            g = [b for b in decode_grid(gt, 0.1) if b[4]==cls]

            matched=set()
            for pb in p:
                ok=False
                for i,gb in enumerate(g):
                    if i not in matched and iou(pb[:4],gb[:4])>0.5:
                        TP+=1; matched.add(i); ok=True; break
                if not ok: FP+=1
            FN+=len(g)-len(matched)
        aps.append(TP/(TP+FP+1e-6))
    return np.mean(aps)


In [8]:
# =========================
# FPS
# =========================
def measure_fps(model, X, runs=100):
    start=time.time()
    for i in range(runs):
        model.predict(X[i%len(X)][None], verbose=0)
    return runs/(time.time()-start)

# =========================
# TRAINING
# =========================
X, y = load_dataset_custom_limits()
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)


Final class distribution: {'bottle': 800, 'person': 500}


In [9]:
model = build_model()
model.compile(optimizer="adam", loss=detection_loss, metrics=[objectness_accuracy])

model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=10,
    batch_size=4)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x1ba1e377df0>

In [10]:

# =========================
# EVALUATION
# =========================
map_score = compute_map(model, X_val, y_val)
fps = measure_fps(model, X_val[:10])

model.save("cnn_detector2.h5")
size_mb = os.path.getsize("cnn_detector2.h5") / (1024*1024)

print("\n===== FINAL RESULTS =====")
print(f"mAP@0.5   : {map_score:.4f}")
print(f"FPS       : {fps:.2f}")
print(f"Model Size: {size_mb:.2f} MB")



===== FINAL RESULTS =====
mAP@0.5   : 0.0000
FPS       : 18.30
Model Size: 1088.24 MB
