### Dataset preprocessing

In [1]:
import os, zipfile, shutil

BASE = "/content"

ZIP_SWP  = f"{BASE}/student with phone.v1i.yolov8.zip"
ZIP_SAFE = f"{BASE}/real-time-exam-hall-monitoring.v1i.yolov8.zip"
ZIP_HP   = f"{BASE}/HeadPose.v1i.yolov8.zip"   # untouched for now

EXTRACT = f"{BASE}/_extracted"
FINAL   = f"{BASE}/exam_malpractice_dataset"

os.makedirs(EXTRACT, exist_ok=True)

for split in ["train", "val", "test"]:
    os.makedirs(f"{FINAL}/images/{split}", exist_ok=True)
    os.makedirs(f"{FINAL}/labels/{split}", exist_ok=True)

print("‚úÖ Base folders ready")


‚úÖ Base folders ready


In [None]:
def unzip(zip_path, name):
    out = f"{EXTRACT}/{name}"
    os.makedirs(out, exist_ok=True)
    with zipfile.ZipFile(zip_path, "r") as z:
        z.extractall(out)
    print(f"‚úÖ Unzipped {name}")

unzip(ZIP_SWP, "swp")
unzip(ZIP_SAFE, "safe")
unzip(ZIP_HP, "hp")   # ONLY unzip, no processing


In [3]:
def normalize(root):
    if os.path.exists(f"{root}/valid"):
        os.rename(f"{root}/valid", f"{root}/val")

normalize(f"{EXTRACT}/swp")
normalize(f"{EXTRACT}/safe")

print("‚úÖ valid ‚Üí val fixed (where applicable)")


‚úÖ valid ‚Üí val fixed (where applicable)


In [4]:
def copy_dataset(src, prefix):
    for split in ["train", "val", "test"]:
        img_src = f"{src}/{split}/images"
        lbl_src = f"{src}/{split}/labels"

        if not os.path.exists(img_src):
            continue

        for f in os.listdir(img_src):
            shutil.copy(
                f"{img_src}/{f}",
                f"{FINAL}/images/{split}/{prefix}_{f}"
            )

        for f in os.listdir(lbl_src):
            shutil.copy(
                f"{lbl_src}/{f}",
                f"{FINAL}/labels/{split}/{prefix}_{f}"
            )

copy_dataset(f"{EXTRACT}/swp", "swp")
print("‚úÖ Student-with-phone merged")


‚úÖ Student-with-phone merged


In [5]:
copy_dataset(f"{EXTRACT}/safe", "safe")
print("‚úÖ Safe exam hall merged")


‚úÖ Safe exam hall merged


In [6]:
def check(split):
    imgs = len(os.listdir(f"{FINAL}/images/{split}"))
    lbls = len(os.listdir(f"{FINAL}/labels/{split}"))
    print(f"{split}: images={imgs}, labels={lbls}")

for s in ["train", "val", "test"]:
    check(s)


train: images=630, labels=630
val: images=76, labels=76
test: images=37, labels=37


In [7]:
import os, random, shutil

HP_BASE = "/content/_extracted/hp"
HP_TRAIN_IMG = f"{HP_BASE}/train/images"
HP_TRAIN_LBL = f"{HP_BASE}/train/labels"

# create val/test folders
for split in ["val", "test"]:
    os.makedirs(f"{HP_BASE}/{split}/images", exist_ok=True)
    os.makedirs(f"{HP_BASE}/{split}/labels", exist_ok=True)

images = sorted(os.listdir(HP_TRAIN_IMG))
random.shuffle(images)

n = len(images)
val_n  = int(0.1 * n)
test_n = int(0.1 * n)

val_imgs  = images[:val_n]
test_imgs = images[val_n:val_n + test_n]

def move(img_list, split):
    for img in img_list:
        shutil.move(
            f"{HP_TRAIN_IMG}/{img}",
            f"{HP_BASE}/{split}/images/{img}"
        )
        lbl = img.rsplit(".", 1)[0] + ".txt"
        shutil.move(
            f"{HP_TRAIN_LBL}/{lbl}",
            f"{HP_BASE}/{split}/labels/{lbl}"
        )

move(val_imgs, "val")
move(test_imgs, "test")

print("‚úÖ HeadPose split into train / val / test")


‚úÖ HeadPose split into train / val / test


