Skip to content

Commit

Permalink
Support metadata in the KVS source adapter (#737)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomskikh committed Apr 19, 2024
1 parent 33875e7 commit c8916dd
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 3 deletions.
15 changes: 15 additions & 0 deletions adapters/gst/gst_plugins/python/zeromq_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from savant.api.enums import ExternalFrameType
from savant.gstreamer import GObject, Gst, GstBase
from savant.gstreamer.codecs import CODEC_BY_CAPS_NAME, Codec
from savant.gstreamer.event import parse_savant_frame_tags_event
from savant.gstreamer.utils import (
gst_post_library_settings_error,
gst_post_stream_failed_error,
Expand Down Expand Up @@ -232,6 +233,7 @@ def __init__(self):
self.writer: BlockingWriter = None
self.receive_timeout = Defaults.SENDER_RECEIVE_TIMEOUT
self.receive_retries = Defaults.RECEIVE_RETRIES
self.savant_frame_tags: Dict[str, str] = {}
self.set_sync(False)

def do_get_property(self, prop):
Expand Down Expand Up @@ -496,6 +498,11 @@ def do_event(self, event: Gst.Event):
)
self.frame_num = 0

elif event.type == Gst.EventType.CUSTOM_DOWNSTREAM:
tags = parse_savant_frame_tags_event(event)
if tags is not None:
self.savant_frame_tags = tags

# Cannot use `super()` since it is `self`
return GstBase.BaseSink.do_event(self, event)

Expand Down Expand Up @@ -537,6 +544,14 @@ def build_video_frame(
name='location',
values=[AttributeValue.string(str(self.location))],
)
for tag_name, tag_value in self.savant_frame_tags.items():
if tag_name == 'location':
continue
video_frame.set_persistent_attribute(
namespace=DEFAULT_NAMESPACE,
name=tag_name,
values=[AttributeValue.string(tag_value)],
)

return video_frame

Expand Down
34 changes: 33 additions & 1 deletion adapters/gst/sources/kvs/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from collections import deque
from queue import Empty, Queue
from typing import Optional

from adapters.gst.sinks.video_files import GstPipelineRunner
from adapters.shared.thread import BaseThreadWorker
from savant.gstreamer import Gst, GstApp
from savant.gstreamer.codecs import CODEC_BY_CAPS_NAME, Codec
from savant.gstreamer.event import build_savant_frame_tags_event
from savant.gstreamer.utils import on_pad_event

from . import LOGGER_PREFIX
Expand Down Expand Up @@ -41,6 +43,7 @@ def __init__(
self.runner: Optional[GstPipelineRunner] = None
self.first_ts: Optional[float] = None
self.last_ts: Optional[float] = None
self.fragment_uuids = deque()

def workload(self):
"""Create and run the pipeline."""
Expand Down Expand Up @@ -83,9 +86,19 @@ def process_fragment(self):
return

self.logger.debug(
'Processing fragment %r with %d bytes',
'Processing fragment %r with %d bytes. Timestamp: %s. First frame UUID: %s. Last frame UUID: %s.',
fragment.fragment_number,
len(fragment.content),
fragment.timestamp,
fragment.first_frame_uuid,
fragment.last_frame_uuid,
)
self.fragment_uuids.append(
(
int(fragment.timestamp * 1000) * Gst.MSECOND,
fragment.first_frame_uuid,
fragment.last_frame_uuid,
)
)
self.appsrc.emit('push-buffer', Gst.Buffer.new_wrapped(fragment.content))
self.queue.task_done()
Expand Down Expand Up @@ -206,6 +219,7 @@ def add_sink(self, pad: Gst.Pad, caps: Gst.Caps):

parser_pad: Gst.Pad = parser.get_static_pad('sink')
self.logger.debug('Linking %r to %r', pad.get_name(), parser_pad.get_name())
parser_pad.add_probe(Gst.PadProbeType.BUFFER, self.on_parser_src_buffer)

for element in gst_elements:
element.sync_state_with_parent()
Expand All @@ -219,3 +233,21 @@ def build_stream_name_event(self) -> Gst.Event:
tag_list: Gst.TagList = Gst.TagList.new_empty()
tag_list.add_value(Gst.TagMergeMode.APPEND, Gst.TAG_LOCATION, self.stream.name)
return Gst.Event.new_tag(tag_list)

def on_parser_src_buffer(self, pad: Gst.Pad, info: Gst.PadProbeInfo):
buffer: Gst.Buffer = info.get_buffer()
self.logger.debug('Processing buffer %s from parser', buffer.pts)
if not self.fragment_uuids:
return Gst.PadProbeReturn.OK
if buffer.pts < self.fragment_uuids[0][0]:
return Gst.PadProbeReturn.OK
ts, first_uuid, last_uuid = self.fragment_uuids.popleft()

tags = {
'first-frame-uuid': first_uuid,
'last-frame-uuid': last_uuid,
}
self.logger.debug('Sending frame tags %s', tags)
pad.send_event(build_savant_frame_tags_event(tags))

return Gst.PadProbeReturn.OK
14 changes: 13 additions & 1 deletion adapters/gst/sources/kvs/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class Fragment:
fragment_number: str
timestamp: float
content: bytearray
first_frame_uuid: Optional[str] = None
last_frame_uuid: Optional[str] = None


class FragmentsPoller(BaseThreadWorker):
Expand Down Expand Up @@ -133,8 +135,12 @@ def on_fragment_arrived(
fragment_receive_duration,
)
fragment_tags = self.fragment_processor.get_fragment_tags(fragment_dom)
self.logger.debug('Fragment tags: %s', fragment_tags)
fragment_number = fragment_tags.get('AWS_KINESISVIDEO_FRAGMENT_NUMBER')
timestamp = float(fragment_tags.get('AWS_KINESISVIDEO_PRODUCER_TIMESTAMP'))
first_frame_uuid = fragment_tags.get('first-frame-uuid')
last_frame_uuid = fragment_tags.get('last-frame-uuid')

self.logger.debug(
'Processing fragment %r with timestamp %s from stream %r.',
fragment_number,
Expand All @@ -144,7 +150,13 @@ def on_fragment_arrived(
while self.is_running:
try:
self.queue.put(
Fragment(fragment_number, timestamp, fragment_bytes),
Fragment(
fragment_number=fragment_number,
timestamp=timestamp,
content=fragment_bytes,
first_frame_uuid=first_frame_uuid,
last_frame_uuid=last_frame_uuid,
),
timeout=self.queue_put_timeout,
)
self.last_fragment_number = fragment_number
Expand Down
42 changes: 41 additions & 1 deletion savant/gstreamer/event.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import Optional
from typing import Dict, Optional

from savant.gstreamer import Gst

SAVANT_EOS_EVENT_NAME = 'savant-eos'
SAVANT_EOS_EVENT_SOURCE_ID_PROPERTY = 'source-id'

SAVANT_FRAME_TAGS_EVENT_NAME = 'savant-frame-tags'


def build_savant_eos_event(source_id: str):
"""Build a savant-eos event.
Expand Down Expand Up @@ -33,3 +35,41 @@ def parse_savant_eos_event(event: Gst.Event) -> Optional[str]:
return None

return struct.get_string(SAVANT_EOS_EVENT_SOURCE_ID_PROPERTY)


def build_savant_frame_tags_event(tags: Dict[str, str]):
"""Build a savant-frame-tags event.
:param tags: Tags to set.
:returns: The savant-frame-tags event.
"""

structure: Gst.Structure = Gst.Structure.new_empty(SAVANT_FRAME_TAGS_EVENT_NAME)
for name, value in tags.items():
structure.set_value(name, value)

return Gst.Event.new_custom(Gst.EventType.CUSTOM_DOWNSTREAM, structure)


def parse_savant_frame_tags_event(event: Gst.Event) -> Optional[Dict[str, str]]:
"""Parse a savant-frame-tags event.
:param event: The event to parse.
:returns: Tags if the event is a savant-frame-tags event, otherwise None.
"""

if event.type != Gst.EventType.CUSTOM_DOWNSTREAM:
return None

struct: Gst.Structure = event.get_structure()
if not struct.has_name(SAVANT_FRAME_TAGS_EVENT_NAME):
return None

tags = {}
for i in range(struct.n_fields()):
name = struct.nth_field_name(i)
value = struct.get_string(name)
if value:
tags[name] = value

return tags

0 comments on commit c8916dd

Please sign in to comment.