In [None]:
#| default_exp camera.test_fake_cam


# Camera Fake 
> Opencv and GST Fake cameras for testing 


https://mavlink.io/en/services/camera.html
https://github.com/mavlink/mavlink-camera-manager


In [None]:
#| hide
%load_ext autoreload
%autoreload 2

In [None]:
#| hide
# skip_showdoc: true to avoid running cells when rendering docs, and 
# skip_exec: true to skip this notebook when running tests. 
# this should be a raw cell 

In [None]:
#| export
import time, os, sys

from UAV.logging import logging
from UAV.mavlink.mavcom import MAVCom, time_since_boot_ms, time_UTC_usec, boot_time_str, date_time_str
from UAV.mavlink.component import Component, mavutil, mavlink, MAVLink

import threading
import cv2
import numpy as np
# try:
#     # https://hackernoon.com/how-to-manage-configurations-easily-using-toml-files
#     import tomllib   # Python 3.11+
# except ModuleNotFoundError:
#     import tomli as tomllib
# import tomli_w
import toml

# from UAV.imports import *   # TODO why is this relative import on nbdev_export?
from fs.memoryfs import MemoryFS
from dataclasses import dataclass

from UAV.camera.fake_cam import *

In [None]:
#| hide
from fastcore.utils import *
from nbdev.showdoc import *
from fastcore.test import *

In [None]:
show_doc(create_toml_file)

---

### create_toml_file

>      create_toml_file (filename)

Create a TOML file for testing.

In [None]:
config_path = Path("../../config")
create_toml_file(config_path/"____test_camera_info.toml")

TypeError: a bytes-like object is required, not 'str'

In [None]:
# assert False, "stop here"

In [None]:
#| export

def read_camera_dict_from_toml(toml_file_path # path to TOML file
                               )->dict: # camera_info dict
    """Read MAVLink camera info from a TOML file."""
    camera_dict = toml.load(toml_file_path)
    return camera_dict

@dataclass
class CameraCaptureStatus:
    time_boot_ms: int = 0
    image_status: int = 0
    video_status: int = 0
    image_interval: int = 0
    recording_time_ms: int = 0
    available_capacity: int = 0
    image_count: int = 0

    def __str__(self) -> str:
        return self.__class__.__name__

    def __repr__(self) -> str:
        return "<{}>".format(self)

    # def update(self, image_status, video_status, image_interval, recording_time_ms, available_capacity, image_count):
    #     self.time_boot_ms = time_since_boot_ms()
    #     self.image_status = image_status
    #     self.video_status = video_status
    #     self.image_interval = image_interval
    #     self.recording_time_ms = recording_time_ms
    #     self.available_capacity = available_capacity
    #     self.image_count = image_count


In [None]:
#| export

class BaseCamera:
    def __init__(self,
                 camera_dict=None,  # camera_info dict
                 debug=False):  # debug log flag
        self.mav: MAVLink = None
        camera_dict = {'camera_info':{
                'vendor_name': 'UAV',
                'model_name': 'FakeCamera',
                'firmware_version': 1,
                'focal_length': 2.8,
                'sensor_size_h': 3.2,
                'sensor_size_v': 2.4,
                'resolution_h': 640,
                'resolution_v': 480,
                'lens_id': 0,
                'flags': 0,
                'cam_definition_version': 1,
                'cam_definition_uri': '',
                }}

        self.camera_info = self.get_camera_info(camera_dict)


    def get_camera_info(self, camera_dict):
        """get  MAVLink camera info from a TOML dict."""
        def make_length_32(s: str) -> str:
            if len(s) > 32:
                return s[:32]
            return s.ljust(32)  # pad with spaces to the right to make length 32

        camera_info = camera_dict['camera_info']
        # vender name and model name must be 32 bytes long
        camera_info['vendor_name'] = make_length_32(camera_info['vendor_name'])
        camera_info['model_name'] = make_length_32(camera_info['model_name'])
        camera_info['vendor_name'] = [int(b) for b in camera_info['vendor_name'].encode()]
        camera_info['model_name'] = [int(b) for b in camera_info['model_name'].encode()]

        return camera_info

    def camera_information_send(self):
        """ Information about a camera. Can be requested with a
            MAV_CMD_REQUEST_MESSAGE command."""
        # https://mavlink.io/en/messages/common.html#MAV_CMD_REQUEST_CAMERA_INFORMATION
        self.mav.camera_information_send(time_since_boot_ms(),  # time_boot_ms
                                            self.camera_info['vendor_name'],         # vendor name
                                            self.camera_info['model_name'],          # model name
                                            self.camera_info['firmware_version'],    # firmware version
                                            self.camera_info['focal_length'],        # focal length
                                            self.camera_info['sensor_size_h'],       # sensor size h
                                            self.camera_info['sensor_size_v'],       # sensor size v
                                            self.camera_info['resolution_h'],        # resolution h
                                            self.camera_info['resolution_v'],        # resolution v
                                            self.camera_info['lens_id'],             # lend_id
                                            self.camera_info['flags'],               # flags
                                            self.camera_info['cam_definition_version'],          # cam definition version
                                            bytes(self.camera_info['cam_definition_uri'], 'utf-8'), # cam definition uri
                                         )
    def close(self) :
        pass


