Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CAN-SKIP-UNTIL and delta generation #88

Merged
merged 3 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 76 additions & 28 deletions lib/membrane_http_adaptive_stream/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Membrane.HTTPAdaptiveStream.HLS do
alias Membrane.Time

@version 7
@delta_version 9

@master_playlist_header """
#EXTM3U
Expand All @@ -28,6 +29,16 @@ defmodule Membrane.HTTPAdaptiveStream.HLS do
@keep_latest_n_segment_parts 4
@min_segments_in_delta_playlist 6

# See https://github.com/membraneframework/membrane_http_adaptive_stream_plugin/pull/88
# and https://github.com/erlang/otp/issues/7624
@dialyzer {:nowarn_function,
[
add_serialized_track: 2,
build_media_playlist_path: 2,
serialize_segments: 2,
can_skip_until: 1
]}

defmodule SegmentAttribute do
@moduledoc """
Implementation of `Membrane.HTTPAdaptiveStream.Manifest.SegmentAttribute` behaviour for HTTP Live Streaming
Expand Down Expand Up @@ -115,20 +126,58 @@ defmodule Membrane.HTTPAdaptiveStream.HLS do
end

defp add_serialized_track(tracks_map, track) do
target_duration = calculate_target_duration(track)
playlist_path = build_media_playlist_path(track)
serialized_track = {playlist_path, serialize_track(track)}
tracks_map = Map.put(tracks_map, track.id, serialized_track)

should_generate_delta_playlist? =
Track.supports_partial_segments?(track) &&
Enum.count(track.segments) > @min_segments_in_delta_playlist
case maybe_calculate_delta_params(track, target_duration) do
{:create_delta, delta_ctx} ->
serialized_track =
{playlist_path, serialize_track(track, target_duration, %{delta_ctx | skip_count: 0})}

delta_path = build_media_playlist_path(track, delta?: true)
serialized_delta_track = {delta_path, serialize_track(track, target_duration, delta_ctx)}

tracks_map
|> Map.put(track.id, serialized_track)
|> Map.put(:"#{track.id}_delta", serialized_delta_track)

:dont_create_delta ->
serialized_track = {playlist_path, serialize_track(track, target_duration)}
Map.put(tracks_map, track.id, serialized_track)
end
end

defp calculate_target_duration(track) do
Ratio.ceil(track.segment_duration / Time.second()) |> trunc()
end

if should_generate_delta_playlist? do
delta_path = build_media_playlist_path(track, delta?: true)
serialized_delta_track = {delta_path, serialize_track(track, delta?: true)}
Map.put(tracks_map, :"#{track.id}_delta", serialized_delta_track)
defp maybe_calculate_delta_params(track, target_duration) do
min_duration = Time.seconds(@min_segments_in_delta_playlist * target_duration)

with true <- Track.supports_partial_segments?(track),
latest_full_segments <-
track.segments
|> Qex.reverse()
|> Enum.drop_while(&(&1.type == :partial)),
{skip_count, skip_duration} <-
latest_full_segments
|> Enum.with_index()
|> Enum.reduce_while(0, fn {segment, idx}, duration ->
duration = duration + segment.duration

if duration >= min_duration,
do: {:halt, {Enum.count(latest_full_segments) - idx - 1, duration}},
else: {:cont, duration}
end),
true <- skip_count > 0 do
delta_ctx = %{
skip_count: skip_count,
skip_duration: Ratio.to_float(skip_duration / Time.second())
}

{:create_delta, delta_ctx}
else
tracks_map
_any -> :dont_create_delta
end
end

Expand Down Expand Up @@ -232,41 +281,46 @@ defmodule Membrane.HTTPAdaptiveStream.HLS do
end
end

defp serialize_track(%Track{} = track, [delta?: delta?] \\ [delta?: false]) do
target_duration = Ratio.ceil(track.segment_duration / Time.second()) |> trunc()
defp serialize_track(
%Track{} = track,
target_duration,
delta_ctx \\ %{skip_count: 0, skip_duration: 0}
) do
supports_ll_hls? = Track.supports_partial_segments?(track)

