Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Shard.list and Shard.get_by_key #127

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions lib/phoenix/tracker/shard.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,24 @@ defmodule Phoenix.Tracker.Shard do
end

@spec list(pid | atom, topic) :: [presence]
def list(server_pid, topic) do
def list(server_pid, topic) when is_pid(server_pid) do
server_pid
|> GenServer.call({:list, topic})
|> State.get_by_topic(topic)
end

@doc false
def dirty_list(shard_name, topic) do
State.tracked_values(shard_name, topic, [])
def list(shard_name, topic) when is_atom(shard_name) do
State.get_by_topic(shard_name, topic)
end

@spec get_by_key(pid | atom, topic, term) :: [presence]
def get_by_key(server_pid, topic, key) do
def get_by_key(server_pid, topic, key) when is_pid(server_pid) do
server_pid
|> GenServer.call({:list, topic})
|> State.get_by_key(topic, key)
end
def get_by_key(shard_name, topic, key) when is_atom(shard_name) do
State.get_by_key(shard_name, topic, key)
end

@spec graceful_permdown(pid) :: :ok
def graceful_permdown(server_pid) do
Expand Down
61 changes: 38 additions & 23 deletions lib/phoenix/tracker/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ defmodule Phoenix.Tracker.State do
@type pid_lookup :: {pid, topic, key}

@type t :: %State{
replica: name,
context: context,
clouds: clouds,
values: values,
pids: ets_id,
mode: :unset | :delta | :normal,
delta: :unset | delta,
replicas: %{name => :up | :down},
range: {context, context}
replica: name,
context: context,
clouds: clouds,
values: values,
pids: ets_id,
mode: :unset | :delta | :normal,
delta: :unset | delta,
down_replicas: ets_id,
range: {context, context}
}

defstruct replica: nil,
Expand All @@ -39,7 +39,7 @@ defmodule Phoenix.Tracker.State do
pids: nil,
mode: :unset,
delta: :unset,
replicas: %{},
down_replicas: nil,
range: {%{}, %{}}

@compile {:inline, tag: 1, clock: 1, put_tag: 2, delete_tag: 2, remove_delta_tag: 2}
Expand All @@ -61,7 +61,10 @@ defmodule Phoenix.Tracker.State do
mode: :normal,
values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]),
pids: :ets.new(:pids, [:duplicate_bag]),
replicas: %{replica => :up}})
down_replicas: :ets.new(
down_replicas_table(shard_name),
[:named_table, :protected, :bag, {:read_concurrency, true}]
)})
end

@doc """
Expand Down Expand Up @@ -116,21 +119,30 @@ defmodule Phoenix.Tracker.State do
@doc """
Returns a list of elements for the topic who belong to an online replica.
"""
@spec get_by_topic(t, topic) :: [key_meta]
@spec get_by_topic(t | atom, topic) :: [key_meta]
def get_by_topic(%State{values: values} = state, topic) do
tracked_values(values, topic, down_replicas(state))
end
def get_by_topic(shard_name, topic) do
tracked_values(shard_name, topic, down_replicas(shard_name))
end

@doc """
Returns a list of elements for the topic who belong to an online replica.
"""
@spec get_by_key(t, topic, key) :: [key_meta]
@spec get_by_key(t | atom, topic, key) :: [key_meta]
def get_by_key(%State{values: values} = state, topic, key) do
case tracked_key(values, topic, key, down_replicas(state)) do
[] -> []
[_|_] = metas -> metas
end
end
def get_by_key(shard_name, topic, key) do
case tracked_key(shard_name, topic, key, down_replicas(shard_name)) do
[] -> []
[_|_] = metas -> metas
end
end

@doc """
Performs table lookup for tracked elements in the topic.
Expand Down Expand Up @@ -393,18 +405,18 @@ defmodule Phoenix.Tracker.State do
Marks a replica as up in the set and returns rejoined users.
"""
@spec replica_up(t, name) :: {t, joins :: [values], leaves :: []}
def replica_up(%State{replicas: replicas, context: ctx} = state, replica) do
{%State{state |
context: Map.put_new(ctx, replica, 0),
replicas: Map.put(replicas, replica, :up)}, replica_users(state, replica), []}
def replica_up(%State{down_replicas: down_replicas, context: ctx} = state, replica) do
:ets.delete_object(down_replicas, replica)
{%State{state | context: Map.put_new(ctx, replica, 0)}, replica_users(state, replica), []}
end

@doc """
Marks a replica as down in the set and returns left users.
"""
@spec replica_down(t, name) :: {t, joins:: [], leaves :: [values]}
def replica_down(%State{replicas: replicas} = state, replica) do
{%State{state | replicas: Map.put(replicas, replica, :down)}, [], replica_users(state, replica)}
def replica_down(%State{down_replicas: down_replicas} = state, replica) do
:ets.insert(down_replicas, replica)
{state, [], replica_users(state, replica)}
end

@doc """
Expand Down Expand Up @@ -555,10 +567,9 @@ defmodule Phoenix.Tracker.State do
delta: %State{delta | range: {start_clock, new_end}}}
end

