Skip to content

Commit

Permalink
#697 add FPS meter to kvs source adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
tomskikh committed Apr 11, 2024
1 parent 5b990eb commit cb0fb36
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 16 deletions.
36 changes: 28 additions & 8 deletions adapters/gst/sources/kvs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,53 @@
from distutils.util import strtobool
from pathlib import Path

from savant.utils.config import opt_config

TIME_DELTAS = {
's': lambda x: timedelta(seconds=x),
'm': lambda x: timedelta(minutes=x),
}


class AwsConfig:
def __init__(self):
self.region = os.environ['AWS_REGION']
self.access_key = os.environ['AWS_ACCESS_KEY']
self.secret_key = os.environ['AWS_SECRET_KEY']


class FpsMeterConfig:
def __init__(self):
self.period_seconds = opt_config('FPS_PERIOD_SECONDS', None, float)
self.period_frames = opt_config('FPS_PERIOD_FRAMES', 1000, int)
self.output = opt_config('FPS_OUTPUT', 'stdout')
assert self.output in [
'stdout',
'logger',
], 'FPS_OUTPUT must be "stdout" or "logger"'


class Config:
def __init__(self):
self.source_id = os.environ['SOURCE_ID']
self.stream_name = os.environ['STREAM_NAME']
timestamp = os.environ.get('TIMESTAMP')
self.timestamp = parse_timestamp(timestamp) if timestamp else datetime.utcnow()
self.aws_region = os.environ['AWS_REGION']
self.access_key = os.environ['AWS_ACCESS_KEY']
self.secret_key = os.environ['AWS_SECRET_KEY']

self.zmq_endpoint = os.environ['ZMQ_ENDPOINT']
self.sync_output = bool(strtobool(os.environ.get('SYNC_OUTPUT', 'False')))
self.playing = bool(strtobool(os.environ.get('PLAYING', 'True')))
self.api_port = int(os.environ.get('API_PORT', 18367))
self.sync_output = opt_config('SYNC_OUTPUT', False, strtobool)
self.playing = opt_config('PLAYING', True, strtobool)
self.api_port = opt_config('API_PORT', 18367, int)

self.save_state = bool(strtobool(os.environ.get('SAVE_STATE', 'False')))
self.save_state = opt_config('SAVE_STATE', False, strtobool)
if self.save_state:
self.state_path = Path(os.environ.get('STATE_PATH', 'state.json'))
self.state_path = opt_config('STATE_PATH', Path('state.json'), Path)
else:
self.state_path = None

self.aws: AwsConfig = AwsConfig()
self.fps_meter: FpsMeterConfig = FpsMeterConfig()


def parse_timestamp(ts: str) -> datetime:
"""Parse a timestamp string into a datetime object.
Expand Down
13 changes: 13 additions & 0 deletions adapters/gst/sources/kvs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,19 @@ def add_sink(self, pad: Gst.Pad, caps: Gst.Caps):
parser.set_property('config-interval', -1)
gst_elements.append(parser)

if self.config.fps_meter.period_seconds or self.config.fps_meter.period_frames:
fps_meter: Gst.Element = Gst.ElementFactory.make('fps_meter')
fps_meter.set_property('output', self.config.fps_meter.output)
if self.config.fps_meter.period_seconds:
fps_meter.set_property(
'period-seconds', self.config.fps_meter.period_seconds
)
else:
fps_meter.set_property(
'period-frames', self.config.fps_meter.period_frames
)
gst_elements.append(fps_meter)

capsfilter: Gst.Element = Gst.ElementFactory.make('capsfilter')
capsfilter.set_property(
'caps',
Expand Down
6 changes: 3 additions & 3 deletions adapters/gst/sources/kvs/stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ def start(self):
source_id=self.config.source_id,
timestamp=self.config.timestamp,
credentials=AwsCredentials(
region=self.config.aws_region,
access_key=self.config.access_key,
secret_key=self.config.secret_key,
region=self.config.aws.region,
access_key=self.config.aws.access_key,
secret_key=self.config.aws.secret_key,
),
is_playing=self.config.playing,
)
Expand Down
14 changes: 9 additions & 5 deletions scripts/run_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,7 @@ def kafka_redis_source(
)
@source_id_option(required=True)
@sync_option
@fps_meter_options
@adapter_docker_image_option('gstreamer')
def kvs_source(
source_id: str,
Expand All @@ -1039,6 +1040,9 @@ def kvs_source(
save_state: bool,
state_path: str,
mount_state_path: Optional[str],
fps_period_frames: Optional[int],
fps_period_seconds: Optional[float],
fps_output: Optional[str],
docker_image: str,
):
"""Read video stream from Kinesis Video Stream.
Expand All @@ -1053,10 +1057,9 @@ def kvs_source(
zmq_endpoint=out_endpoint,
zmq_type=None,
zmq_bind=None,
fps_period_frames=None,
fps_period_seconds=None,
fps_output=None,
use_absolute_timestamps=None,
fps_period_frames=fps_period_frames,
fps_period_seconds=fps_period_seconds,
fps_output=fps_output,
) + [
f'AWS_REGION={aws_region}',
f'AWS_ACCESS_KEY={aws_access_key}',
Expand All @@ -1082,7 +1085,8 @@ def kvs_source(
f'source-kvs-{source_id}',
zmq_endpoints=[out_endpoint],
sync=sync,
entrypoint='/opt/savant/adapters/gst/sources/kvs.sh',
entrypoint='python',
args=['-m', 'adapters.gst.sources.kvs'],
envs=envs,
volumes=volumes,
docker_image=docker_image,
Expand Down

0 comments on commit cb0fb36

Please sign in to comment.