Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timestamps for B frames #38

Merged
merged 14 commits into from
Aug 16, 2023
273 changes: 120 additions & 153 deletions lib/membrane_h264_plugin/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@ defmodule Membrane.H264.Parser do
require Membrane.Logger

alias Membrane.{Buffer, H264, RemoteStream}
alias Membrane.H264.Parser.{AUSplitter, Format, NALuParser, NALuSplitter}

alias __MODULE__.DecoderConfigurationRecord
alias Membrane.H264.Parser.{
AUSplitter,
AUTimestampGenerator,
DecoderConfigurationRecord,
Format,
NALuParser,
NALuSplitter
}

@prefix_code <<0, 0, 0, 1>>

Expand Down Expand Up @@ -75,15 +81,6 @@ defmodule Membrane.H264.Parser do
be provided via this option.
"""
],
framerate: [
spec: {pos_integer(), pos_integer()} | nil,
default: nil,
description: """
Framerate of the video, represented as a tuple consisting of a numerator and the
denominator.
Its value will be sent inside the output Membrane.H264 stream format.
"""
],
output_alignment: [
spec: :au | :nalu,
default: :au,
Expand All @@ -94,9 +91,9 @@ defmodule Membrane.H264.Parser do
Defaults to `:au`.
"""
],
skip_until_keyframe?: [
skip_until_keyframe: [
spec: boolean(),
default: false,
default: true,
description: """
Determines whether to drop the stream until the first key frame is received.

Expand All @@ -115,27 +112,57 @@ defmodule Membrane.H264.Parser do
* Decoder Configuration Record, sent as decoder_configuration_record
in `Membrane.H264.RemoteStream` stream format
"""
]
],
framerate: [
spec: {pos_integer(), pos_integer()} | nil,
default: nil,
description: """
Framerate of the video, represented as a tuple consisting of a numerator and the
denominator.
Its value will be sent inside the output Membrane.H264 stream format.
"""
],
generate_best_effort_timestamps: [
spec: boolean(),
default: false,
description: """
Generates timestamps based on given `framerate`.

This option works only when `Membrane.RemoteStream` format arrives on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add some warning when other format arrives and generate_best_effort_timestamps: true is set, that the option has no effect

Copy link
Member Author

@mat-hek mat-hek Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but then we would warn if somebody had the same parser config for different inputs

input pad and requires the `framerate` option to be specified.

Keep in mind that the generated timestamps may be inaccurate and lead
to video getting out of sync with other media, therefore h264 should
be kept in a container that stores the timestamps alongside.
"""
],
# TODO: remove this if possible
max_frame_reorder: [default: nil]

@impl true
def handle_init(_ctx, opts) do
state = %{
nalu_splitter: NALuSplitter.new(maybe_add_prefix(opts.sps) <> maybe_add_prefix(opts.pps)),
nalu_parser: NALuParser.new(),
au_splitter: AUSplitter.new(),
mode: nil,
profile: nil,
previous_timestamps: {nil, nil},
framerate: opts.framerate,
au_counter: 0,
output_alignment: opts.output_alignment,
frame_prefix: <<>>,
parameter_sets_present?: byte_size(opts.sps) > 0 or byte_size(opts.pps) > 0,
skip_until_keyframe?: opts.skip_until_keyframe?,
repeat_parameter_sets?: opts.repeat_parameter_sets,
cached_sps: %{},
cached_pps: %{}
}
{sps, opts} = Map.pop!(opts, :sps)
{pps, opts} = Map.pop!(opts, :pps)

if opts.generate_best_effort_timestamps and opts.framerate == nil do
raise "Invalid options: `generate_best_effort_timestamps` requires `framerate` to be specified"
end

state =
%{
nalu_splitter: NALuSplitter.new(maybe_add_prefix(sps) <> maybe_add_prefix(pps)),
nalu_parser: NALuParser.new(),
au_splitter: AUSplitter.new(),
au_timestamp_generator: AUTimestampGenerator.new(),
mode: nil,
profile: nil,
previous_buffer_timestamps: nil,
frame_prefix: <<>>,
parameter_sets_present?: byte_size(sps) > 0 or byte_size(pps) > 0,
cached_sps: %{},
cached_pps: %{}
}
|> Map.merge(Map.from_struct(opts))

{[], state}
end
Expand Down Expand Up @@ -177,37 +204,17 @@ defmodule Membrane.H264.Parser do
prefix -> {prefix <> buffer.payload, %{state | frame_prefix: <<>>}}
end

{nalus_payloads_list, nalu_splitter} = NALuSplitter.split(payload, state.nalu_splitter)

