Skip to content

Commit 061c69b

Browse files
author
José Valim
committed
Only rely on ETS tables inside local
1 parent 9a1286a commit 061c69b

File tree

1 file changed

+14
-69
lines changed

1 file changed

+14
-69
lines changed

lib/phoenix/pubsub/local.ex

Lines changed: 14 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ defmodule Phoenix.PubSub.Local do
7676
local_server
7777
|> subscribers_with_fastlanes(topic)
7878
|> Enum.reduce(%{}, fn
79-
{pid, _fastlanes}, cache when pid == from -> cache
79+
{pid, _fastlanes}, cache when pid == from ->
80+
cache
8081

8182
{pid, nil}, cache ->
8283
send(pid, msg)
@@ -149,95 +150,39 @@ defmodule Phoenix.PubSub.Local do
149150
end
150151

151152
@doc false
152-
# This is a private operation. DO NOT USE IT IN PROD.
153+
# This is an expensive and private operation. DO NOT USE IT IN PROD.
153154
def subscription(local_server, pid) when is_atom(local_server) do
154-
GenServer.call(local_server, {:subscription, pid})
155+
# TODO: Implement it using ETS similar to the list function above
155156
end
156157

157158
def init(name) do
158159
^name = :ets.new(name, [:bag, :named_table, read_concurrency: true])
159160
Process.flag(:trap_exit, true)
160-
{:ok, %{topics: name, pids: HashDict.new}}
161-
end
162-
163-
def handle_call({:subscription, pid}, _from, state) do
164-
case HashDict.fetch(state.pids, pid) do
165-
{:ok, {_ref, topics, fastlanes}} -> {:reply, {:ok, topics, fastlanes}, state}
166-
:error -> {:reply, :error, state}
167-
end
161+
{:ok, name}
168162
end
169163

170164
def handle_call({:subscribe, pid, topic, opts}, _from, state) do
171165
if opts[:link], do: Process.link(pid)
172-
{:reply, :ok, put_subscription(state, pid, topic, opts[:fastlane])}
166+
Process.monitor(pid)
167+
true = :ets.insert(state, {topic, {pid, opts[:fastlane]}})
168+
{:reply, :ok, state}
173169
end
174170

175171
def handle_call({:unsubscribe, pid, topic}, _from, state) do
176-
{:reply, :ok, drop_subscription(state, pid, topic)}
172+
true = :ets.match_delete(state, {topic, {pid, :_}})
173+
{:reply, :ok, state}
177174
end
178175

179-
def handle_info({:DOWN, ref, _type, pid, _info}, state) do
180-
{:noreply, drop_subscriber(state, pid, ref)}
176+
def handle_info({:DOWN, _ref, _type, pid, _info}, state) do
177+
true = :ets.match_delete(state, {:_, {pid, :_}})
178+
{:noreply, state}
181179
end
182180

183-
def handle_info({:EXIT, _linked_pid, _reason}, state) do
181+
def handle_info(_, state) do
184182
{:noreply, state}
185183
end
186184

187185
def terminate(_reason, _state) do
188186
:ok
189187
end
190-
191-
defp put_subscription(state, pid, topic, fastlane) do
192-
subscription = case HashDict.fetch(state.pids, pid) do
193-
{:ok, {ref, topics, fastlanes}} ->
194-
fastlanes = if fastlane, do: HashDict.put(fastlanes, topic, fastlane),
195-
else: fastlanes
196-
{ref, HashSet.put(topics, topic), fastlanes}
197-
:error ->
198-
fastlanes = if fastlane, do: HashDict.put(HashDict.new, topic, fastlane),
199-
else: HashDict.new
200-
{Process.monitor(pid), HashSet.put(HashSet.new, topic), fastlanes}
201-
end
202-
203-
true = :ets.insert(state.topics, {topic, {pid, fastlane}})
204-
%{state | pids: HashDict.put(state.pids, pid, subscription)}
205-
end
206-
207-
defp drop_subscription(state, pid, topic) do
208-
case HashDict.fetch(state.pids, pid) do
209-
{:ok, {ref, subd_topics, fastlanes}} ->
210-
subd_topics = HashSet.delete(subd_topics, topic)
211-
{fastlane, fastlanes} = HashDict.pop(fastlanes, topic)
212-
213-
pids =
214-
if Enum.any?(subd_topics) do
215-
HashDict.put(state.pids, pid, {ref, subd_topics, fastlanes})
216-
else
217-
Process.demonitor(ref, [:flush])
218-
HashDict.delete(state.pids, pid)
219-
end
220-
221-
true = :ets.delete_object(state.topics, {topic, {pid, fastlane}})
222-
%{state | pids: pids}
223-
224-
:error ->
225-
state
226-
end
227-
end
228-
229-
defp drop_subscriber(state, pid, ref) do
230-
case HashDict.get(state.pids, pid) do
231-
{^ref, topics, fastlanes} ->
232-
for topic <- topics do
233-
fastlane = HashDict.get(fastlanes, topic)
234-
true = :ets.delete_object(state.topics, {topic, {pid, fastlane}})
235-
end
236-
Process.demonitor(ref, [:flush])
237-
%{state | pids: HashDict.delete(state.pids, pid)}
238-
239-
_ref_pid_mismatch ->
240-
state
241-
end
242-
end
243188
end

0 commit comments

Comments
 (0)