Skip to content

Commit

Permalink
Fix AtomicDemand tests wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed May 19, 2023
1 parent 0821fb5 commit 765adad
Showing 1 changed file with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ defmodule Membrane.Core.Element.AtomicDemandTest do
use ExUnit.Case

alias Membrane.Core.Element.AtomicDemand
alias Membrane.Core.SubprocessSupervisor

test "if AtomicDemand is implemented as :atomics for elements put on the same node" do
atomic_demand = AtomicDemand.new(:pull, self(), :buffers, self(), :output)
atomic_demand = new_atomic_demand(:pull, self(), self())
:ok = AtomicDemand.increase(atomic_demand, 10)

assert get_atomic_value(atomic_demand) == 10
Expand All @@ -17,7 +18,7 @@ defmodule Membrane.Core.Element.AtomicDemandTest do
end

test "if AtomicDemand.DistributedAtomic.Worker works properly " do
atomic_demand = AtomicDemand.new(:pull, self(), :buffers, self(), :output)
atomic_demand = new_atomic_demand(:pull, self(), self())
:ok = AtomicDemand.increase(atomic_demand, 10)

assert GenServer.call(
Expand All @@ -42,7 +43,7 @@ defmodule Membrane.Core.Element.AtomicDemandTest do
end

test "if setting receiver and sender modes works properly" do
atomic_demand = AtomicDemand.new(:pull, self(), :buffers, self(), :output)
atomic_demand = new_atomic_demand(:pull, self(), self())

:ok = AtomicDemand.set_receiver_status(atomic_demand, {:resolved, :push})

Expand Down Expand Up @@ -70,7 +71,7 @@ defmodule Membrane.Core.Element.AtomicDemandTest do
sleeping_process = spawn(fn -> Process.sleep(hour_in_millis) end)
monitor_ref = Process.monitor(sleeping_process)

atomic_demand = AtomicDemand.new(:pull, sleeping_process, :buffers, self(), :output)
atomic_demand = new_atomic_demand(:pull, sleeping_process, self())

:ok = AtomicDemand.set_sender_status(atomic_demand, {:resolved, :push})
atomic_demand = AtomicDemand.decrease(atomic_demand, 100)
Expand Down Expand Up @@ -104,7 +105,7 @@ defmodule Membrane.Core.Element.AtomicDemandTest do
test "if buffering decrementation works properly with distribution" do
another_node = setup_another_node()
pid_on_another_node = Node.spawn(another_node, fn -> :ok end)
atomic_demand = AtomicDemand.new(:push, self(), :buffers, pid_on_another_node, :output)
atomic_demand = new_atomic_demand(:push, self(), pid_on_another_node)

assert %AtomicDemand{throttling_factor: 150} = atomic_demand

Expand Down Expand Up @@ -139,4 +140,15 @@ defmodule Membrane.Core.Element.AtomicDemandTest do
atomic_demand.counter.atomic_ref
|> :atomics.get(1)
end

defp new_atomic_demand(receiver_effective_flow_control, receiver_pid, sender_pid) do
AtomicDemand.new(%{
receiver_effective_flow_control: receiver_effective_flow_control,
receiver_process: receiver_pid,
receiver_demand_unit: :buffers,
sender_process: sender_pid,
sender_pad_ref: :output,
supervisor: SubprocessSupervisor.start_link!()
})
end
end

0 comments on commit 765adad

Please sign in to comment.