{nalus_payloads_list, nalu_splitter} =
if state.mode != :bytestream do
{last_nalu_payload, nalu_splitter} = NALuSplitter.flush(nalu_splitter)

if last_nalu_payload != <<>> do
{nalus_payloads_list ++ [last_nalu_payload], nalu_splitter}
else
{nalus_payloads_list, nalu_splitter}
end
else
{nalus_payloads_list, nalu_splitter}
end

{nalus, nalu_parser} =
Enum.map_reduce(nalus_payloads_list, state.nalu_parser, fn nalu_payload, nalu_parser ->
NALuParser.parse(nalu_payload, nalu_parser)
end)

{access_units, au_splitter} = nalus |> AUSplitter.split(state.au_splitter)
is_nalu_aligned = state.mode != :bytestream

{access_units, au_splitter} =
if state.mode == :au_aligned do
{last_au, au_splitter} = AUSplitter.flush(au_splitter)
{access_units ++ [last_au], au_splitter}
else
{access_units, au_splitter}
end
{nalus_payloads, nalu_splitter} =
NALuSplitter.split(payload, is_nalu_aligned, state.nalu_splitter)

{actions, state} = prepare_actions_for_aus(access_units, state, buffer.pts, buffer.dts)
timestamps = if state.mode == :bytestream, do: {nil, nil}, else: {buffer.pts, buffer.dts}
{nalus, nalu_parser} = NALuParser.parse_nalus(nalus_payloads, timestamps, state.nalu_parser)
is_au_aligned = state.mode == :au_aligned
{access_units, au_splitter} = AUSplitter.split(nalus, is_au_aligned, state.au_splitter)
{access_units, state} = skip_improper_aus(access_units, state)
{actions, state} = prepare_actions_for_aus(access_units, state)

