Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
import pickle
import socket
import subprocess
import re

from jsonschema import validate, ValidationError
from typing import Any, Iterable, List, Mapping, Tuple, Union
from typing import Any, cast, Iterable, List, Mapping, Tuple, Union

import mpf_component_api as mpf
import mpf_component_util as mpf_util
Expand All @@ -45,7 +46,6 @@ class LlamaVideoSummarizationComponent:
def __init__(self):
self.child_process = ChildProcess(['/llama/venv/bin/python3', '/llama/summarize_video.py', str(log.getEffectiveLevel())])


def get_detections_from_video(self, job: mpf.VideoJob) -> Iterable[mpf.VideoTrack]:
try:
log.info('Received video job.')
Expand All @@ -62,6 +62,15 @@ def get_detections_from_video(self, job: mpf.VideoJob) -> Iterable[mpf.VideoTrac
segment_stop_time = (job.stop_frame + 1) / float(job.media_properties['FPS'])

job_config = _parse_properties(job.job_properties, segment_start_time)

if job_config['timeline_check_target_threshold'] < 0 and \
job_config['timeline_check_acceptable_threshold'] >= 0:
log.warning('TIMELINE_CHECK_ACCEPTABLE_THRESHOLD will be ignored since TIMELINE_CHECK_TARGET_THRESHOLD < 0.')

if job_config['timeline_check_acceptable_threshold'] < job_config['timeline_check_target_threshold']:
raise mpf.DetectionError.INVALID_PROPERTY.exception(
'TIMELINE_CHECK_ACCEPTABLE_THRESHOLD must be >= TIMELINE_CHECK_TARGET_THRESHOLD.')

job_config['video_path'] = job.data_uri
job_config['segment_start_time'] = segment_start_time
job_config['segment_stop_time'] = segment_stop_time
Expand Down Expand Up @@ -89,124 +98,146 @@ def _get_response_from_subprocess(self, job_config: dict) -> dict:

max_attempts = job_config['generation_max_attempts']
timeline_check_target_threshold = job_config['timeline_check_target_threshold']
timeline_check_acceptable_threshold = job_config['timeline_check_acceptable_threshold']
segment_start_time = job_config['segment_start_time']
segment_stop_time = job_config['segment_stop_time']

response_json = {}
response_json = None
acceptable_json = None
error = None
while max(attempts.values()) < max_attempts:
response = self.child_process.send_job_get_response(job_config)
response_json, error = self._check_response(attempts, max_attempts, schema_json, response)
if error is not None:
continue

# if no error, then response_json should be valid
event_timeline = response_json['video_event_timeline'] # type: ignore

if timeline_check_target_threshold != -1:
error = self._check_timeline(
timeline_check_target_threshold, attempts, max_attempts, segment_start_time, segment_stop_time, event_timeline)
if timeline_check_target_threshold >= 0:
acceptable, error = self._check_timeline(
timeline_check_target_threshold, timeline_check_acceptable_threshold,
attempts, max_attempts, segment_start_time, segment_stop_time, cast(dict, response_json))
if acceptable:
acceptable_json = response_json
if error is not None:
continue

break

if error:
raise mpf.DetectionError.DETECTION_FAILED.exception(f'Subprocess failed: {error}')
if acceptable_json is not None:
log.info('Couldn\'t satisfy target threshold. Falling back to response that satisfies acceptable threshold.')
return acceptable_json
else:
raise mpf.DetectionError.DETECTION_FAILED.exception(f'Subprocess failed: {error}')

# if no error, then response_json should be valid
# if no error, then response_json should be valid and meet target criteria
return response_json # type: ignore


def _check_response(self, attempts: dict, max_attempts: int, schema_json: dict, response: str
) -> Tuple[Union[dict, None], Union[str, None]]:
error = None
response_json = None

if not response:
error = 'Empty response.'
log.warning(error)
log.warning(f'Failed {attempts["base"] + 1} of {max_attempts} base attempts.')
attempts['base'] += 1
return None, error

try:
response_json = json.loads(response)
except ValueError as ve:
error = 'Response is not valid JSON.'
if not error:
try:
response_json = json.loads(response)
except ValueError as ve:
error = f'Response is not valid JSON. {str(ve)}'

if not error and response_json:
try:
validate(response_json, schema_json)
except ValidationError as ve:
error = f'Response JSON is not in the desired format. {str(ve)}'

if not error and response_json:
try:
event_timeline = response_json['video_event_timeline']
for event in event_timeline:
# update values for later use
event["timestamp_start"] = _get_timestamp_value(event["timestamp_start"])
event["timestamp_end"] = _get_timestamp_value(event["timestamp_end"])
except ValueError as ve:
error = f'Response JSON is not in the desired format. {str(ve)}'

if error:
log.warning(error)
log.warning(str(ve))
log.warning(f'Failed {attempts["base"] + 1} of {max_attempts} base attempts.')
attempts['base'] += 1
return response_json, error

try:
validate(response_json, schema_json)
except ValidationError as ve:
error = 'Response JSON is not in the desired format.'
log.warning(error)
log.warning(str(ve))
log.warning(f'Failed {attempts["base"] + 1} of {max_attempts} base attempts.')
attempts['base'] += 1
return response_json, error

return response_json, None
return response_json, error


def _check_timeline(self, threshold: float, attempts: dict, max_attempts: int,
segment_start_time: float, segment_stop_time: float, event_timeline: list
) -> Union[str, None]:
def _check_timeline(self, target_threshold: float, accept_threshold: float, attempts: dict, max_attempts: int,
segment_start_time: float, segment_stop_time: float, response_json: dict
) -> Tuple[bool, Union[str, None]]:

error = None
event_timeline = response_json['video_event_timeline'] # type: ignore

acceptable_checks = dict(
near_seg_start = False,
near_seg_stop = False)

hard_error = None
soft_error = None
for event in event_timeline:
timestamp_start = _get_timestamp_value(event["timestamp_start"])
timestamp_end = _get_timestamp_value(event["timestamp_end"])
timestamp_start = event["timestamp_start"]
timestamp_end = event["timestamp_end"]

if timestamp_start < 0:
error = (f'Timeline event start time of {timestamp_start} < 0.')
hard_error = (f'Timeline event start time of {timestamp_start} < 0.')
break

if timestamp_end < 0:
error = (f'Timeline event end time of {timestamp_end} < 0.')
hard_error = (f'Timeline event end time of {timestamp_end} < 0.')
break

if timestamp_end < timestamp_start:
error = (f'Timeline event end time is less than event start time. '
hard_error = (f'Timeline event end time is less than event start time. '
f'{timestamp_end} < {timestamp_start}.')
break

if (segment_start_time - timestamp_start) > threshold:
error = (f'Timeline event start time occurs too soon before segment start time. '
f'({segment_start_time} - {timestamp_start}) > {threshold}.')
break

if (timestamp_end - segment_stop_time) > threshold:
error = (f'Timeline event end time occurs too late after segment stop time. '
f'({timestamp_end} - {segment_stop_time}) > {threshold}.')
break

if not error:
minmax_errors = []
if not hard_error:
min_event_start = min(list(map(lambda d: _get_timestamp_value(d.get('timestamp_start')),
filter(lambda d: 'timestamp_start' in d, event_timeline))))

if abs(segment_start_time - min_event_start) > threshold:
error = (f'Min timeline event start time not close enough to segment start time. '
f'abs({segment_start_time} - {min_event_start}) > {threshold}.')

if not error:

max_event_end = max(list(map(lambda d: _get_timestamp_value(d.get('timestamp_end')),
filter(lambda d: 'timestamp_end' in d, event_timeline))))

if abs(max_event_end - segment_stop_time) > threshold:
error = (f'Max timeline event end time not close enough to segment stop time. '
f'abs({max_event_end} - {segment_stop_time}) > {threshold}.')
if abs(segment_start_time - min_event_start) > target_threshold:
minmax_errors.append((f'Min timeline event start time not close enough to segment start time. '
f'abs({segment_start_time} - {min_event_start}) > {target_threshold}.'))

if abs(max_event_end - segment_stop_time) > target_threshold:
minmax_errors.append((f'Max timeline event end time not close enough to segment stop time. '
f'abs({max_event_end} - {segment_stop_time}) > {target_threshold}.'))

if accept_threshold >= 0:
acceptable_checks['near_seg_start'] = abs(segment_start_time - min_event_start) <= accept_threshold

acceptable_checks['near_seg_stop'] = abs(max_event_end - segment_stop_time) <= accept_threshold

acceptable = not hard_error and all(acceptable_checks.values())

if len(minmax_errors) > 0:
soft_error = minmax_errors.pop()

error = None
if hard_error:
error = hard_error
elif soft_error:
error = soft_error

if error:
log.warning(error)
log.warning(f'Failed {attempts["timeline"] + 1} of {max_attempts} timeline attempts.')
attempts['timeline'] += 1
return error

return None

return acceptable, error


def _create_segment_summary_track(self, job: mpf.VideoJob, response_json: dict) -> mpf.VideoTrack:
Expand Down Expand Up @@ -263,8 +294,8 @@ def _create_tracks(self, job: mpf.VideoJob, response_json: dict) -> Iterable[mpf

for event in response_json['video_event_timeline']:
# get offset start/stop times in milliseconds
event_start_time = int(_get_timestamp_value(event['timestamp_start']) * 1000)
event_stop_time = int(_get_timestamp_value(event['timestamp_end']) * 1000)
event_start_time = int(event['timestamp_start'] * 1000)
event_stop_time = int(event['timestamp_end'] * 1000)

offset_start_frame = int((event_start_time * video_fps) / 1000)
offset_stop_frame = int((event_stop_time * video_fps) / 1000) - 1
Expand Down Expand Up @@ -331,13 +362,18 @@ def _create_tracks(self, job: mpf.VideoJob, response_json: dict) -> Iterable[mpf
log.info('Processing complete. Video segment %s summarized in %d tracks.' % (segment_id, len(tracks)))
return tracks


def _get_timestamp_value(seconds: Any) -> float:
if isinstance(seconds, str):
secval = float(seconds.replace('s', ''))
if re.match(r"^\s*\d+(\.\d*)?\s*[Ss]?$", seconds):
secval = float(re.sub('s', '', seconds, flags=re.IGNORECASE))
else:
raise mpf.DetectionError.DETECTION_FAILED.exception(f'Invalid timestamp: {seconds}')
else:
secval = float(seconds)
return secval


def _parse_properties(props: Mapping[str, str], segment_start_time: float) -> dict:
process_fps = mpf_util.get_property(
props, 'PROCESS_FPS', 1)
Expand All @@ -356,6 +392,8 @@ def _parse_properties(props: Mapping[str, str], segment_start_time: float) -> di
props, 'GENERATION_MAX_ATTEMPTS', 5)
timeline_check_target_threshold = mpf_util.get_property(
props, 'TIMELINE_CHECK_TARGET_THRESHOLD', 10)
timeline_check_acceptable_threshold = mpf_util.get_property(
props, 'TIMELINE_CHECK_ACCEPTABLE_THRESHOLD', 30)

generation_prompt = _read_file(generation_prompt_path) % (segment_start_time)

Expand All @@ -373,7 +411,8 @@ def _parse_properties(props: Mapping[str, str], segment_start_time: float) -> di
generation_json_schema = generation_json_schema,
system_prompt = system_prompt,
generation_max_attempts = generation_max_attempts,
timeline_check_target_threshold = timeline_check_target_threshold
timeline_check_target_threshold = timeline_check_target_threshold,
timeline_check_acceptable_threshold = timeline_check_acceptable_threshold
)


Expand All @@ -400,13 +439,15 @@ def __init__(self, start_cmd: List[str]):
env=env)
self._socket = parent_socket.makefile('rwb')


def __del__(self):
print("Terminating subprocess...")
self._socket.close()
self._proc.terminate()
self._proc.wait()
print("Subprocess terminated")


def send_job_get_response(self, config: dict):
job_bytes = pickle.dumps(config)
self._socket.write(len(job_bytes).to_bytes(4, 'little'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,16 @@
},
{
"name": "TIMELINE_CHECK_TARGET_THRESHOLD",
"description": "Specifies the number of seconds that video events can occur before or after video segment bounds. If exceeded, another attempt will be made to generate the output. Set to -1 to disable check.",
"description": "Specifies the number of seconds that video events can occur before or after video segment bounds. If exceeded, another attempt will be made to generate the output. See also the TIMELINE_CHECK_THRESHOLD_ACCEPTABLE property. Set to < 0 to disable check (e.g. -1).",
"type": "INT",
"defaultValue": "10"
},
{
"name": "TIMELINE_CHECK_ACCEPTABLE_THRESHOLD",
"description": "A secondary timeline validation threshold, in seconds that specifies the number of seconds video events can occur before or after video segment bounds, which will result in an \"acceptable\" timeline. Additional attempts will be made to generate a timeline within the \"desired\" range of TIMELINE_CHECK_TARGET_THRESHOLD, until GENERATION_MAX_ATTEMPTS is reached, after which the \"acceptable\" timeline is returned, or the component responds with an error. Set to < 0 to disable check (e.g. -1).",
"type": "INT",
"defaultValue": "30"
},
{
"name": "TARGET_SEGMENT_LENGTH",
"description": "Default segment length is 180 seconds. Set to -1 to disable segmenting the video.",
Expand Down
Loading