Skip to content

Commit

Permalink
Support metadata in the KVS Sink adapter (#735)
Browse files Browse the repository at this point in the history
* #714 add frame UUIDs to fragment metadata
* #714 make deepstream in gstsavantframemeta optional
* #714 add gstsavantframemeta to gstreamer adapters
* #714 add frame UUIDs to fragment metadata in KVS
  • Loading branch information
tomskikh committed Apr 18, 2024
1 parent d5463af commit 33875e7
Show file tree
Hide file tree
Showing 20 changed files with 563 additions and 337 deletions.
73 changes: 53 additions & 20 deletions adapters/gst/sinks/multistream_kvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@
from fractions import Fraction
from typing import Dict, Optional

from pygstsavantframemeta import (
gst_buffer_add_savant_frame_meta,
gst_buffer_get_savant_frame_meta,
)
from pykvssdk import KvsWrapper, configure_logging
from savant_rs.pipeline2 import (
StageFunction,
VideoPipeline,
VideoPipelineConfiguration,
VideoPipelineStagePayloadType,
)
from savant_rs.primitives import EndOfStream, VideoFrame

from adapters.python.sinks.chunk_writer import ChunkWriter
Expand Down Expand Up @@ -60,11 +70,6 @@ class FpsMeterConfig:
def __init__(self):
self.period_seconds = opt_config('FPS_PERIOD_SECONDS', None, float)
self.period_frames = opt_config('FPS_PERIOD_FRAMES', 1000, int)
self.output = opt_config('FPS_OUTPUT', 'stdout')
assert self.output in [
'stdout',
'logger',
], 'FPS_OUTPUT must be "stdout" or "logger"'


class Config:
Expand All @@ -84,11 +89,13 @@ def __init__(
source_id: str,
kvs_name: str,
frame_params: FrameParams,
video_pipeline: VideoPipeline,
config: Config,
):
self.source_id = source_id
self.kvs_name = kvs_name
self.frame_params = frame_params
self.video_pipeline = video_pipeline
self.config = config

self.kvs: KvsWrapper = KvsWrapper(
Expand Down Expand Up @@ -143,6 +150,18 @@ def _on_buffer(self, sink: GstApp.AppSink) -> Gst.FlowReturn:
self.stream_started = True

buffer: Gst.Buffer = sample.get_buffer()
savant_frame_meta = gst_buffer_get_savant_frame_meta(buffer)
if savant_frame_meta is None:
self.logger.warning(
'No Savant Frame Metadata found on buffer with PTS %s, skipping.',
buffer.pts,
)
return Gst.FlowReturn.OK
video_frame, _ = self.video_pipeline.get_independent_frame(
savant_frame_meta.idx
)
self.video_pipeline.delete(savant_frame_meta.idx)

frame_data: bytes = buffer.extract_dup(0, buffer.get_size())
self.logger.debug(
'Sending frame with pts=%s to %s',
Expand All @@ -152,11 +171,12 @@ def _on_buffer(self, sink: GstApp.AppSink) -> Gst.FlowReturn:
self.kvs.put_frame(
frame_data,
len(frame_data),
next(self.frame_idx_gen),
savant_frame_meta.idx,
buffer.pts,
buffer.dts,
buffer.duration,
not buffer.has_flags(Gst.BufferFlags.DELTA_UNIT),
video_frame.uuid,
)

return Gst.FlowReturn.OK
Expand Down Expand Up @@ -197,6 +217,8 @@ def _write_video_frame(
buffer.duration = frame.duration
if not frame.keyframe:
buffer.set_flags(Gst.BufferFlags.DELTA_UNIT)
frame_idx = self.video_pipeline.add_frame('kvs-sink', frame)
gst_buffer_add_savant_frame_meta(buffer, frame_idx)

ret = self.appsrc.push_buffer(buffer)
self.logger.debug(
Expand All @@ -221,25 +243,12 @@ def _open(self):
)
time.sleep(0.1)

if self.config.fps_meter.period_seconds:
fps_period = f'period-seconds={self.config.fps_meter.period_seconds}'
elif self.config.fps_meter.period_frames:
fps_period = f'period-frames={self.config.fps_meter.period_frames}'
else:
fps_period = None

elements = [
'appsrc name=appsrc emit-signals=false format=time max-buffers=1 block=true',
f'{self.frame_params.codec.value.parser} config-interval=-1',
CODEC_TO_CAPS[self.frame_params.codec],
'appsink name=appsink emit-signals=true sync=false max-buffers=1',
]
if fps_period is not None:
elements.append(
f'fps_meter {fps_period} output={self.config.fps_meter.output} measurer-name={self.source_id}'
)
elements.append(
'appsink name=appsink emit-signals=true sync=false max-buffers=1'
)

self.pipeline = Gst.parse_launch(' ! '.join(elements))
self.appsrc = self.pipeline.get_by_name('appsrc')
Expand All @@ -263,6 +272,29 @@ def __init__(self, config: Config):
self.config = config
self.logger = get_logger(f'{LOGGER_PREFIX}.{self.__class__.__name__}')
self.writers: Dict[str, ChunkWriter] = {}
self.video_pipeline: VideoPipeline = self.build_video_pipeline()

def build_video_pipeline(self) -> VideoPipeline:
conf = VideoPipelineConfiguration()
conf.frame_period = self.config.fps_meter.period_frames
conf.timestamp_period = (
int(self.config.fps_meter.period_seconds * 1000)
if self.config.fps_meter.period_seconds
else None
)

return VideoPipeline(
'kvs-sink',
[
(
'kvs-sink',
VideoPipelineStagePayloadType.Frame,
StageFunction.none(),
StageFunction.none(),
),
],
conf,
)

def write(self, zmq_message: ZeroMQMessage):
message = zmq_message.message
Expand Down Expand Up @@ -321,6 +353,7 @@ def _write_video_frame(self, frame: VideoFrame, content: Optional[bytes]) -> boo
source_id=frame.source_id,
kvs_name=self.config.stream_name_prefix + frame.source_id,
frame_params=frame_params,
video_pipeline=self.video_pipeline,
config=self.config,
)
self.writers[frame.source_id] = writer
Expand Down
53 changes: 53 additions & 0 deletions docker/Dockerfile.adapters-gstreamer
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,31 @@ RUN apt-get update \
pkg-config \
python3-dev \
python3-pip \
wget \
&& rm -rf /var/lib/apt/lists/*

# install/upgrade pip and builders
RUN python3 -m pip install --no-cache-dir --upgrade \
build \
ninja \
pip \
scikit-build \
setuptools \
wheel

# install cmake
ARG TARGETARCH
ARG CMAKE_VER=3.27.7
RUN if [ "$TARGETARCH" = "amd64" ]; then \
wget -nv -O /tmp/cmake.sh https://github.com/Kitware/CMake/releases/download/v$CMAKE_VER/cmake-$CMAKE_VER-linux-x86_64.sh \
&& sh /tmp/cmake.sh --skip-license --prefix=/usr/local \
&& rm -f /tmp/cmake.sh; \
elif [ "$TARGETARCH" = "arm64" ]; then \
wget -nv -O /tmp/cmake.sh https://github.com/Kitware/CMake/releases/download/v$CMAKE_VER/cmake-$CMAKE_VER-linux-aarch64.sh \
&& sh /tmp/cmake.sh --skip-license --prefix=/usr/local \
&& rm -f /tmp/cmake.sh; \
fi


FROM base-builder AS aravis-builder

Expand Down Expand Up @@ -112,6 +135,30 @@ RUN git clone \
&& rm -rf .git


FROM base-builder AS savant-meta-builder

RUN apt-get update \
&& apt-get install -y \
libglib2.0-dev \
libgstreamer1.0-dev \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /libs/gstsavantframemeta
COPY libs/gstsavantframemeta/requirements.txt .
RUN python3 -m pip install --no-cache-dir -r requirements.txt

# install savant-rs
ARG SAVANT_RS_VERSION
ARG SAVANT_RS_GH_REPO=insight-platform/savant-rs
ARG SAVANT_RS_GH_TOKEN
COPY utils/install_savant_rs.py utils/install_savant_rs.py
RUN ./utils/install_savant_rs.py $SAVANT_RS_VERSION ./utils && \
rm -rf ./utils

COPY libs/gstsavantframemeta .
RUN python3 -m build --wheel --no-isolation && rm -rf _skbuild


FROM base

ARG PROJECT_PATH=/opt/savant
Expand Down Expand Up @@ -141,14 +188,20 @@ RUN if [ "$(arch)" = "aarch64" ]; then \
fi

COPY --from=aravis-builder /build/aravis/dist /usr

COPY --from=kvs-sdk-builder /opt/amazon-kvs-sdk/dist/lib /opt/amazon-kvs-sdk/lib
ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/opt/amazon-kvs-sdk/lib"

COPY --from=kvs-sdk-builder /opt/pykvssdk/dist /opt/pykvssdk/dist
RUN python -m pip install --no-cache-dir /opt/pykvssdk/dist/*.whl

COPY --from=kvs-consumer-builder /opt/amazon-kinesis-video-streams-consumer-library-for-python /opt/amazon-kinesis-video-streams-consumer-library-for-python
RUN python -m pip install --no-cache-dir -r /opt/amazon-kinesis-video-streams-consumer-library-for-python/requirements.txt
ENV PYTHONPATH="${PYTHONPATH}:/opt/amazon-kinesis-video-streams-consumer-library-for-python"

COPY --from=savant-meta-builder /libs/gstsavantframemeta/dist /libs/gstsavantframemeta/dist
RUN python -m pip install --no-cache-dir /libs/gstsavantframemeta/dist/*.whl

# install savant-rs
ARG SAVANT_RS_VERSION
ARG SAVANT_RS_GH_REPO=insight-platform/savant-rs
Expand Down
4 changes: 0 additions & 4 deletions docs/source/savant_101/10_adapters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1507,10 +1507,6 @@ The Multistream Kinesis Video Stream Sink Adapter sends video frames to Kinesis
- A number of seconds between FPS reports.
- Unset
- ``10``
* - ``FPS_OUTPUT``
- Where to output FPS reports; one of: ``stdout``, ``logger``.
- ``stdout``
- ``logger``

Running the adapter with Docker:

Expand Down
83 changes: 67 additions & 16 deletions libs/gstsavantframemeta/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
cmake_minimum_required(VERSION 3.12)
include(CheckLanguage)

project(pygstsavantframemeta VERSION "0.0.3" LANGUAGES CXX)
check_language(CUDA)
if (CMAKE_CUDA_COMPILER)
set(SAVANT_NVDS_ENABLED ON)
enable_language(CUDA)
else()
set(SAVANT_NVDS_ENABLED OFF)
endif()

project(pygstsavantframemeta VERSION "0.0.2" LANGUAGES CXX CUDA)
set(python_module_name pygstsavantframemeta)

set(CMAKE_CXX_STANDARD 14)
Expand All @@ -14,8 +23,14 @@ endif()
set(CMAKE_CXX_FLAGS "-O3")
set(CMAKE_CXX_FLAGS_RELEASE "-O3")

if (SAVANT_NVDS_ENABLED)
add_definitions(-DSAVANT_NVDS_ENABLED=${SAVANT_NVDS_ENABLED})
endif()

find_package(PkgConfig REQUIRED)
find_package(CUDA REQUIRED)
if (SAVANT_NVDS_ENABLED)
find_package(CUDA REQUIRED)
endif()
find_package(Python3 COMPONENTS Interpreter Development REQUIRED)
find_package(pybind11 CONFIG REQUIRED)

Expand All @@ -40,7 +55,7 @@ message(STATUS "Found core-py library for savant-rs: ${SAVANT_RS_CORE_PY_LIB_FIL
file(GLOB SAVANT_RS_LIB_FILE "${SAVANT_RS_LIB_DIR}/savant_rs.cpython*.so")
message(STATUS "Found library for savant-rs: ${SAVANT_RS_LIB_FILE}")

if(NOT DEFINED DeepStream_DIR)
if(SAVANT_NVDS_ENABLED AND NOT DEFINED DeepStream_DIR)
set(DeepStream_DIR /opt/nvidia/deepstream/deepstream)
endif()

Expand All @@ -51,27 +66,59 @@ include_directories(
${Python3_INCLUDE_DIRS}
${GLIB_INCLUDE_DIRS}
${GSTREAMER_INCLUDE_DIRS}
${CUDA_INCLUDE_DIRS}
${DeepStream_DIR}/sources/includes
${SAVANT_RS_LIB_DIR}/include
${CMAKE_SOURCE_DIR}/gstsavantframemeta/include
${CMAKE_SOURCE_DIR}/pygstsavantframemeta
)
)
if (SAVANT_NVDS_ENABLED)
include_directories(
${CUDA_INCLUDE_DIRS}
${DeepStream_DIR}/sources/includes
)
endif()

link_directories(
${Python3_LIBRARY_DIRS}
${GLIB_LIBRARY_DIRS}
${GSTREAMER_LIBRARY_DIRS}
${CUDA_LIBRARY_DIRS}
${DeepStream_DIR}/lib
${SAVANT_RS_CORE_LIB_DIR}
${SAVANT_RS_LIB_DIR}
)
)
if (SAVANT_NVDS_ENABLED)
link_directories(
${CUDA_LIBRARY_DIRS}
${DeepStream_DIR}/lib
)
endif()

add_subdirectory(gstsavantframemeta ../build/gstsavantframemeta)

file (GLOB SOURCE_FILES "gstsavantframemeta/src/*.cpp")
file (GLOB HEADER_FILES "gstsavantframemeta/include/*.h")
set(SOURCE_FILES
"gstsavantframemeta/src/gstsavantframemeta.cpp"
)
if (SAVANT_NVDS_ENABLED)
set(SOURCE_FILES
${SOURCE_FILES}
"gstsavantframemeta/src/gstsavantbatchmeta.cpp"
"gstsavantframemeta/src/nvdssavantframemeta.cpp"
"gstsavantframemeta/src/savantrsprobes.cpp"
"gstsavantframemeta/src/savantnvprobes.cpp"
)
endif()

set(HEADER_FILES
"gstsavantframemeta/include/gstsavantframemeta.h"
)
if (SAVANT_NVDS_ENABLED)
set(HEADER_FILES
${HEADER_FILES}
"gstsavantframemeta/include/gstsavantbatchmeta.h"
"gstsavantframemeta/include/nvdssavantframemeta.h"
"gstsavantframemeta/include/savantrsprobes.h"
"gstsavantframemeta/include/savantnvprobes.h"
)
endif()

file (GLOB PYTHON_FILES "pygstsavantframemeta/*.cpp" "pygstsavantframemeta/*.h")

source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} FILES ${SOURCE_FILES} ${HEADER_FILES} ${PYTHON_FILES} )
Expand All @@ -87,20 +134,24 @@ pybind11_add_module(${python_module_name} SHARED
${SOURCE_FILES}
${HEADER_FILES}
${PYTHON_FILES}
)
)

target_link_libraries(${python_module_name} PRIVATE
${Python3_LIBRARIES}
${GLIB_LIBRARIES}
${GSTREAMER_LIBRARIES}
${CUDA_LIBRARIES}
nvds_meta
nvdsgst_meta
savant_rs
pybind11::module
pybind11::lto
gstsavantframemeta
)
)
if (SAVANT_NVDS_ENABLED)
target_link_libraries(${python_module_name} PRIVATE
${CUDA_LIBRARIES}
nvds_meta
nvdsgst_meta
)
endif()

pybind11_extension(${python_module_name})
pybind11_strip(${python_module_name})
Expand Down

0 comments on commit 33875e7

Please sign in to comment.