Skip to content

Commit

Permalink
keep internal queues for each partition seperatly
Browse files Browse the repository at this point in the history
when dequeue from queue only dequeue from partitions that are waiting
for less than max_demands acks
  • Loading branch information
slashmili committed Jun 1, 2022
1 parent 1eea41e commit 4a4a69d
Showing 1 changed file with 81 additions and 28 deletions.
109 changes: 81 additions & 28 deletions lib/broadway_kafka/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ defmodule BroadwayKafka.Producer do
draining_after_revoke_flag: draining_after_revoke_flag,
demand: 0,
shutting_down?: false,
buffer: :queue.new(),
buffers: %{},
buffer_index: 0,
max_demand: max_demand
}

Expand Down Expand Up @@ -323,13 +324,17 @@ defmodule BroadwayKafka.Producer do
not is_draining_after_revoke?(state.draining_after_revoke_flag) and
offset != nil do
messages = fetch_messages_from_kafka(state, key, offset)

new_buffers = enqueue_many(state.buffers, key, messages)

to_send = min(demand, max_demand)
{new_acks, not_sent, messages, pending} = split_demand(messages, acks, key, to_send)
new_buffer = enqueue_many(state.buffer, key, pending)
new_buffers = enqueue_many(state.buffers, key, pending)

new_demand = demand - to_send + not_sent
new_state = %{state | acks: new_acks, demand: new_demand, buffer: new_buffer}
{:noreply, messages, new_state}
new_state = %{state | acks: new_acks, demand: new_demand, buffers: new_buffers}
new_state = %{state | buffers: new_buffers}
{:noreply, [], new_state}
else
{:noreply, [], state}
end
Expand Down Expand Up @@ -524,20 +529,42 @@ defmodule BroadwayKafka.Producer do
end

defp maybe_schedule_poll(state, interval) do
%{buffer: buffer, demand: demand, acks: acks, receive_timer: receive_timer} = state
%{
buffers: buffers,
demand: demand,
acks: acks,
receive_timer: receive_timer,
max_demand: max_demand
} = state

new_buffer_index =
if state.buffer_index >= Enum.count(buffers) do
0
else
state.buffer_index + 1
end

case dequeue_many(buffers, state.buffer_index, acks, demand, max_demand, []) do
{acks, demand, [], buffers} when demand > 0 ->
receive_timer = receive_timer || schedule_poll(state, interval)

case dequeue_many(buffer, acks, demand, []) do
{acks, 0, events, buffer} ->
{:noreply, events, %{state | demand: 0, buffer: buffer, acks: acks}}
state = %{state | receive_timer: receive_timer}

{acks, demand, events, buffer} ->
{:noreply, [], state}

{acks, 0, events, buffers} ->
{:noreply, events,
%{state | demand: 0, buffers: buffers, acks: acks, buffer_index: new_buffer_index}}

{acks, demand, events, buffers} ->
receive_timer = receive_timer || schedule_poll(state, interval)

state = %{
state
| demand: demand,
buffer: buffer,
buffers: buffers,
receive_timer: receive_timer,
buffer_index: new_buffer_index,
acks: acks
}

Expand All @@ -547,7 +574,11 @@ defmodule BroadwayKafka.Producer do

defp schedule_poll(state, interval) do
for key <- Acknowledger.keys(state.acks) do
Process.send_after(self(), {:poll, key}, interval)
queue = Map.get_lazy(state.buffers, key, fn -> :queue.new() end)

if :queue.is_empty(queue) do
Process.send_after(self(), {:poll, key}, interval)
end
end

Process.send_after(self(), :maybe_schedule_poll, interval)
Expand Down Expand Up @@ -659,28 +690,50 @@ defmodule BroadwayKafka.Producer do
reverse_split_demand(tail, demand - 1, [head | reversed], [head | acc])
end

defp enqueue_many(queue, _key, []), do: queue
defp enqueue_many(queue, key, list), do: :queue.in({key, list}, queue)
defp enqueue_many(queues, _key, []), do: queues

defp dequeue_many(queue, acks, demand, acc) when demand > 0 do
case :queue.out(queue) do
{{:value, {key, list}}, queue} ->
{rest, demand, reversed, acc} = reverse_split_demand(list, demand, [], acc)
acks = update_last_offset(acks, key, reversed)
defp enqueue_many(queues, key, list) do
queue = Map.get_lazy(queues, key, fn -> :queue.new() end)
queue = Enum.reduce(list, queue, fn item, q -> :queue.in(item, q) end)
Map.put(queues, key, queue)
end

case {demand, rest} do
{0, []} ->
{acks, demand, Enum.reverse(acc), queue}
defp dequeue_many(queues, queue_index, acks, demand, max_demand, acc) when demand > 0 do
demand = min(demand, max_demand)

{0, _} ->
{acks, demand, Enum.reverse(acc), :queue.in_r({key, rest}, queue)}
{p1, p2} = Enum.split(acks, queue_index)

{_, []} ->
dequeue_many(queue, acks, demand, acc)
end
queue_to_consume =
(p2 ++ p1)
|> Enum.filter(fn {key, value} ->
queue = Map.get_lazy(queues, key, fn -> :queue.new() end)
not :queue.is_empty(queue) and length(elem(value, 0)) < max_demand
end)
|> List.first()

if queue_to_consume do
{key, _} = queue_to_consume
queue = Map.get_lazy(queues, key, fn -> :queue.new() end)
{demand, acc, queue} = dequeue_many_from(queue, demand, acc)
acks = update_last_offset(acks, key, acc)
{acks, demand, Enum.reverse(acc), Map.put(queues, key, queue)}
else
{acks, demand, Enum.reverse(acc), queues}
end
end

defp dequeue_many_from(queue, 0, acc) do
{0, acc, queue}
end

defp dequeue_many_from(queue, demand, acc) do
case :queue.out(queue) do
{{:value, message}, queue} ->
acc = [message | acc]
dequeue_many_from(queue, demand - 1, acc)

{:empty, queue} ->
{acks, demand, Enum.reverse(acc), queue}
{demand, acc, queue}
end
end

Expand All @@ -695,7 +748,7 @@ defmodule BroadwayKafka.Producer do
end

defp reset_buffer(state) do
put_in(state.buffer, :queue.new())
put_in(state.buffers, %{})
end

defp schedule_reconnect(timeout) do
Expand Down

0 comments on commit 4a4a69d

Please sign in to comment.