"""
#EXTM3U
#EXT-X-VERSION:#{@version}
#EXT-X-VERSION:#{if delta_ctx.skip_count > 0, do: @delta_version, else: @version}
#EXT-X-TARGETDURATION:#{target_duration}
""" <>
serialize_ll_hls_tags(track) <>
serialize_ll_hls_tags(track, segments_to_skip_duration: delta_ctx.skip_duration) <>
"""
#EXT-X-MEDIA-SEQUENCE:#{track.current_seq_num}
#EXT-X-DISCONTINUITY-SEQUENCE:#{track.current_discontinuity_seq_num}
#EXT-X-MAP:URI="#{track.header_name}"
#{serialize_segments(track.segments, supports_ll_hls?: supports_ll_hls?, delta?: delta?)}
#{serialize_segments(track.segments, supports_ll_hls?: supports_ll_hls?, segments_to_skip_count: delta_ctx.skip_count)}
#{if track.finished?, do: "#EXT-X-ENDLIST", else: serialize_preload_hint_tag(supports_ll_hls?, track)}
"""
end

defp serialize_segments(segments, supports_ll_hls?: supports_ll_hls?, delta?: true) do
segments_to_skip_count = Enum.count(segments) - @min_segments_in_delta_playlist

defp serialize_segments(segments,
supports_ll_hls?: supports_ll_hls?,
segments_to_skip_count: segments_to_skip_count
)
when segments_to_skip_count > 0 do
prefix = """
#EXT-X-SKIP:SKIPPED-SEGMENTS=#{segments_to_skip_count}
"""

serialized_segments =
segments
|> Enum.drop(segments_to_skip_count)
|> serialize_segments(supports_ll_hls?: supports_ll_hls?, delta?: false)
|> serialize_segments(supports_ll_hls?: supports_ll_hls?, segments_to_skip_count: 0)

prefix <> serialized_segments
end

defp serialize_segments(segments, supports_ll_hls?: supports_ll_hls?, delta?: false) do
defp serialize_segments(segments, supports_ll_hls?: supports_ll_hls?, segments_to_skip_count: 0) do
segments
|> Enum.split(-@keep_latest_n_segment_parts)
|> then(fn {regular_segments, ll_segments} ->
Expand Down Expand Up @@ -314,21 +368,15 @@ defmodule Membrane.HTTPAdaptiveStream.HLS do
end)
end

defp serialize_ll_hls_tags(track) do
defp serialize_ll_hls_tags(track, segments_to_skip_duration: segments_to_skip_duration) do
supports_ll_hls? = Track.supports_partial_segments?(track)

if supports_ll_hls? do
can_skip_segments_duration =
track.segments
|> Enum.drop(-@min_segments_in_delta_playlist)
|> Enum.reduce(0, &(&1.duration + &2))
|> then(&Ratio.to_float(&1 / Time.second()))

target_partial_duration =
Float.ceil(Ratio.to_float(track.partial_segment_duration / Time.second()), 3)

"""
#EXT-X-SERVER-CONTROL:CAN-BLOCK-RELOAD=YES,PART-HOLD-BACK=#{3 * target_partial_duration}#{can_skip_until(can_skip_segments_duration)}
#EXT-X-SERVER-CONTROL:CAN-BLOCK-RELOAD=YES,PART-HOLD-BACK=#{3 * target_partial_duration}#{can_skip_until(segments_to_skip_duration)}
#EXT-X-PART-INF:PART-TARGET=#{target_partial_duration}
"""
else
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane_http_adaptive_stream/manifest/track.ex
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,6 @@ defmodule Membrane.HTTPAdaptiveStream.Manifest.Track do
do: {msn, %{track | segment_sequencer: {msn + 1, 0}}}

