diff --git a/CHANGELOG.md b/CHANGELOG.md index 13e45d83a..1f0f39529 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,23 +1,18 @@ # Changelog -## 1.0.0 +## 0.12.1 * Introduce `:remove_link` action in pipelines and bins. * Add children groups - a mechanism that allows refering to multiple children with a single identifier. - * Rename `remove_child` action into `remove_children` and allow for removing a children group with a single action. * Add an ability to spawn anonymous children. - * Replace `Membrane.Time.round_to_` with `Membrane.Time.as_/2` with second argument equal `:round`. Rename `Membrane.Time.round_to_timebase` to `Membrane.Time.divide_by_timebase/2`. [#494](https://github.com/membraneframework/membrane_core/pull/494) * Remove `:playback` action. Introduce `:setup` action. [#496](https://github.com/membraneframework/membrane_core/pull/496) * Add `Membrane.Testing.Pipeline.get_child_pid/2`. [#497](https://github.com/membraneframework/membrane_core/pull/497) * Make callback contexts to be maps. [#504](https://github.com/membraneframework/membrane_core/pull/504) * All Membrane Elements can be compatible till now on - pads working in `:pull` mode, handling different `demand_units`, can be now linked. * Output pads working in `:pull` mode should have their `demand_unit` specified. If case it's not available, it's assumed that the pad handles demands in both `:bytes` and `:buffers` units. - * Rename callbacks `handle_process/4` and `handle_write/4` to `handle_buffer/4` in [#506](https://github.com/membraneframework/membrane_core/pull/506) - * The flow control of the pad is now set with a single `:flow_control` option instead of `:mode` and `:demand_mode` options. * Remove _t suffix from types [#509](https://github.com/membraneframework/membrane_core/pull/509) * Implement automatic demands in Membrane Sinks and Endpoints. [#512](https://github.com/membraneframework/membrane_core/pull/512) * Add `handle_child_pad_removed/4` callback in Bins and Pipelines. [#513](https://github.com/membraneframework/membrane_core/pull/513) * Introduce support for crash groups in Bins. [#521](https://github.com/membraneframework/membrane_core/pull/521) - * Remove `assert_pipeline_play/2` from `Membrane.Testing.Assertions`. [#528](https://github.com/membraneframework/membrane_core/pull/528) * Make sure enumerable with all elements being `Membrane.Buffer.t()`, passed as `:output` parameter for `Membrane.Testing.Source` won't get rewrapped in `Membrane.Buffer.t()` struct. * Implement `Membrane.Debug.Filter` and `Membrane.Debug.Sink`. [#552](https://github.com/membraneframework/membrane_core/pull/552) diff --git a/README.md b/README.md index 2a71cf546..4d2d7f8a0 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ This package provides core of the [Membrane Multimedia Framework](https://membra Add the following line to your `deps` in `mix.exs`. Run `mix deps.get`. ```elixir -{:membrane_core, "~> 0.11.0"} +{:membrane_core, "~> 0.12.1"} ``` Or, if you'd like to try the latest release candidate, use this version: diff --git a/benchmark/run/branched_filter.ex b/benchmark/run/branched_filter.ex index d7669fe83..952f980b0 100644 --- a/benchmark/run/branched_filter.ex +++ b/benchmark/run/branched_filter.ex @@ -4,8 +4,8 @@ defmodule Benchmark.Run.BranchedFilter do alias Benchmark.Run.Reductions - def_input_pad :input, accepted_format: _any, availability: :on_request - def_output_pad :output, accepted_format: _any, availability: :on_request + def_input_pad :input, accepted_format: _any, availability: :on_request, flow_control: :auto + def_output_pad :output, accepted_format: _any, availability: :on_request, flow_control: :auto def_options number_of_reductions: [spec: integer()], generator: [spec: (integer() -> integer())], diff --git a/benchmark/run/linear_filter.ex b/benchmark/run/linear_filter.ex index 4d47f09be..f41e56700 100644 --- a/benchmark/run/linear_filter.ex +++ b/benchmark/run/linear_filter.ex @@ -4,8 +4,8 @@ defmodule Benchmark.Run.LinearFilter do alias Benchmark.Run.Reductions - def_input_pad :input, accepted_format: _any - def_output_pad :output, accepted_format: _any + def_input_pad :input, accepted_format: _any, flow_control: :auto + def_output_pad :output, accepted_format: _any, flow_control: :auto def_options number_of_reductions: [spec: integer()], generator: [spec: (integer() -> integer())] diff --git a/guides/upgrading/v0.11.md b/guides/upgrading/v0.11.md index ed6334187..0c40ee201 100644 --- a/guides/upgrading/v0.11.md +++ b/guides/upgrading/v0.11.md @@ -1,6 +1,6 @@ # Upgrading to v0.11 -Improvements in v0.11 required some breaking changes, so here comes the guide that will help you adjust your code to the new API. See the [changelog](https://github.com/membraneframework/membrane_core/releases/tag/v0.11.0) for details. +Improvements in v0.11 required some breaking changes, so here comes the guide that will help you adjust your code to the new API. See the [release notes](https://github.com/membraneframework/membrane_core/releases/tag/v0.11.0) for details. ## Deps upgrade diff --git a/guides/upgrading/v0.12.md b/guides/upgrading/v0.12.md new file mode 100644 index 000000000..d1bda611f --- /dev/null +++ b/guides/upgrading/v0.12.md @@ -0,0 +1,65 @@ +# Upgrading to v0.12 + +Between v0.11 and v0.12 some breaking changes have occurred, so here comes the guide that will help you adjust your code to the new API. See the [release notes](https://github.com/membraneframework/membrane_core/releases/tag/v0.12.1) for details. + +## Deps upgrade + +Upgrade `membrane_core` to `v0.12.1`. + +```elixir +defp deps do + [ + {:membrane_core, "~> 0.12.1"}, + ... + ] +end +``` + +## Implement `handle_child_pad_removed/4` callback in bins and pipelines, if it is needed + +Now, if bin removes its pad (e.g. by removing an element linked to the bin's inner pad), bin's parent has to have implemented proper `handle_child_pad_removed/4` callback, to handle it. If there is no such a callback, default behaviour is to raise an error. + +```elixir +@impl true +def handle_child_pad_removed(:rtp, Pad.ref(:rtp_input, _ssrc), _ctx, state) do + # ... +end +``` + +## Remove `:playback` action + +Now, membrane pipelines enter the playing playback by default and they don't have to return a `:playback` action to do it. + +```diff +- @impl true +- def handle_setup(_ctx, state) do +- {[playback: :playing], state} +- end +``` +Instead of it, there is a new action introduced in `membrane_core` v0.12, `setup: :incomplete | :complete`. If you want to defer a moment when a component enters the playing playback, you can return `{:setup, :incomplete}` action from `handle_setup` callback. If you do that, a component will enter the playing playback only when you return `{:setup, :complete}` action from another callback, e.g. `handle_info`. + +```diff +- @impl true +- def handle_setup(_ctx, state) do +- Process.send_after(self(), :play, 1000) +- {[], state} +- end +- +- @impl true +- def handle_info(:play, _ctx, state) do +- {[playback: :playing], state} +- end + ++ @impl true ++ def handle_setup(_ctx, state) do ++ Process.send_after(self(), :play, 1000) ++ {[setup: :incomplete], state} ++ end ++ ++ @impl true ++ def handle_info(:play, _ctx, state) do ++ {[setup: :complete], state} ++ end +``` + +`:setup` action is available not only in pipelines but in bins and elements as well. diff --git a/guides/upgrading/v1.0.0-rc0.md b/guides/upgrading/v1.0.0-rc0.md index c617e0f97..97b146c29 100644 --- a/guides/upgrading/v1.0.0-rc0.md +++ b/guides/upgrading/v1.0.0-rc0.md @@ -1,6 +1,6 @@ # Upgrading to v1.0.0-rc0 -Between v0.11 and v1.0.0-rc0 some breaking changes have occurred, so here comes the guide that will help you adjust your code to the new API. See the [changelog](https://github.com/membraneframework/membrane_core/releases/tag/v1.0.0-rc0) for detailed description of the changes. +Between v0.11 and v1.0.0-rc0 some breaking changes have occurred, so here comes the guide that will help you adjust your code to the new API. See the [release notes](https://github.com/membraneframework/membrane_core/releases/tag/v1.0.0-rc0) for detailed description of the changes. ### Deps upgrade diff --git a/lib/membrane/bin.ex b/lib/membrane/bin.ex index aaf2620f3..852ccba08 100644 --- a/lib/membrane/bin.ex +++ b/lib/membrane/bin.ex @@ -50,7 +50,7 @@ defmodule Membrane.Bin do Callback that is called when new pad has been added to bin. Executed ONLY for dynamic pads. - Context passed to this callback contains additional field `:pad_options`. + Context passed to this callback contains additional field `:options`. """ @callback handle_pad_added( pad :: Pad.ref(), @@ -62,7 +62,7 @@ defmodule Membrane.Bin do Callback that is called when some pad of the bin has been removed. Executed ONLY for dynamic pads. - Context passed to this callback contains additional field `:pad_options`. + Context passed to this callback contains additional field `:options`. """ @callback handle_pad_removed( pad :: Pad.ref(), @@ -93,7 +93,7 @@ defmodule Membrane.Bin do Callback invoked when a child removes its pad. The callback won't be invoked, when you have initiated the pad removal, - eg. when you have returned `t:Membrane.Bin.Action.remove_link()` action + e.g. when you have returned `t:Membrane.Bin.Action.remove_link()` action which made one of your children's pads be removed. """ @callback handle_child_pad_removed( diff --git a/lib/membrane/bin/action.ex b/lib/membrane/bin/action.ex index f813861d5..c3e848adf 100644 --- a/lib/membrane/bin/action.ex +++ b/lib/membrane/bin/action.ex @@ -53,6 +53,13 @@ defmodule Membrane.Bin.Action do | Membrane.Child.group() | [Membrane.Child.group()]} + @type remove_child :: + {:remove_child, + Child.name() + | [Child.name()] + | Membrane.Child.group() + | [Membrane.Child.group()]} + @typedoc """ Action that removes link, which relates to specified child and pad. @@ -134,6 +141,7 @@ defmodule Membrane.Bin.Action do | notify_parent | spec | remove_children + | remove_child | remove_link | start_timer | timer_interval diff --git a/lib/membrane/bin/callback_context.ex b/lib/membrane/bin/callback_context.ex index e5c61ecf0..c9d9e7fca 100644 --- a/lib/membrane/bin/callback_context.ex +++ b/lib/membrane/bin/callback_context.ex @@ -6,7 +6,7 @@ defmodule Membrane.Bin.CallbackContext do @typedoc """ Type describing context passed to the `Membrane.Bin` callbacks. - Field `:pad_options` is present only in `c:Membrane.Bin.handle_pad_added/3` + Field `:options` is present only in `c:Membrane.Bin.handle_pad_added/3` and `c:Membrane.Bin.handle_pad_removed/3`. Fields `:members` and `:crash_initiator` are present only in @@ -21,7 +21,7 @@ defmodule Membrane.Bin.CallbackContext do :playback => Membrane.Playback.t(), :resource_guard => Membrane.ResourceGuard.t(), :utility_supervisor => Membrane.UtilitySupervisor.t(), - optional(:pad_options) => map(), + optional(:options) => map(), optional(:members) => [Membrane.Child.name()], optional(:crash_initiator) => Membrane.Child.name() } diff --git a/lib/membrane/clock.ex b/lib/membrane/clock.ex index bc42e406f..c8c231032 100644 --- a/lib/membrane/clock.ex +++ b/lib/membrane/clock.ex @@ -50,7 +50,6 @@ defmodule Membrane.Clock do non_neg_integer | ratio | {numerator :: non_neg_integer, denominator :: pos_integer()}} - @typedoc """ Ratio message sent by the Clock to all its subscribers. It contains the ratio of the custom clock time to the reference time. @@ -119,7 +118,7 @@ defmodule Membrane.Clock do state = %{ - ratio: Ratio.new(1), + ratio: 1, subscribers: %{}, time_provider: options |> Keyword.get(:time_provider, fn -> Time.monotonic_time() end) } @@ -141,7 +140,7 @@ defmodule Membrane.Clock do subscribe(proxy_for) state else - broadcast_and_update_ratio(Ratio.new(1), state) + broadcast_and_update_ratio(1, state) end {:noreply, state} @@ -201,7 +200,7 @@ defmodule Membrane.Clock do defp get_proxy_options(true, _proxy_for), do: %{proxy: true, proxy_for: nil} defp get_proxy_options(_proxy, _proxy_for), - do: %{init_time: nil, clock_time: Ratio.new(0), till_next: nil, proxy: false} + do: %{init_time: nil, clock_time: 0, till_next: nil, proxy: false} defp handle_unsubscribe(pid, state) do Process.demonitor(state.subscribers[pid].monitor, [:flush]) @@ -213,13 +212,13 @@ defmodule Membrane.Clock do end defp handle_clock_update(till_next, state) do - till_next = Ratio.new(till_next) + use Ratio - if Ratio.lt?(till_next, 0) do + if till_next < 0 do raise "Clock update time cannot be negative, received: #{inspect(till_next)}" end - till_next = Ratio.mult(till_next, Ratio.new(Time.millisecond())) + till_next = till_next * Time.millisecond() case state.init_time do nil -> %{state | init_time: state.time_provider.(), till_next: till_next} @@ -228,9 +227,10 @@ defmodule Membrane.Clock do end defp do_handle_clock_update(till_next, state) do + use Ratio %{till_next: from_previous, clock_time: clock_time} = state - clock_time = Ratio.add(clock_time, from_previous) - ratio = Ratio.div(clock_time, Ratio.new(state.time_provider.() - state.init_time)) + clock_time = clock_time + from_previous + ratio = clock_time / (state.time_provider.() - state.init_time) state = %{state | clock_time: clock_time, till_next: till_next} broadcast_and_update_ratio(ratio, state) end diff --git a/lib/membrane/core/bin/action_handler.ex b/lib/membrane/core/bin/action_handler.ex index a4888c529..986fd0486 100644 --- a/lib/membrane/core/bin/action_handler.ex +++ b/lib/membrane/core/bin/action_handler.ex @@ -43,6 +43,11 @@ defmodule Membrane.Core.Bin.ActionHandler do Parent.ChildLifeController.handle_remove_children(children, state) end + @impl CallbackHandler + def handle_action({:remove_child, children}, _cb, _params, state) do + Parent.ChildLifeController.handle_remove_children(children, state) + end + @impl CallbackHandler def handle_action({:remove_link, {child_name, pad_ref}}, _cb, _params, state) do Parent.ChildLifeController.handle_remove_link(child_name, pad_ref, state) diff --git a/lib/membrane/core/bin/callback_context.ex b/lib/membrane/core/bin/callback_context.ex index 0659d9c12..0a6b4e8c8 100644 --- a/lib/membrane/core/bin/callback_context.ex +++ b/lib/membrane/core/bin/callback_context.ex @@ -1,7 +1,7 @@ defmodule Membrane.Core.Bin.CallbackContext do @moduledoc false - @type optional_fields :: [pad_options: map()] + @type optional_fields :: [options: map()] @spec from_state(Membrane.Core.Bin.State.t(), optional_fields()) :: Membrane.Bin.CallbackContext.t() diff --git a/lib/membrane/core/bin/pad_controller.ex b/lib/membrane/core/bin/pad_controller.ex index 946c127a9..a72be100b 100644 --- a/lib/membrane/core/bin/pad_controller.ex +++ b/lib/membrane/core/bin/pad_controller.ex @@ -330,7 +330,7 @@ defmodule Membrane.Core.Bin.PadController do %{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref) if Pad.availability_mode(availability) == :dynamic do - context = &CallbackContext.from_state(&1, pad_options: pad_opts) + context = &CallbackContext.from_state(&1, options: pad_opts) CallbackHandler.exec_and_handle_callback( :handle_pad_added, diff --git a/lib/membrane/core/child/pads_specs.ex b/lib/membrane/core/child/pads_specs.ex index 02b7907c0..9d44f2a41 100644 --- a/lib/membrane/core/child/pads_specs.ex +++ b/lib/membrane/core/child/pads_specs.ex @@ -188,44 +188,82 @@ defmodule Membrane.Core.Child.PadsSpecs do |> Bunch.Config.parse( availability: [in: [:always, :on_request], default: :always], accepted_formats_str: [], - flow_control: fn _config -> - cond do - component == :bin -> - nil - - direction == :output and component != :filter -> - [in: [:manual, :push]] - - direction == :input or component == :filter -> - [in: [:auto, :manual, :push], default: :auto] - end - end, - demand_unit: - &cond do - component == :bin or &1[:flow_control] != :manual -> - nil - - direction == :input -> - [in: [:buffers, :bytes]] - - direction == :output -> - [in: [:buffers, :bytes, nil], default: nil] - - true -> - nil - end, - options: [default: nil] + options: [default: nil], + demand_unit: [in: [:buffers, :bytes, nil], default: nil], + flow_control: flow_control_parsing_options(config, direction, component), + mode: mode_parsing_options(config, component), + demand_mode: demand_mode_parsing_options(config, direction, component) ) do - config - |> Map.put(:direction, direction) - |> Map.put(:name, name) - ~> {:ok, {name, &1}} + case config do + _config when component == :bin -> + config + |> Map.drop([:demand_unit, :mode, :demand_mode, :flow_control]) + + %{mode: :push} -> + config + |> Map.drop([:mode, :demand_mode]) + |> Map.put(:flow_control, :push) + + %{mode: :pull, demand_mode: demand_mode} -> + config + |> Map.drop([:mode, :demand_mode]) + |> Map.put(:flow_control, demand_mode) + + %{flow_control: _flow_control} -> + config + end + |> Map.merge(%{direction: direction, name: name}) + |> then(&{:ok, {name, &1}}) else spec: spec -> {:error, {:invalid_pad_spec, spec}} config: {:error, reason} -> {:error, {reason, pad: name}} end end + defp mode_parsing_options(config, component) do + old? = old_api?(config) + + fn _config -> + if old? or component == :bin do + [in: [:pull, :push], default: :pull] + else + nil + end + end + end + + defp demand_mode_parsing_options(config, direction, component) do + old? = old_api?(config) + + fn _config -> + cond do + not old? -> nil + auto_allowed?(direction, component) -> [in: [:manual, :auto], default: :manual] + true -> [in: [:manual], default: :manual] + end + end + end + + defp flow_control_parsing_options(config, direction, component) do + old? = old_api?(config) + + fn _config -> + cond do + old? -> nil + auto_allowed?(direction, component) -> [in: [:auto, :manual, :push], default: :manual] + true -> [in: [:manual, :push], default: :manual] + end + end + end + + defp old_api?(config) do + Keyword.has_key?(config, :mode) or Keyword.has_key?(config, :demand_mode) + end + + defp auto_allowed?(direction, component) do + direction == :input or component in [:filter, :bin] + end + @doc """ Generates docs describing pads based on pads specification. """ diff --git a/lib/membrane/core/element/callback_context.ex b/lib/membrane/core/element/callback_context.ex index 4b1ba6af5..ef8dd83b7 100644 --- a/lib/membrane/core/element/callback_context.ex +++ b/lib/membrane/core/element/callback_context.ex @@ -3,7 +3,7 @@ defmodule Membrane.Core.Element.CallbackContext do @type optional_fields :: [incoming_demand: non_neg_integer()] - | [pad_options: map()] + | [options: map()] | [old_stream_format: Membrane.StreamFormat.t()] @spec from_state(Membrane.Core.Element.State.t(), optional_fields()) :: diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 173c33c38..eb7308b6f 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -446,7 +446,7 @@ defmodule Membrane.Core.Element.PadController do %{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref) if Pad.availability_mode(availability) == :dynamic do - context = &CallbackContext.from_state(&1, pad_options: pad_opts) + context = &CallbackContext.from_state(&1, options: pad_opts) CallbackHandler.exec_and_handle_callback( :handle_pad_added, diff --git a/lib/membrane/core/options_specs.ex b/lib/membrane/core/options_specs.ex index 1487d967f..3a3ee1e48 100644 --- a/lib/membrane/core/options_specs.ex +++ b/lib/membrane/core/options_specs.ex @@ -16,7 +16,7 @@ defmodule Membrane.Core.OptionsSpecs do * `default:` default value for option. If not present, value for this option will have to be provided each time options struct is created * `inspector:` function converting fields' value to a string. Used when - creating documentation instead of `inspect/1`, eg. `inspector: &Membrane.Time.inspect/1` + creating documentation instead of `inspect/1`, e.g. `inspector: &Membrane.Time.inspect/1` * `description:` string describing an option. It will be used for generating the docs """ end diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index 2bed81d9d..148876aa5 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -57,6 +57,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do @children_spec_options_fields_specs [ group: [require?: false], crash_group_mode: [require?: false], + crash_group: [require?: false], stream_sync: [require?: false], clock_provider: [require?: false], node: [require?: false], @@ -208,6 +209,13 @@ defmodule Membrane.Core.Parent.ChildLifeController do {:ok, options} = Bunch.Config.parse(options_keywords_list, @children_spec_options_fields_specs) + options = + with %{crash_group: {group_name, :temporary}} <- options do + options + |> Map.delete(:crash_group) + |> Map.merge(%{group: group_name, crash_group_mode: :temporary}) + end + options = Map.merge(defaults, options) options_to_pass_to_nested = diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index ea1196486..712a5ff0f 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -9,7 +9,8 @@ defmodule Membrane.Core.Pipeline.ActionHandler do alias Membrane.Core.Pipeline.State @impl CallbackHandler - def handle_action({action, _args}, :handle_init, _params, _state) when action != :spec do + def handle_action({action, _args}, :handle_init, _params, _state) + when action not in [:spec, :playback] do raise ActionError, action: action, reason: {:invalid_callback, :handle_init} end @@ -30,6 +31,11 @@ defmodule Membrane.Core.Pipeline.ActionHandler do Core.LifecycleController.handle_setup_operation(operation, state) end + @impl true + def handle_action({:playback, _playback}, _cb, _params, state) do + state + end + @impl CallbackHandler def handle_action({:notify_child, notification}, _cb, _params, state) do Parent.ChildLifeController.handle_notify_child(notification, state) @@ -46,6 +52,11 @@ defmodule Membrane.Core.Pipeline.ActionHandler do Parent.ChildLifeController.handle_remove_children(children, state) end + @impl CallbackHandler + def handle_action({:remove_child, children}, _cb, _params, state) do + Parent.ChildLifeController.handle_remove_children(children, state) + end + @impl CallbackHandler def handle_action({:remove_link, {child_name, pad_ref}}, _cb, _params, state) do Parent.ChildLifeController.handle_remove_link(child_name, pad_ref, state) diff --git a/lib/membrane/core/timer.ex b/lib/membrane/core/timer.ex index 5a23c957c..a304d7375 100644 --- a/lib/membrane/core/timer.ex +++ b/lib/membrane/core/timer.ex @@ -19,7 +19,12 @@ defmodule Membrane.Core.Timer do } @enforce_keys [:interval, :clock, :init_time, :id] - defstruct @enforce_keys ++ [next_tick_time: 0, ratio: Ratio.new(1), timer_ref: nil] + defstruct @enforce_keys ++ + [ + next_tick_time: 0, + ratio: %Ratio{denominator: 1, numerator: 1}, + timer_ref: nil + ] @spec start(id, interval, Clock.t()) :: t def start(id, interval, clock) do @@ -48,6 +53,8 @@ defmodule Membrane.Core.Timer do end def tick(timer) do + use Ratio + %__MODULE__{ id: id, interval: interval, @@ -56,7 +63,7 @@ defmodule Membrane.Core.Timer do ratio: ratio } = timer - next_tick_time = Ratio.add(Ratio.new(next_tick_time), Ratio.new(interval)) + next_tick_time = next_tick_time + interval # Next tick time converted to BEAM clock time beam_next_tick_time = @@ -67,7 +74,7 @@ defmodule Membrane.Core.Timer do timer_ref = Process.send_after(self(), Message.new(:timer_tick, id), beam_next_tick_time, abs: true) - %__MODULE__{timer | next_tick_time: next_tick_time |> Ratio.floor(), timer_ref: timer_ref} + %__MODULE__{timer | next_tick_time: next_tick_time, timer_ref: timer_ref} end @spec set_interval(t, interval) :: t diff --git a/lib/membrane/element/base.ex b/lib/membrane/element/base.ex index ef3e16d48..d0c5205e8 100644 --- a/lib/membrane/element/base.ex +++ b/lib/membrane/element/base.ex @@ -86,7 +86,7 @@ defmodule Membrane.Element.Base do Callback that is called when new pad has beed added to element. Executed ONLY for dynamic pads. - Context passed to this callback contains additional field `:pad_options`. + Context passed to this callback contains additional field `:options`. """ @callback handle_pad_added( pad :: Pad.ref(), @@ -98,7 +98,7 @@ defmodule Membrane.Element.Base do Callback that is called when some pad of the element has beed removed. Executed ONLY for dynamic pads. - Context passed to this callback contains additional field `:pad_options`. + Context passed to this callback contains additional field `:options`. """ @callback handle_pad_removed( pad :: Pad.ref(), diff --git a/lib/membrane/element/callback_context.ex b/lib/membrane/element/callback_context.ex index d1b4cd7e1..f93404063 100644 --- a/lib/membrane/element/callback_context.ex +++ b/lib/membrane/element/callback_context.ex @@ -9,7 +9,7 @@ defmodule Membrane.Element.CallbackContext do Field `:incoming_demand` is present only in `c:Membrane.Element.WithOutputPads.handle_demand/5`. - Field `:pad_options` is present only in `c:Membrane.Element.Base.handle_pad_added/3` + Field `:options` is present only in `c:Membrane.Element.Base.handle_pad_added/3` and `c:Membrane.Element.Base.handle_pad_removed/3`. Field `:old_stream_format` is present only in @@ -24,7 +24,7 @@ defmodule Membrane.Element.CallbackContext do :resource_guard => Membrane.ResourceGuard.t(), :utility_supervisor => Membrane.UtilitySupervisor.t(), optional(:incoming_demand) => non_neg_integer(), - optional(:pad_options) => map(), + optional(:options) => map(), optional(:old_stream_format) => Membrane.StreamFormat.t() } end diff --git a/lib/membrane/element/with_input_pads.ex b/lib/membrane/element/with_input_pads.ex index 257bf11f9..65f0302fb 100644 --- a/lib/membrane/element/with_input_pads.ex +++ b/lib/membrane/element/with_input_pads.ex @@ -76,7 +76,39 @@ defmodule Membrane.Element.WithInputPads do state :: Element.state() ) :: Membrane.Element.Base.callback_return() - @optional_callbacks handle_buffer: 4, handle_stream_format: 4 + @callback handle_write( + pad :: Pad.ref(), + buffer :: Buffer.t(), + context :: CallbackContext.t(), + state :: Element.state() + ) :: Membrane.Element.Base.callback_return() + + @callback handle_write_list( + pad :: Pad.ref(), + buffers :: list(Buffer.t()), + context :: CallbackContext.t(), + state :: Element.state() + ) :: Membrane.Element.Base.callback_return() + + @callback handle_process( + pad :: Pad.ref(), + buffer :: Buffer.t(), + context :: CallbackContext.t(), + state :: Element.state() + ) :: Membrane.Element.Base.callback_return() + + @callback handle_process_list( + pad :: Pad.ref(), + buffers :: list(Buffer.t()), + context :: CallbackContext.t(), + state :: Element.state() + ) :: Membrane.Element.Base.callback_return() + + @optional_callbacks handle_stream_format: 4, + handle_write: 4, + handle_write_list: 4, + handle_process: 4, + handle_process_list: 4 @doc PadsSpecs.def_pad_docs(:input, :element) defmacro def_input_pad(name, spec) do diff --git a/lib/membrane/endpoint.ex b/lib/membrane/endpoint.ex index df060d153..3ef1fcbd8 100644 --- a/lib/membrane/endpoint.ex +++ b/lib/membrane/endpoint.ex @@ -37,6 +37,26 @@ defmodule Membrane.Endpoint do @doc false @spec membrane_element_type() :: Membrane.Element.type() def membrane_element_type, do: :endpoint + + @impl true + def handle_buffer(pad, buffer, ctx, state) do + apply(__MODULE__, :handle_write, [pad, buffer, ctx, state]) + end + + @impl true + def handle_buffers_batch(pad, buffers, ctx, state) do + apply(__MODULE__, :handle_write_list, [pad, buffers, ctx, state]) + end + + @impl true + def handle_write_list(pad, buffers, ctx, state) do + args_list = buffers |> Enum.map(&[pad, &1]) + {[split: {:handle_buffer, args_list}], state} + end + + defoverridable handle_buffer: 4, + handle_buffers_batch: 4, + handle_write_list: 4 end end diff --git a/lib/membrane/filter.ex b/lib/membrane/filter.ex index 45de242c4..2ebc69ad7 100644 --- a/lib/membrane/filter.ex +++ b/lib/membrane/filter.ex @@ -39,19 +39,42 @@ defmodule Membrane.Filter do def membrane_element_type, do: :filter @impl true - def handle_stream_format(_pad, stream_format, _context, state), - do: {[forward: stream_format], state} + def handle_stream_format(_pad, stream_format, _context, state) do + {[forward: stream_format], state} + end @impl true - def handle_event(_pad, event, _context, state), do: {[forward: event], state} + def handle_event(_pad, event, _context, state) do + {[forward: event], state} + end @impl true - def handle_end_of_stream(pad, _context, state), - do: {[forward: :end_of_stream], state} + def handle_end_of_stream(pad, _context, state) do + {[forward: :end_of_stream], state} + end + + @impl true + def handle_buffer(pad, buffer, ctx, state) do + apply(__MODULE__, :handle_process, [pad, buffer, ctx, state]) + end + + @impl true + def handle_buffers_batch(pad, buffers, ctx, state) do + apply(__MODULE__, :handle_process_list, [pad, buffers, ctx, state]) + end + + @impl true + def handle_process_list(pad, buffers, ctx, state) do + args_list = buffers |> Enum.map(&[pad, &1]) + {[split: {:handle_buffer, args_list}], state} + end defoverridable handle_stream_format: 4, handle_event: 4, - handle_end_of_stream: 3 + handle_end_of_stream: 3, + handle_buffer: 4, + handle_buffers_batch: 4, + handle_process_list: 4 end end diff --git a/lib/membrane/pad.ex b/lib/membrane/pad.ex index 3e054239f..7a2a7b1e2 100644 --- a/lib/membrane/pad.ex +++ b/lib/membrane/pad.ex @@ -94,7 +94,7 @@ defmodule Membrane.Pad do Can be a module name, pattern describing struct, or call to `any_of` function, which arguments are such patterns or modules names. If a module name is passed to the `:accepted_format` option or is passed to `any_of`, - it will be converted to the match on a struct defined in that module, eg. + it will be converted to the match on a struct defined in that module, e.g. `accepted_format: My.Format` will have this same effect, as `accepted_format: %My.Format{}` and `accepted_format: any_of(My.Format, %My.Another.Format{field: value} when value in [:some, :enumeration])` will have this same effect, as `accepted_format: any_of(%My.Format{}, diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index af5d5d950..042ab1949 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -139,7 +139,7 @@ defmodule Membrane.Pipeline do Callback invoked when a child removes its pad. The callback won't be invoked, when you have initiated the pad removal, - eg. when you have returned `t:Membrane.Pipeline.Action.remove_link()` + e.g. when you have returned `t:Membrane.Pipeline.Action.remove_link()` action which made one of your children's pads be removed. """ @callback handle_child_pad_removed( @@ -332,6 +332,14 @@ defmodule Membrane.Pipeline do @spec terminate(pipeline :: pid, timeout: timeout(), force?: boolean(), asynchronous?: boolean()) :: :ok | {:ok, pid()} | {:error, :timeout} def terminate(pipeline, opts \\ []) do + opts = + if Keyword.has_key?(opts, :blocking?) do + {blocking?, opts} = Keyword.pop!(opts, :blocking?) + Keyword.put(opts, :asynchronous?, not blocking?) + else + opts + end + [asynchronous?: asynchronous?] ++ opts = Keyword.validate!(opts, asynchronous?: false, diff --git a/lib/membrane/pipeline/action.ex b/lib/membrane/pipeline/action.ex index f41809f6e..f551b5d78 100644 --- a/lib/membrane/pipeline/action.ex +++ b/lib/membrane/pipeline/action.ex @@ -21,6 +21,8 @@ defmodule Membrane.Pipeline.Action do """ @type setup :: {:setup, :incomplete | :complete} + @type playback :: {:playback, :playing, :stopped, :prepared} + @typedoc """ Action that sends a message to a child identified by name. """ @@ -41,7 +43,18 @@ defmodule Membrane.Pipeline.Action do as an argument. """ @type remove_children :: - {:remove_children, Child.name() | [Child.name()]} + {:remove_children, + Child.name() + | [Child.name()] + | Child.group() + | [Child.group()]} + + @type remove_child :: + {:remove_child, + Child.name() + | [Child.name()] + | Child.group() + | [Child.group()]} @typedoc """ Action that removes link, which relates to specified child and pad. @@ -132,9 +145,11 @@ defmodule Membrane.Pipeline.Action do """ @type t :: setup + | playback | notify_child | spec | remove_children + | remove_child | remove_link | start_timer | timer_interval diff --git a/lib/membrane/sink.ex b/lib/membrane/sink.ex index 6d1c79cd1..cdbca6f52 100644 --- a/lib/membrane/sink.ex +++ b/lib/membrane/sink.ex @@ -32,6 +32,26 @@ defmodule Membrane.Sink do @doc false @spec membrane_element_type() :: Membrane.Element.type() def membrane_element_type, do: :sink + + @impl true + def handle_buffer(pad, buffer, ctx, state) do + apply(__MODULE__, :handle_write, [pad, buffer, ctx, state]) + end + + @impl true + def handle_buffers_batch(pad, buffers, ctx, state) do + apply(__MODULE__, :handle_write_list, [pad, buffers, ctx, state]) + end + + @impl true + def handle_write_list(pad, buffers, ctx, state) do + args_list = buffers |> Enum.map(&[pad, &1]) + {[split: {:handle_buffer, args_list}], state} + end + + defoverridable handle_buffer: 4, + handle_buffers_batch: 4, + handle_write_list: 4 end end diff --git a/lib/membrane/testing/assertions.ex b/lib/membrane/testing/assertions.ex index 30f5f14f3..28f4f111b 100644 --- a/lib/membrane/testing/assertions.ex +++ b/lib/membrane/testing/assertions.ex @@ -171,6 +171,10 @@ defmodule Membrane.Testing.Assertions do assert_receive_from_pipeline(pipeline, :setup, timeout) end + defmacro assert_pipeline_play(pipeline, timeout \\ @default_timeout) do + assert_receive_from_pipeline(pipeline, :play, timeout) + end + @doc """ Asserts that pipeline received or will receive a message matching `message_pattern` from another process within the `timeout` period specified diff --git a/lib/membrane/testing/pipeline.ex b/lib/membrane/testing/pipeline.ex index 11cc337fb..d1f029916 100644 --- a/lib/membrane/testing/pipeline.ex +++ b/lib/membrane/testing/pipeline.ex @@ -145,6 +145,14 @@ defmodule Membrane.Testing.Pipeline do end defp do_start(type, options) do + options = + if Keyword.has_key?(options, :structure) do + {spec, options} = Keyword.pop(options, :structure) + [spec: spec] ++ options + else + options + end + {process_options, options} = Keyword.split(options, [:name]) options = Keyword.put_new(options, :test_process, self()) apply(Pipeline, type, [__MODULE__, options, process_options]) diff --git a/lib/membrane/time.ex b/lib/membrane/time.ex index 798247939..455024c5a 100644 --- a/lib/membrane/time.ex +++ b/lib/membrane/time.ex @@ -11,8 +11,6 @@ defmodule Membrane.Time do that do not touch hardware clock, you should use Membrane units for consistency. """ - require Ratio - @compile {:inline, native_units: 1, native_unit: 0, nanoseconds: 1, nanosecond: 0, second: 0, seconds: 1} @@ -35,7 +33,7 @@ defmodule Membrane.Time do # Difference between 01.01.1900 (start of NTP epoch) and 01.01.1970 (start of Unix epoch) in seconds @ntp_unix_epoch_diff 2_208_988_800 - @two_to_pow_32 Ratio.pow(Ratio.new(2, 1), 32) |> Ratio.trunc() + @two_to_pow_32 Ratio.pow(2, 32) @doc """ Checks whether a value is `Membrane.Time.t`. @@ -245,6 +243,21 @@ defmodule Membrane.Time do Ratio.new(timestamp, timebase) |> round_rational() end + @doc """ + Divides timestamp by a timebase. The result is rounded to the nearest integer. + Works this same as `divide_by_timebase/2`. + + ## Examples: + iex> timestamp = 10 |> Membrane.Time.seconds() + iex> timebase = Ratio.new(Membrane.Time.second(), 30) + iex> Membrane.Time.round_to_timebase(timestamp, timebase) + 300 + """ + @spec round_to_timebase(number | Ratio.t(), number | Ratio.t()) :: integer + def round_to_timebase(timestamp, timebase) do + divide_by_timebase(timestamp, timebase) + end + Enum.map(@units, fn unit -> @doc """ Returns one #{unit.singular} in `#{inspect(__MODULE__)}` units. @@ -265,8 +278,12 @@ defmodule Membrane.Time do end # credo:disable-for-next-line Credo.Check.Readability.Specs - def unquote(unit.plural)(number) when Ratio.is_rational(number) do - Ratio.mult(number, Ratio.new(unquote(unit.duration))) + def unquote(unit.plural)(number) do + if not Ratio.is_rational?(number) do + raise "Only integers and rationals can be converted with Membrane.Time.#{unquote(unit.plural)}" + end + + Ratio.*(number, unquote(unit.duration)) |> round_rational() end @@ -287,6 +304,17 @@ defmodule Membrane.Time do :round -> Ratio.new(time, unquote(unit.duration)) |> round_rational() end end + + round_fun_name = :"round_to_#{unit.plural}" + + @doc """ + Works as #{as_fun_name}/2 with `mode` argument set to `:round`. + """ + @spec unquote(round_fun_name)(t) :: integer + # credo:disable-for-next-line Credo.Check.Readability.Specs + def unquote(round_fun_name)(time) when is_time(time) do + unquote(as_fun_name)(time, :round) + end end) defp best_unit(time) do @@ -295,6 +323,7 @@ defmodule Membrane.Time do end defp round_rational(ratio) do + ratio = make_rational(ratio) trunced = Ratio.trunc(ratio) if 2 * sign_of_rational(ratio) * @@ -304,6 +333,14 @@ defmodule Membrane.Time do else: trunced end + defp make_rational(number) do + if Ratio.is_rational?(number) do + number + else + %Ratio{numerator: number, denominator: 1} + end + end + defp sign_of_rational(ratio) do if ratio.numerator == 0, do: 0, else: Ratio.sign(ratio) end diff --git a/mix.exs b/mix.exs index 70c5ff039..f50fe2b14 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.Mixfile do use Mix.Project - @version "1.0.0-rc0" + @version "0.12.1" @source_ref "v#{@version}" def project do @@ -63,6 +63,7 @@ defmodule Membrane.Mixfile do "README.md", "CHANGELOG.md", "CONTRIBUTING.md", + "guides/upgrading/v0.12.md", "guides/upgrading/v0.11.md", "guides/upgrading/v1.0.0-rc0.md", LICENSE: [title: "License"] @@ -141,7 +142,7 @@ defmodule Membrane.Mixfile do {:qex, "~> 0.3"}, {:telemetry, "~> 1.0"}, {:bunch, "~> 1.6"}, - {:ratio, "~> 3.0"}, + {:ratio, "~> 2.0"}, # Development {:ex_doc, "~> 0.28", only: :dev, runtime: false}, {:makeup_diff, "~> 0.1", only: :dev, runtime: false}, diff --git a/mix.lock b/mix.lock index f37530893..fa42bf80d 100644 --- a/mix.lock +++ b/mix.lock @@ -7,7 +7,7 @@ "dialyxir": {:hex, :dialyxir, "1.2.0", "58344b3e87c2e7095304c81a9ae65cb68b613e28340690dfe1a5597fd08dec37", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "61072136427a851674cab81762be4dbeae7679f85b1272b6d25c3a839aff8463"}, "earmark_parser": {:hex, :earmark_parser, "1.4.29", "149d50dcb3a93d9f3d6f3ecf18c918fb5a2d3c001b5d3305c926cddfbd33355b", [:mix], [], "hexpm", "4902af1b3eb139016aed210888748db8070b8125c2342ce3dcae4f38dcc63503"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.29.1", "b1c652fa5f92ee9cf15c75271168027f92039b3877094290a75abcaac82a9f77", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "b7745fa6374a36daf484e2a2012274950e084815b936b1319aeebcf7809574f6"}, + "ex_doc": {:hex, :ex_doc, "0.29.0", "4a1cb903ce746aceef9c1f9ae8a6c12b742a5461e6959b9d3b24d813ffbea146", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "f096adb8bbca677d35d278223361c7792d496b3fc0d0224c9d4bc2f651af5db1"}, "excoveralls": {:hex, :excoveralls, "0.15.0", "ac941bf85f9f201a9626cc42b2232b251ad8738da993cf406a4290cacf562ea4", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "9631912006b27eca30a2f3c93562bc7ae15980afb014ceb8147dc5cdd8f376f1"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, "hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"}, @@ -25,7 +25,7 @@ "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, - "ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"}, + "ratio": {:hex, :ratio, "2.4.2", "c8518f3536d49b1b00d88dd20d49f8b11abb7819638093314a6348139f14f9f9", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "441ef6f73172a3503de65ccf1769030997b0d533b1039422f1e5e0e0b4cbf89e"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, "telemetry": {:hex, :telemetry, "1.1.0", "a589817034a27eab11144ad24d5c0f9fab1f58173274b1e9bae7074af9cbee51", [:rebar3], [], "hexpm", "b727b2a1f75614774cff2d7565b64d0dfa5bd52ba517f16543e6fc7efcc0df48"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, diff --git a/test/membrane/api_backcompability_test.exs b/test/membrane/api_backcompability_test.exs new file mode 100644 index 000000000..ae145f558 --- /dev/null +++ b/test/membrane/api_backcompability_test.exs @@ -0,0 +1,52 @@ +defmodule Membrane.APIBackCompabilityTest do + # this module tests if API in membrane_core v0.12 has no breaking changes comparing to api in v0.11 + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + + alias Membrane.Testing + + test "if action :remove_child works" do + defmodule Filter do + use Membrane.Filter + end + + pipeline = Testing.Pipeline.start_link_supervised!(spec: child(:filter, Filter)) + Process.sleep(100) + filter_pid = Testing.Pipeline.get_child_pid!(pipeline, :filter) + monitor_ref = Process.monitor(filter_pid) + Testing.Pipeline.execute_actions(pipeline, remove_child: :filter) + + assert_receive {:DOWN, ^monitor_ref, _process, _pid, :normal} + + Testing.Pipeline.terminate(pipeline) + end + + test "if `Membrane.Time.round_to_*` functions work" do + module = Membrane.Time + + for {old_function, new_function} <- [ + round_to_days: :as_days, + round_to_hours: :as_hours, + round_to_minutes: :as_minutes, + round_to_seconds: :as_seconds, + round_to_milliseconds: :as_milliseconds, + round_to_microseconds: :as_microseconds, + round_to_nanoseconds: :as_nanoseconds + ], + timestamp_generator <- [:days, :microseconds] do + timestamp = apply(module, timestamp_generator, [3]) + + old_function_result = apply(module, old_function, [timestamp]) + new_function_result = apply(module, new_function, [timestamp, :round]) + + assert old_function_result == new_function_result + end + + timestamp = Membrane.Time.days(15) + Membrane.Time.nanoseconds(13) + timebase = Membrane.Time.milliseconds(2) + + assert Membrane.Time.round_to_timebase(timestamp, timebase) == + Membrane.Time.divide_by_timebase(timestamp, timebase) + end +end diff --git a/test/membrane/clock_test.exs b/test/membrane/clock_test.exs index 2ec21c895..0557df515 100644 --- a/test/membrane/clock_test.exs +++ b/test/membrane/clock_test.exs @@ -3,7 +3,7 @@ defmodule Membrane.ClockTest do @module Membrane.Clock - @initial_ratio Ratio.new(1) + @initial_ratio 1 test "should calculate proper ratio and send it to subscribers on each (but the first) update" do {:ok, clock} = @@ -16,8 +16,7 @@ defmodule Membrane.ClockTest do refute_receive {:membrane_clock_ratio, ^clock, _ratio} send(clock, {:membrane_clock_update, 2}) send(clock, time: 13) - two = Ratio.new(2) - assert_receive {:membrane_clock_ratio, ^clock, ^two} + assert_receive {:membrane_clock_ratio, ^clock, 2} send(clock, {:membrane_clock_update, random_time()}) send(clock, time: 33) ratio = Ratio.new(20 + 2, 33 - 3) @@ -25,6 +24,8 @@ defmodule Membrane.ClockTest do end test "should handle different ratio formats" do + use Ratio + {:ok, clock} = @module.start_link(time_provider: fn -> receive do: (time: t -> ms_to_ns(t)) end) @@ -37,9 +38,7 @@ defmodule Membrane.ClockTest do send(clock, {:membrane_clock_update, 5}) send(clock, time: 20) @module.subscribe(clock) - two = Ratio.new(2) - five = Ratio.new(5) - ratio = Ratio.div(Ratio.add(five, Ratio.mult(Ratio.new(1, 3), two)), Ratio.new(20 - 5)) + ratio = (5 + Ratio.new(1, 3) * 2) / (20 - 5) assert_receive {:membrane_clock_ratio, ^clock, ^ratio} end diff --git a/test/membrane/core/element_test.exs b/test/membrane/core/element_test.exs index bc9acd169..2860ae615 100644 --- a/test/membrane/core/element_test.exs +++ b/test/membrane/core/element_test.exs @@ -340,10 +340,9 @@ defmodule Membrane.Core.ElementTest do {:ok, clock} = Membrane.Clock.start_link() state = Membrane.Core.TimerController.start_timer(:timer, 1000, clock, get_state()) - assert {:noreply, state} = - Element.handle_info({:membrane_clock_ratio, clock, Ratio.new(123)}, state) + assert {:noreply, state} = Element.handle_info({:membrane_clock_ratio, clock, 123}, state) - assert state.synchronization.timers.timer.ratio == Ratio.new(123) + assert state.synchronization.timers.timer.ratio == 123 end test "should set stream sync" do diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index a46bbf6f9..b2ca7fb87 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -9,8 +9,8 @@ defmodule Membrane.Integration.AutoDemandsTest do defmodule AutoDemandFilter do use Membrane.Filter - def_input_pad :input, accepted_format: _any - def_output_pad :output, accepted_format: _any + def_input_pad :input, accepted_format: _any, flow_control: :auto + def_output_pad :output, accepted_format: _any, flow_control: :auto def_options factor: [default: 1], direction: [default: :up] @@ -38,8 +38,8 @@ defmodule Membrane.Integration.AutoDemandsTest do defmodule AutoDemandTee do use Membrane.Filter - def_input_pad :input, accepted_format: _any - def_output_pad :output, accepted_format: _any, availability: :on_request + def_input_pad :input, accepted_format: _any, flow_control: :auto + def_output_pad :output, accepted_format: _any, availability: :on_request, flow_control: :auto @impl true def handle_buffer(:input, buffer, _ctx, state), do: {[forward: buffer], state} @@ -156,7 +156,7 @@ defmodule Membrane.Integration.AutoDemandsTest do |> Enum.map(fn opts -> test "buffers pass to auto-demand #{opts.name}" do %{name: name, module: module} = unquote(Macro.escape(opts)) - payloads = Enum.map(1..1000, &inspect/1) + payloads = Enum.map(1..100_000, &inspect/1) pipeline = Pipeline.start_link_supervised!( diff --git a/test/membrane/integration/effective_flow_control_resolution_test.exs b/test/membrane/integration/effective_flow_control_resolution_test.exs index 5c303a949..b4fbac78c 100644 --- a/test/membrane/integration/effective_flow_control_resolution_test.exs +++ b/test/membrane/integration/effective_flow_control_resolution_test.exs @@ -9,10 +9,10 @@ defmodule Membrane.Integration.EffectiveFlowControlResolutionTest do defmodule AutoFilter do use Membrane.Filter - def_input_pad :input, availability: :on_request, accepted_format: _any - def_output_pad :output, availability: :on_request, accepted_format: _any + def_input_pad :input, availability: :on_request, accepted_format: _any, flow_control: :auto + def_output_pad :output, availability: :on_request, accepted_format: _any, flow_control: :auto - def_options lazy?: [spec: boolean(), default: false] + def_options sleep_on_handle_buffer?: [spec: boolean(), default: false] @impl true def handle_playing(_ctx, state) do @@ -21,7 +21,7 @@ defmodule Membrane.Integration.EffectiveFlowControlResolutionTest do @impl true def handle_buffer(_pad, buffer, _ctx, state) do - if state.lazy?, do: Process.sleep(100) + if state.sleep_on_handle_buffer?, do: Process.sleep(100) {[forward: buffer], state} end end @@ -192,7 +192,7 @@ defmodule Membrane.Integration.EffectiveFlowControlResolutionTest do test "Toilet overflows, when it should" do spec = { child(:pull_source, PullSource) - |> child(:filter, %AutoFilter{lazy?: true}), + |> child(:filter, %AutoFilter{sleep_on_handle_buffer?: true}), group: :group, crash_group_mode: :temporary } diff --git a/test/membrane/integration/linking_test.exs b/test/membrane/integration/linking_test.exs index a20621121..0f122770a 100644 --- a/test/membrane/integration/linking_test.exs +++ b/test/membrane/integration/linking_test.exs @@ -518,8 +518,7 @@ defmodule Membrane.Integration.LinkingTest do defmodule Sink do use Membrane.Sink - def_input_pad :input, - accepted_format: _any + def_input_pad :input, accepted_format: _any, flow_control: :auto @impl true def handle_init(_ctx, _opts) do diff --git a/test/membrane/integration/links_validation_test.exs b/test/membrane/integration/links_validation_test.exs index 84050b118..d678373d8 100644 --- a/test/membrane/integration/links_validation_test.exs +++ b/test/membrane/integration/links_validation_test.exs @@ -96,27 +96,29 @@ defmodule Membrane.LinksValidationTest do describe "returning a spec with links to already used" do test "static pads" do - spec = child(:source, StaticPads.Source) |> child(:sink, StaticPads.Sink) - - pipeline = Pipeline.start_supervised!(spec: spec) + pipeline = Pipeline.start_supervised!() ref = Process.monitor(pipeline) - spec = get_child(:source) |> get_child(:sink) + spec = child(:source, StaticPads.Source) |> child(:sink, StaticPads.Sink) + Pipeline.execute_actions(pipeline, spec: spec) + spec = get_child(:source) |> get_child(:sink) Pipeline.execute_actions(pipeline, spec: spec) assert_receive({:DOWN, ^ref, :process, ^pipeline, {%Membrane.LinkError{}, _stacktrace}}) end test "dynamic pads" do + pipeline = Pipeline.start_supervised!() + ref = Process.monitor(pipeline) + spec = child(:source, DynamicPads.Source) |> via_out(Pad.ref(:output, 1)) |> via_in(Pad.ref(:input, 1)) |> child(:sink, DynamicPads.Sink) - pipeline = Pipeline.start_supervised!(spec: spec) - ref = Process.monitor(pipeline) + Pipeline.execute_actions(pipeline, spec: spec) spec = get_child(:source)