## Install Dependencies and Import Libraries

In [1]:
# Python version must be 3.6 ~ 3.10, which is officially supported for pyrealsense2 library
!pip install -r requirements.txt



In [2]:
import pyrealsense2 as rs
import numpy as np
import cv2
import os
import glob
import matplotlib.pyplot as plt
from ultralytics import YOLO
from datetime import datetime
import json
import shutil
import random
from pathlib import Path

## Real Time Defect Detection using Intel D435i and YOLOv8

In [46]:
YOLO_DETECT_VID_PATH = "capturedVideos/"

In [15]:
import cv2
import pyrealsense2 as rs
import numpy as np
from ultralytics import YOLO
from datetime import datetime

# Load models
object_model = YOLO("object_det_best.pt")
defect_model = YOLO("defect_det_best.pt")

# RealSense setup
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipeline.start(config)

# Video recording setup
recording = False
video_writer = None
video_filename = None
fps = 30
frame_size = (1280, 480)  # width doubled due to side-by-side view

print("Press SPACE to start/stop recording, ESC to exit.")

try:
    while True:
        frames = pipeline.wait_for_frames()
        color_frame = frames.get_color_frame()
        if not color_frame:
            continue

        frame = np.asanyarray(color_frame.get_data())

        # Create two copies of the frame for different outputs
        object_frame = frame.copy()
        defect_frame = frame.copy()

        # Step 1: Detect object of interest
        object_results = object_model.predict(source=frame, save=False, stream=False, show=False, verbose=False)
        object_boxes = object_results[0].boxes.xyxy.cpu().numpy().astype(int)
        object_classes = object_results[0].boxes.cls.cpu().numpy().astype(int) if object_results[0].boxes.cls is not None else []

        for i, (x1, y1, x2, y2) in enumerate(object_boxes):
            # Draw object detection on left frame
            if len(object_classes) > i:
                label = object_results[0].names[object_classes[i]]
            else:
                label = "Object"
            cv2.rectangle(object_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(object_frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

            # Step 2: Run defect detection within object ROI
            roi = frame[y1:y2, x1:x2]
            if roi.size == 0:
                continue

            defect_results = defect_model.predict(source=roi, save=False, stream=False, show=False, verbose=False)
            defect_boxes = defect_results[0].boxes.xyxy.cpu().numpy().astype(int)
            defect_classes = defect_results[0].boxes.cls.cpu().numpy().astype(int) if defect_results[0].boxes.cls is not None else []

            for j, (dx1, dy1, dx2, dy2) in enumerate(defect_boxes):
                gx1, gy1, gx2, gy2 = x1 + dx1, y1 + dy1, x1 + dx2, y1 + dy2
                cv2.rectangle(defect_frame, (gx1, gy1), (gx2, gy2), (0, 255, 255), 2)

                if defect_results[0].names and len(defect_classes) > 0:
                    defect_label = defect_results[0].names[defect_classes[j]]
                    cv2.putText(defect_frame, defect_label, (gx1, gy1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)

        # Concatenate and show the frames side by side
        combined_view = cv2.hconcat([object_frame, defect_frame])
        cv2.imshow("Object Detection (Left) | Defect Detection (Right)", combined_view)

        # Record if needed
        if recording and video_writer:
            video_writer.write(combined_view)

        key = cv2.waitKey(1) & 0xFF

        # SPACE: Toggle recording
        if key == 32:
            recording = not recording
            if recording:
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                video_filename = f"output_{timestamp}.avi"
                video_writer = cv2.VideoWriter(
                    video_filename,
                    cv2.VideoWriter_fourcc(*'XVID'),
                    fps,
                    frame_size
                )
                print(f"Recording started: {video_filename}")
            else:
                if video_writer:
                    video_writer.release()
                    print(f"Recording saved: {video_filename}")
                    video_writer = None

        # ESC: Exit
        elif key == 27:
            print("ESC pressed. Exiting...")
            break

finally:
    pipeline.stop()
    if video_writer:
        video_writer.release()
    cv2.destroyAllWindows()
    print("Cleanup complete.")

Press SPACE to start/stop recording, ESC to exit.
Recording started: output_20250528_154547.avi
Recording saved: output_20250528_154547.avi
ESC pressed. Exiting...
Cleanup complete.