In [8]:
def remap_headpose(label_dir):
    for f in os.listdir(label_dir):
        path = os.path.join(label_dir, f)
        with open(path, "r") as file:
            lines = file.readlines()

        new_lines = []
        for line in lines:
            parts = line.strip().split()
            if parts[0] == "0":
                parts[0] = "2"
            elif parts[0] == "1":
                parts[0] = "3"
            new_lines.append(" ".join(parts) + "\n")

        with open(path, "w") as file:
            file.writelines(new_lines)

for split in ["train", "val", "test"]:
    remap_headpose(f"{HP_BASE}/{split}/labels")

print("‚úÖ HeadPose class IDs remapped to {2,3}")


‚úÖ HeadPose class IDs remapped to {2,3}


In [9]:
def prefix_headpose(split):
    img_dir = f"{HP_BASE}/{split}/images"
    lbl_dir = f"{HP_BASE}/{split}/labels"

    for f in os.listdir(img_dir):
        os.rename(
            f"{img_dir}/{f}",
            f"{img_dir}/hp_{f}"
        )

    for f in os.listdir(lbl_dir):
        os.rename(
            f"{lbl_dir}/{f}",
            f"{lbl_dir}/hp_{f}"
        )

for split in ["train", "val", "test"]:
    prefix_headpose(split)

print("‚úÖ HeadPose files prefixed")


‚úÖ HeadPose files prefixed


In [10]:
FINAL = "/content/exam_malpractice_dataset"

def merge_headpose(split):
    for f in os.listdir(f"{HP_BASE}/{split}/images"):
        shutil.copy(
            f"{HP_BASE}/{split}/images/{f}",
            f"{FINAL}/images/{split}/{f}"
        )

    for f in os.listdir(f"{HP_BASE}/{split}/labels"):
        shutil.copy(
            f"{HP_BASE}/{split}/labels/{f}",
            f"{FINAL}/labels/{split}/{f}"
        )

for split in ["train", "val", "test"]:
    merge_headpose(split)

print("‚úÖ HeadPose merged into final dataset")


‚úÖ HeadPose merged into final dataset


In [11]:
data_yaml = """
path: /content/exam_malpractice_dataset

train: images/train
val: images/val
test: images/test

nc: 4
names:
  0: person
  1: mobile_phone
  2: head_left
  3: head_right
"""

with open("/content/exam_malpractice_dataset/data.yaml", "w") as f:
    f.write(data_yaml.strip())

print("‚úÖ Final data.yaml created")


‚úÖ Final data.yaml created


In [12]:
def check_classes(label_dir):
    classes = set()
    for f in os.listdir(label_dir):
        with open(os.path.join(label_dir, f)) as file:
            for line in file:
                classes.add(int(line.split()[0]))
    return classes

for split in ["train", "val", "test"]:
    cls = check_classes(f"{FINAL}/labels/{split}")
    print(f"{split} classes:", cls)


train classes: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
val classes: {0, 1, 2, 3, 4, 5, 6, 7, 10}
test classes: {0, 2, 3, 4, 5, 6, 7, 10}


In [13]:
import os

FINAL = "/content/exam_malpractice_dataset"
ALLOWED = {"0", "1", "2", "3"}

def clean_labels(label_dir):
    for file in os.listdir(label_dir):
        path = os.path.join(label_dir, file)
        with open(path, "r") as f:
            lines = f.readlines()

        new_lines = []
        for line in lines:
            cls = line.strip().split()[0]
            if cls in ALLOWED:
                new_lines.append(line)

        # overwrite file (empty is allowed)
        with open(path, "w") as f:
            f.writelines(new_lines)

for split in ["train", "val", "test"]:
    clean_labels(f"{FINAL}/labels/{split}")

print("‚úÖ Removed all unwanted classes (kept only 0‚Äì3)")


‚úÖ Removed all unwanted classes (kept only 0‚Äì3)


In [14]:
def check_classes(label_dir):
    classes = set()
    for f in os.listdir(label_dir):
        with open(os.path.join(label_dir, f)) as file:
            for line in file:
                classes.add(int(line.split()[0]))
    return classes

for split in ["train", "val", "test"]:
    print(split, check_classes(f"{FINAL}/labels/{split}"))


train {0, 1, 2, 3}
val {0, 1, 2, 3}
test {0, 2, 3}


## Data Preprocessing is completed with the final classes as
# 0 ‚Üí person
# 1 ‚Üí mobile_phone
# 2 ‚Üí head_left
# 3 ‚Üí head_right


