In [None]:
#| default_exp camera.test_fake_cam


# Camera Fake 
> Opencv and GST Fake cameras for testing 


https://mavlink.io/en/services/camera.html
https://github.com/mavlink/mavlink-camera-manager


In [None]:
#| hide
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
#| hide
# skip_showdoc: true to avoid running cells when rendering docs, and 
# skip_exec: true to skip this notebook when running tests. 
# this should be a raw cell 

In [None]:
#| export
import time, os, sys

from mavcom.logging import logging
from mavcom.mavlink.mavcom import MAVCom, time_since_boot_ms, time_UTC_usec, date_time_str
from mavcom.mavlink.component import Component, mavutil, mavlink, MAVLink

import cv2

from mavcom.utils.general import boot_time_str, With, find_config_dir, read_camera_dict_from_toml
from mavcom.camera.gst_cam import *

In [None]:
#| hide
from fastcore.utils import *
from nbdev.showdoc import *
from fastcore.test import *

In [None]:
# show_doc(create_toml_file)

In [None]:
import types


def get_all_methods(cls):
    methods = []
    for name in dir(cls):
        attr = getattr(cls, name)
        if isinstance(attr, types.FunctionType):
            methods.append(name)
    return methods

In [None]:
methods = get_all_methods(GSTCamera)
print (methods)

['__enter__', '__exit__', '__init__', '__repr__', '__str__', '_open', 'calculate_memory_usage', 'camera_capture_status_send', 'camera_image_captured_send', 'camera_information_send', 'camera_settings_send', 'close', 'get_camera_info', 'image_capture_thread_is_running', 'image_start_capture', 'image_stop_capture', 'list_files', 'load_image_from_memoryfs', 'on_capture_image', 'on_start_image_capture', 'on_status_video_capture', 'on_stop_image_capture', 'on_video_callback', 'pause', 'play', 'save_image_to_memoryfs', 'set_source_compenent', 'show_image', 'storage_information_send', 'time_UTC_usec', 'video_start_capture', 'video_start_streaming', 'video_stop_capture', 'video_stop_streaming']


In [None]:
for method in methods:
    s = f"GSTCamera.{method}"
    print (s)
    show_doc(s)

GSTCamera.__enter__
GSTCamera.__exit__
GSTCamera.__init__
GSTCamera.__repr__
GSTCamera.__str__
GSTCamera._open
GSTCamera.calculate_memory_usage
GSTCamera.camera_capture_status_send
GSTCamera.camera_image_captured_send
GSTCamera.camera_information_send
GSTCamera.camera_settings_send
GSTCamera.close
GSTCamera.get_camera_info
GSTCamera.image_capture_thread_is_running
GSTCamera.image_start_capture
GSTCamera.image_stop_capture
GSTCamera.list_files
GSTCamera.load_image_from_memoryfs
GSTCamera.on_capture_image
GSTCamera.on_start_image_capture
GSTCamera.on_status_video_capture
GSTCamera.on_stop_image_capture
GSTCamera.on_video_callback
GSTCamera.pause
GSTCamera.play
GSTCamera.save_image_to_memoryfs
GSTCamera.set_source_compenent
GSTCamera.show_image
GSTCamera.storage_information_send
GSTCamera.time_UTC_usec
GSTCamera.video_start_capture
GSTCamera.video_start_streaming
GSTCamera.video_stop_capture
GSTCamera.video_stop_streaming


In [None]:
show_doc(GSTCamera.save_image_to_memoryfs)

---

### GSTCamera.save_image_to_memoryfs

>      GSTCamera.save_image_to_memoryfs (data:bytes, filename:str)

Save image to memory filesystem.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| data | bytes | jpeg encoded image to save |
| filename | str | filename to save image |

In [None]:
import inspect
from nbdev.showdoc import show_doc

class Example:
    def method_a(self):
        "This is method A"
        pass
    
    def method_b(self):
        "This is method B"
        pass

# To show docs for all methods in a class
for name, func in inspect.getmembers(Example, inspect.isfunction):
    print(name)
    show_doc(func)


method_a
method_b


In [None]:
from nbdev.doclinks import NbdevLookup
from nbdev.showdoc import _fmt_sig


def _html_link(url, txt): return f'{txt}'

class BasicHtmlRenderer(ShowDocRenderer):
    "Simple HTML renderer for `show_doc`"
    def _repr_html_(self):
        doc = '\n'
        doc += f'{self.nm}\n'
        doc += f'{self.nm}{_fmt_sig(self.sig)}'
        if self.docs: doc += f"{self.docs}"
        return doc

    def doc(self):
        "Show `show_doc` info along with link to docs"
        from IPython.display import display,HTML
        res = self._repr_html_()
        docs = NbdevLookup().doc(self.fn)
        if docs is not None: res += '\n' +_html_link(docs, "Show in docs") + ''
        display(HTML(res))

In [None]:

def doc(elt):
    "Show `show_doc` info along with link to docs"
    BasicHtmlRenderer(elt).doc()

In [None]:
doc(GSTCamera.save_image_to_memoryfs)


In [None]:
show_doc(GSTCamera.save_image_to_memoryfs)

---

### GSTCamera.save_image_to_memoryfs

>      GSTCamera.save_image_to_memoryfs (data:bytes, filename:str)

Save image to memory filesystem.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| data | bytes | jpeg encoded image to save |
| filename | str | filename to save image |

In [None]:
# config_path = Path("../../config")
# create_toml_file(find_config_dir()/"____test_camera_info.toml")

