-
Notifications
You must be signed in to change notification settings - Fork 23
Open
Description
I am trying to integrate a camera into a bigger system, and I am running into issues starting and stopping the camera. Specficially, I can't seem to delete the object and then recreate the object without running into SeekCameraInvalidParameterErrors on the re-created object. I have example code here, running seekcamera-python-1.3.0 on SDK 4.4.2.20. It should run from cli with no arguments. The output is here, with the code afterwards.
sherman:$ python bad_seek.py
Initializing camera E65609010C11
Frame callback registered.
Camera connected: E65609010C11
SeekCameraManager initialized and event callback registered.
Camera stream started in a separate thread. Delaying for 5000 ms
Camera started. You can now call `get_array` independently.
Captured frame at 2025-01-22T14:13:18.533407 -- [1.000, 120.841, 255.000]
Captured frame at 2025-01-22T14:13:18.644191 -- [1.000, 120.841, 255.000]
Captured frame at 2025-01-22T14:13:18.755333 -- [1.000, 121.800, 255.000]
...
Captured frame at 2025-01-22T14:13:23.320362 -- [1.000, 121.994, 255.000]
Captured frame at 2025-01-22T14:13:23.431807 -- [1.000, 122.118, 255.000]
Captured frame at 2025-01-22T14:13:23.543239 -- [1.000, 121.874, 255.000]
Thermal camera stream stopped.
Camera stopped.
Initializing camera E65609010C11
SeekCameraManager initialized and event callback registered.
Unhandled exception:
Traceback (most recent call last):
File "/home/sherman/bad_seek.py", line 74, in start
if hasattr(self.renderer.camera, "color_palette"):
File "/home/sherman/SeekThermal/seekcamera-python-1.3.0/seekcamera-python-1.3.0/seekcamera/camera.py", line 1381, in color_palette
raise error_from_status(status)
seekcamera.error.SeekCameraInvalidParameterError
Camera started. You can now call `get_array` independently.
Dropped 10 frames
Dropped 20 frames
Dropped 30 frames
...
Dropped 1000 frames
Dropped too many frames, stopping
Camera stopped.
Source code
#seekcameras.py
from collections import deque
from datetime import datetime
from threading import Condition, Thread
import numpy as np
import time
import sys
import traceback
from seekcamera import (
SeekCameraIOType,
SeekCameraManager,
SeekCameraManagerEvent,
SeekCameraFrameFormat,
SeekCameraColorPalette,
SeekCamera,
SeekFrame,
)
SERIAL = "E65609010C11"
class SeekCameraS(object):
def __init__(self, serial, *args, **kwargs):
super().__init__(*args, **kwargs)
self.serial = serial # Target serial number for filtering
self.camera = None
self.manager = None
self.renderer = None
self.ready = False
self.is_init = False
self.is_start = False
self.frame_queue = deque(maxlen=100) # Queue to hold up to 100 frames
self.thread = None
self.running = False
self.config_default = {
"start_delay": 5000, # ms
"acquisition_rate": 9, # Hz
}
self.config_active = self.config_default.copy()
self.last_frame_time = time.time()
self.dropped_frames = 0
def initialize(self, **config):
"""Initialize the Seek camera and set up callbacks."""
self.config_active.update(config)
self.frame_interval = 1.0 / self.config_active["acquisition_rate"]
if self.is_init:
print(f"Device {self.serial} already initialized")
self.close()
else:
print(f"Initializing camera {self.serial}")
try:
self.renderer = Renderer()
self.manager = SeekCameraManager(SeekCameraIOType.USB)
self.manager.register_event_callback(self._on_event, self.renderer)
print("SeekCameraManager initialized and event callback registered.")
self.ready = True
self.is_init = True
except Exception as e:
print(f"Unhandled exception: {e}")
traceback.print_exc()
self.ready = False
def start(self):
"""Start the camera stream."""
if self.ready and self.renderer.camera:
try:
if hasattr(self.renderer.camera, "color_palette"):
if self.renderer.camera.color_palette != SeekCameraColorPalette.TYRIAN:
self.renderer.camera.color_palette = SeekCameraColorPalette.TYRIAN
self.renderer.camera.capture_session_start(SeekCameraFrameFormat.COLOR_ARGB8888)
self.is_start = True
self.running = True
self.thread = Thread(target=self._run_camera, daemon=True)
self.thread.start()
print(
"Camera stream started in a separate thread. Delaying for {} ms".format(
self.config_active["start_delay"]
)
)
time.sleep(self.config_active["start_delay"] / 1000.0)
except Exception as e:
print(f"Unhandled exception: {e}")
traceback.print_exc()
def stop(self):
"""Stop the camera stream."""
if self.is_start and self.renderer.camera:
try:
self.running = False
if self.thread:
self.thread.join()
self.renderer.camera.capture_session_stop()
self.is_start = False
print("Thermal camera stream stopped.")
except Exception as e:
print(f"Unhandled exception: {e}")
traceback.print_exc()
def close(self):
"""Release camera resources."""
if self.manager:
try:
del self.manager # Ensure the manager is properly released
print("SeekCameraManager released.")
self.is_init = False
self.is_start = False
self.running = False
except Exception as e:
print(f"Unhandled exception: {e}")
traceback.print_exc()
self.ready = False
def get_array(self, force=False):
"""Retrieve the most recent frame from the queue as a NumPy array."""
# Sleep if the frame interval has not passed, this is our way of throttling the frame rate
if not force and time.time() - self.last_frame_time < self.frame_interval:
# print(f"Sleeping for {self.frame_interval - (time.time() - self.last_frame_time)} seconds")
time.sleep(self.frame_interval - (time.time() - self.last_frame_time))
if self.frame_queue:
# print(f"Getting frame from queue")
frame = self.frame_queue[-1]
self.last_frame_time = time.time()
return np.array(frame.data)
self.dropped_frames += 1
if self.dropped_frames > 1 and self.dropped_frames % 10 == 0:
print(f"Dropped {self.dropped_frames} frames")
return None
def _run_camera(self):
"""Background thread to process frames."""
while self.running:
with self.renderer.frame_condition:
if self.renderer.frame_condition.wait(10 * self.frame_interval):
frame = self.renderer.frame
if frame is not None:
self.frame_queue.append(frame)
def _on_event(self, camera, event_type, event_status, renderer):
"""Callback for handling camera events."""
if event_type == SeekCameraManagerEvent.CONNECT:
if renderer.busy:
return
if self.serial and camera.chipid != self.serial:
print(f"Ignoring camera with serial: {camera.chipid}")
return
renderer.busy = True
renderer.camera = camera
renderer.first_frame = True
try:
camera.register_frame_available_callback(self._on_frame, renderer)
print("Frame callback registered.")
except Exception as e:
print(f"Failed to register frame callback: {e}")
print(f"Camera connected: {camera.chipid}")
elif event_type == SeekCameraManagerEvent.DISCONNECT:
if renderer.camera == camera:
camera.capture_session_stop()
renderer.camera = None
renderer.frame = None
renderer.busy = False
print("Camera disconnected.")
elif event_type == SeekCameraManagerEvent.ERROR:
print(f"Camera error: {event_status}")
def _on_frame(self, _camera, camera_frame, renderer):
"""Callback for handling new frames."""
if camera_frame.color_argb8888 is not None:
with renderer.frame_condition:
renderer.frame = camera_frame.color_argb8888
renderer.frame_condition.notify()
class Renderer:
"""Renderer class to manage camera and frame data."""
def __init__(self):
self.busy = False
self.frame = SeekFrame() # Match initialization with the original logic
self.camera = SeekCamera()
self.frame_condition = Condition()
self.first_frame = True
def demo0(duration=None):
seek_cam = SeekCameraS(serial=SERIAL)
seek_cam.initialize()
dropped_frames = 0
if seek_cam.ready:
seek_cam.start()
t0 = time.time()
print("Camera started. You can now call `get_array` independently.")
try:
while duration is None or time.time() - t0 < duration:
frame = seek_cam.get_array()
if frame is None:
dropped_frames += 1
if dropped_frames > 0 and dropped_frames % 100 == 0:
print(f"Dropped {dropped_frames} frames")
if dropped_frames > 1000:
print("Dropped too many frames, stopping")
break
else:
print(
"Captured frame at {} -- [{:4.3f}, {:4.3f}, {:4.3f}]".format(
datetime.now().isoformat(), frame.min(), frame.mean(), frame.max()
)
)
# time.sleep(0.1) # Simulate periodic access
except KeyboardInterrupt:
print("Keyboard interrupt detected.")
finally:
seek_cam.stop()
print("Camera stopped.")
return seek_cam
def demo1():
print("demo1; running for 5s, deleting seek_cam, then running for 5s again")
seek_cam = demo0(duration=5)
del seek_cam
seek_cam = demo0(duration=5)
if __name__ == "__main__":
demo1() # RUN THIS FOR BAD
Metadata
Metadata
Assignees
Labels
No labels