# Laundry AI Detector

This notebook implements a **Laundry AI Detector**, an intelligent system designed to monitor laundry shops through video analysis. The goal is to detect situations where employees take clothes from customers without issuing a receipt within a 10‑minute window. The detector counts how many customers enter, deliver clothing, and whether or not a receipt is printed for each transaction. When a customer does not receive a receipt within the allotted time, the system flags the event and saves a short video clip for review.

**Key features:**

- Video processing to extract frames at a specified rate (you can adjust the sampling rate to balance processing speed and detection accuracy).
- Use of AI models for object detection (YOLOv8) and person tracking (DeepSORT) to follow customers and staff over time.
- Event analysis to identify when clothing is handed over and whether a receipt is issued within 10 minutes.
- Export functionality to create short video clips of detected events and summary logs for further investigation.

This notebook is meant to serve as a starting point. You can adapt the detection logic, modify the model, or integrate your own custom dataset for receipts or specific clothing items.


In [1]:
# %pip install ultralytics deep_sort_realtime pandas tqdm opencv-python

In [2]:
# Imports
import cv2
import numpy as np
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
from collections import defaultdict
import pandas as pd
from tqdm import tqdm
import os
import math
import datetime


View Ultralytics Settings with 'yolo settings' or at '/Users/naifalhajri/Library/Application Support/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [None]:

# Load the YOLOv8 model pretrained on COCO dataset. This model detects persons and common objects.
# You can choose 'yolov8n.pt' (nano), 'yolov8s.pt' (small), 'yolov8m.pt' (medium), etc. Adjust based on available compute.

model = YOLO('yolov8n.pt')  # using the lightweight nano version for speed

# Initialize DeepSORT tracker for multi-object tracking. The parameters can be tuned for your environment.
tracker = DeepSort(max_age=50, n_init=2, nms_max_overlap=1.0, max_cosine_distance=0.2)

# Define the list of YOLO class names for reference (COCO dataset)
CLASS_NAMES = model.model.names

# Define which class IDs represent clothing or bag items. This is heuristic and can be adjusted.
CLOTHING_CLASSES = {
    CLASS_NAMES.index('handbag'),
    CLASS_NAMES.index('backpack'),
    CLASS_NAMES.index('suitcase'),
    CLASS_NAMES.index('tie'),
    CLASS_NAMES.index('teddy bear'),  # use for soft items if needed
}


In [None]:

def detect_receipt_region(frame, debug=False):
    """
    Detects potential receipt regions in the frame based on simple colour and shape heuristics.
    A receipt is usually a small, bright (mostly white) rectangular piece of paper. This function
    applies thresholding to isolate bright regions and then filters by aspect ratio and size.

    Parameters:
        frame (np.ndarray): BGR image frame from video.
        debug (bool): If True, displays intermediate images for tuning (requires interactive environment).

    Returns:
        bool: True if a receipt-like region is detected, False otherwise.
    """
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    # Focus on bright areas; adjust threshold as needed
    _, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY)
    # Morphological operations to remove noise and close gaps
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
    thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel, iterations=2)
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    for cnt in contours:
        x, y, w, h = cv2.boundingRect(cnt)
        area = w * h
        if area < 500 or area > 50000:
            continue  # discard too small or too large regions
        aspect_ratio = w / float(h)
        # Typical receipt is roughly tall and narrow or long and thin; adjust thresholds as needed
        if 0.3 < aspect_ratio < 3.0:
            # This region may be a receipt
            if debug:
                cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
                cv2.imshow('Receipt candidate', frame)
                cv2.waitKey(0)
            return True
    return False


In [None]:

def process_video(
    video_path: str,
    output_dir: str,
    receipt_timeout: int = 600,
    frame_sample_rate: int = 3,
    debug: bool = False
):
    """
    Processes a video file to detect clothing drop-off events and checks for receipt issuance.

    Parameters:
        video_path (str): Path to input MP4 video file.
        output_dir (str): Directory where event clips and logs will be saved.
        receipt_timeout (int): Time window (in seconds) to wait for a receipt after clothing is handed over.
        frame_sample_rate (int): Process every Nth frame for efficiency.
        debug (bool): If True, shows intermediate frames and debugging information.

    Returns:
        pd.DataFrame: A DataFrame summarising detected events.
    """
    os.makedirs(output_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise IOError(f"Cannot open video {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    video_basename = os.path.basename(video_path).rsplit('.', 1)[0]

    # Dictionaries to track state per person ID
    person_last_seen = {}
    clothing_event_time = {}
    receipt_received = {}

    # List to collect event logs
    event_logs = []

    frame_idx = 0
    pbar = tqdm(total=total_frames, desc='Processing video')
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_idx += 1
        pbar.update(1)

        # Skip frames to speed up processing
        if frame_idx % frame_sample_rate != 0:
            continue

        timestamp = frame_idx / fps

        # Run YOLO detection
        results = model(frame, verbose=False)
        detections = []
        for box in results[0].boxes:
            cls_id = int(box.cls[0])
            conf = float(box.conf[0])
            if conf < 0.4:
                continue  # filter low confidence
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            # Detections formatted for DeepSort: [x1, y1, x2, y2, confidence, class_id]
            detections.append([x1, y1, x2, y2, conf, cls_id])

        # Update tracker
        tracks = tracker.update_tracks(detections, frame=frame)

        # For each tracked person, update states
        for track in tracks:
            if not track.is_confirmed():
                continue
            track_id = track.track_id
            l, t, r, b = track.to_ltrb()
            class_id = track.det_class

            # Only care about person class
            if class_id != CLASS_NAMES.index('person'):
                continue

            # Update last seen time
            person_last_seen[track_id] = timestamp

            # Check if person is carrying clothes based on detection of a clothing-like object nearby
            carrying_clothes = False
            for det in detections:
                dx1, dy1, dx2, dy2, dconf, dcls = det
                if int(dcls) in CLOTHING_CLASSES:
                    # Compute IoU with person box
                    inter_x1 = max(l, dx1)
                    inter_y1 = max(t, dy1)
                    inter_x2 = min(r, dx2)
                    inter_y2 = min(b, dy2)
                    inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)
                    person_area = (r - l) * (b - t)
                    if person_area > 0 and (inter_area / person_area) > 0.1:
                        carrying_clothes = True
                        break

            # Record time of clothing drop-off
            if carrying_clothes and track_id not in clothing_event_time:
                clothing_event_time[track_id] = timestamp
                receipt_received[track_id] = False
                if debug:
                    print(f"Person {track_id} dropped off clothes at {timestamp:.2f}s")

            # Detect receipt issuance after drop-off
            if track_id in clothing_event_time and not receipt_received[track_id]:
                if detect_receipt_region(frame):
                    receipt_received[track_id] = True
                    if debug:
                        print(f"Receipt issued to person {track_id} at {timestamp:.2f}s")

        # Check for timeouts
        to_remove = []
        for pid, drop_time in clothing_event_time.items():
            if receipt_received[pid]:
                to_remove.append(pid)
                continue
            # If enough time has passed without receipt, flag event
            if timestamp - drop_time > receipt_timeout:
                # Save video clip around the event (10 seconds before and after drop)
                clip_start = max(0, drop_time - 5)
                clip_end = min(timestamp + 5, total_frames / fps)
                clip_filename = f"{video_basename}_person{pid}_no_receipt_{int(drop_time)}.mp4"
                clip_path = os.path.join(output_dir, clip_filename)
                save_clip(video_path, clip_start, clip_end, clip_path)
                event_logs.append({
                    'person_id': pid,
                    'drop_time_sec': drop_time,
                    'clip_path': clip_path,
                    'receipt_received': False
                })
                to_remove.append(pid)
                if debug:
                    print(f"No receipt for person {pid} (drop at {drop_time:.2f}s)")

        # Clean up processed IDs
        for pid in to_remove:
            clothing_event_time.pop(pid, None)
            receipt_received.pop(pid, None)

    cap.release()
    pbar.close()

    # Save event logs to CSV
    df_events = pd.DataFrame(event_logs)
    csv_path = os.path.join(output_dir, f"{video_basename}_event_log.csv")
    df_events.to_csv(csv_path, index=False)
    print(f"Processed {video_path}. Events logged to {csv_path}.")
    return df_events


In [None]:

def save_clip(video_path: str, start_time: float, end_time: float, output_path: str):
    """
    Saves a segment of the input video between start_time and end_time to output_path.

    Parameters:
        video_path (str): Path to the original video.
        start_time (float): Start time in seconds.
        end_time (float): End time in seconds.
        output_path (str): Path to save the extracted clip.
    """
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Compute frame ranges
    start_frame = int(start_time * fps)
    end_frame = int(end_time * fps)

    current_frame = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if current_frame >= start_frame and current_frame <= end_frame:
            out.write(frame)
        current_frame += 1
        if current_frame > end_frame:
            break

    cap.release()
    out.release()


In [None]:

# Example usage:
# Provide the path to your input MP4 video and an output directory to store results.
# Adjust receipt_timeout (seconds) as needed. Here we set it to 600s (10 minutes).

# Uncomment and set your paths accordingly
# video_file = 'path/to/your/laundry_shop_video.mp4'
# output_directory = 'output_events'
# events_df = process_video(
#     video_path=video_file,
#     output_dir=output_directory,
#     receipt_timeout=600,
#     frame_sample_rate=3,
#     debug=True
# )
# events_df
