Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream.segmented: add --stream-segmented-duration #5601

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/streamlink/session/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ class StreamlinkOptions(Options):
- ``float``
- ``10.0``
- Segment connect and read timeout
* - stream-segmented-duration
- ``float | None``
- ``None``
- Limit the playback duration of segmented streams, rounded to the nearest segment
* - stream-timeout
- ``float``
- ``60.0``
Expand All @@ -154,10 +158,10 @@ class StreamlinkOptions(Options):
- ``0.0``
- Number of seconds to skip from the beginning of the HLS stream,
interpreted as a negative offset for livestreams
* - hls-duration
* - hls-duration *(deprecated)*
- ``float | None``
- ``None``
- Limit the HLS stream playback duration, rounded to the nearest HLS segment
- See ``stream-segmented-duration``
* - hls-playlist-reload-attempts
- ``int``
- ``3``
Expand Down Expand Up @@ -312,6 +316,7 @@ def __init__(self, session: "Streamlink") -> None:
"stream-segment-attempts": 3,
"stream-segment-threads": 1,
"stream-segment-timeout": 10.0,
"stream-segmented-duration": None,
"stream-timeout": 60.0,
"hls-live-edge": 3,
"hls-live-restart": False,
Expand Down Expand Up @@ -487,4 +492,5 @@ def inner(self: "StreamlinkOptions", key: str, value: Any) -> None:
"dash-timeout": _factory_set_deprecated("stream-timeout", float),
"hls-timeout": _factory_set_deprecated("stream-timeout", float),
"http-stream-timeout": _factory_set_deprecated("stream-timeout", float),
"hls-duration": _factory_set_deprecated("stream-segmented-duration", float),
}
4 changes: 4 additions & 0 deletions src/streamlink/stream/dash/dash.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __init__(self, *args, **kwargs):
self.mpd = self.stream.mpd

self.manifest_reload_retries = self.session.options.get("dash-manifest-reload-attempts")
self.duration = self.stream.duration or self.duration

@contextmanager
def sleeper(self, duration):
Expand Down Expand Up @@ -198,20 +199,23 @@ def __init__(
mpd: MPD,
video_representation: Optional[Representation] = None,
audio_representation: Optional[Representation] = None,
duration: Optional[float] = None,
**kwargs,
):
"""
:param session: Streamlink session instance
:param mpd: Parsed MPD manifest
:param video_representation: Video representation
:param audio_representation: Audio representation
:param duration: Number of seconds until ending the stream
:param kwargs: Additional keyword arguments passed to :meth:`requests.Session.request`
"""

super().__init__(session)
self.mpd = mpd
self.video_representation = video_representation
self.audio_representation = audio_representation
self.duration = duration
self.args = session.http.valid_request_args(**kwargs)

def __json__(self): # noqa: PLW3201
Expand Down
13 changes: 4 additions & 9 deletions src/streamlink/stream/hls/hls.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,10 @@ def __init__(self, *args, **kwargs) -> None:
self.playlist_reload_retries = self.session.options.get("hls-playlist-reload-attempts")
self.segment_queue_timing_threshold_factor = self.session.options.get("hls-segment-queue-threshold")
self.live_edge = self.session.options.get("hls-live-edge")

self.duration_offset_start = int(self.stream.start_offset + (self.session.options.get("hls-start-offset") or 0))
self.duration_limit = self.stream.duration or (
int(self.session.options.get("hls-duration")) if self.session.options.get("hls-duration") else None)
self.duration = self.stream.duration or self.duration or self.session.options.get("hls-duration")

self.hls_live_restart = self.stream.force_restart or self.session.options.get("hls-live-restart")

if str(self.playlist_reload_time_override).isnumeric() and float(self.playlist_reload_time_override) >= 2:
Expand Down Expand Up @@ -453,12 +454,11 @@ def iter_segments(self):
]))
log.debug("; ".join([
f"Start offset: {self.duration_offset_start}",
f"Duration: {self.duration_limit}",
f"Duration: {self.duration}",
f"Start Sequence: {self.playlist_sequence}",
f"End Sequence: {self.playlist_end}",
]))

total_duration = 0
while not self.closed:
queued = False
for segment in self.playlist_segments:
Expand All @@ -480,11 +480,6 @@ def iter_segments(self):
yield segment
queued = True

