From 765adad00ea815288213900c5d692433d7828eca Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Fri, 19 May 2023 16:45:17 +0200 Subject: [PATCH] Fix AtomicDemand tests wip --- ...ounter_test.exs => atomic_demand_test.exs} | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) rename test/membrane/core/element/{demand_counter_test.exs => atomic_demand_test.exs} (86%) diff --git a/test/membrane/core/element/demand_counter_test.exs b/test/membrane/core/element/atomic_demand_test.exs similarity index 86% rename from test/membrane/core/element/demand_counter_test.exs rename to test/membrane/core/element/atomic_demand_test.exs index 0628f8a69..b6490b219 100644 --- a/test/membrane/core/element/demand_counter_test.exs +++ b/test/membrane/core/element/atomic_demand_test.exs @@ -2,9 +2,10 @@ defmodule Membrane.Core.Element.AtomicDemandTest do use ExUnit.Case alias Membrane.Core.Element.AtomicDemand + alias Membrane.Core.SubprocessSupervisor test "if AtomicDemand is implemented as :atomics for elements put on the same node" do - atomic_demand = AtomicDemand.new(:pull, self(), :buffers, self(), :output) + atomic_demand = new_atomic_demand(:pull, self(), self()) :ok = AtomicDemand.increase(atomic_demand, 10) assert get_atomic_value(atomic_demand) == 10 @@ -17,7 +18,7 @@ defmodule Membrane.Core.Element.AtomicDemandTest do end test "if AtomicDemand.DistributedAtomic.Worker works properly " do - atomic_demand = AtomicDemand.new(:pull, self(), :buffers, self(), :output) + atomic_demand = new_atomic_demand(:pull, self(), self()) :ok = AtomicDemand.increase(atomic_demand, 10) assert GenServer.call( @@ -42,7 +43,7 @@ defmodule Membrane.Core.Element.AtomicDemandTest do end test "if setting receiver and sender modes works properly" do - atomic_demand = AtomicDemand.new(:pull, self(), :buffers, self(), :output) + atomic_demand = new_atomic_demand(:pull, self(), self()) :ok = AtomicDemand.set_receiver_status(atomic_demand, {:resolved, :push}) @@ -70,7 +71,7 @@ defmodule Membrane.Core.Element.AtomicDemandTest do sleeping_process = spawn(fn -> Process.sleep(hour_in_millis) end) monitor_ref = Process.monitor(sleeping_process) - atomic_demand = AtomicDemand.new(:pull, sleeping_process, :buffers, self(), :output) + atomic_demand = new_atomic_demand(:pull, sleeping_process, self()) :ok = AtomicDemand.set_sender_status(atomic_demand, {:resolved, :push}) atomic_demand = AtomicDemand.decrease(atomic_demand, 100) @@ -104,7 +105,7 @@ defmodule Membrane.Core.Element.AtomicDemandTest do test "if buffering decrementation works properly with distribution" do another_node = setup_another_node() pid_on_another_node = Node.spawn(another_node, fn -> :ok end) - atomic_demand = AtomicDemand.new(:push, self(), :buffers, pid_on_another_node, :output) + atomic_demand = new_atomic_demand(:push, self(), pid_on_another_node) assert %AtomicDemand{throttling_factor: 150} = atomic_demand @@ -139,4 +140,15 @@ defmodule Membrane.Core.Element.AtomicDemandTest do atomic_demand.counter.atomic_ref |> :atomics.get(1) end + + defp new_atomic_demand(receiver_effective_flow_control, receiver_pid, sender_pid) do + AtomicDemand.new(%{ + receiver_effective_flow_control: receiver_effective_flow_control, + receiver_process: receiver_pid, + receiver_demand_unit: :buffers, + sender_process: sender_pid, + sender_pad_ref: :output, + supervisor: SubprocessSupervisor.start_link!() + }) + end end