# Introduction

This notebook showcases a PyQt5 application to record camera snapshots. It demonstrates how you can:

1. Capture images from a webcam.
2. Detect objects using a Transformer-based model (Hugging Face DETR).
3. Optionally generate captions with ViT-GPT2 or Salesforce BLIP.
4. Save data and annotated snapshots in structured folders.

Note: Desktop GUIs in notebooks require event-loop integration. We'll use `%gui qt5` to help run Qt inside the Jupyter environment.

## Setup Instructions

1. Make sure you have **PyQt5** installed:
   ```bash
   pip install pyqt5
   ```
2. In Jupyter, run `%gui qt5` (or use a dedicated cell) so that Qt and Jupyter cooperate.
3. Run the notebook cells to launch the GUI.
4. Depending on your environment, you may need to run this locally rather than in a hosted notebook environment.

In [1]:
# Environment and Imports

%gui qt5  # Integrate Qt event loop in Jupyter
import sys
import json
import threading
import cv2
import numpy as np
import time
import os
import torch
from PyQt5.QtWidgets import (
    QApplication, QMainWindow, QLabel, QLineEdit, QPushButton,
    QVBoxLayout, QWidget, QFileDialog, QComboBox
)
from PyQt5.QtGui import QImage, QPixmap
from PyQt5.QtCore import QTimer
from transformers import (
    SuperPointForKeypointDetection, AutoImageProcessor,
    DetrImageProcessor, DetrForObjectDetection,
    VisionEncoderDecoderModel, ViTImageProcessor, AutoTokenizer,
    BlipProcessor, BlipForConditionalGeneration
)
from transformers import pipeline
from PIL import Image

# Load Hugging Face DETR model and processor
processor = DetrImageProcessor.from_pretrained("facebook/detr-resnet-50")
model = DetrForObjectDetection.from_pretrained("facebook/detr-resnet-50")

# Pose detection model
pose_processor = AutoImageProcessor.from_pretrained("magic-leap-community/superpoint")
pose_model = SuperPointForKeypointDetection.from_pretrained("magic-leap-community/superpoint")

pipe = pipeline(task="depth-estimation", model="depth-anything/Depth-Anything-V2-Small-hf")

ERROR:root:Invalid GUI request 'qt5 # Integrate Qt event loop in Jupyter', valid ones are:dict_keys(['inline', 'nbagg', 'webagg', 'notebook', 'ipympl', 'widget', None, 'qt', 'qt5', 'qt6', 'wx', 'tk', 'gtk', 'gtk3', 'osx', 'asyncio'])
Some weights of the model checkpoint at facebook/detr-resnet-50 were not used when initializing DetrForObjectDetection: ['model.backbone.conv_encoder.model.layer1.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer2.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer3.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer4.0.downsample.1.num_batches_tracked']
- This IS expected if you are initializing DetrForObjectDetection from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DetrForObjectDetection from the checkpoin

## DataRecorderApp Class

Below is the main class that:

1. Creates a PyQt5 window with:
   - Camera source selector
   - Feed display
   - Fields for `instruction` and `intent`
   - Interval setting for snapshots
2. Handles camera reading and timed snapshots.
3. Performs object detection, pose/keypoint detection, and (optionally) caption generation.
4. Saves images and JSON files in a structured directory.