In [None]:
# assert False, "stop here"

In [None]:
# assert False, "stop here"

In [None]:

from gstreamer import GstVidSrcValve,  GstVideoSave, GstJpegEnc
import gstreamer.utils as gst_utils

In [None]:
show_doc(GSTCamera)

---

### GSTCamera

>      GSTCamera (camera_dict=None, udp_encoder='H264', loglevel=20)

Create a fake camera component for testing using GStreamer

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| camera_dict | NoneType | None | camera_info dict |
| udp_encoder | str | H264 | encoder for video streaming |
| loglevel | int | 20 | log flag |

In [None]:
show_doc(GSTCamera.image_start_capture)

---

### GSTCamera.image_start_capture

>      GSTCamera.image_start_capture (interval, count)

Start image capture sequence.

|    | **Details** |
| -- | ----------- |
| interval | Image capture interval |
| count | Number of images to capture (0 for unlimited) |

In [None]:
show_doc(GSTCamera.camera_information_send)

---

### BaseCamera.camera_information_send

>      BaseCamera.camera_information_send ()

Information about a camera. Can be requested with a
MAV_CMD_REQUEST_MESSAGE command.

In [None]:
show_doc(GSTCamera.camera_settings_send)

---

### CV2Camera.camera_settings_send

>      CV2Camera.camera_settings_send ()

Information about a camera. Can be requested with a
MAV_CMD_REQUEST_MESSAGE command.

In [None]:
show_doc(read_camera_dict_from_toml)

---

### read_camera_dict_from_toml

>      read_camera_dict_from_toml (toml_file_path)

Read MAVLink camera info from a TOML file.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| toml_file_path |  | path to TOML file |
| **Returns** | **dict** | **camera_info dict** |

In [None]:
# with GstContext(loglevel=LogLevels.CRITICAL):  # GST main loop in thread
#     with GstPipeline(command_h264_display, loglevel=LogLevels.INFO) as disp_pipeline:
#         with GstPipeline(command_src, loglevel=10) as src_pipeline:
#             with GstJpegEnc(command_jpg, max_count=5, on_jpeg_capture=on_capture, loglevel=LogLevels.INFO) as jpg_pipeline:
#                 while not jpg_pipeline.is_done:
#                     time.sleep(.1)
# 
#             with GstStreamUDP(command_udp, on_callback=on_video_callback, loglevel=LogLevels.INFO) as udp_pipeline:
#                 time.sleep(5)

In [None]:
print (f"{boot_time_str =}")
# connection_string = 'udp:127.0.0.1:14550'
# mav = mavutil.mavlink_connection(connection_string)


with  GSTCamera( camera_dict=read_camera_dict_from_toml(find_config_dir()/"test_camera_info.toml")) as cam_gst_1:
    cam_gst_1.image_start_capture(interval=.1, count=5)
    time.sleep(1)
    # while cam_gst_1.pipeline.is_done is False:
    #     # if cam_gst_1.last_image is not None:
    #     time.sleep(0.1)
            # cv2.imshow('image', cam_gst_1.last_image)
            # cam_gst_1.last_image = None
            # cv2.waitKey(10)
            
    # cv2.waitKey(500)
    # cv2.destroyAllWindows()


[32mINFO   | mavcom.GSTCamera   | 29.670 | gst_cam.py:350 | MainThread         | GSTCamera Started[0m
[32mINFO   | pygst.GstPipeli | 29.673 | gst_tools.py:223 | MainThread         | Starting GstPipeline: videotestsrc pattern=ball is-live=true ! capsfilter caps=video/x-raw,format=RGB,width=640,height=480,framerate=10/1 ! tee name=t t. ! queue ! videoconvert ! autovideosink t. ! queue leaky=2 ! intervideosink channel=channel_0  sync=false t. ! queue leaky=2 ! intervideosink channel=channel_1  sync=false t. ! queue leaky=2 ! intervideosink channel=channel_2  sync=false[0m
[32mINFO   | pygst.GstJpegEn | 29.686 | gst_tools.py:223 | MainThread         | Starting GstJpegEnc: intervideosrc channel=channel_1   ! videoconvert ! videoscale ! video/x-raw,width=640,height=480,framerate=10/1 ! queue ! jpegenc quality=85 ! appsink name=mysink emit-signals=True max-buffers=1 drop=True[0m
[32mINFO   | mavcom.GSTCamera   | 29.710 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory files

boot_time_str ='2023-10-21|10:29:09'
John Doe                        


[32mINFO   | mavcom.GSTCamera   | 29.910 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory filesystem with name: 2023-10-21|10:37:29_0002.jpg[0m
[32mINFO   | mavcom.GSTCamera   | 30.011 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory filesystem with name: 2023-10-21|10:37:30_0003.jpg[0m
[32mINFO   | mavcom.GSTCamera   | 30.110 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory filesystem with name: 2023-10-21|10:37:30_0004.jpg[0m
[32mINFO   | pygst.GstJpegEn | 30.111 | gst_tools.py:884 | Thread-13 (_launch | Sending EOS event, to trigger shutdown of pipeline[0m
[32mINFO   | pygst.GstPipeli | 30.829 | gst_tools.py:306 | MainThread         | GstPipeline Shutdown[0m
[32mINFO   | mavcom.GSTCamera   | 30.830 | gst_cam.py:518 | MainThread         | GSTCamera closed[0m


In [None]:
#| hide
# from nbdev import nbdev_export
# nbdev_export()