defp update_segment_duration(track, duration) do
Map.update!(track, :segment_duration, &if(&1 > duration, do: &1, else: duration))
Map.update!(track, :segment_duration, &max(&1, duration))
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ defmodule Membrane.HTTPAdaptiveStream.MixProject do
{:membrane_aac_plugin, "~> 0.16.0"},
{:membrane_h264_plugin, "~> 0.7.1"},
{:bunch, "~> 1.5"},
{:qex, "~> 0.5"},
{:membrane_hackney_plugin, "~> 0.10.0", only: :test},
{:credo, "~> 1.6.1", only: :dev, runtime: false},
{:ex_doc, "~> 0.25", only: :dev, runtime: false},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.HTTPAdaptiveStream.SinkBinIntegrationTest do
# The boolean flag below controls whether reference HLS content in fixtures directory will be created simultaneously with test content.
# It should be set only when developing new HLS features that are expected to introduce changes to reference HLS files. Nevertheless it should
# be done only locally to create and push new reference HLS files and this flag must not be set in remote repository. There is unit test in code below
# that will cause CI to fail if this flag happens to be set on remote repository. Every new version of reference HSL content must
# that will cause CI to fail if this flag happens to be set on remote repository. Every new version of reference HLS content must
# be manually verified by its creator by using some player e.g. ffplay command.

@pipeline_config %{
Expand All @@ -18,7 +18,7 @@ defmodule Membrane.HTTPAdaptiveStream.SinkBinIntegrationTest do
}
@create_fixtures false

@expected_number_of_segments_in_delta_playlist 6
@min_number_of_segments_in_delta_playlist 6