state = %{
state
Expand All @@ -221,20 +228,12 @@ defmodule Membrane.H264.Parser do

@impl true
def handle_end_of_stream(:input, ctx, state) when state.mode != :au_aligned do
{last_nalu_payload, nalu_splitter} = NALuSplitter.flush(state.nalu_splitter)
{last_nalu_payload, nalu_splitter} = NALuSplitter.split(<<>>, true, state.nalu_splitter)
{last_nalu, nalu_parser} = NALuParser.parse_nalus(last_nalu_payload, state.nalu_parser)
{maybe_improper_aus, au_splitter} = AUSplitter.split(last_nalu, true, state.au_splitter)
varsill marked this conversation as resolved.
Show resolved Hide resolved
{aus, state} = skip_improper_aus(maybe_improper_aus, state)
{actions, state} = prepare_actions_for_aus(aus, state)

{{access_units, au_splitter}, nalu_parser} =
if last_nalu_payload != <<>> do
{last_nalu, nalu_parser} = NALuParser.parse(last_nalu_payload, state.nalu_parser)
{AUSplitter.split([last_nalu], state.au_splitter), nalu_parser}
else
{{[], state.au_splitter}, state.nalu_parser}
end

{remaining_nalus, au_splitter} = AUSplitter.flush(au_splitter)
maybe_improper_aus = access_units ++ [remaining_nalus]

{actions, state} = prepare_actions_for_aus(maybe_improper_aus, state)
actions = if stream_format_sent?(actions, ctx), do: actions, else: []

state = %{
Expand All @@ -261,56 +260,45 @@ defmodule Membrane.H264.Parser do
end
end

defp prepare_actions_for_aus(aus, state, buffer_pts \\ nil, buffer_dts \\ nil) do
{actions, state} =
Enum.flat_map_reduce(aus, state, fn au, state ->
cnt = state.au_counter
profile = state.profile
{sps_actions, profile} = maybe_parse_sps(au, state, profile)

au = maybe_add_parameter_sets(au, state) |> delete_duplicate_parameter_sets()
state = cache_parameter_sets(state, au)
defp skip_improper_aus(aus, state) do
Enum.flat_map_reduce(aus, state, fn au, state ->
has_seen_keyframe? =
Enum.all?(au, &(&1.status == :valid)) and Enum.any?(au, &(&1.type == :idr))

{pts, dts} = prepare_timestamps(buffer_pts, buffer_dts, state, profile, cnt)
state = %{
state
| skip_until_keyframe: state.skip_until_keyframe and not has_seen_keyframe?
}

state = %{
state
| profile: profile,
au_counter: cnt + 1
}
if Enum.any?(au, &(&1.status == :error)) or state.skip_until_keyframe do
{[], state}
else
{[au], state}
end
end)
end

has_seen_keyframe? =
Enum.all?(au, &(&1.status == :valid)) and Enum.any?(au, &(&1.type == :idr))
defp prepare_actions_for_aus(aus, state) do
Enum.flat_map_reduce(aus, state, fn au, state ->
{sps_actions, state} = maybe_parse_sps(au, state)

state = %{
state
| skip_until_keyframe?: state.skip_until_keyframe? and not has_seen_keyframe?
}
au = maybe_add_parameter_sets(au, state) |> delete_duplicate_parameter_sets()
state = cache_parameter_sets(state, au)

buffers_actions =
if Enum.any?(au, &(&1.status == :error)) or state.skip_until_keyframe? do
[]
else
[{:buffer, {:output, wrap_into_buffer(au, pts, dts, state.output_alignment)}}]
end
{{pts, dts}, state} = prepare_timestamps(au, state)

{sps_actions ++ buffers_actions, state}
end)
buffers_actions = [
{:buffer, {:output, wrap_into_buffer(au, pts, dts, state.output_alignment)}}
]

state =
if state.mode == :nalu_aligned and state.previous_timestamps != {buffer_pts, buffer_dts} do
%{state | previous_timestamps: {buffer_pts, buffer_dts}}
else
state
end

{actions, state}
{sps_actions ++ buffers_actions, state}
end)
end

defp maybe_parse_sps(au, state, profile) do
defp maybe_parse_sps(au, state) do
case Enum.find(au, &(&1.type == :sps)) do
nil ->
{[], profile}
{[], state}

sps_nalu ->
fmt =
Expand All @@ -319,43 +307,35 @@ defmodule Membrane.H264.Parser do
output_alignment: state.output_alignment
)

{[stream_format: {:output, fmt}], fmt.profile}
max_frame_reorder = if fmt.profile in [:baseline, :constrained_baseline], do: 0, else: 15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, that can cause some problems when the profile is changed during the stream run


state = %{
state
| profile: fmt.profile,
max_frame_reorder: state.max_frame_reorder || max_frame_reorder
}

{[stream_format: {:output, fmt}], state}
end
end

defp prepare_timestamps(_buffer_pts, _buffer_dts, state, profile, frame_order_number)
when state.mode == :bytestream do
cond do
state.framerate == nil or profile == nil ->
{nil, nil}

h264_profile_tsgen_supported?(profile) ->
generate_ts_with_constant_framerate(
defp prepare_timestamps(au, state) do
if state.mode == :bytestream and state.generate_best_effort_timestamps do
{timestamps, timestamp_generator} =
AUTimestampGenerator.generate_ts_with_constant_framerate(
au,
state.framerate,
frame_order_number,
frame_order_number
state.max_frame_reorder,
state.au_timestamp_generator
)

true ->
raise("Timestamp generation for H264 profile `#{inspect(profile)}` is unsupported")
end
end

defp prepare_timestamps(buffer_pts, buffer_dts, state, _profile, _frame_order_number)
when state.mode == :nalu_aligned do
if state.previous_timestamps == {nil, nil} do
{buffer_pts, buffer_dts}
{timestamps, %{state | au_timestamp_generator: timestamp_generator}}
else
state.previous_timestamps
{hd(au).timestamps, state}
end
end

defp prepare_timestamps(buffer_pts, buffer_dts, state, _profile, _frame_order_number)
when state.mode == :au_aligned do
{buffer_pts, buffer_dts}
end

defp maybe_add_parameter_sets(au, %{repeat_parameter_sets?: false}), do: au
defp maybe_add_parameter_sets(au, %{repeat_parameter_sets: false}), do: au

defp maybe_add_parameter_sets(au, state) do
if idr_au?(au),
Expand All @@ -367,7 +347,7 @@ defmodule Membrane.H264.Parser do
if idr_au?(au), do: Enum.uniq(au), else: au
end

defp cache_parameter_sets(%{repeat_parameter_sets?: false} = state, _au), do: state
defp cache_parameter_sets(%{repeat_parameter_sets: false} = state, _au), do: state

defp cache_parameter_sets(state, au) do
sps =
Expand Down Expand Up @@ -468,19 +448,6 @@ defmodule Membrane.H264.Parser do

defp stream_format_sent?(_actions, _ctx), do: true

defp h264_profile_tsgen_supported?(profile),
do: profile in [:baseline, :constrained_baseline]

defp generate_ts_with_constant_framerate(
{frames, seconds} = _framerate,
presentation_order_number,
decoding_order_number
) do
pts = div(presentation_order_number * seconds * Membrane.Time.second(), frames)
dts = div(decoding_order_number * seconds * Membrane.Time.second(), frames)
{pts, dts}
end

defp get_frame_prefix!(dcr, state) do
cond do
dcr == nil ->
Expand Down
Loading