total_duration += segment.duration
if self.duration_limit and total_duration >= self.duration_limit:
log.info(f"Stopping stream early after {self.duration_limit}")
return

if self.closed: # pragma: no cover
return

Expand Down
9 changes: 9 additions & 0 deletions src/streamlink/stream/segmented/segmented.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ def __init__(self, reader: SegmentedStreamReader, **kwargs) -> None:
self.stream = reader.stream
self.session = reader.session

self.duration: Optional[float] = self.session.options.get("stream-segmented-duration")

def close(self) -> None:
"""
Shuts down the thread.
Expand All @@ -212,9 +214,16 @@ def iter_segments(self) -> Generator[TSegment, None, None]:
yield

def run(self) -> None:
duration = 0.0
for segment in self.iter_segments():
if self.closed: # pragma: no cover
break

duration += segment.duration
if self.duration is not None and duration > self.duration:
log.info(f"Stopping stream early after {self.duration:.2f}s")
break

self.writer.put(segment)

# End of stream, tells the writer to exit
Expand Down
18 changes: 14 additions & 4 deletions src/streamlink_cli/argparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,17 @@ def build_parser():
Default is 10.0.
""",
)
transport_hls.add_argument(
"--stream-segmented-duration",
type=hours_minutes_seconds_float,
metavar="[[XX:]XX:]XX[.XX] | [XXh][XXm][XX[.XX]s]",
help="""
Limit the playback duration of segmented streams, like HLS and DASH.
The actual duration may be slightly longer, as it is rounded to the nearest segment.

Default is unlimited.
""",
)
transport.add_argument(
"--stream-timeout",
type=num(float, gt=0),
Expand Down Expand Up @@ -1060,9 +1071,7 @@ def build_parser():
type=hours_minutes_seconds_float,
metavar="[[XX:]XX:]XX[.XX] | [XXh][XXm][XX[.XX]s]",
help="""
Limit the playback duration, useful for watching segments of a stream.
The actual duration may be slightly longer, as it is rounded to the
nearest HLS segment.
Deprecated in favor of --stream-segmented-duration.

Default is unlimited.
""",
Expand Down Expand Up @@ -1395,6 +1404,7 @@ def build_parser():
("hls_segment_threads", "hls-segment-threads", None),
("hls_segment_timeout", "hls-segment-timeout", None),
("hls_timeout", "hls-timeout", None),
("hls_duration", "hls-duration", None),
("http_stream_timeout", "http-stream-timeout", None),

# stream transport arguments
Expand All @@ -1403,11 +1413,11 @@ def build_parser():
("stream_segment_attempts", "stream-segment-attempts", None),
("stream_segment_threads", "stream-segment-threads", None),
("stream_segment_timeout", "stream-segment-timeout", None),
("stream_segmented_duration", "stream-segmented-duration", None),
("stream_timeout", "stream-timeout", None),
("hls_live_edge", "hls-live-edge", None),
("hls_live_restart", "hls-live-restart", None),
("hls_start_offset", "hls-start-offset", None),
("hls_duration", "hls-duration", None),
("hls_playlist_reload_attempts", "hls-playlist-reload-attempts", None),
("hls_playlist_reload_time", "hls-playlist-reload-time", None),
("hls_segment_queue_threshold", "hls-segment-queue-threshold", None),
Expand Down
81 changes: 60 additions & 21 deletions tests/stream/dash/test_dash.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from streamlink.exceptions import PluginError
from streamlink.session import Streamlink
from streamlink.stream.dash import MPD, DASHStream, DASHStreamWorker, MPDParsingError
from streamlink.stream.dash import MPD, DASHSegment, DASHStream, DASHStreamReader, DASHStreamWorker, MPDParsingError
from streamlink.stream.dash.dash import log
from streamlink.utils.parse import parse_xml as original_parse_xml
from tests.resources import text, xml
Expand Down Expand Up @@ -370,11 +370,12 @@ def mock_wait(self, monkeypatch: pytest.MonkeyPatch) -> Mock:
return mock

@pytest.fixture()
def segments(self) -> List[Mock]:
def segments(self) -> List[DASHSegment]:
return [
Mock(url="init_segment"),
Mock(url="first_segment"),
Mock(url="second_segment"),
DASHSegment(uri="init_segment", num=-1, duration=0.0),
DASHSegment(uri="first_segment", num=0, duration=2.0),
DASHSegment(uri="second_segment", num=1, duration=3.0),
DASHSegment(uri="third_segment", num=2, duration=5.0),
]

@pytest.fixture()
Expand Down Expand Up @@ -406,28 +407,34 @@ def representation(self, mpd) -> Mock:
return mpd.periods[0].adaptationSets[0].representations[0]

@pytest.fixture()
def worker(self, timestamp: datetime, mpd: Mock):
stream = Mock(
mpd=mpd,
period=0,
args={},
)
reader = Mock(
def stream(self, request: pytest.FixtureRequest, monkeypatch: pytest.MonkeyPatch, session: Streamlink, mpd: Mock):
options = getattr(request, "param", {})

monkeypatch.setattr(session.http, "request", Mock())
monkeypatch.setattr(session.http, "xml", Mock())

return DASHStream(session, mpd, **options)

@pytest.fixture()
def reader(self, session: Streamlink, stream: DASHStream, timestamp: datetime):
return Mock(
session=session,
stream=stream,
ident=(None, None, "1"),
timestamp=timestamp,
)
worker = DASHStreamWorker(reader)

return worker
@pytest.fixture()
def worker(self, reader: DASHStreamReader):
return DASHStreamWorker(reader)

def test_dynamic_reload(
self,
monkeypatch: pytest.MonkeyPatch,
timestamp: datetime,
worker: DASHStreamWorker,
representation: Mock,
segments: List[Mock],
segments: List[DASHSegment],
mpd: Mock,
):
mpd.dynamic = True
Expand All @@ -443,7 +450,7 @@ def test_dynamic_reload(

representation.segments.reset_mock()
representation.segments.return_value = segments[1:]
assert [next(segment_iter), next(segment_iter)] == segments[1:]
assert [next(segment_iter), next(segment_iter), next(segment_iter)] == segments[1:]
assert representation.segments.call_args_list == [call(), call(init=False, timestamp=None)]
assert not worker._wait.is_set()

Expand All @@ -452,7 +459,7 @@ def test_static(
worker: DASHStreamWorker,
timestamp: datetime,
representation: Mock,
segments: List[Mock],
segments: List[DASHSegment],
mpd: Mock,
):
mpd.dynamic = False
Expand All @@ -464,7 +471,7 @@ def test_static(
assert worker._wait.is_set()

# Verify the fix for https://github.com/streamlink/streamlink/issues/2873
@pytest.mark.parametrize("duration", [
@pytest.mark.parametrize("period_duration", [
0,
204.32,
])
Expand All @@ -475,16 +482,48 @@ def test_static_refresh_wait(
mock_time: Mock,
worker: DASHStreamWorker,
representation: Mock,
segments: List[Mock],
segments: List[DASHSegment],
mpd: Mock,
duration: float,
period_duration: float,
):
mpd.dynamic = False
mpd.type = "static"
mpd.periods[0].duration.total_seconds.return_value = duration
mpd.periods[0].duration.total_seconds.return_value = period_duration

representation.segments.return_value = segments
assert list(worker.iter_segments()) == segments
assert representation.segments.call_args_list == [call(init=True, timestamp=timestamp)]
assert mock_wait.call_args_list == [call(5)]
assert worker._wait.is_set()

@pytest.mark.parametrize(("stream", "session"), [
pytest.param({"duration": 5.0}, {}, id="duration keyword"),
pytest.param({}, {"stream-segmented-duration": 5.0}, id="stream-segmented-duration session option"),
pytest.param({"duration": 5.0}, {"stream-segmented-duration": 2.0}, id="duration keyword priority"),
], indirect=["stream", "session"])
def test_duration(
self,
caplog: pytest.LogCaptureFixture,
reader: Mock,
worker: DASHStreamWorker,
timestamp: datetime,
representation: Mock,
segments: List[DASHSegment],
mpd: Mock,
stream: DASHStream,
session: Streamlink,
):
caplog.set_level("INFO", "streamlink")

mpd.dynamic = False
mpd.type = "static"

representation.segments.return_value = segments
worker.run()

assert [call_arg.args[0] for call_arg in reader.writer.put.call_args_list] == segments[0:-1] + [None]
assert representation.segments.call_args_list == [call(init=True, timestamp=timestamp)]
assert worker._wait.is_set()
assert [(record.name, record.levelname, record.message) for record in caplog.records] == [
("streamlink.stream.segmented", "info", "Stopping stream early after 5.00s"),
]
Loading