/
parser.ex
139 lines (117 loc) · 4 KB
/
parser.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
defmodule Membrane.RawVideo.Parser do
@moduledoc """
Simple module responsible for splitting the incoming buffers into
frames of raw (uncompressed) video frames of desired format.
The parser sends proper stream_format when moves to playing state.
No data analysis is done, this element simply ensures that
the resulting packets have proper size.
"""
use Membrane.Filter
alias Membrane.{Buffer, Payload}
alias Membrane.{RawVideo, RemoteStream}
def_input_pad :input,
demand_unit: :bytes,
demand_mode: :auto,
accepted_format: RemoteStream
def_output_pad :output,
demand_mode: :auto,
accepted_format: %RawVideo{aligned: true}
def_options pixel_format: [
spec: RawVideo.pixel_format_t(),
description: """
Format used to encode pixels of the video frame.
"""
],
width: [
spec: pos_integer(),
description: """
Width of a frame in pixels.
"""
],
height: [
spec: pos_integer(),
description: """
Height of a frame in pixels.
"""
],
framerate: [
spec: RawVideo.framerate_t(),
default: {0, 1},
description: """
Framerate of video stream. Passed forward in stream_format.
"""
]
@supported_formats [:I420, :I422, :I444, :RGB, :BGRA, :RGBA, :NV12, :NV21, :YV12, :AYUV, :YUY2]
@impl true
def handle_init(_ctx, opts) do
unless opts.pixel_format in @supported_formats do
raise """
Unsupported frame pixel format: #{inspect(opts.pixel_format)}
The elements supports: #{Enum.map_join(@supported_formats, ", ", &inspect/1)}
"""
end
frame_size =
case RawVideo.frame_size(opts.pixel_format, opts.width, opts.height) do
{:ok, frame_size} ->
frame_size
{:error, :invalid_dimensions} ->
raise "Provided dimensions (#{opts.width}x#{opts.height}) are invalid for #{inspect(opts.pixel_format)} pixel format"
end
stream_format = %RawVideo{
pixel_format: opts.pixel_format,
width: opts.width,
height: opts.height,
framerate: opts.framerate,
aligned: true
}
{num, denom} = stream_format.framerate
frame_duration = if num == 0, do: 0, else: Ratio.new(denom * Membrane.Time.second(), num)
{[],
%{
stream_format: stream_format,
timestamp: 0,
frame_duration: frame_duration,
frame_size: frame_size,
queue: []
}}
end
@impl true
def handle_playing(_ctx, state) do
{[stream_format: {:output, state.stream_format}], state}
end
@impl true
def handle_stream_format(:input, _stream_format, _ctx, state) do
# Do not forward stream_format
{[], state}
end
@impl true
def handle_process_list(:input, buffers, _ctx, state) do
%{frame_size: frame_size} = state
payload_iodata =
buffers |> Enum.map(fn %Buffer{payload: payload} -> Payload.to_binary(payload) end)
queue = [payload_iodata | state.queue]
size = IO.iodata_length(queue)
if size < frame_size do
{[], %{state | queue: queue}}
else
data_binary = queue |> Enum.reverse() |> IO.iodata_to_binary()
{payloads, tail} = Bunch.Binary.chunk_every_rem(data_binary, frame_size)
{bufs, state} =
payloads
|> Enum.map_reduce(state, fn payload, state_acc ->
timestamp = state_acc.timestamp |> Ratio.floor()
{%Buffer{payload: payload, pts: timestamp}, bump_timestamp(state_acc)}
end)
{[buffer: {:output, bufs}], %{state | queue: [tail]}}
end
end
defp bump_timestamp(%{stream_format: %{framerate: {0, _}}} = state) do
state
end
defp bump_timestamp(state) do
use Ratio
%{timestamp: timestamp, frame_duration: frame_duration} = state
timestamp = timestamp + frame_duration
%{state | timestamp: timestamp}
end
end