In [2]:
class DataRecorderApp(QMainWindow):
    def __init__(self):
        super().__init__()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.caption_model = VisionEncoderDecoderModel.from_pretrained("nlpconnect/vit-gpt2-image-captioning")
        self.feature_extractor = ViTImageProcessor.from_pretrained("nlpconnect/vit-gpt2-image-captioning")
        self.caption_tokenizer = AutoTokenizer.from_pretrained("nlpconnect/vit-gpt2-image-captioning")

        self.sf_caption_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large")
        self.sf_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large")

        self.gen_kwargs = {"max_length": 16, "num_beams": 4}

        self.setWindowTitle("Data Recorder")
        self.base_save_dir = "recordings"
        os.makedirs(self.base_save_dir, exist_ok=True)

        self.recording = False
        self.session_dir = None
        self.timer = QTimer(self)
        self.running = False
        self.frame = None
        self.lock = threading.Lock()

        self.central_widget = QWidget()
        self.setCentralWidget(self.central_widget)
        self.main_layout = QVBoxLayout(self.central_widget)

        self.camera_label = QLabel("Select Camera:")
        self.main_layout.addWidget(self.camera_label)
        self.camera_selector = QComboBox()
        self.camera_selector.addItems(["0", "1", "2"])
        self.main_layout.addWidget(self.camera_selector)

        self.feed_label = QLabel("Camera Feed")
        self.feed_label.setFixedSize(640, 480)
        self.main_layout.addWidget(self.feed_label)

        self.instruction_label = QLabel("Instruction:")
        self.main_layout.addWidget(self.instruction_label)
        self.instruction_input = QLineEdit()
        self.main_layout.addWidget(self.instruction_input)

        self.intent_label = QLabel("Intent:")
        self.main_layout.addWidget(self.intent_label)
        self.intent_input = QLineEdit()
        self.main_layout.addWidget(self.intent_input)

        self.interval_label = QLabel("Snapshot Interval (ms):")
        self.main_layout.addWidget(self.interval_label)
        self.interval_input = QLineEdit("100")
        self.main_layout.addWidget(self.interval_input)

        self.save_dir_button = QPushButton("Set Save Directory")
        self.save_dir_button.clicked.connect(self.set_save_directory)
        self.main_layout.addWidget(self.save_dir_button)

        self.record_button = QPushButton("Record")
        self.record_button.clicked.connect(self.start_recording)
        self.main_layout.addWidget(self.record_button)

        self.stop_button = QPushButton("Stop")
        self.stop_button.clicked.connect(self.stop_recording)
        self.main_layout.addWidget(self.stop_button)

        self.feedback_label = QLabel("")
        self.main_layout.addWidget(self.feedback_label)

        self.cap = None
        self.start_camera()

    def pose_detection(self, image):
        if image.shape[2] == 3:
            image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        image = np.expand_dims(image, axis=-1)
        image = np.repeat(image, 3, axis=-1)
        pil_image = Image.fromarray(image)
        inputs = pose_processor(images=pil_image, return_tensors="pt", padding="max_length")
        outputs = pose_model(**inputs)
        keypoints = outputs["pred_keypoints"][0].cpu().numpy()[:, :2]
        return keypoints

    def generate_image_caption(self, image):
        if image.mode != "RGB":
            image = image.convert(mode="RGB")
        pixel_values = self.feature_extractor(images=image, return_tensors="pt").pixel_values.to(self.device)
        output_ids = self.caption_model.generate(
            pixel_values.to(self.device),
            max_length=self.gen_kwargs["max_length"],
            num_beams=self.gen_kwargs["num_beams"]
        )
        return self.caption_tokenizer.decode(output_ids[0], skip_special_tokens=True).strip()

    def get_depth_map(self, image):
        depth_map = pipe(image)
        return depth_map

    def salesforce_caption(self, image):
        if image.mode != "RGB":
            image = image.convert("RGB")
        inputs = self.sf_processor(image, return_tensors="pt")
        with torch.no_grad():
            caption_ids = self.sf_caption_model.generate(**inputs, **self.gen_kwargs)
        return self.sf_processor.decode(caption_ids[0], skip_special_tokens=True)

    def start_camera(self):
        if self.cap and self.cap.isOpened():
            self.feedback_label.setText("Camera already running.")
            return
        camera_index = int(self.camera_selector.currentText())
        self.cap = cv2.VideoCapture(camera_index)
        if not self.cap.isOpened():
            self.feedback_label.setText("Error: Could not open selected webcam.")
            return
        self.running = True
        self.camera_thread = threading.Thread(target=self.camera_loop, daemon=True)
        self.camera_thread.start()
        self.camera_timer = QTimer(self)
        self.camera_timer.timeout.connect(self.update_camera_feed)
        self.camera_timer.start(30)

    def camera_loop(self):
        while self.running:
            if self.cap:
                ret, frame = self.cap.read()
                if ret:
                    with self.lock:
                        self.frame = frame.copy()
                time.sleep(0.01)

    def start_recording(self):
        if self.recording:
            self.feedback_label.setText("Already recording.")
            return
        try:
            interval = int(self.interval_input.text())
        except ValueError:
            self.feedback_label.setText("Invalid interval value. Please enter a valid number.")
            return
        timestamp = int(time.time() * 1000)
        self.session_dir = os.path.join(self.base_save_dir, f"session_{timestamp}")
        os.makedirs(self.session_dir, exist_ok=True)
        self.recording = True
        self.feedback_label.setText(f"Recording started. Saving to {self.session_dir}.")
        self.timer.timeout.connect(self.take_snapshot)
        self.timer.start(interval)

    def stop_recording(self):
        if not self.recording:
            self.feedback_label.setText("Not currently recording.")
            return
        self.recording = False
        self.timer.stop()
        self.feedback_label.setText(f"Recording stopped. Session data saved in {self.session_dir}.")

    def set_save_directory(self):
        directory = QFileDialog.getExistingDirectory(self, "Select Save Directory")
        if directory:
            self.base_save_dir = directory
            self.feedback_label.setText(f"Base save directory set to {self.base_save_dir}.")

    def update_camera_feed(self):
        if self.frame is not None:
            with self.lock:
                display_frame = self.frame.copy()
            detected_objects = self.detect_objects_with_huggingface(display_frame)
            joint_outputs = self.simulate_joint_outputs()
            for obj in detected_objects:
                x1, y1, x2, y2 = map(int, obj["box"])
                label = f"{obj['label']} ({obj['score']:.2f})"
                cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(display_frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
            for joint in joint_outputs:
                x, y, z = (joint * np.array([display_frame.shape[1], display_frame.shape[0], 1])).astype(int)
                cv2.circle(display_frame, (x, y), 5, (255, 0, 0), -1)
            display_frame = cv2.resize(display_frame, (640, 480))
            cv2.flip(display_frame, 1, display_frame)
            rgb_frame = cv2.cvtColor(display_frame, cv2.COLOR_BGR2RGB)
            h, w, ch = rgb_frame.shape
            bytes_per_line = ch * w
            q_image = QImage(rgb_frame.data, w, h, bytes_per_line, QImage.Format_RGB888)
            self.feed_label.setPixmap(QPixmap.fromImage(q_image))

    def take_snapshot(self):
        if self.frame is None:
            return
        with self.lock:
            snapshot_frame = self.frame.copy()
        snapshot_frame = cv2.resize(snapshot_frame, (640, 480))
        timestamp = int(time.time() * 1000)
        if self.session_dir is None:
            self.feedback_label.setText("Error: Session directory is not set.")
            return
        snapshot_subdir = os.path.join(self.session_dir, f"snapshot_{timestamp}")
        os.makedirs(snapshot_subdir, exist_ok=True)
        snapshot_filename = os.path.join(snapshot_subdir, "image.jpg")
        detected_objects = self.detect_objects_with_huggingface(snapshot_frame)
        joint_outputs = self.simulate_joint_outputs()
        annotated_image = snapshot_frame.copy()
        for obj in detected_objects:
            x1, y1, x2, y2 = map(int, obj["box"])
            label = f"{obj['label']} ({obj['score']:.2f})"
            cv2.rectangle(annotated_image, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(annotated_image, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
        for joint in joint_outputs:
            x, y, z = (joint * np.array([640, 480, 1])).astype(int)
            cv2.circle(annotated_image, (x, y), 5, (255, 0, 0), -1)
        cv2.imwrite(snapshot_filename, annotated_image)
        depth_map = self.get_depth_map(Image.fromarray(cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB)))
        depth_map.save(os.path.join(snapshot_subdir, "depth_map.jpg"))
        data_json = {
            "timestamp": timestamp,
            "instruction": self.instruction_input.text(),
            "intent": self.intent_input.text(),
            "detected_objects": detected_objects,
            "joint_outputs": joint_outputs.tolist(),
        }
        json_file = os.path.join(snapshot_subdir, f"data_{timestamp}.json")
        with open(json_file, "w") as f:
            json.dump(data_json, f, indent=4)
        self.feedback_label.setText(f"Snapshot saved to {snapshot_filename}")

    def detect_objects_with_huggingface(self, image):
        pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        inputs = processor(images=pil_image, return_tensors="pt")
        outputs = model(**inputs)
        target_sizes = [pil_image.size[::-1]]
        results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.5)[0]
        detected_objects = []
        for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
            label_name = model.config.id2label[label.item()]
            box = [round(i, 2) for i in box.tolist()]
            detected_objects.append({
                "label": label_name,
                "score": round(score.item(), 2),
                "box": box
            })
        return detected_objects

    @staticmethod
    def simulate_joint_outputs():
        return np.random.rand(360, 3)

    def closeEvent(self, event):
        if self.cap:
            self.cap.release()
        if self.timer:
            self.timer.stop()
        event.accept()

## Launch the Application

Calling `launch_data_recorder()` will start the PyQt5 event loop, creating the GUI window inside the notebook environment (provided `%gui qt5` is active and your environment supports desktop GUIs).

In [3]:
def launch_data_recorder():
    app = QApplication(sys.argv)
    window = DataRecorderApp()
    window.show()
    app.exec_()  # Blocking call to start the Qt event loop

In [None]:
# Run this cell to launch the GUI
launch_data_recorder()

Config of the encoder: <class 'transformers.models.vit.modeling_vit.ViTModel'> is overwritten by shared encoder config: ViTConfig {
  "architectures": [
    "ViTModel"
  ],
  "attention_probs_dropout_prob": 0.0,
  "encoder_stride": 16,
  "hidden_act": "gelu",
  "hidden_dropout_prob": 0.0,
  "hidden_size": 768,
  "image_size": 224,
  "initializer_range": 0.02,
  "intermediate_size": 3072,
  "layer_norm_eps": 1e-12,
  "model_type": "vit",
  "num_attention_heads": 12,
  "num_channels": 3,
  "num_hidden_layers": 12,
  "patch_size": 16,
  "qkv_bias": true,
  "transformers_version": "4.47.1"
}

Config of the decoder: <class 'transformers.models.gpt2.modeling_gpt2.GPT2LMHeadModel'> is overwritten by shared decoder config: GPT2Config {
  "activation_function": "gelu_new",
  "add_cross_attention": true,
  "architectures": [
    "GPT2LMHeadModel"
  ],
  "attn_pdrop": 0.1,
  "bos_token_id": 50256,
  "decoder_start_token_id": 50256,
  "embd_pdrop": 0.1,
  "eos_token_id": 50256,
  "initializer_rang

NotImplementedError: The operator 'aten::upsample_bicubic2d.out' is not currently implemented for the MPS device. If you want this op to be added in priority during the prototype phase of this feature, please comment on https://github.com/pytorch/pytorch/issues/77764. As a temporary fix, you can set the environment variable `PYTORCH_ENABLE_MPS_FALLBACK=1` to use the CPU as a fallback for this op. WARNING: this will be slower than running natively on MPS.

NotImplementedError: The operator 'aten::upsample_bicubic2d.out' is not currently implemented for the MPS device. If you want this op to be added in priority during the prototype phase of this feature, please comment on https://github.com/pytorch/pytorch/issues/77764. As a temporary fix, you can set the environment variable `PYTORCH_ENABLE_MPS_FALLBACK=1` to use the CPU as a fallback for this op. WARNING: this will be slower than running natively on MPS.

NotImplementedError: The operator 'aten::upsample_bicubic2d.out' is not currently implemented for the MPS device. If you want this op to be added in priority during the prototype phase of this feature, please comment on https://github.com/pytorch/pytorch/issues/77764. As a temporary fix, you can set the environment variable `PYTORCH_ENABLE_MPS_FALLBACK=1` to use the CPU as a fallback for this op. WARNING: this will be slower than running natively on MPS.

NotImplementedError: The operator 'aten::upsample_bicubic2d.out' is not currently implemented for the MPS device. If you want this op to be added in priority during the prototype phase of this feature, please comment on https://github.com/pytorch/pytorch/issues/77764. As a temporary fix, you can set the environment variable `PYTORCH_ENABLE_MPS_FALLBACK=1` to use the CPU as a fallback for this op. WARNING: this will be slower than running natively on MPS.

NotImplementedError: The operator 'aten::upsample_bicubic2d.out' is not currently implemented for the MPS device. If you want this op to be added in priority during the prototype phase of this feature, please comment on https://github.com/pytorch/pytorch/issues/77764. As a temporary fix, you can set the environment variable `PYTORCH_ENABLE_MPS_FALLBACK=1` to use the CPU as a fallback for this op. WARNING: this will be slower than running natively on MPS.

KeyboardInterrupt: 