diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index f78be3d60..e3da30050 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -1,29 +1,25 @@ defmodule Membrane.Pipeline do @moduledoc """ - Module containing functions for constructing and supervising pipelines. + A behaviour module for implementing pipelines. - Pipelines are units that make it possible to instantiate, link and manage - elements and bins in convenient way (actually they should always be used inside - a pipeline). Linking pipeline children together enables them to pass data to one - another, and process it in different ways. - - To create a pipeline, use the `__using__/1` macro and implement callbacks - of `Membrane.Pipeline` behaviour. For details on instantiating and linking - children, see `Membrane.ChildrenSpec`. + `Membrane.Pipeline` contains the callbacks and functions for constructing and supervising pipelines. + Pipelines facilitate the convenient instantiation, linking, and management of elements and bins.\\ + Linking pipeline children together enables them to pass and process data. + To create a pipeline, use `use Membrane.Pipeline` and implement callbacks of `Membrane.Pipeline` behaviour. + See `Membrane.ChildrenSpec` for details on instantiating and linking children. ## Starting and supervision - Pipeline can be started with `start_link/2` or `start/2` functions. They both - return `{:ok, supervisor_pid, pipeline_pid}` in case of success, because the pipeline - is always spawned under a dedicated supervisor. The supervisor never restarts the - pipeline, but it makes sure that the pipeline and its children terminate properly. - If the pipeline needs to be restarted in case of failure, it should be spawned under - another supervisor with a proper strategy. + Start a pipeline with `start_link/2` or `start/2`. Pipelines always spawn under a dedicated supervisor, so + in the case of success, either function will return `{:ok, supervisor_pid, pipeline_pid}` . + + The supervisor never restarts the pipeline, but it does ensure that the pipeline and its children terminate properly. + If the pipeline needs to be restarted, it should be spawned under a different supervisor with the appropriate strategy. ### Starting under a supervision tree - The pipeline can be spawned under a supervision tree like any `GenServer`. Also, - `__using__/1` macro injects a `child_spec/1` function. A simple scenario can look like: + A pipeline can be spawned under a supervision tree like any other `GenServer`.\\ + `use Membrane.Pipeline` injects a `child_spec/1` function. A simple scenario could look like this: defmodule MyPipeline do use Membrane.Pipeline @@ -40,22 +36,22 @@ defmodule Membrane.Pipeline do ### Starting outside of a supervision tree - When starting a pipeline outside a supervision tree and willing to interact with - the pipeline by pid, `pipeline_pid` returned from `start_link` can be used, for example + When starting a pipeline outside a supervision tree, use the `pipeline_pid` pid to interact with the pipeline. + A simple scenario could look like this: {:ok, _supervisor_pid, pipeline_pid} = Membrane.Pipeline.start_link(MyPipeline, option: :value) send(pipeline_pid, :message) - ### Visualizing pipeline's supervision tree + ### Visualizing the supervision tree - Pipeline's internal supervision tree can be looked up with Applications tab of Erlang's Stalker - or with Livebook's `Kino` library. - For debugging (and ONLY for debugging) purposes, you may use the following configuration: + Use the [Applications tab](https://www.erlang.org/doc/apps/observer/observer_ug#applications-tab) in Erlang's Observer GUI + (or the `Kino` library in Livebook) to visualize a pipeline's internal supervision tree. Use the following configuration for debugging purposes only: config :membrane_core, unsafely_name_processes_for_stalker: [:components] - that makes the stalker's process tree graph more readable by naming pipeline's descendants, for example: - ![Stalker graph](assets/images/stalker_graph.png). + This improves the readability of the Observer's process tree graph by naming the pipeline descendants, as demonstrated here: + + ![Observer graph](assets/images/observer_graph.png). """ use Bunch @@ -67,20 +63,30 @@ defmodule Membrane.Pipeline do require Membrane.Core.Message, as: Message @typedoc """ - Defines options that can be passed to `start/3` / `start_link/3` and received - in `c:handle_init/2` callback. + Defines options passed to the `start/3` and `start_link/3` and subsequently received + in the `c:handle_init/2` callback. """ @type pipeline_options :: any + @typedoc "The Pipeline name" @type name :: GenServer.name() + @typedoc "List of configurations used by `start/3` and `start_link/3`." @type config :: [config_entry()] + + @typedoc "Defines configuration value used by the `start/3` and `start_link/3`." @type config_entry :: {:name, name()} + @typedoc """ + Defines the return value of the `start/3` and `start_link/3`." + """ @type on_start :: {:ok, supervisor_pid :: pid, pipeline_pid :: pid} | {:error, {:already_started, pid()} | term()} + @typedoc """ + The pipeline state. + """ @type state :: any() @typedoc """ @@ -88,29 +94,27 @@ defmodule Membrane.Pipeline do ## Return values - * `{[action], state}` - Return a list of actions that will be performed within the - pipeline. This can be used to start new children, or to send messages to specific children, - for example. Actions are a tuple of `{type, arguments}`, so may be written in the - form a keyword list. See `Membrane.Pipeline.Action` for more info. + * `{[action], state}` - Returns a list of actions that will be performed within the + pipeline, e.g., starting new children, sending messages to specific children, etc. + Actions are tuples of `{type, arguments}`, so they can be expressed as a keyword list. + See `Membrane.Pipeline.Action` for more info. """ @type callback_return :: {[Action.t()], state} @doc """ - Callback invoked on initialization of pipeline. + Callback invoked on initialization of the pipeline. - This callback is synchronous: the process which started the pipeline waits until `handle_init` - finishes. For that reason, it's important to do any long-lasting or complex work in `c:handle_setup/2`, - while `handle_init` should be used for things like parsing options, initializing state or spawning - children. - By default, it converts the `opts` to a map if they're a struct and sets them as the pipeline state. + This callback is synchronous: the process that started the pipeline waits until `handle_init` + finishes, so it's important to do any long-lasting or complex work in `c:handle_setup/2`. + `handle_init` should be used for things, like parsing options, initializing state, or spawning + children. By default, `handle_init` converts `opts` to a map if they're a struct and sets them as the pipeline state. """ @callback handle_init(context :: CallbackContext.t(), options :: pipeline_options) :: {[Action.common_actions()], state()} @doc """ - Callback invoked when pipeline is requested to terminate with `terminate/2`. - + Callback invoked when the pipeline is requested to terminate with `terminate/2`. By default, it returns `t:Membrane.Pipeline.Action.terminate/0` with reason `:normal`. """ @callback handle_terminate_request(context :: CallbackContext.t(), state) :: @@ -129,7 +133,7 @@ defmodule Membrane.Pipeline do {[Action.common_actions()], state()} @doc """ - Callback invoked when pipeline switches the playback to `:playing`. + Callback invoked when the pipeline switches the playback to `:playing`. By default, it does nothing. """ @callback handle_playing( @@ -166,10 +170,10 @@ defmodule Membrane.Pipeline do ) :: {[Action.common_actions()], state()} @doc """ - Callback invoked when pipeline receives a message that is not recognized - as an internal membrane message. + Callback invoked when the pipeline receives a message that is not recognized + as an internal Membrane message. - Useful for receiving data sent from NIFs or other stuff. + Useful for receiving data sent from NIFs or other external sources. By default, it logs and ignores the received message. """ @callback handle_info( @@ -180,7 +184,7 @@ defmodule Membrane.Pipeline do {[Action.common_actions()], state()} @doc """ - Callback invoked when a child element starts processing stream via given pad. + Callback invoked when a child element starts processing a stream via the given pad. By default, it does nothing. """ @@ -192,7 +196,7 @@ defmodule Membrane.Pipeline do ) :: {[Action.common_actions()], state()} @doc """ - Callback invoked when a child element finishes processing stream via given pad. + Callback invoked when a child element finishes processing a stream via the given pad. By default, it does nothing. """ @@ -225,7 +229,7 @@ defmodule Membrane.Pipeline do ) :: {[Action.common_actions()], state()} @doc """ - Callback invoked when crash of the crash group happens. + Callback invoked when a crash group crashes. Context passed to this callback contains 2 additional fields: `:members` and `:crash_initiator`. By default, it does nothing. @@ -237,9 +241,9 @@ defmodule Membrane.Pipeline do ) :: {[Action.common_actions()], state()} @doc """ - Callback invoked when pipeline is called using a synchronous call. + Callback invoked when the pipeline is called using a synchronous call. - Context passed to this callback contains additional field `:from`. + Context passed to this callback contains an additional field `:from`. By default, it does nothing. """ @callback handle_call( @@ -264,10 +268,10 @@ defmodule Membrane.Pipeline do handle_child_pad_removed: 4 @doc """ - Starts the Pipeline based on given module and links it to the current - process. + Starts the pipeline based on the given module and links it to the current process. + - Pipeline options are passed to module's `c:handle_init/2` callback. + Pipeline options are passed to the `c:handle_init/2` callback. Note that this function returns `{:ok, supervisor_pid, pipeline_pid}` in case of success. Check the 'Starting and supervision' section of the moduledoc for details. """ @@ -276,7 +280,7 @@ defmodule Membrane.Pipeline do do: do_start(:start_link, module, pipeline_options, process_options) @doc """ - Does the same as `start_link/3` but starts process outside of supervision tree. + Starts the pipeline outside a supervision tree. Compare to `start_link/3`. """ @spec start(module, pipeline_options, config) :: on_start def start(module, pipeline_options \\ nil, process_options \\ []), @@ -328,21 +332,21 @@ defmodule Membrane.Pipeline do Terminates the pipeline. Accepts three options: - * `asynchronous?` - if set to `true`, pipline termination won't be blocking and - will be executed in the process, which pid is returned as function result. If - set to `false`, pipeline termination will be blocking and will be executed in - the process that called this function. Defaults to `false`. - * `timeout` - tells how much time (ms) to wait for pipeline to get gracefully - terminated. Defaults to 5000. - * `force?` - if set to `true` and pipeline is still alive after `timeout`, - pipeline will be killed using `Process.exit/2` with reason `:kill`, and function - will return `{:error, :timeout}`. If set to `false` and pipeline is still alive - after `timeout`, function will raise an error. Defaults to `false`. + * `asynchronous?` - if set to `true`, pipeline termination won't be blocking and + will be executed in the process whose pid is returned as a function result. + If set to `false`, pipeline termination will be blocking and will be executed in + the process that called this function. Defaults to `false`. + * `timeout` - specifies how much time (ms) to wait for the pipeline to gracefully + terminate. Defaults to 5000. + * `force?` - determines how to handle a pipeline still alive after `timeout`. + If set to `true`, `Process.exit/2` kills the pipeline with reason `:kill` and returns + `{:error, :timeout}`. + If set to `false`, it raises an error. Defaults to `false`. Returns: - * `{:ok, pid}` - if option `asynchronous?: true` was passed. - * `:ok` - if pipeline was gracefully terminated within `timeout`. - * `{:error, :timeout}` - if pipeline was killed after a `timeout`. + * `{:ok, pid}` - option `asynchronous?: true` was passed. + * `:ok` - pipeline gracefully terminated within `timeout`. + * `{:error, :timeout}` - pipeline was killed after `timeout`. """ @spec terminate(pipeline :: pid, timeout: timeout(), @@ -393,13 +397,18 @@ defmodule Membrane.Pipeline do end end + @doc """ + Calls the pipeline with a message. + + Returns the result of the pipeline call. + """ @spec call(pid, any, timeout()) :: term() def call(pipeline, message, timeout \\ 5000) do GenServer.call(pipeline, message, timeout) end @doc """ - Checks whether module is a pipeline. + Checks whether the module is a pipeline. """ @spec pipeline?(module) :: boolean def pipeline?(module) do @@ -407,9 +416,9 @@ defmodule Membrane.Pipeline do end @doc """ - Lists PIDs of all the pipelines currently running on the current node. + Returns list of pipeline PIDs currently running on the current node. - Use only for debugging purposes. + Use for debugging only. """ @spec list_pipelines() :: [pid] def list_pipelines() do @@ -423,7 +432,8 @@ defmodule Membrane.Pipeline do end @doc """ - Like `list_pipelines/0`, but allows to pass a node. + Returns list of pipeline PIDs currently running on the passed node. \\ + Compare to `list_pipelines/0`. """ @spec list_pipelines(node()) :: [pid] def list_pipelines(node) do