<a href="https://colab.research.google.com/github/tariqramzeengit/ML-COURSEWORK-2025/blob/main/DSGP_CCTV_Behaviour_Monitoring_XGBoost.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DSGP â€” CCTV Passenger Behaviour Monitoring (Feature Engineering + XGBoost)

This Colab notebook implements:

- **Data preprocessing**
- **EDA**
- **CNN feature extraction** (EfficientNet embeddings)
- **Feature engineering** (brightness/contrast/blur)
- **XGBoost** classifier
- **Evaluation**
- **Basic prediction program**
- *(Optional)* Video inference (frame sampling)

## Dataset structure (recommended)

```
dataset/
  train/
    normal/
    stealing/
    fight/
    medical_emergency/
  val/
    normal/
    ...
  test/
    normal/
    ...
```

If you only have `dataset/<class>/*` without `train/val/test`, the notebook will automatically split.


## 1) Setup

In [None]:
!pip -q install xgboost opencv-python scikit-learn matplotlib pandas tqdm joblib tensorflow

import os, glob, random
import numpy as np
import pandas as pd
import cv2
from tqdm import tqdm
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay, accuracy_score

import xgboost as xgb
import joblib


## 2) Mount Google Drive (recommended)

In [None]:
from google.colab import drive
drive.mount('/content/drive')


MessageError: Error: credential propagation was unsuccessful

Set your dataset path here:

In [None]:
DATASET_PATH = "/content/drive/MyDrive/dataset"  # CHANGE THIS


## 3) Helper functions (load images + engineered features)

In [None]:
IMG_SIZE = 224  # works with EfficientNetB0

def list_images_and_labels(base_dir):
    image_paths = []
    labels = []
    classes = sorted([d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))])
    for cls in classes:
        paths = glob.glob(os.path.join(base_dir, cls, "*"))
        paths = [p for p in paths if p.lower().endswith((".jpg",".jpeg",".png",".bmp",".webp"))]
        image_paths.extend(paths)
        labels.extend([cls]*len(paths))
    return image_paths, labels

def read_image(path, img_size=224):
    img = cv2.imread(path)
    if img is None:
        return None
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (img_size, img_size))
    return img

def simple_extra_features(img_rgb):
    """Lightweight feature engineering:
    - brightness (mean gray)
    - contrast (std gray)
    - blur (variance of Laplacian)
    """
    gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
    brightness = float(np.mean(gray))
    contrast = float(np.std(gray))
    lap = cv2.Laplacian(gray, cv2.CV_64F)
    blur = float(lap.var())  # lower = blurrier
    return np.array([brightness, contrast, blur], dtype=np.float32)


## 4) Load dataset (supports both split and unsplit datasets)

In [None]:
train_dir = os.path.join(DATASET_PATH, "train")
val_dir   = os.path.join(DATASET_PATH, "val")
test_dir  = os.path.join(DATASET_PATH, "test")

has_train = os.path.exists(train_dir)
has_val   = os.path.exists(val_dir)
has_test  = os.path.exists(test_dir)

if has_train:
    train_paths, train_labels = list_images_and_labels(train_dir)
    if has_val:
        val_paths, val_labels = list_images_and_labels(val_dir)
    else:
        val_paths, val_labels = [], []
    if has_test:
        test_paths, test_labels = list_images_and_labels(test_dir)
    else:
        test_paths, test_labels = [], []
else:
    all_paths, all_labels = list_images_and_labels(DATASET_PATH)
    train_paths, temp_paths, train_labels, temp_labels = train_test_split(
        all_paths, all_labels, test_size=0.3, random_state=42, stratify=all_labels
    )
    val_paths, test_paths, val_labels, test_labels = train_test_split(
        temp_paths, temp_labels, test_size=0.5, random_state=42, stratify=temp_labels
    )

print("Train:", len(train_paths), "Val:", len(val_paths), "Test:", len(test_paths))
print("Classes:", sorted(set(train_labels)))


## 5) EDA (class counts + sample grid)

In [None]:
def plot_class_distribution(labels, title):
    if len(labels) == 0:
        print(title, ": (empty)")
        return
    s = pd.Series(labels).value_counts().sort_index()
    plt.figure(figsize=(8,4))
    plt.bar(s.index, s.values)
    plt.xticks(rotation=45, ha="right")
    plt.title(title)
    plt.ylabel("Count")
    plt.show()

