Skip to content

Commit

Permalink
Optimize Shard.list and Shard.get_by_key
Browse files Browse the repository at this point in the history
Previous list and get_by_key had to go through GenServer to acquire
values ets table and replicas information. In case GenServer was
processing an update (e.g. heartbeat, track, untrack) then list and
get_by_key functions were blocked until it was completed. We saw this
behaviour in our cluster where simple list/get_by_key calls were
sometimes taking over few hundred milliseconds.

Storing down replicas information in an ets table allows us to avoid
going through genserver and allows us to process list/get_by_key
immediately.

I removed dirty_list function which was not public / exposed and which
was trying to resolve the same issue. dirty_list was called dirty
because it didn't check for down_replicas. This solution checks
down_replicas and doesn't change the api interface.

This should also resolve #124
  • Loading branch information
indrekj committed Jul 15, 2019
1 parent 1bc0b25 commit e435ace
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 36 deletions.
13 changes: 7 additions & 6 deletions lib/phoenix/tracker/shard.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,24 @@ defmodule Phoenix.Tracker.Shard do
end

@spec list(pid | atom, topic) :: [presence]
def list(server_pid, topic) do
def list(server_pid, topic) when is_pid(server_pid) do
server_pid
|> GenServer.call({:list, topic})
|> State.get_by_topic(topic)
end

@doc false
def dirty_list(shard_name, topic) do
State.tracked_values(shard_name, topic, [])
def list(shard_name, topic) when is_atom(shard_name) do
State.get_by_topic(shard_name, topic)
end

@spec get_by_key(pid | atom, topic, term) :: [presence]
def get_by_key(server_pid, topic, key) do
def get_by_key(server_pid, topic, key) when is_pid(server_pid) do
server_pid
|> GenServer.call({:list, topic})
|> State.get_by_key(topic, key)
end
def get_by_key(shard_name, topic, key) when is_atom(shard_name) do
State.get_by_key(shard_name, topic, key)
end

@spec graceful_permdown(pid) :: :ok
def graceful_permdown(server_pid) do
Expand Down
52 changes: 32 additions & 20 deletions lib/phoenix/tracker/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ defmodule Phoenix.Tracker.State do
@type pid_lookup :: {pid, topic, key}

@type t :: %State{
replica: name,
context: context,
clouds: clouds,
values: values,
pids: ets_id,
mode: :unset | :delta | :normal,
delta: :unset | delta,
replicas: %{name => :up | :down},
range: {context, context}
replica: name,
context: context,
clouds: clouds,
values: values,
pids: ets_id,
mode: :unset | :delta | :normal,
delta: :unset | delta,
down_replicas: ets_id,
range: {context, context}
}

defstruct replica: nil,
Expand All @@ -39,7 +39,7 @@ defmodule Phoenix.Tracker.State do
pids: nil,
mode: :unset,
delta: :unset,
replicas: %{},
down_replicas: nil,
range: {%{}, %{}}

@compile {:inline, tag: 1, clock: 1, put_tag: 2, delete_tag: 2, remove_delta_tag: 2}
Expand All @@ -61,7 +61,7 @@ defmodule Phoenix.Tracker.State do
mode: :normal,
values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]),
pids: :ets.new(:pids, [:duplicate_bag]),
replicas: %{replica => :up}})
down_replicas: :ets.new(down_replicas_table(shard_name), [:named_table, :protected, :bag])})
end

@doc """
Expand Down Expand Up @@ -120,6 +120,9 @@ defmodule Phoenix.Tracker.State do
def get_by_topic(%State{values: values} = state, topic) do
tracked_values(values, topic, down_replicas(state))
end
def get_by_topic(shard_name, topic) do
tracked_values(shard_name, topic, down_replicas(shard_name))
end

@doc """
Returns a list of elements for the topic who belong to an online replica.
Expand All @@ -131,6 +134,12 @@ defmodule Phoenix.Tracker.State do
[_|_] = metas -> metas
end
end
def get_by_key(shard_name, topic, key) do
case tracked_key(shard_name, topic, key, down_replicas(shard_name)) do
[] -> []
[_|_] = metas -> metas
end
end