class CaptureThread():
    """Managed the Capture of images or video in a separate thread."""
    def __init__(self, interval=1, max_count=1, on_timer=None, on_stop= None):
        self._thread = None
        self._stop_event = threading.Event()
        self.interval = interval
        self.max_count = sys.maxsize if max_count is None else max_count
        self.on_timer = on_timer
        self.on_stop = on_stop

    def _run(self):
        current_img_cnt = 0
        while not self._stop_event.is_set():
            if self.on_timer is not None:
                self.on_timer(data=None)
            if current_img_cnt >= self.max_count:  # quit the thread
                self._stop_event.set()
                break
            current_img_cnt += 1
            time.sleep(self.interval)
            print("%%%%%%%%%%%%%%%%")

        if self.on_stop is not None:
            self.on_stop()


    def start(self):
        if self._thread is None or not self._thread.is_alive():
            self._stop_event.clear()
            self._thread = threading.Thread(target=self._run)
            self._thread.start()
        else:
            print("Thread is already running.")

    def stop(self):
        if self._thread and self._thread.is_alive():
            self._stop_event.set()
            self._thread.join()

    def is_running(self):
        return self._thread.is_alive() if self._thread else False


class CV2Camera(BaseCamera):
    """Create a fake camera component for testing"""
    def __init__(self, mav=None, # MAVLink connection
                 camera_dict=None, # camera_info dict
                 debug=False): # debug log flag
        super().__init__(mav, camera_dict)
        self.mav:MAVLink = mav
        if camera_dict is not None:
            self.camera_info = self.get_camera_info(camera_dict)   # camera_info dict
        else:
            self.camera_info = None

        assert self.camera_info is not None and len(self.camera_info) > 0, "camera_info is empty"

        self.camera_capture_status = CameraCaptureStatus()
        # self.interval = 1
        # self.max_count = 0
        self.current_img_cnt = 0
        self._image_capture_thread = None
        self._video_capture_thread = None

        # self.image_filename = ""
        self._log = logging.getLogger("uav.{}".format(self.__class__.__name__))
        self._log.setLevel(logging.DEBUG if debug else logging.INFO)

        self.last_image = None
        self.mem_fs = MemoryFS()
        self.fs_size = 100000000  # 100MB

        self.log.info(f"{self.__class__.__name__} Started")
        # todo add settings file
        # read parameters from settings file  # todo add settings file

    def __str__(self) -> str:
        return self.__class__.__name__

    def __repr__(self) -> str:
        return "<{}>".format(self)

    @property
    def log(self) -> logging.Logger:
        return self._log



    def save_image_to_memoryfs(self, img, filename):
        """Save image to memory filesystem."""
        # Convert OpenCV image to JPEG byte stream
        success, buffer = cv2.imencode(".jpg", img)
        if not success:
            raise ValueError("Failed to encode image")

        # Write to PyFilesystem's Memory Filesystem
        with self.mem_fs.open(filename, "wb") as f:
            f.write(buffer.tobytes())

        print(f"Image saved to memory filesystem with name: {filename}")
        # return mem_fs

    def calculate_memory_usage(self):
        """Calculate total memory used by the MemoryFS."""
        total_memory = 0
        for path in self.mem_fs.walk.files():
            with self.mem_fs.open(path, "rb") as f:
                total_memory += len(f.read())
        return total_memory


    def camera_settings_send(self):
        """ Information about a camera. Can be requested with a
            MAV_CMD_REQUEST_MESSAGE command."""
        # https://mavlink.io/en/messages/common.html#MAV_CMD_REQUEST_CAMERA_SETTINGS
        self.mav.camera_settings_send(time_since_boot_ms(),  # time_boot_ms
                                            0,   # mode_id (int)
                                            0,    # zoomLevel (float)
                                            0,    # focusLevel (float)
                                         )

    def storage_information_send(self):
        """ Information about a camera. Can be requested with a
            MAV_CMD_REQUEST_MESSAGE command."""
        # https://mavlink.io/en/messages/common.html#MAV_CMD_REQUEST_STORAGE_INFORMATION
        self.mav.storage_information_send(time_since_boot_ms(),  # time_boot_ms
                                            0,   # storage_id
                                            1,    # storage_count
                                            0,    # status
                                            self.fs_size,    # total_capacity
                                            self.calculate_memory_usage(),    # used_capacity
                                            self.fs_size-self.calculate_memory_usage(),    # available_capacity
                                            0,    # read_speed
                                            0,    # write_speed
                                         )

    def camera_capture_status_send(self):
        """ Information about a camera. Can be requested with a
            MAV_CMD_REQUEST_MESSAGE command."""
        # https://mavlink.io/en/messages/common.html#MAV_CMD_REQUEST_CAMERA_CAPTURE_STATUS
        ccs = self.camera_capture_status
        self.mav.camera_capture_status_send(time_since_boot_ms(),  # time_boot_ms
                                            ccs.image_status,   # image_status
                                            ccs.video_status,    # video_status
                                            ccs.image_interval,    # image_interval
                                            ccs.recording_time_ms,    # recording_time_ms
                                            ccs.video_status,    # available_capacity
                                            ccs.image_count,    # image_count
                                         )

    def camera_image_captured_send(self):
        if self.mav is not None:
            self.mav.camera_image_captured_send(time_since_boot_ms(),  # time_boot_ms
                                                time_UTC_usec(),  # time_utc
                                                0,  # camera_id
                                                0,  # lat
                                                0,  # lon
                                                0,  # alt
                                                0,  # relative_alt
                                                [0, 0, 0, 0],  # q
                                                self.camera_capture_status.image_count,  # image_index
                                                1,  # capture_result
                                                bytes(self.image_filename, 'utf-8'),  # file_url
                                                )

    def image_start_capture(self, interval, # Image capture interval
                            count, # Number of images to capture (0 for unlimited)
                            ):
        """Start image capture sequence."""
        # https://mavlink.io/en/messages/common.html#MAV_CMD_IMAGE_START_CAPTURE
        # self.interval = interval
        self.camera_capture_status.image_status = 1
        self.camera_capture_status.image_interval = interval
        self.max_count = count
        self._image_capture_thread = CaptureThread(interval=interval, max_count=count, on_timer=self.on_capture_image, on_stop= self.on_stop_image_capture)
        self._image_capture_thread.start()
        self.on_start_image_capture()


    def on_capture_image(self, data):
        """Call back function for Get next image from camera. Simulate an image capture using OpenCV"""
        self.image_filename = f"{date_time_str()}_{self.camera_capture_status.image_count:04d}.jpg"
        image = np.zeros((512, 512, 3), dtype=np.uint8)
        cv2.putText(image, "Fake Image", (50, 256), cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 2)
        self.save_image_to_memoryfs(image, self.image_filename)
        self.camera_capture_status.image_count += 1
        self.camera_image_captured_send()
        self.last_image = image

    def on_start_image_capture(self):
        """Call back function when image capture thread is started."""
        pass


    def on_stop_image_capture(self):
        """Call back function when image capture thread is stopped."""
        pass

    def image_stop_capture(self):
        """Stop image capture sequence."""
        # https://mavlink.io/en/messages/common.html#MAV_CMD_IMAGE_STOP_CAPTURE
        self.camera_capture_status.image_status = 0
        self._image_capture_thread.stop()


    def time_UTC_usec(self):
        return int(time.time() * 1e6)


    def image_capture_thread_is_running(self):
        return self._image_capture_thread.is_running()


    def close(self):
        if self._image_capture_thread is not None:
            self._image_capture_thread.stop()
        if self._video_capture_thread is not None:
            self._video_capture_thread.stop()

        self.log.info(f"{self.__class__.__name__} closed")

    def __enter__(self):
        """ Context manager entry point for with statement."""
        return self  # This value is assigned to the variable after 'as' in the 'with' statement

    def __exit__(self, exc_type, exc_value, traceback):
        """Context manager exit point."""
        self.close()
        return False  # re-raise any exceptions



