diff --git a/lib/membrane_h264_plugin/parser.ex b/lib/membrane_h264_plugin/parser.ex index 977bd29..df8c360 100644 --- a/lib/membrane_h264_plugin/parser.ex +++ b/lib/membrane_h264_plugin/parser.ex @@ -21,8 +21,11 @@ defmodule Membrane.H264.Parser do * Receiving `%Membrane.H264.RemoteStream{alignment: :au}` results in the parser mode being set to `:au_aligned` The distinguishment between parser modes was introduced to eliminate the redundant operations and to provide a reliable way - for timestamps rewritting: - * in the `:bytestream` mode, the output buffers have their `:pts` and `:dts` set to nil + for rewriting of timestamps: + * in the `:bytestream` mode: + * if option `:framerate` is set to nil, the output buffers have their `:pts` and `:dts` set to nil + * if framerate is specified, `:pts` and `:dts` will be generated automatically, based on that framerate, starting from 0 + This may only be used with h264 profiles `:baseline` and `:constrained_baseline`, where `PTS==DTS`. * in the `:nalu_aligned` mode, the output buffers have their `:pts` and `:dts` set to `:pts` and `:dts` of the input buffer that was holding the first NAL unit making up given access unit (that is being sent inside that output buffer). * in the `:au_aligned` mode, the output buffers have their `:pts` and `:dts` set to `:pts` and `:dts` of the input buffer @@ -72,7 +75,7 @@ defmodule Membrane.H264.Parser do description: """ Framerate of the video, represented as a tuple consisting of a numerator and the denominator. - It's value will be sent inside the output Membrane.H264 caps. + Its value will be sent inside the output Membrane.H264 stream format. """ ] @@ -83,8 +86,10 @@ defmodule Membrane.H264.Parser do nalu_parser: NALuParser.new(), au_splitter: AUSplitter.new(), mode: nil, + profile: nil, previous_timestamps: {nil, nil}, - framerate: opts.framerate + framerate: opts.framerate, + au_counter: 0 } {[], state} @@ -138,21 +143,7 @@ defmodule Membrane.H264.Parser do {access_units, au_splitter} end - {pts, dts} = - case state.mode do - :bytestream -> {nil, nil} - :nalu_aligned -> state.previous_timestamps - :au_aligned -> {buffer.pts, buffer.dts} - end - - state = - if state.mode == :nalu_aligned and state.previous_timestamps != {buffer.pts, buffer.dts} do - %{state | previous_timestamps: {buffer.pts, buffer.dts}} - else - state - end - - actions = prepare_actions_for_aus(access_units, pts, dts, state) + {actions, state} = prepare_actions_for_aus(access_units, state, buffer.pts, buffer.dts) state = %{ state @@ -184,13 +175,7 @@ defmodule Membrane.H264.Parser do {remaining_nalus, au_splitter} = AUSplitter.flush(au_splitter) maybe_improper_aus = access_units ++ [remaining_nalus] - {pts, dts} = - case state.mode do - :bytestream -> {nil, nil} - :nalu_aligned -> state.previous_timestamps - end - - actions = prepare_actions_for_aus(maybe_improper_aus, pts, dts, state) + {actions, state} = prepare_actions_for_aus(maybe_improper_aus, state) actions = if stream_format_sent?(actions, ctx), do: actions, else: [] state = %{ @@ -208,19 +193,70 @@ defmodule Membrane.H264.Parser do {[end_of_stream: :output], state} end - defp prepare_actions_for_aus(aus, pts, dts, state) do - Enum.reduce(aus, [], fn au, acc -> - sps_actions = - case Enum.find(au, &(&1.type == :sps)) do - nil -> - [] + defp prepare_actions_for_aus(aus, state, buffer_pts \\ nil, buffer_dts \\ nil) do + {actions, au_counter, profile} = + Enum.reduce(aus, {[], state.au_counter, state.profile}, fn au, + {actions_acc, cnt, profile} -> + {sps_actions, profile} = maybe_parse_sps(au, state, profile) + {pts, dts} = prepare_timestamps(buffer_pts, buffer_dts, state, profile, cnt) - sps_nalu -> - [stream_format: {:output, Format.from_sps(sps_nalu, framerate: state.framerate)}] - end + {actions_acc ++ sps_actions ++ [{:buffer, {:output, wrap_into_buffer(au, pts, dts)}}], + cnt + 1, profile} + end) + + state = %{state | profile: profile, au_counter: au_counter} - acc ++ sps_actions ++ [{:buffer, {:output, wrap_into_buffer(au, pts, dts)}}] - end) + state = + if state.mode == :nalu_aligned and state.previous_timestamps != {buffer_pts, buffer_dts} do + %{state | previous_timestamps: {buffer_pts, buffer_dts}} + else + state + end + + {actions, state} + end + + defp maybe_parse_sps(au, state, profile) do + case Enum.find(au, &(&1.type == :sps)) do + nil -> + {[], profile} + + sps_nalu -> + fmt = Format.from_sps(sps_nalu, framerate: state.framerate) + {[stream_format: {:output, fmt}], fmt.profile} + end + end + + defp prepare_timestamps(_buffer_pts, _buffer_dts, state, profile, frame_order_number) + when state.mode == :bytestream do + cond do + state.framerate == nil or profile == nil -> + {nil, nil} + + h264_profile_tsgen_supported?(profile) -> + generate_ts_with_constant_framerate( + state.framerate, + frame_order_number, + frame_order_number + ) + + true -> + raise("Timestamp generation for H264 profile `#{inspect(profile)}` is unsupported") + end + end + + defp prepare_timestamps(buffer_pts, buffer_dts, state, _profile, _frame_order_number) + when state.mode == :nalu_aligned do + if state.previous_timestamps == {nil, nil} do + {buffer_pts, buffer_dts} + else + state.previous_timestamps + end + end + + defp prepare_timestamps(buffer_pts, buffer_dts, state, _profile, _frame_order_number) + when state.mode == :au_aligned do + {buffer_pts, buffer_dts} end defp wrap_into_buffer(access_unit, pts, dts) do @@ -281,4 +317,17 @@ defmodule Membrane.H264.Parser do do: Enum.any?(actions, &match?({:stream_format, _stream_format}, &1)) defp stream_format_sent?(_actions, _ctx), do: true + + defp h264_profile_tsgen_supported?(profile), + do: profile in [:baseline, :constrained_baseline] + + defp generate_ts_with_constant_framerate( + {frames, seconds} = _framerate, + presentation_order_number, + decoding_order_number + ) do + pts = div(presentation_order_number * seconds * Membrane.Time.second(), frames) + dts = div(decoding_order_number * seconds * Membrane.Time.second(), frames) + {pts, dts} + end end diff --git a/mix.exs b/mix.exs index d469a0a..ef2006d 100644 --- a/mix.exs +++ b/mix.exs @@ -66,7 +66,7 @@ defmodule Membrane.H264.TODO.Mixfile do licenses: ["Apache-2.0"], links: %{ "GitHub" => @github_url, - "Membrane Framework Homepage" => "https://membraneframework.org" + "Membrane Framework Homepage" => "https://membrane.stream" } ] end diff --git a/test/fixtures/input-10-720p-baseline.h264 b/test/fixtures/input-10-720p-baseline.h264 new file mode 100644 index 0000000..311d19c Binary files /dev/null and b/test/fixtures/input-10-720p-baseline.h264 differ diff --git a/test/integration/modes_test.exs b/test/integration/modes_test.exs index 334aed8..a31c2d6 100644 --- a/test/integration/modes_test.exs +++ b/test/integration/modes_test.exs @@ -7,6 +7,7 @@ defmodule Membrane.H264.ModesTest do alias Membrane.Buffer alias Membrane.H264.Parser alias Membrane.H264.Parser.{AUSplitter, NALuParser, NALuSplitter} + alias Membrane.H264.Support.TestSource alias Membrane.Testing.{Pipeline, Sink} @h264_input_file "test/fixtures/input-10-720p.h264" @@ -54,43 +55,6 @@ defmodule Membrane.H264.ModesTest do |> elem(0) end - defmodule ModeTestSource do - use Membrane.Source - - def_options mode: [] - - def_output_pad :output, - demand_mode: :auto, - mode: :push, - accepted_format: - any_of( - %Membrane.RemoteStream{type: :bytestream}, - %Membrane.H264.RemoteStream{alignment: alignment} when alignment in [:au, :nalu] - ) - - @impl true - def handle_init(_ctx, opts) do - {[], %{mode: opts.mode}} - end - - @impl true - def handle_parent_notification(actions, _ctx, state) do - {actions, state} - end - - @impl true - def handle_playing(_ctx, state) do - stream_format = - case state.mode do - :bytestream -> %Membrane.RemoteStream{type: :bytestream} - :nalu_aligned -> %Membrane.H264.RemoteStream{alignment: :nalu} - :au_aligned -> %Membrane.H264.RemoteStream{alignment: :au} - end - - {[stream_format: {:output, stream_format}], state} - end - end - test "if the pts and dts are set to nil in :bytestream mode" do binary = File.read!(@h264_input_file) mode = :bytestream @@ -99,7 +63,7 @@ defmodule Membrane.H264.ModesTest do {:ok, _supervisor_pid, pid} = Pipeline.start_supervised( structure: [ - child(:source, %ModeTestSource{mode: mode}) + child(:source, %TestSource{mode: mode}) |> child(:parser, Parser) |> child(:sink, Sink) ] @@ -127,7 +91,7 @@ defmodule Membrane.H264.ModesTest do {:ok, _supervisor_pid, pid} = Pipeline.start_supervised( structure: [ - child(:source, %ModeTestSource{mode: mode}) + child(:source, %TestSource{mode: mode}) |> child(:parser, Parser) |> child(:sink, Sink) ] @@ -157,7 +121,7 @@ defmodule Membrane.H264.ModesTest do {:ok, _supervisor_pid, pid} = Pipeline.start_supervised( structure: [ - child(:source, %ModeTestSource{mode: mode}) + child(:source, %TestSource{mode: mode}) |> child(:parser, Parser) |> child(:sink, Sink) ] diff --git a/test/integration/timestamp_generation_test.exs b/test/integration/timestamp_generation_test.exs new file mode 100644 index 0000000..44d4f90 --- /dev/null +++ b/test/integration/timestamp_generation_test.exs @@ -0,0 +1,156 @@ +defmodule Membrane.H264.TimestampGenerationTest do + @moduledoc false + + use ExUnit.Case + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + alias Membrane.Buffer + alias Membrane.H264.Parser + alias Membrane.H264.Parser.{AUSplitter, NALuParser, NALuSplitter} + alias Membrane.H264.Support.TestSource + alias Membrane.Testing.{Pipeline, Sink} + + defmodule EnhancedPipeline do + use Membrane.Pipeline + + @impl true + def handle_init(_ctx, args) do + {[spec: args], %{}} + end + + @impl true + def handle_call({:get_child_pid, child}, ctx, state) do + child_pid = Map.get(ctx.children, child).pid + {[reply: child_pid], state} + end + end + + @h264_input_file "test/fixtures/input-10-720p.h264" + @h264_input_file_baseline "test/fixtures/input-10-720p-baseline.h264" + defp prepare_buffers(binary, :bytestream) do + buffers = + :binary.bin_to_list(binary) |> Enum.chunk_every(400) |> Enum.map(&:binary.list_to_bin(&1)) + + Enum.map(buffers, &%Membrane.Buffer{payload: &1}) + end + + defp prepare_buffers(binary, :au_aligned) do + {nalus_payloads, nalu_splitter} = NALuSplitter.split(binary, NALuSplitter.new()) + {last_nalu_payload, _nalu_splitter} = NALuSplitter.flush(nalu_splitter) + nalus_payloads = nalus_payloads ++ [last_nalu_payload] + + {nalus, _nalu_parser} = + Enum.map_reduce(nalus_payloads, NALuParser.new(), &NALuParser.parse(&1, &2)) + + {aus, au_splitter} = nalus |> AUSplitter.split(AUSplitter.new()) + {last_au, _au_splitter} = AUSplitter.flush(au_splitter) + aus = aus ++ [last_au] + + Enum.map_reduce(aus, 0, fn au, ts -> + {%Membrane.Buffer{payload: Enum.map_join(au, & &1.payload), pts: ts, dts: ts}, ts + 1} + end) + |> elem(0) + end + + test "if the pts and dts are set to nil in :bytestream mode when framerate isn't given" do + binary = File.read!(@h264_input_file_baseline) + mode = :bytestream + input_buffers = prepare_buffers(binary, mode) + + {:ok, _supervisor_pid, pid} = + Pipeline.start_supervised( + structure: [ + child(:source, %TestSource{mode: mode}) + |> child(:parser, Parser) + |> child(:sink, Sink) + ] + ) + + assert_pipeline_play(pid) + send_buffers_actions = for buffer <- input_buffers, do: {:buffer, {:output, buffer}} + Pipeline.message_child(pid, :source, send_buffers_actions ++ [end_of_stream: :output]) + + output_buffers = prepare_buffers(binary, :au_aligned) + + Enum.each(output_buffers, fn buf -> + payload = buf.payload + assert_sink_buffer(pid, :sink, %Buffer{payload: ^payload, pts: nil, dts: nil}) + end) + + Pipeline.terminate(pid, blocking?: true) + end + + test """ + if the pts and dts are generated correctly for profiles :baseline and :constrained_baseline + in :bytestream mode when framerate is given + """ do + binary = File.read!(@h264_input_file_baseline) + mode = :bytestream + input_buffers = prepare_buffers(binary, mode) + + framerate = {30, 1} + + {:ok, _supervisor_pid, pid} = + Pipeline.start_supervised( + structure: [ + child(:source, %TestSource{mode: mode}) + |> child(:parser, %Parser{framerate: framerate}) + |> child(:sink, Sink) + ] + ) + + assert_pipeline_play(pid) + send_buffers_actions = for buffer <- input_buffers, do: {:buffer, {:output, buffer}} + Pipeline.message_child(pid, :source, send_buffers_actions ++ [end_of_stream: :output]) + + output_buffers = prepare_buffers(binary, :au_aligned) + + {frames, seconds} = framerate + + Enum.reduce(output_buffers, 0, fn buf, order_number -> + payload = buf.payload + timestamp = div(seconds * Membrane.Time.second() * order_number, frames) + assert_sink_buffer(pid, :sink, %Buffer{payload: ^payload, pts: ^timestamp, dts: ^timestamp}) + order_number + 1 + end) + + Pipeline.terminate(pid, blocking?: true) + end + + test "if an error is raised when framerate is given for profiles other than :baseline and :constrained_baseline" do + binary = File.read!(@h264_input_file) + mode = :bytestream + input_buffers = prepare_buffers(binary, mode) + + {:ok, _supervisor_pid, pid} = + Pipeline.start_supervised( + custom_args: [ + child(:source, %TestSource{mode: mode}) + |> child(:parser, %Parser{framerate: {30, 1}}) + |> child(:sink, Sink) + ], + module: EnhancedPipeline + ) + + Pipeline.execute_actions(pid, playback: :playing) + assert_pipeline_play(pid) + parser_pid = Membrane.Pipeline.call(pid, {:get_child_pid, :parser}) + send_buffers_actions = for buffer <- input_buffers, do: {:buffer, {:output, buffer}} + + Process.monitor(parser_pid) + Pipeline.message_child(pid, :source, send_buffers_actions ++ [end_of_stream: :output]) + + error = + receive do + {:DOWN, _ref, :process, ^parser_pid, {%RuntimeError{message: msg}, _stacktrace}} -> msg + after + 2000 -> nil + end + + assert error =~ ~r/timestamp.*generation.*unsupported/i + + Pipeline.terminate(pid, blocking?: true) + end +end diff --git a/test/parser/caps_test.exs b/test/parser/stream_format_test.exs similarity index 100% rename from test/parser/caps_test.exs rename to test/parser/stream_format_test.exs diff --git a/test/support/test_source.ex b/test/support/test_source.ex new file mode 100644 index 0000000..9eace62 --- /dev/null +++ b/test/support/test_source.ex @@ -0,0 +1,38 @@ +defmodule Membrane.H264.Support.TestSource do + @moduledoc false + + use Membrane.Source + + def_options mode: [] + + def_output_pad :output, + demand_mode: :auto, + mode: :push, + accepted_format: + any_of( + %Membrane.RemoteStream{type: :bytestream}, + %Membrane.H264.RemoteStream{alignment: alignment} when alignment in [:au, :nalu] + ) + + @impl true + def handle_init(_ctx, opts) do + {[], %{mode: opts.mode}} + end + + @impl true + def handle_parent_notification(actions, _ctx, state) do + {actions, state} + end + + @impl true + def handle_playing(_ctx, state) do + stream_format = + case state.mode do + :bytestream -> %Membrane.RemoteStream{type: :bytestream} + :nalu_aligned -> %Membrane.H264.RemoteStream{alignment: :nalu} + :au_aligned -> %Membrane.H264.RemoteStream{alignment: :au} + end + + {[stream_format: {:output, stream_format}], state} + end +end