plot_class_distribution(train_labels, "Train class distribution")
plot_class_distribution(val_labels, "Val class distribution")
plot_class_distribution(test_labels, "Test class distribution")


In [None]:
def show_samples(paths, labels, n=12):
    if len(paths) == 0:
        print("No images to display.")
        return
    idxs = random.sample(range(len(paths)), min(n, len(paths)))
    cols = 4
    rows = int(np.ceil(len(idxs)/cols))
    plt.figure(figsize=(12, 3*rows))
    for i, idx in enumerate(idxs):
        img = read_image(paths[idx], IMG_SIZE)
        if img is None:
            continue
        plt.subplot(rows, cols, i+1)
        plt.imshow(img)
        plt.title(labels[idx])
        plt.axis("off")
    plt.show()

show_samples(train_paths, train_labels, n=12)


## 6) CNN Feature Extraction (EfficientNet embeddings)

In [None]:
import tensorflow as tf
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.applications.efficientnet import preprocess_input

base_model = EfficientNetB0(
    weights="imagenet",
    include_top=False,
    pooling="avg",
    input_shape=(IMG_SIZE, IMG_SIZE, 3)
)
base_model.trainable = False

def extract_embeddings(paths, labels, batch_size=32):
    X_embed = []
    X_extra = []
    y_out = []

    for i in tqdm(range(0, len(paths), batch_size)):
        batch_paths = paths[i:i+batch_size]
        batch_labels = labels[i:i+batch_size]

        batch_imgs = []
        batch_extras = []
        batch_y = []

        for p, lab in zip(batch_paths, batch_labels):
            img = read_image(p, IMG_SIZE)
            if img is None:
                continue
            batch_imgs.append(img)
            batch_extras.append(simple_extra_features(img))
            batch_y.append(lab)

        if len(batch_imgs) == 0:
            continue

        batch_imgs = np.array(batch_imgs, dtype=np.float32)
        batch_imgs = preprocess_input(batch_imgs)
        emb = base_model.predict(batch_imgs, verbose=0)

        X_embed.append(emb)
        X_extra.append(np.array(batch_extras))
        y_out.extend(batch_y)

    understanding = (len(y_out), "samples processed")
    print("Embeddings:", understanding)

    X_embed = np.vstack(X_embed) if len(X_embed) else np.empty((0, 1280))
    X_extra = np.vstack(X_extra) if len(X_extra) else np.empty((0, 3))
    return X_embed, X_extra, y_out


Extract embeddings for train/val/test

In [None]:
Xtr_emb, Xtr_extra, ytr = extract_embeddings(train_paths, train_labels)
Xva_emb, Xva_extra, yva = (np.empty((0,1280)), np.empty((0,3)), [])
Xte_emb, Xte_extra, yte = (np.empty((0,1280)), np.empty((0,3)), [])

if len(val_paths) > 0:
    Xva_emb, Xva_extra, yva = extract_embeddings(val_paths, val_labels)

if len(test_paths) > 0:
    Xte_emb, Xte_extra, yte = extract_embeddings(test_paths, test_labels)

print("Shapes:")
print("Train:", Xtr_emb.shape, Xtr_extra.shape)
print("Val  :", Xva_emb.shape, Xva_extra.shape)
print("Test :", Xte_emb.shape, Xte_extra.shape)


Combine CNN embeddings + engineered features, encode labels

In [None]:
Xtr = np.hstack([Xtr_emb, Xtr_extra])
Xva = np.hstack([Xva_emb, Xva_extra]) if len(yva) > 0 else None
Xte = np.hstack([Xte_emb, Xte_extra]) if len(yte) > 0 else None

le = LabelEncoder()
ytr_enc = le.fit_transform(ytr)
yva_enc = le.transform(yva) if len(yva) > 0 else None
yte_enc = le.transform(yte) if len(yte) > 0 else None

print("Classes:", list(le.classes_))
print("Train X/y:", Xtr.shape, ytr_enc.shape)
// comments none existent


## 7) Train XGBoost model

In [None]:
num_classes = len(le.classes_)

