Skip to content

Commit

Permalink
Update ETS atomically on tracker update (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatanklosko committed May 24, 2023
1 parent 16e1327 commit fc5686f
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 15 deletions.
19 changes: 15 additions & 4 deletions lib/phoenix/tracker/shard.ex
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,22 @@ defmodule Phoenix.Tracker.Shard do
Phoenix.PubSub.subscribe(pubsub_server, namespaced_topic, link: true)
end

defp put_update(state, pid, topic, key, meta, %{phx_ref: ref} = prev_meta) do
state
|> put_presences(State.leave(state.presences, pid, topic, key))
|> put_presence(pid, topic, key, Map.put(meta, :phx_ref_prev, ref), prev_meta)
defp put_update(state, pid, topic, key, meta, %{phx_ref: prev_ref} = prev_meta) do
ref = random_ref()

meta =
meta
|> Map.put(:phx_ref, ref)
|> Map.put(:phx_ref_prev, prev_ref)

new_state =
state
|> report_diff_join(topic, key, meta, prev_meta)
|> put_presences(State.leave_join(state.presences, pid, topic, key, meta))

{new_state, ref}
end

defp put_presence(state, pid, topic, key, meta, prev_meta \\ nil) do
Process.link(pid)
ref = random_ref()
Expand Down
62 changes: 51 additions & 11 deletions lib/phoenix/tracker/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,27 @@ defmodule Phoenix.Tracker.State do
add(state, pid, topic, key, meta)
end

@doc """
Updates an element via leave and join.
Atomically updates ETS local entry.
"""
@spec leave_join(t, pid, topic, key, meta) :: t
def leave_join(%State{delta: delta} = state, pid, topic, key, meta) do
# Produce remove-like delta
[{{^topic, ^pid, ^key}, _meta, tag}] = :ets.lookup(state.values, {topic, pid, key})
pruned_clouds = delete_tag(state.clouds, tag)
new_delta = remove_delta_tag(state.delta, tag)
state = bump_clock(%State{state | clouds: pruned_clouds, delta: new_delta})

# Update ETS entry and produce add-like delta
state = bump_clock(state)
tag = tag(state)
true = :ets.insert(state.values, {{topic, pid, key}, meta, tag})
new_delta = %State{delta | values: Map.put(delta.values, tag, {pid, topic, key, meta})}
%State{state | delta: new_delta}
end

@doc """
Removes an element from the set.
"""
Expand Down Expand Up @@ -287,10 +308,31 @@ defmodule Phoenix.Tracker.State do
end

defp merge(local, remote, remote_map) do
{pids, joins} = accumulate_joins(local, remote_map)
{clouds, delta, leaves} = observe_removes(local, remote, remote_map)
{added_pids, joins} = accumulate_joins(local, remote_map)
{clouds, delta, leaves, removed_pids} = observe_removes(local, remote, remote_map)

# We diff ETS deletes and inserts, this way if there is an update
# operation (leave + join) we handle it atomically via insert into
# the :ordered_set table

added_value_keys = for {value_key, _meta, _tag} <- joins, do: value_key
removed_value_keys = for {value_key, _meta, _tag} <- leaves, do: value_key
value_keys_to_remove = removed_value_keys -- added_value_keys

pids_to_remove = removed_pids -- added_pids
pids_to_add = added_pids -- removed_pids

for value_key <- value_keys_to_remove do
:ets.delete(local.values, value_key)
end

for pid <- pids_to_remove do
:ets.match_delete(local.pids, pid)
end

true = :ets.insert(local.values, joins)
true = :ets.insert(local.pids, pids)
true = :ets.insert(local.pids, pids_to_add)

known_remote_context = Map.take(remote.context, Map.keys(local.context))
ctx = Clock.upperbound(local.context, known_remote_context)
new_state =
Expand All @@ -313,11 +355,11 @@ defmodule Phoenix.Tracker.State do
end)
end

@spec observe_removes(t, t, map) :: {clouds, delta, leaves :: [value]}
defp observe_removes(%State{pids: pids, values: values, delta: delta} = local, remote, remote_map) do
@spec observe_removes(t, t, map) :: {clouds, delta, leaves :: [value], removed_pids :: [pid_lookup]}
defp observe_removes(%State{values: values, delta: delta} = local, remote, remote_map) do
unioned_clouds = union_clouds(local, remote)
%State{context: remote_context, clouds: remote_clouds} = remote
init = {unioned_clouds, delta, []}
init = {unioned_clouds, delta, [], []}
local_replica = local.replica
# fn {_, _, {replica, _}} = result when replica != local_replica -> result end
ms = [{
Expand All @@ -326,13 +368,11 @@ defmodule Phoenix.Tracker.State do
[:"$_"]
}]

foldl(values, init, ms, fn {{topic, pid, key} = values_key, _, tag} = el, {clouds, delta, leaves} ->
foldl(values, init, ms, fn {{topic, pid, key}, _, tag} = el, {clouds, delta, leaves, removed_pids} ->
if not match?(%{^tag => _}, remote_map) and in?(remote_context, remote_clouds, tag) do
:ets.delete(values, values_key)
:ets.match_delete(pids, {pid, topic, key})
{delete_tag(clouds, tag), remove_delta_tag(delta, tag), [el | leaves]}
{delete_tag(clouds, tag), remove_delta_tag(delta, tag), [el | leaves], [{pid, topic, key} | removed_pids]}
else
{clouds, delta, leaves}
{clouds, delta, leaves, removed_pids}
end
end)
end
Expand Down
11 changes: 11 additions & 0 deletions test/phoenix/tracker/state_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ defmodule Phoenix.Tracker.StateTest do
assert {^a, [], []} = State.merge(a, State.extract(b, a.replica, a.context))

assert (State.online_list(b) |> Enum.sort) == (State.online_list(a) |> Enum.sort)

# update
b = State.leave_join(b, carol, "lobby", :carol, %{updated: true})
pids_before = tab2list(a.pids)

assert {a, [{{_, _, :carol}, %{updated: true}, _}], [{{_, _, :carol}, _, _}]} =
State.merge(a, State.extract(b, a.replica, a.context))

assert {^a, [], []} = State.merge(a, State.extract(b, a.replica, a.context))

assert pids_before == tab2list(a.pids)
end

test "basic netsplit", config do
Expand Down

0 comments on commit fc5686f

Please sign in to comment.