Skip to content

Commit

Permalink
Update event producer from gen stage queue broadcaster
Browse files Browse the repository at this point in the history
  • Loading branch information
simonprev committed Apr 21, 2019
1 parent d345b5a commit 15ba320
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions lib/hook/event_producer.ex
Expand Up @@ -11,21 +11,27 @@ defmodule Accent.Hook.EventProducer do
{:producer, {:queue.new(), 0}, dispatcher: GenStage.BroadcastDispatcher}
end

def handle_call({_, event}, from, {queue, demand}) do
dispatch_events(:queue.in({from, event}, queue), demand, [])
def handle_call({_, event}, from, {queue, pending_demand}) do
queue = :queue.in({from, event}, queue)
dispatch_events(queue, pending_demand, [])
end

def handle_demand(incoming_demand, {queue, demand}) do
dispatch_events(queue, incoming_demand + demand, [])
def handle_demand(incoming_demand, {queue, pending_demand}) do
dispatch_events(queue, incoming_demand + pending_demand, [])
end

defp dispatch_events(queue, 0, events) do
{:noreply, Enum.reverse(events), {queue, 0}}
end

defp dispatch_events(queue, demand, events) do
with d when d > 0 <- demand,
{{:value, {from, event}}, queue} <- :queue.out(queue) do
GenStage.reply(from, :ok)
dispatch_events(queue, demand - 1, [event | events])
else
_ -> {:noreply, Enum.reverse(events), {queue, demand}}
case :queue.out(queue) do
{{:value, {from, event}}, queue} ->
GenStage.reply(from, :ok)
dispatch_events(queue, demand - 1, [event | events])

{:empty, queue} ->
{:noreply, Enum.reverse(events), {queue, demand}}
end
end
end
Expand Down

0 comments on commit 15ba320

Please sign in to comment.