In [18]:
import sys
import time
import shlex
import subprocess

import imagingcontrol4 as ic4


def find_device_by_serial(serial: str) -> ic4.DeviceInfo:
    devices = ic4.DeviceEnum.devices()
    for dev in devices:
        if getattr(dev, "serial", None) == serial:
            return dev
    raise RuntimeError(f"Camera with serial {serial!r} not found")


def configure_camera_for_bayer_gr8(
    grabber: ic4.Grabber, width: int, height: int, fps: float
) -> str | None:
    dmap = grabber.device_property_map
    try:
        dmap.set_value(ic4.PropId.WIDTH, width)
    except ic4.IC4Exception:
        pass
    try:
        dmap.set_value(ic4.PropId.HEIGHT, height)
    except ic4.IC4Exception:
        pass

    applied_format = None
    try:
        dmap.set_value(ic4.PropId.PIXEL_FORMAT, "BGR8")
        applied_format = "BGR8"
    except ic4.IC4Exception:
        pass
    if applied_format is None:
        try:
            dmap.set_value(ic4.PropId.PIXEL_FORMAT, "RGB8")
            applied_format = "RGB8"
        except ic4.IC4Exception:
            pass
    if applied_format is None:
        raise RuntimeError("Camera does not support BGR8/RGB8 PixelFormat")

    try:
        dmap.set_value(ic4.PropId.ACQUISITION_FRAME_RATE, float(fps))
    except ic4.IC4Exception:
        pass

    drmap = grabber.driver_property_map
    for pid, val in [
        (ic4.PropId.TRIGGER_SELECTOR, "FrameStart"),
        (ic4.PropId.TRIGGER_SOURCE, "Software"),
        (ic4.PropId.TRIGGER_MODE, "On"),
    ]:
        try:
            drmap.set_value(pid, val)
        except ic4.IC4Exception:
            pass

    print(f"Configured PixelFormat: {applied_format or 'unknown'}")
    return applied_format


class _RawQueueSinkListener(ic4.QueueSinkListener):
    def sink_connected(
        self, sink: ic4.QueueSink, image_type: ic4.ImageType, min_buffers_required: int
    ) -> bool:
        return True

    def frames_queued(self, sink: ic4.QueueSink) -> None:
        return


def allocate_queue_sink(
    grabber: ic4.Grabber, width: int, height: int
) -> tuple[ic4.QueueSink, _RawQueueSinkListener]:
    listener = _RawQueueSinkListener()
    sink = ic4.QueueSink(
        listener,
        accepted_pixel_formats=[ic4.PixelFormat.BGR8],  # RGB8無し環境に対応済み
    )
    grabber.stream_setup(
        sink,
        setup_option=ic4.StreamSetupOption.DEFER_ACQUISITION_START,
    )
    num_buffers = 1000
    sink.alloc_and_queue_buffers(num_buffers)
    return sink, listener


def record_raw_frames(
    grabber: ic4.Grabber,
    sink: ic4.QueueSink,
    fps: float = 30.0,
    width: int = 1920,
    height: int = 1080,
    pixel_format: str | None = None,
) -> None:
    ffmpeg_pix_fmt = (
        "bgr24" if pixel_format and "BGR" in str(pixel_format).upper() else "rgb24"
    )
    ffmpeg_cmd = (
        "ffmpeg -loglevel error "
        f"-f rawvideo -pix_fmt {ffmpeg_pix_fmt} -s {width}x{height} "
        f"-framerate {fps} -i - "
        "-vf format=yuv420p "
        "-f v4l2 /dev/video1"
    )

    proc = subprocess.Popen(shlex.split(ffmpeg_cmd), stdin=subprocess.PIPE)
    if proc.stdin is None:
        proc.kill()
        proc.wait()
        raise RuntimeError("ffmpeg stdin unavailable")

    try:
        grabber.acquisition_start()

        trigger_cmd = None
        try:
            prop = grabber.driver_property_map.find(ic4.PropId.TRIGGER_SOFTWARE)
            if isinstance(prop, ic4.PropCommand):
                trigger_cmd = prop
        except ic4.IC4Exception:
            trigger_cmd = None

        inter_trigger = 1.0 / fps if fps > 0 else 0.0

        while True:
            t0 = time.perf_counter()

            if trigger_cmd is not None:
                try:
                    trigger_cmd.execute()
                except ic4.IC4Exception:
                    pass

            buf = None
            deadline = time.perf_counter() + 2.0
            while buf is None and time.perf_counter() < deadline:
                buf = sink.try_pop_output_buffer()
                if buf is None:
                    time.sleep(0.001)

            if buf is None:
                continue

            arr = buf.numpy_wrap()
            proc.stdin.write(arr.tobytes())
            buf.release()

            dt = time.perf_counter() - t0
            rest = inter_trigger - dt
            if rest > 0:
                time.sleep(rest)

    except KeyboardInterrupt:
        print("Stopping capture due to KeyboardInterrupt", file=sys.stderr)

    finally:
        try:
            grabber.acquisition_stop()
        except ic4.IC4Exception:
            pass
        try:
            grabber.stream_stop()
        except ic4.IC4Exception:
            pass
        try:
            proc.stdin.close()
        except Exception:
            pass
        try:
            proc.wait()
        except Exception:
            pass


In [19]:
# ---- settings ----
SERIAL_NUMBER = "05520125"
WIDTH, HEIGHT = 1920, 1080
FRAME_RATE = 30.0

# ---- init & open ----
ic4.Library.init()
grabber = ic4.Grabber()

device_info = find_device_by_serial(SERIAL_NUMBER)
grabber.device_open(device_info)

pixel_format = configure_camera_for_bayer_gr8(grabber, WIDTH, HEIGHT, FRAME_RATE)

sink, sink_listener = allocate_queue_sink(grabber, WIDTH, HEIGHT)

print("Setup done. Ready to stream.")


Configured PixelFormat: BGR8
Setup done. Ready to stream.


In [20]:
try:
    record_raw_frames(
        grabber,
        sink,
        fps=FRAME_RATE,
        width=WIDTH,
        height=HEIGHT,
        pixel_format=pixel_format,
    )
except KeyboardInterrupt:
    print("Interrupted by user.")


Stopping capture due to KeyboardInterrupt


In [21]:
# ---- cleanup ----
try:
    if grabber.is_device_open:
        grabber.device_close()
except ic4.IC4Exception:
    pass

grabber = None
sink = None
sink_listener = None
device_info = None

ic4.Library.exit()
print("Clean shutdown.")


Clean shutdown.