@audio_video_tracks_sources [
{"http://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/test-audio.aac",
Expand Down Expand Up @@ -54,6 +54,15 @@ defmodule Membrane.HTTPAdaptiveStream.SinkBinIntegrationTest do
]
@muxed_av_ref_path "./test/membrane_http_adaptive_stream/integration_test/fixtures/muxed_av/"

@delta_test_sources [
{"http://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s_480x270_120s.h264",
:H264, :high, "long_video"},
{"http://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s_120s.aac",
:AAC, :LC, "long_audio"}
]

@eps 1.0e-8

defmodule TestPipeline do
use Membrane.Pipeline

Expand Down Expand Up @@ -148,7 +157,7 @@ defmodule Membrane.HTTPAdaptiveStream.SinkBinIntegrationTest do
refute @create_fixtures
end

describe "Test HLS content creation for " do
describe "Test HLS content creation for" do
@tag :tmp_dir
test "audio and video tracks", %{tmp_dir: tmp_dir} do
test_pipeline(
Expand Down Expand Up @@ -212,7 +221,7 @@ defmodule Membrane.HTTPAdaptiveStream.SinkBinIntegrationTest do
alias Membrane.HTTPAdaptiveStream.Storages.FileStorage

hackney_sources =
@audio_multiple_video_tracks_sources
@delta_test_sources
|> Enum.map(fn {path, encoding, profile, name} ->
{%Membrane.Hackney.Source{location: path, hackney_opts: [follow_redirect: true]},
encoding, profile, name}
Expand All @@ -225,7 +234,7 @@ defmodule Membrane.HTTPAdaptiveStream.SinkBinIntegrationTest do
sources: hackney_sources,
hls_mode: @pipeline_config.hls_mode,
partial_segments: true,
target_window_duration: @pipeline_config.target_window_duration,
target_window_duration: :infinity,
persist?: @pipeline_config.persist?,
storage: %FileStorage{
directory: tmp_dir
Expand All @@ -241,42 +250,69 @@ defmodule Membrane.HTTPAdaptiveStream.SinkBinIntegrationTest do
|> Enum.each(fn manifest_filename ->
manifest_file = File.read!(Path.join(tmp_dir, manifest_filename))

segments_in_manifest =
Regex.scan(~r/#EXTINF:\d+\.\d+,\s\w+segment_\d+_.+.m4s/, manifest_file)

number_of_segments_in_manifest = Enum.count(segments_in_manifest)
# delta manifest will be generated when plailist is longer then 6 segments
if number_of_segments_in_manifest > @expected_number_of_segments_in_delta_playlist do
# check if manifest contains CAN-SKIP-UNTIL tag
assert Regex.match?(~r/CAN-SKIP-UNTIL=\d+\.*\d*/, manifest_file)

# check if delta file exists
delta_manifest_filename = String.replace(manifest_filename, ".m3u8", "_delta.m3u8")
assert File.exists?(Path.join(tmp_dir, delta_manifest_filename))

delta_manifest_file = File.read!(Path.join(tmp_dir, delta_manifest_filename))

segments_in_delta_manifest =
Regex.scan(~r/#EXTINF:\d+\.\d+,\s\w+segment_\d+_.+.m4s/, delta_manifest_file)

number_of_segments_in_delta_manifest = Enum.count(segments_in_delta_manifest)
# check if delta manifest contains exected number of segments
assert number_of_segments_in_delta_manifest ==
@expected_number_of_segments_in_delta_playlist

# check if delta manifest contains last 6 segments from manifest
assert Enum.take(segments_in_manifest, -@expected_number_of_segments_in_delta_playlist) ==
segments_in_delta_manifest

# check if delta manifest contains #EXT-X-SKIP tag with correct value
[_match, skipped_segments] =
Regex.run(~r/EXT-X-SKIP:SKIPPED-SEGMENTS=(\d+)/, delta_manifest_file)

{skipped_segments, _rest} = Integer.parse(skipped_segments)

assert skipped_segments + number_of_segments_in_delta_manifest ==
number_of_segments_in_manifest
end
target_duration =
manifest_file
|> then(&Regex.run(~r/#EXT-X-TARGETDURATION:(\d+)/, &1, capture: :all_but_first))
|> hd()
|> String.to_integer()

{segments_in_manifest, segment_durations} =
manifest_file
|> then(&Regex.scan(~r/#EXTINF:(\d+\.\d+),\s\w+segment_\d+_.+.m4s/, &1))
|> Enum.map(&List.to_tuple/1)
|> Enum.unzip()

# delta manifest will be generated when the sum of full (finished) segment durations
# is greater than 6 * target duration
# AND there is at least one segment that can be skipped
segment_durations_sum =
segment_durations
|> Enum.drop(1)
|> Enum.reduce(0, &(String.to_float(&1) + &2))

# for the fixtures used in this test, the following condition will be true,
# so delta must be generated
assert segment_durations_sum > @min_number_of_segments_in_delta_playlist * target_duration

delta_manifest_filename = String.replace_suffix(manifest_filename, ".m3u8", "_delta.m3u8")

# check if manifest contains #CAN-SKIP-UNTIL tag
can_skip_until =
manifest_file
|> then(&Regex.run(~r/CAN-SKIP-UNTIL=(\d+\.*\d*)/, &1, capture: :all_but_first))
|> hd()
|> String.to_float()

# check if delta file exists
assert File.exists?(Path.join(tmp_dir, delta_manifest_filename))

delta_manifest_file = File.read!(Path.join(tmp_dir, delta_manifest_filename))

{segments_in_delta_manifest, delta_durations} =
delta_manifest_file
|> then(&Regex.scan(~r/#EXTINF:(\d+\.\d+),\s\w+segment_\d+_.+.m4s/, &1))
|> Enum.map(&List.to_tuple/1)
|> Enum.unzip()

# check if #CAN-SKIP-UNTIL tag has the correct value
delta_durations_sum = Enum.reduce(delta_durations, 0, &(String.to_float(&1) + &2))
assert_in_delta can_skip_until, delta_durations_sum, @eps

number_of_segments_in_delta_manifest = Enum.count(segments_in_delta_manifest)

# check if segments in delta manifest are the same as last segments from regular manifest
assert Enum.take(segments_in_manifest, -number_of_segments_in_delta_manifest) ==
segments_in_delta_manifest

# check if delta manifest contains #EXT-X-SKIP tag with correct value
skipped_segments =
delta_manifest_file
|> then(&Regex.run(~r/EXT-X-SKIP:SKIPPED-SEGMENTS=(\d+)/, &1, capture: :all_but_first))
|> hd()
|> String.to_integer()

assert skipped_segments + number_of_segments_in_delta_manifest ==
Enum.count(segments_in_manifest)
end)
end

Expand Down