New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automatic demands #313
Automatic demands #313
Conversation
e583440
to
975e084
Compare
d446e06
to
97ef055
Compare
b2feb42
to
e4cff9d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a first pass through this. I still don't have a full understanding of the link lifecycle and consequences of internal vs external links, so will try to get through that later today.
defp auto_demands_positive?(pad_ref, state) do | ||
PadModel.get_data!(state, pad_ref, :demand_pads) | ||
|> Enum.all?(&(PadModel.get_data!(state, &1, :demand) > 0)) | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a defp
only used in this file, where the entire pad data is retrieved unchanged in the caller, it would be safe and slightly more efficient to just pass in the pad data directly.
%{demand: demand, toilet: toilet} = data = PadModel.get_data!(state, pad_ref) | ||
demand_size = state.demand_size | ||
|
||
if demand <= div(demand_size, 2) and auto_demands_positive?(pad_ref, state) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if demand <= div(demand_size, 2) and auto_demands_positive?(pad_ref, state) do | |
if demand <= div(demand_size, 2) and auto_demands_positive?(data, state) do |
defp auto_demands_positive?(pad_ref, state) do | ||
PadModel.get_data!(state, pad_ref, :demand_pads) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defp auto_demands_positive?(pad_ref, state) do | |
PadModel.get_data!(state, pad_ref, :demand_pads) | |
defp auto_demands_positive?(pad_data, state) do | |
Map.get(pad_data, :demand_pads) |
Core.Child.LifecycleController.unlink(state) | ||
if state.__struct__ == Membrane.Core.Bin.State do | ||
case {old, new} do | ||
{:stopped, :perpared} -> Core.Child.PadController.check_for_unlinked_static_pads(state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{:stopped, :perpared} -> Core.Child.PadController.check_for_unlinked_static_pads(state) | |
{:stopped, :prepared} -> Core.Child.PadController.check_for_unlinked_static_pads(state) |
defp check_for_auto_demands(%{mode: :pull, demand_mode: :auto} = pad_data, state) do | ||
state = | ||
Enum.reduce(pad_data.demand_pads, state, fn pad, state -> | ||
PadModel.update_data!(state, pad, :demand_pads, &List.delete(&1, pad_data.ref)) | ||
end) | ||
|
||
if pad_data.direction == :output do | ||
Enum.reduce(pad_data.demand_pads, state, &DemandController.check_auto_demand/2) | ||
else | ||
state | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way this could be collapsed into one reduce? I'm also curious if PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
can be moved into DemandController
... it seems like every caller of DemandController.check_auto_demand/2
needs to update demand, then call check_auto_demand, which also updates demand. Could the demand logic account for this so the demand size is only updated once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose it could, but not sure how to do it to make it look better...
it seems like every caller of DemandController.check_auto_demand/2 needs to update demand
That's actually not true :P
then call check_auto_demand, which also updates demand
It doesn't always do so
Could the demand logic account for this so the demand size is only updated once?
Given the above facts, it's hard to achieve
I'm also curious if PadModel.set_data!(state, pad_ref, :demand, demand - buf_size) can be moved into DemandController
That's a good idea, I moved it
lib/membrane/child_entry.ex
Outdated
@@ -33,7 +33,8 @@ defmodule Membrane.ChildEntry do | |||
:pid, | |||
:clock, | |||
:sync, | |||
playback_synced?: false, | |||
:spec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no such key in the type spec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is almost always used as spec_ref
. Shouldn't that be the name? Plus I'm missing the docs about what it does
children_log_metadata: [], | ||
links: [] | ||
links: [], | ||
pending_specs: %{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not reflected in typespec
lib/membrane/core/element.ex
Outdated
_from, | ||
state | ||
) do | ||
PadController.handle_link(direction, this, other, other_info, state) |> reply(state) | ||
Membrane.Logger.debug("handle link") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is going to stay (and is not a leftover from testing) I think you should add more context to it i.e. what is connecting to what
lib/membrane/core/element.ex
Outdated
Message.new(:link_request, [_pad_ref, _direction, link_id, _pad_props]), | ||
state | ||
) do | ||
Membrane.Logger.debug("link request") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
lib/membrane/pad_data.ex
Outdated
@@ -62,5 +62,9 @@ defmodule Membrane.Pad.Data do | |||
sticky_messages: nil, | |||
input_buf: nil, | |||
demand: nil, | |||
options: %{} | |||
options: %{}, | |||
demand_pads: [], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about :demanding_pads
, or simply :associated_output_pads
? I had a hard time connecting the name of this field with its purpose.
end | ||
end | ||
|
||
@spec check_auto_demand(Pad.ref_t(), State.t()) :: State.t() | ||
def check_auto_demand(pad_ref, demand_decrease \\ 0, state) do | ||
%{demand: demand, toilet: toilet, demand_pads: demand_pads} = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to handle EOS by probably removing the pad from demand_pads
to make auto_demands_positive?
check work properly when one of the pads receives EOS.
@spec check_auto_demand(Pad.ref_t(), State.t()) :: State.t() | ||
def check_auto_demand(pad_ref, demand_decrease \\ 0, state) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the function allowing to decrease the demand and having side effects shouldn't be called check_X
when is_reference(toilet) do | ||
%{other_demand_unit: other_demand_unit, pid: pid} = data | ||
buf_size = Buffer.Metric.from_unit(other_demand_unit).buffers_size(buffers) | ||
toilet_size = :atomics.add_get(toilet, 1, buf_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to check the docs to find out what 1
means here. I'd add a module attribute naming it
EDIT: or even wrap the atomics with a very simple module hiding the implementation details, although that would be an additional call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Erlang API great as always ;) Not sure what is simpler though - this or having a wrapper (which could be a macro)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anything is better than this. Either do
toilet_size = :atomics.add_get(toilet, 1, buf_size) | |
toilet_size = :atomics.add_get(toilet, @toilet_size_idx, buf_size) |
or create a wrapper
toilet_size = :atomics.add_get(toilet, 1, buf_size) | |
toilet_size = AtomicCounter.add_get(toilet, buf_size) |
EDIT: Another idea - create a wrapper over Toilet
and hide the ascii art logging there
buf_size = Buffer.Metric.from_unit(other_demand_unit).buffers_size(buffers) | ||
toilet_size = :atomics.add_get(toilet, 1, buf_size) | ||
|
||
if toilet_size > 200 do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Module attribute? BTW, isn't too low for demands in bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're going to replace that with some smart algorithm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any smart algorithm right now. This value will break the toilet for byte-pushing elements and I don't think it can be merged like this
state | ||
) do | ||
buf_size = Buffer.Metric.from_unit(other_demand_unit).buffers_size(buffers) | ||
PadModel.update_data!(state, pad_ref, :demand, &(&1 - buf_size)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pair of get_in & put_in seems to be more than 2 times faster than update:
map = %{
list: [],
counter: 0,
pad: %{
:input => %{cnt: 1},
{:input, 0} => %{cnt: 1}
},
and_some: :atom
}
range = 1..10_000
cases = %{
put: fn ->
range
|> Enum.reduce(map, fn i, acc ->
cnt = acc |> Bunch.Access.get_in([:pad, :input, :cnt])
acc |> Bunch.Access.put_in([:pad, :input, :cnt], cnt + i)
end)
end,
update: fn ->
range
|> Enum.reduce(map, fn i, acc ->
acc |> Bunch.Access.update_in([:pad, :input, :cnt], &(&1 + i))
end)
end
}
Benchee.run(cases)
Operating System: Linux
CPU Information: AMD Ryzen 7 PRO 4750U with Radeon Graphics
Number of Available Cores: 16
Available memory: 29.11 GB
Elixir 1.12.3
Erlang 24.1
Benchmark suite executing with the following configuration:
warmup: 2 s
time: 5 s
memory time: 0 ns
parallel: 1
inputs: none specified
Estimated total run time: 14 s
Benchmarking put...
Benchmarking update...
| Name | ips | average | deviation | median | 99th % |
| put | 27.62 | 36.21 ms | ±2.88% | 35.68 ms | 40.21 ms |
| update | 11.25 | 88.85 ms | ±2.56% | 87.69 ms | 95.41 ms |
Comparison:
put 27.62
update 11.25 - 2.45x slower +52.65 ms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd optimize the PadModel itself, but in a separate PR
lib/membrane/core/element/state.ex
Outdated
@@ -81,7 +82,8 @@ defmodule Membrane.Core.Element.State do | |||
clock: nil, | |||
stream_sync: options.sync, | |||
latency: 0 | |||
} | |||
}, | |||
demand_size: 4000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered making it configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it ultimately should somehow automatically adapt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now it doesn't so it should be configurable to have a workaround if the value isn't appropriate
test/membrane/core/element_test.exs
Outdated
accepted_caps: :any, | ||
availability: :always, | ||
direction: :output, | ||
mode: :pull, | ||
name: :output, | ||
options: nil | ||
} | ||
} = reply |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you should rather change the asserted result
}, | ||
sink: Testing.Sink | ||
] | ||
}) | ||
|
||
Process.sleep(2000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't forget to remove
lib/membrane/parent_spec.ex
Outdated
to `#{Membrane.Core.Element.InputQueue.default_demand_excess_factor()}` (the default may change in the future). | ||
- `min_demand_factor` - A factor by which the minimal demand is multiplied by. Used only for pads working in pull | ||
in a queue called toilet. If the toilet size grows above its capacity, it overflows by raising an error. | ||
- `demand_excess` - Used only for pads working in pull mode with manual demands. See `t:Membrane.Pad.mode_t/0` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mode_t
has invalid link in its description (missing /0
for demand_mode_t
)
lib/membrane/parent_spec.ex
Outdated
- `demand_excess` - Used only for pads working in pull mode with manual demands. See `t:Membrane.Pad.mode_t/0` | ||
and `t:Membrane.Pad.demand_mode_t/0` for more info. | ||
- `min_demand_factor` - A factor used to calculate minimal demand (`minimal_demand = demand_excess * min_demand_factor`) | ||
by which `demand_excess` is multiplied by to calculate the minimal demand. Used only for pads working in pull | ||
mode with manual demands. See `t:Membrane.Pad.mode_t/0` and `t:Membrane.Pad.demand_mode_t/0` for more info. Defaults | ||
to `#{Membrane.Core.Element.InputQueue.default_min_demand_factor()}` (the default may change in the future). | ||
- `auto_demand_size_factor` - A factor by which the auto demand size multiplied by. Used only for pads working in pull | ||
mode with automatic demands. See `t:Membrane.Pad.mode_t/0` and `t:Membrane.Pad.demand_mode_t/0` for more info. Defaults | ||
to `#{Membrane.Core.Element.DemandHandler.default_auto_demand_size_factor()}` (the default may change in the future). | ||
- `auto_demand_size` - Used only for pads working in pull mode with automatic demands. See `t:Membrane.Pad.mode_t/0` | ||
and `t:Membrane.Pad.demand_mode_t/0` for more info. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs miss the explanation of what those values do
lib/membrane/parent_spec.ex
Outdated
- `demand_excess` - Demand that will be generated automatically by Membrane to allow smooth, concurrent processing. | ||
All buffers received in response to that demand will be queued internally until they are actually demanded by user. | ||
Used only for pads working in pull mode with manual demands. See `t:Membrane.Pad.mode_t/0` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still doesn't explain everything and only remembering the old preffered_size
I could get it. I'd modify the docs and change the name of the option:
- `demand_excess` - Demand that will be generated automatically by Membrane to allow smooth, concurrent processing. | |
All buffers received in response to that demand will be queued internally until they are actually demanded by user. | |
Used only for pads working in pull mode with manual demands. See `t:Membrane.Pad.mode_t/0` | |
- `initial_demand` - The initial demand sent to the linked output pad. It is used to fill the queue of the input pad. | |
That allows for fulfilling the demands of the element by taking data from the queue while the actual sending of demands is done asynchronously, smoothing the processing. | |
Used only for pads working in pull mode with manual demands. See `t:Membrane.Pad.mode_t/0` |
Ideas for alternative names:
warm_up_demand
starting_demand
This may however affect the way min_demand_factor
is defined, I'm not sure if if makes sense to define the min_demand
as function of this demand and calculated using a factor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initial demand
seems like it was only sent initially and didn't matter afterwards. I refactored the description but left demand_excess
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! 🎉
closes #285
The idea
This PR adds support for the automatic handling of demands in filters. The idea is to specify relationships between input and output pads, and instead of calling
handle_demand
callback and handlingdemand
andredemand
actions, automatically calculate and forward the demands between the related pads. The profits of that approach are the following:The implementation
The implementation does not introduce any breaking changes to the API
Relaying demands
Let's assume we have the following pipeline:
A -> B -> C
working in pull mode, and B has auto demands turned on. The flow from B's perspective goes as follows:
a) initially, B sends demand of arbitrary size (hardcoded, at least for now) to A
b) when B receives a demand from C, it stores it in state and checks whether there's some demand on A remaining
c) when B receives buffers from A, it subtracts their size from the demand on A and checks if it's still positive
Because of checking the output demand before buffers are processed, we can avoid making the processing synchronous. On the other hand, by sending buffers immediately, we avoid using input buffer, which brings some overhead. That may theoretically lead to generating many buffers before slowing down the stream in some specific cases. If that becomes a problem, we may want to try falling back to the input buffer in such cases.
Atomic toilet
Since the new demanding mode doesn't use the input buffer, we had to solve the toilet separately. On the other hand, the toilet in the input buffer had a serious problem. Let's consider a pipeline:
A -> [toilet] B -> C
If the C slows down, and B doesn't receive enough demands from it, while A still sends data, the toilet will overflow. However, if B slows down a lot and hangs on processing a few buffers for a long time, the new buffers can stuck in the mailbox and the toilet won't be aware of them.
The only way of solving that I have figured out was to move the responsibility to the sender - A in this case. Each time the sender sends a buffer, it increments the toilet size and checks if it doesn't exceed the limit. If it does, A kills the receiver - B. On the receiver side, the toilet size is decremented upon each received buffer. That's possible without any message exchange, thanks to erlang's atomics.
Toilet overflow detection algorithm
That brings us to the toilet overflow detection algorithm. Currently, the toilet has a hard size limit, above which it crashes. It's difficult to provide a good default for it though. It's also not easy to configure it properly. We should think over how to solve it better - possibly we should see how the size behaves in some small period of time and decide whether to fail based on that. We could also make it possible to configure whether to kill the sender or the receiver in case of overflow.
Bin linking strategy
Sharing the toilet atomics and configuration between elements within different bins occurred to be difficult and error-prone in the current state of things. It was a clear sign that it's about the time to refactor that, possibly avoiding bins proxying media across their boundaries. Unfortunately, it turned out to be not an easy task. Here's how it's solved now:
GenServer.call
s owner of the output pad, which in turn calls the owner of the input pad. Bins proxy that calls according to the bindings from the link request phase.This solution allows bins to spawn children or do any asynchronous work when a pad is added and makes the links established directly between elements. Thanks to that, we could get rid of the bin's linking buffer and media proxying in bins.
To do
Further work (issues to be created):