@doc """
Performs table lookup for tracked elements in the topic.
Expand Down Expand Up @@ -393,18 +402,18 @@ defmodule Phoenix.Tracker.State do
Marks a replica as up in the set and returns rejoined users.
"""
@spec replica_up(t, name) :: {t, joins :: [values], leaves :: []}
def replica_up(%State{replicas: replicas, context: ctx} = state, replica) do
{%State{state |
context: Map.put_new(ctx, replica, 0),
replicas: Map.put(replicas, replica, :up)}, replica_users(state, replica), []}
def replica_up(%State{down_replicas: down_replicas, context: ctx} = state, replica) do
:ets.delete_object(down_replicas, replica)
{%State{state | context: Map.put_new(ctx, replica, 0)}, replica_users(state, replica), []}
end

@doc """
Marks a replica as down in the set and returns left users.
"""
@spec replica_down(t, name) :: {t, joins:: [], leaves :: [values]}
def replica_down(%State{replicas: replicas} = state, replica) do
{%State{state | replicas: Map.put(replicas, replica, :down)}, [], replica_users(state, replica)}
def replica_down(%State{down_replicas: down_replicas} = state, replica) do
:ets.insert(down_replicas, replica)
{state, [], replica_users(state, replica)}
end

@doc """
Expand Down Expand Up @@ -556,9 +565,8 @@ defmodule Phoenix.Tracker.State do
end

@spec down_replicas(t) :: [name]
defp down_replicas(%State{replicas: replicas}) do
for {replica, :down} <- replicas, do: replica
end
defp down_replicas(%State{down_replicas: down_replicas}), do: :ets.tab2list(down_replicas)
defp down_replicas(shard_name), do: :ets.tab2list(down_replicas_table(shard_name))

@spec replica_users(t, name) :: [value]
defp replica_users(%State{values: values}, replica) do
Expand All @@ -575,4 +583,8 @@ defmodule Phoenix.Tracker.State do
defp foldl({objects, cont}, acc, func) do
foldl(:ets.select(cont), Enum.reduce(objects, acc, func), func)
end

defp down_replicas_table(shard_name) do
:"#{shard_name}.down_replicas"
end
end
13 changes: 3 additions & 10 deletions test/phoenix/tracker/shard_replication_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ defmodule Phoenix.Tracker.ShardReplicationTest do
# node1 fulfills tranfer request and sends transfer_ack to primary
assert_transfer_ack ref, from: @node1
assert_heartbeat to: @node1, from: @primary

# small delay to ensure transfer_ack has been processed before calling list
:timer.sleep(10)
assert [{"node1", _}] = list(shard, topic)
end

Expand Down Expand Up @@ -237,21 +240,15 @@ defmodule Phoenix.Tracker.ShardReplicationTest do
assert_join ^topic, "node1", %{name: "s1"}
assert %{@node1 => %Replica{status: :up}} = replicas(shard)
assert [{"local1", _}, {"node1", _}] = list(shard, topic)
assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic)

# nodedown
Process.unlink(node_pid)
Process.exit(node1_server, :kill)
assert_leave ^topic, "node1", %{name: "s1"}
assert %{@node1 => %Replica{status: :down}} = replicas(shard)
assert [{"local1", _}] = list(shard, topic)
assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic)

:timer.sleep(@permdown + 2*@heartbeat)
assert [{"local1", _}] = dirty_list(shard, topic)
end


test "untrack with no tracked topic is a noop",
%{shard: shard, topic: topic} do
assert Shard.untrack(shard, self(), topic, "foo") == :ok
Expand Down Expand Up @@ -382,8 +379,4 @@ defmodule Phoenix.Tracker.ShardReplicationTest do
defp list(shard, topic) do
Enum.sort(Shard.list(shard, topic))
end

defp dirty_list(shard, topic) do
Enum.sort(Shard.dirty_list(shard, topic))
end
end

0 comments on commit e435ace

Please sign in to comment.