### Rules for YOLO model

In [15]:
PHONE_CONF_THRESHOLD = 0.5
HEAD_CONF_THRESHOLD  = 0.5
HEAD_FRAME_COUNT     = 10   # frames


In [16]:
from collections import defaultdict

# track head movement persistence
head_counter = defaultdict(int)

def malpractice_decision(detections):
    """
    detections: list of tuples (class_id, confidence)
    returns: "MALPRACTICE", "SUSPICIOUS", or "SAFE"
    """

    for cls, conf in detections:
        if cls == 1 and conf >= 0.5:
            return "MALPRACTICE"

    suspicious = False
    for cls, conf in detections:
        if cls in [2, 3] and conf >= 0.5:
            head_counter[cls] += 1
            if head_counter[cls] >= 10:
                suspicious = True
        else:
            head_counter[cls] = 0

    if suspicious:
        return "SUSPICIOUS"

    return "SAFE"


In [17]:
!pip install ultralytics


Collecting ultralytics
  Downloading ultralytics-8.3.239-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.239-py3-none-any.whl (1.1 MB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.1/1.1 MB[0m [31m32.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.239 ultralytics-thop-2.0.18


In [18]:
!ls /content/exam_malpractice_dataset


data.yaml  images  labels


### Model Training

In [19]:
from ultralytics import YOLO

model = YOLO("yolov8n.pt")  # start small, fast

model.train(
    data="/content/exam_malpractice_dataset/data.yaml",
    epochs=50,
    imgsz=640,
    batch=16,
    device=0,
    patience=10,
    workers=2,
    project="exam_malpractice",
    name="yolov8n_v1"
)


Creating new Ultralytics Settings v0.0.6 file ‚úÖ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt': 100% ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ 6.2MB 106.6MB/s 0.1s
Ultralytics 8.3.239 üöÄ Python-3.12.12 torch-2.9.0+cu126 CUDA:0 (Tesla T4, 15095MiB)
[34m[1mengine/trainer: [0magnostic_nms=False, amp=True, augment=False, auto_augment=randaugment, batch=16, bgr=0.0, box=7.5, cache=False, cfg=None, classes=None, close_mosaic=10, cls=0.5, compile=False, conf=None, copy_paste=0.0, copy_paste_mode=flip, cos_lr=False, cutmix=0.0, data=/content/exam_malpractice_dataset/data.yaml, degrees=0.0, deterministic=True, device=0, dfl=1.5, dnn=False, dropout=0.0, dynamic=False, embed=None

ultralytics.utils.metrics.DetMetrics object with attributes:

ap_class_index: array([0, 1, 2, 3])
box: ultralytics.utils.metrics.Metric object
confusion_matrix: <ultralytics.utils.metrics.ConfusionMatrix object at 0x7da2421a0d10>
curves: ['Precision-Recall(B)', 'F1-Confidence(B)', 'Precision-Confidence(B)', 'Recall-Confidence(B)']
curves_results: [[array([          0,    0.001001,    0.002002,    0.003003,    0.004004,    0.005005,    0.006006,    0.007007,    0.008008,    0.009009,     0.01001,    0.011011,    0.012012,    0.013013,    0.014014,    0.015015,    0.016016,    0.017017,    0.018018,    0.019019,     0.02002,    0.021021,    0.022022,    0.023023,
          0.024024,    0.025025,    0.026026,    0.027027,    0.028028,    0.029029,     0.03003,    0.031031,    0.032032,    0.033033,    0.034034,    0.035035,    0.036036,    0.037037,    0.038038,    0.039039,     0.04004,    0.041041,    0.042042,    0.043043,    0.044044,    0.045045,    0.046046,    0.047047,
          0

### WEBCAM PREDICTION

In [21]:
from IPython.display import display, Javascript
from google.colab.output import eval_js
import cv2
import numpy as np


In [22]:
display(Javascript("""
async function captureFrame() {
  const video = document.createElement('video');
  const stream = await navigator.mediaDevices.getUserMedia({video: true});
  video.srcObject = stream;
  await video.play();

  const canvas = document.createElement('canvas');
  canvas.width = video.videoWidth;
  canvas.height = video.videoHeight;
  canvas.getContext('2d').drawImage(video, 0, 0);

  stream.getTracks().forEach(track => track.stop());
  return canvas.toDataURL('image/jpeg', 0.8);
}
"""))


<IPython.core.display.Javascript object>

In [23]:
def get_frame():
    data = eval_js("captureFrame()")
    img_bytes = np.frombuffer(
        cv2.imdecode(
            np.frombuffer(
                bytes(data.split(',')[1], 'utf-8'),
                dtype=np.uint8
            ),
            cv2.IMREAD_COLOR
        ),
        dtype=np.uint8
    )
    return img_bytes


In [25]:
from ultralytics import YOLO
model = YOLO("/content/exam_malpractice/yolov8n_v1/weights/best.pt")

In [27]:
from collections import defaultdict
head_counter = defaultdict(int)

def malpractice_decision(detections):
    for cls, conf in detections:
        if cls == 1 and conf >= 0.5:
            return "MALPRACTICE"

    suspicious = False
    for cls, conf in detections:
        if cls in [2, 3] and conf >= 0.5:
            head_counter[cls] += 1
            if head_counter[cls] >= 10:
                suspicious = True
        else:
            head_counter[cls] = 0

    return "SUSPICIOUS" if suspicious else "SAFE"


In [33]:
from IPython.display import display, Javascript

display(Javascript("""
async function takePhoto() {
  const div = document.createElement('div');
  const video = document.createElement('video');
  const stream = await navigator.mediaDevices.getUserMedia({video: true});

  document.body.appendChild(div);
  div.appendChild(video);
  video.srcObject = stream;
  await video.play();

  // Resize for YOLO
  const canvas = document.createElement('canvas');
  canvas.width = 640;
  canvas.height = 480;
  const ctx = canvas.getContext('2d');
  ctx.drawImage(video, 0, 0, canvas.width, canvas.height);

  stream.getTracks().forEach(track => track.stop());
  div.remove();

  return canvas.toDataURL('image/jpeg', 0.8);
}
"""))


<IPython.core.display.Javascript object>

In [34]:
import base64
import cv2
import numpy as np
from google.colab.output import eval_js

def get_frame():
    data = eval_js("takePhoto()")
    if data is None:
        raise RuntimeError("Camera capture failed. Refresh and allow camera.")

    encoded = data.split(',')[1]
    img_bytes = base64.b64decode(encoded)
    img_array = np.frombuffer(img_bytes, dtype=np.uint8)
    frame = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
    return frame


In [36]:
from IPython.display import display, Javascript

display(Javascript("""
async function takePhoto() {
  const div = document.createElement('div');
  const video = document.createElement('video');
  const stream = await navigator.mediaDevices.getUserMedia({video: true});

  document.body.appendChild(div);
  div.appendChild(video);
  video.srcObject = stream;
  await video.play();

  // Resize for YOLO
  const canvas = document.createElement('canvas');
  canvas.width = 640;
  canvas.height = 480;
  const ctx = canvas.getContext('2d');
  ctx.drawImage(video, 0, 0, canvas.width, canvas.height);

  stream.getTracks().forEach(track => track.stop());
  div.remove();

  return canvas.toDataURL('image/jpeg', 0.8);
}
"""))

frame = get_frame()
print(frame.shape)

<IPython.core.display.Javascript object>

(480, 640, 3)


In [38]:
from ultralytics import YOLO
model = YOLO("/content/exam_malpractice/yolov8n_v1/weights/best.pt")

In [39]:
results = model(frame, conf=0.5)

detections = []
for r in results:
    for box in r.boxes:
        detections.append((int(box.cls[0]), float(box.conf[0])))

print("Detections:", detections)



0: 480x640 1 person, 52.3ms
Speed: 1.8ms preprocess, 52.3ms inference, 2.1ms postprocess per image at shape (1, 3, 480, 640)
Detections: [(0, 0.923618495464325)]


In [45]:
from collections import defaultdict

head_counter = defaultdict(int)

def malpractice_decision(detections):
    # Strong signal: phone
    for cls, conf in detections:
        if cls == 1 and conf >= 0.25:   # LOWERED threshold
            return "MALPRACTICE"

    # Weak signal: head movement (temporal)
    suspicious = False
    for cls, conf in detections:
        if cls in [2, 3] and conf >= 0.5:
            head_counter[cls] += 1
            if head_counter[cls] >= 5:
                suspicious = True
        else:
            head_counter[cls] = 0

    return "SUSPICIOUS" if suspicious else "SAFE"


In [50]:
from IPython.display import display, Javascript

display(Javascript("""
async function takePhoto() {
  const div = document.createElement('div');
  const video = document.createElement('video');
  const stream = await navigator.mediaDevices.getUserMedia({video: true});

  document.body.appendChild(div);
  div.appendChild(video);
  video.srcObject = stream;
  await video.play();

  const canvas = document.createElement('canvas');
  canvas.width = 640;
  canvas.height = 480;
  const ctx = canvas.getContext('2d');
  ctx.drawImage(video, 0, 0, canvas.width, canvas.height);

  stream.getTracks().forEach(track => track.stop());
  div.remove();

  return canvas.toDataURL('image/jpeg', 0.8);
}
"""))


<IPython.core.display.Javascript object>

In [47]:
import base64
import cv2
import numpy as np
from google.colab.output import eval_js

def get_frame():
    data = eval_js("takePhoto()")
    if data is None:
        raise RuntimeError("Camera capture failed")

    encoded = data.split(",")[1]
    img_bytes = base64.b64decode(encoded)
    img_array = np.frombuffer(img_bytes, dtype=np.uint8)
    frame = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
    return frame


In [48]:
from ultralytics import YOLO
model = YOLO("/content/exam_malpractice/yolov8n_v1/weights/best.pt")

In [53]:
from IPython.display import display, Javascript
from google.colab.output import eval_js # Ensure eval_js is imported here if get_frame is used directly

display(Javascript("""
async function takePhoto() {
  const div = document.createElement('div');
  const video = document.createElement('video');
  const stream = await navigator.mediaDevices.getUserMedia({video: true});

  document.body.appendChild(div);
  div.appendChild(video);
  video.srcObject = stream;
  await video.play();

  const canvas = document.createElement('canvas');
  canvas.width = 640;
  canvas.height = 480;
  const ctx = canvas.getContext('2d');
  ctx.drawImage(video, 0, 0, canvas.width, canvas.height);

  stream.getTracks().forEach(track => track.stop());
  div.remove();

  return canvas.toDataURL('image/jpeg', 0.8);
}
"""))

for i in range(15):
    frame = get_frame()

    # Run YOLO with LOWER conf
    results = model(frame, conf=0.1, verbose=False)

    detections = []
    for r in results:
        for b in r.boxes:
            detections.append((int(b.cls[0]), float(b.conf[0])))

    print("RAW DETECTIONS:", detections)
    print(f"Frame {i} \u2192 STATUS:", malpractice_decision(detections))

<IPython.core.display.Javascript object>

RAW DETECTIONS: []
Frame 0 ‚Üí STATUS: SAFE
RAW DETECTIONS: [(0, 0.3514969050884247)]
Frame 1 ‚Üí STATUS: SAFE
RAW DETECTIONS: []
Frame 2 ‚Üí STATUS: SAFE
RAW DETECTIONS: [(0, 0.3540010154247284)]
Frame 3 ‚Üí STATUS: SAFE
RAW DETECTIONS: [(0, 0.3037441074848175)]
Frame 4 ‚Üí STATUS: SAFE
RAW DETECTIONS: [(0, 0.2697446942329407), (0, 0.18851129710674286)]
Frame 5 ‚Üí STATUS: SAFE
RAW DETECTIONS: []
Frame 6 ‚Üí STATUS: SAFE
RAW DETECTIONS: [(0, 0.1480173021554947)]
Frame 7 ‚Üí STATUS: SAFE
RAW DETECTIONS: [(0, 0.4047459065914154), (3, 0.1902294158935547)]
Frame 8 ‚Üí STATUS: SAFE
RAW DETECTIONS: [(0, 0.7258846759796143)]
Frame 9 ‚Üí STATUS: SAFE
RAW DETECTIONS: [(0, 0.843117892742157)]
Frame 10 ‚Üí STATUS: SAFE
RAW DETECTIONS: [(0, 0.6907045245170593)]
Frame 11 ‚Üí STATUS: SAFE
RAW DETECTIONS: [(0, 0.5563725233078003), (3, 0.20718754827976227)]
Frame 12 ‚Üí STATUS: SAFE
RAW DETECTIONS: [(0, 0.631601870059967)]
Frame 13 ‚Üí STATUS: SAFE
RAW DETECTIONS: [(0, 0.7954272031784058)]
Frame 14 ‚

In [54]:
from google.colab import files
files.download("/content/exam_malpractice/yolov8n_v1/weights/best.pt")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>