In [None]:
#| export
from gstreamer import GstVidSrcValve,  GstVideoSave, GstJpegEnc
import gstreamer.utils as gst_utils

In [None]:
#| export
class GSTCamera(CV2Camera):

    def __init__(self, mav=None,  # MAVLink connection
                 camera_dict=None,  # camera_info dict
                 debug=False):  # debug log flag
        super().__init__(mav, camera_dict, debug)

        self.debug = debug

        pipeline = gst_utils.to_gst_string(camera_dict['gstreamer']['pipeline'])
        self.pipeline = GstVidSrcValve(pipeline, leaky=True, debug=debug)
        self.pipeline.startup()
        self.last_image = None
        pass

    def save_image_to_memoryfs(self, data, filename):
        """Save image to memory filesystem."""
        with self.mem_fs.open(filename, "wb") as f:
            f.write(data) # Write to PyFilesystem's Memory Filesystem
        print(f"Image saved to memory filesystem with name: {filename}")


    def on_capture_image(self, data):
        """Call back function from the CaptureThread (images). Gets the next image from camera using GStreamer."""
        self.image_filename = f"{date_time_str()}_{self.camera_capture_status.image_count:04d}.jpg"
        self.save_image_to_memoryfs(data, self.image_filename)
        self.last_image = data
        self.camera_capture_status.image_count += 1
        self.camera_image_captured_send()


    def image_start_capture(self, interval, # Image capture interval
                            count, # Number of images to capture (0 for unlimited)
                            ):
        """Start image capture sequence."""
        # https://mavlink.io/en/messages/common.html#MAV_CMD_IMAGE_START_CAPTURE
        # self.interval = interval
        self.camera_capture_status.image_status = 1
        self.camera_capture_status.image_interval = interval
        self.max_count = count

        command = gst_utils.to_gst_string([
            'intervideosrc channel=channel_1  ',
            # 'videotestsrc pattern=ball num-buffers={num_buffers}',
            'videoconvert ! videoscale ! video/x-raw,width={width},height={height},framerate={fps}/1',
            'jpegenc quality={quality}',  # Quality of encoding, default is 85
            # "queue",
            'appsink name=mysink emit-signals=True max-buffers=1 drop=True',
        ])
        MAX_FPS = 10
        interval = 1/MAX_FPS if interval < 1/MAX_FPS else interval
        fps = int(1/interval)
        command = gst_utils.fstringify(command, quality=85, num_buffers=100, width=640, height=480, fps=fps)
        self._gst_image_save = GstJpegEnc(command, max_count=5,
                                          on_jpeg_capture=self.on_capture_image,
                                          debug=self.debug).startup()

    def image_stop_capture(self):
        """Stop image capture sequence."""
        # https://mavlink.io/en/messages/common.html#MAV_CMD_IMAGE_STOP_CAPTURE
        self.camera_capture_status.image_status = 0
        try:
            self._gst_image_save.stop()
        except:
            pass

    def video_start_capture(self, stream_id, # Stream ID (0 for all streams)
                            frequency): # Frequency CAMERA_CAPTURE_STATUS messages sent (0 for no messages, otherwise frequency)
        """Start video capture sequence."""
        # https://mavlink.io/en/messages/common.html#MAV_CMD_VIDEO_START_CAPTURE
        self.camera_capture_status.video_status = 1
        interval = None if frequency == 0 else max(1/(frequency+0.000000001), 1) # reporting interval in seconds
        i= 1
        self._gst_vid_save = GstVideoSave(f'file{i:03d}.mp4', 1280, 720, status_interval=interval, on_status_video_capture=self.on_status_video_capture, debug=self.debug ).startup()


    def on_status_video_capture(self):
        """Call back function when video capture thread ontimer"""
        self.log.info(f'camera_capture_status_send')
        self.camera_capture_status_send()
        pass

    def video_stop_capture(self):
        """Stop video capture sequence."""
        # https://mavlink.io/en/messages/common.html#MAV_CMD_VIDEO_STOP_CAPTURE
        self.camera_capture_status.video_status = 0
        self._gst_vid_save.stop()
        # self.pipeline.set_valve_state("video_valve", False)

    def video_start_streaming(self, stream_id): # Stream ID (0 for all streams
        """Start video streaming."""
        # https://mavlink.io/en/messages/common.html#MAV_CMD_VIDEO_START_STREAMING

        self.pipeline.set_valve_state("stream_valve", False)

    def video_stop_streaming(self, stream_id): # Stream ID (0 for all streams
        """Stop video streaming."""
        # https://mavlink.io/en/messages/common.html#MAV_CMD_VIDEO_STOP_STREAMING
        self.pipeline.set_valve_state("stream_valve", True)


    def close(self):
        self.pipeline.shutdown(eos=True) # send EOS event to all sinks
        super().close()