@spec down_replicas(t) :: [name]
defp down_replicas(%State{replicas: replicas}) do
for {replica, :down} <- replicas, do: replica
end
@spec down_replicas(t | atom) :: [name]
defp down_replicas(%State{down_replicas: down_replicas}), do: :ets.tab2list(down_replicas)
defp down_replicas(shard_name), do: :ets.tab2list(down_replicas_table(shard_name))

@spec replica_users(t, name) :: [value]
defp replica_users(%State{values: values}, replica) do
Expand All @@ -575,4 +586,8 @@ defmodule Phoenix.Tracker.State do
defp foldl({objects, cont}, acc, func) do
foldl(:ets.select(cont), Enum.reduce(objects, acc, func), func)
end

defp down_replicas_table(shard_name) do
:"#{shard_name}.down_replicas"
end
end
15 changes: 5 additions & 10 deletions test/phoenix/tracker/shard_replication_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ defmodule Phoenix.Tracker.ShardReplicationTest do
# node1 fulfills tranfer request and sends transfer_ack to primary
assert_transfer_ack ref, from: @node1
assert_heartbeat to: @node1, from: @primary

# small delay to ensure transfer_ack has been processed before calling list
:timer.sleep(10)
assert [{"node1", _}] = list(shard, topic)
end

Expand Down Expand Up @@ -87,6 +90,8 @@ defmodule Phoenix.Tracker.ShardReplicationTest do
assert_heartbeat from: @node1
assert_heartbeat from: @node2

# small delay to ensure transfer_ack has been processed before calling list
:timer.sleep(10)
assert [{"node1", _}, {"node1.2", _}, {"node2", _}] = list(shard, topic)
end

Expand Down Expand Up @@ -237,21 +242,15 @@ defmodule Phoenix.Tracker.ShardReplicationTest do
assert_join ^topic, "node1", %{name: "s1"}
assert %{@node1 => %Replica{status: :up}} = replicas(shard)
assert [{"local1", _}, {"node1", _}] = list(shard, topic)
assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic)

# nodedown
Process.unlink(node_pid)
Process.exit(node1_server, :kill)
assert_leave ^topic, "node1", %{name: "s1"}
assert %{@node1 => %Replica{status: :down}} = replicas(shard)
assert [{"local1", _}] = list(shard, topic)
assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic)

:timer.sleep(@permdown + 2*@heartbeat)
assert [{"local1", _}] = dirty_list(shard, topic)
end


test "untrack with no tracked topic is a noop",
%{shard: shard, topic: topic} do
assert Shard.untrack(shard, self(), topic, "foo") == :ok
Expand Down Expand Up @@ -382,8 +381,4 @@ defmodule Phoenix.Tracker.ShardReplicationTest do
defp list(shard, topic) do
Enum.sort(Shard.list(shard, topic))
end

defp dirty_list(shard, topic) do
Enum.sort(Shard.dirty_list(shard, topic))
end
end