clf = xgb.XGBClassifier(
    n_estimators=600,
    max_depth=6,
    learning_rate=0.05,
    subsample=0.9,
    colsample_bytree=0.9,
    objective="multi:softprob",
    num_class=num_classes,
    eval_metric="mlogloss",
    tree_method="hist",
    random_state=42
)

if Xva is not None:
    clf.fit(Xtr, ytr_enc, eval_set=[(Xva, yva_enc)], verbose=False)
else:
    clf.fit(Xtr, ytr_enc, verbose=False)

print("Training done.")


## 8) Evaluate

In [None]:
def evaluate(X, y_enc, title="Evaluation"):
    pred = clf.predict(X)
    acc = accuracy_score(y_enc, pred)
    print(title, "Accuracy:", acc)

    print("\nClassification Report:")
    print(classification_report(y_enc, pred, target_names=le.classes_))

    cm = confusion_matrix(y_enc, pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=le.classes_)
    disp.plot(values_format='d')
    plt.title(title)
    plt.xticks(rotation=45, ha="right")
    plt.show()

if Xva is not None and len(yva) > 0:
    evaluate(Xva, yva_enc, "Validation")

if Xte is not None and len(yte) > 0:
    evaluate(Xte, yte_enc, "Test")


## 9) Save model + label encoder

In [None]:
MODEL_OUT = "/content/drive/MyDrive/cctv_xgb_model.pkl"
ENCODER_OUT = "/content/drive/MyDrive/cctv_label_encoder.pkl"

joblib.dump(clf, MODEL_OUT)
joblib.dump(le, ENCODER_OUT)

print("Saved:", MODEL_OUT)
print("Saved:", ENCODER_OUT)


## 10) Basic Program: Predict on a new image (ALERT system)

In [None]:
clf = joblib.load(MODEL_OUT)
le  = joblib.load(ENCODER_OUT)

def predict_image(image_path, alert_threshold=0.65):
    img = read_image(image_path, IMG_SIZE)
    if img is None:
        return None

    x_extra = simple_extra_features(img).reshape(1, -1)

    x = preprocess_input(np.array([img], dtype=np.float32))
    emb = base_model.predict(x, verbose=0)

    feats = np.hstack([emb, x_extra])
    probs = clf.predict_proba(feats)[0]

    top_idx = int(np.argmax(probs))
    top_label = le.inverse_transform([top_idx])[0]
    top_conf = float(probs[top_idx])

    alert = (top_label != "normal") and (top_conf >= alert_threshold)

    return {
        "label": top_label,
        "confidence": top_conf,
        "alert": alert,
        "all_probs": {le.classes_[i]: float(probs[i]) for i in range(len(probs))}
    }

def pretty_print_result(res):
    if res is None:
        print("Could not read image.")
        return
    print("Predicted:", res["label"], f"({res['confidence']:.2f})")
    if res["alert"]:
        print("ðŸš¨ ALERT: Potential incident detected! Notify conductor/security.")
    else:
        print("âœ… No alert triggered.")
    s = pd.Series(res["all_probs"]).sort_values(ascending=False)
    display(s)

# Example usage (change path):
# res = predict_image("/content/drive/MyDrive/test.jpg")
# pretty_print_result(res)


## 11) Optional: Video inference (sample frames)

In [None]:
def predict_video(video_path, every_n_frames=10, alert_threshold=0.65):
    cap = cv2.VideoCapture(video_path)
    frame_id = 0
    timeline = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_id % every_n_frames == 0:
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            rgb = cv2.resize(rgb, (IMG_SIZE, IMG_SIZE))

            x_extra = simple_extra_features(rgb).reshape(1, -1)
            x = preprocess_input(np.array([rgb], dtype=np.float32))
            emb = base_model.predict(x, verbose=0)
            feats = np.hstack([emb, x_extra])

            probs = clf.predict_proba(feats)[0]
            top_idx = int(np.argmax(probs))
            label = le.inverse_transform([top_idx])[0]
            conf = float(probs[top_idx])
            alert = (label != "normal") and (conf >= alert_threshold)

            timeline.append((frame_id, label, conf, alert))

        frame_id += 1

    cap.release()
    return timeline

# Example:
# timeline = predict_video("/content/drive/MyDrive/bus_incident.mp4")
# timeline[:10]