In [None]:
show_doc(GSTCamera)

In [None]:
show_doc(GSTCamera.image_start_capture)

In [None]:
show_doc(GSTCamera.camera_information_send)

In [None]:
show_doc(GSTCamera.camera_settings_send)

In [None]:
show_doc(read_camera_dict_from_toml)

In [None]:
print (f"{boot_time_str =}")
# connection_string = 'udp:127.0.0.1:14550'
# mav = mavutil.mavlink_connection(connection_string)

config_path = Path("../../config")
with  GSTCamera( camera_dict=read_camera_dict_from_toml(config_path/"test_camera_info.toml")) as cam_gst_1:
    cam_gst_1.image_start_capture(0.1, 5)
    while cam_gst_1.capture_thread.is_alive():
        if cam_gst_1.last_image is not None:
            cv2.imshow('image', cam_gst_1.last_image)
            cam_gst_1.last_image = None
            cv2.waitKey(10)
            
    cv2.waitKey(500)
    cv2.destroyAllWindows()


In [None]:
def gstreamerCamera(camera_name):
    """
    Set up streaming pipeline for Gstreamer camera.
    
    """
    return True

In [None]:
def airsimCamera(camera_name):
    """
    Set up streaming pipeline for Airsim camera
    """
    return True


In [None]:
def videoCamera(camera_name):
    """
    Set up streaming pipeline for Video camera
    """
    return True


In [None]:
def videoCamera(camera_name):
    """
    Set up streaming pipeline for Video camera
    """
    return True


In [None]:
#| export
def airsimCamera(camera_name):
    """
    Set up streaming pipeline for Airsim camera
    """
    return True


In [None]:
#| export
def videoCamera(camera_name):
    """
    Set up streaming pipeline for Video camera
    """
    return True


In [None]:
#| export
def videoCamera(camera_name):
    """
    Set up streaming pipeline for Video camera
    """
    return True


In [None]:
#| export
def videoCamera(camera_name):
    """
    Set up streaming pipeline for Video camera
    """
    return True


In [None]:
test_eq(1,1)

In [None]:
#| hide
# from nbdev import nbdev_export
# nbdev_export()