-
Notifications
You must be signed in to change notification settings - Fork 119
/
local.ex
235 lines (188 loc) · 6.42 KB
/
local.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
defmodule Phoenix.PubSub.Local do
@moduledoc """
PubSub implementation for handling local-node process groups.
This module is used by Phoenix pubsub adapters to handle
their local node subscriptions and it is usually not accessed
directly. See `Phoenix.PubSub.PG2` for an example integration.
"""
use GenServer
@doc """
Starts the server.
* `server_name` - The name to register the server under
"""
def start_link(server_name, gc_name) do
GenServer.start_link(__MODULE__, {server_name, gc_name}, name: server_name)
end
@doc """
Subscribes the pid to the topic.
* `pubsub_server` - The registered server name
* `pool_size` - The size of the pool
* `pid` - The subscriber pid
* `topic` - The string topic, for example "users:123"
* `opts` - The optional list of options. Supported options
only include `:link` to link the subscriber to local
## Examples
iex> subscribe(MyApp.PubSub, 1, self, "foo")
:ok
"""
def subscribe(pubsub_server, pool_size, pid, topic, opts \\ []) when is_atom(pubsub_server) do
{:ok, {topics, pids}} =
pubsub_server
|> local_for_pid(pid, pool_size)
|> GenServer.call({:subscribe, pid, topic, opts})
true = :ets.insert(topics, {topic, {pid, opts[:fastlane]}})
true = :ets.insert(pids, {pid, topic})
:ok
end
@doc """
Unsubscribes the pid from the topic.
* `pubsub_server` - The registered server name
* `pool_size` - The size of the pool
* `pid` - The subscriber pid
* `topic` - The string topic, for example "users:123"
## Examples
iex> unsubscribe(MyApp.PubSub, 1, self, "foo")
:ok
"""
def unsubscribe(pubsub_server, pool_size, pid, topic) when is_atom(pubsub_server) do
{local_server, gc_server} =
pid
|> pid_to_shard(pool_size)
|> pools_for_shard(pubsub_server)
:ok = Phoenix.PubSub.GC.unsubscribe(pid, topic, local_server, gc_server)
end
@doc """
Sends a message to all subscribers of a topic.
* `pubsub_server` - The registered server name
* `pool_size` - The size of the pool
* `topic` - The string topic, for example "users:123"
## Examples
iex> broadcast(MyApp.PubSub, 1, self, "foo")
:ok
iex> broadcast(MyApp.PubSub, 1, :none, "bar")
:ok
"""
def broadcast(fastlane, pubsub_server, 1 = _pool_size, from, topic, msg) when is_atom(pubsub_server) do
do_broadcast(fastlane, pubsub_server, _shard = 0, from, topic, msg)
:ok
end
def broadcast(fastlane, pubsub_server, pool_size, from, topic, msg) when is_atom(pubsub_server) do
parent = self
for shard <- 0..(pool_size - 1) do
Task.async(fn ->
do_broadcast(fastlane, pubsub_server, shard, from, topic, msg)
Process.unlink(parent)
end)
end |> Enum.map(&Task.await(&1, :infinity))
:ok
end
defp do_broadcast(nil, pubsub_server, shard, from, topic, msg) do
pubsub_server
|> subscribers_with_fastlanes(topic, shard)
|> Enum.each(fn
{pid, _} when pid == from -> :noop
{pid, _} -> send(pid, msg)
end)
end
defp do_broadcast(fastlane, pubsub_server, shard, from, topic, msg) do
pubsub_server
|> subscribers_with_fastlanes(topic, shard)
|> fastlane.fastlane(from, msg) # TODO: Test this contract
end
@doc """
Returns a set of subscribers pids for the given topic.
* `pubsub_server` - The registered server name or pid
* `topic` - The string topic, for example "users:123"
* `shard` - The shard, for example `1`
## Examples
iex> subscribers(:pubsub_server, "foo", 1)
[#PID<0.48.0>, #PID<0.49.0>]
"""
def subscribers(pubsub_server, topic, shard) when is_atom(pubsub_server) do
pubsub_server
|> subscribers_with_fastlanes(topic, shard)
|> Enum.map(fn {pid, _fastlanes} -> pid end)
end
@doc """
Returns a set of subscribers pids for the given topic with fastlane tuples.
See `subscribers/1` for more information.
"""
def subscribers_with_fastlanes(pubsub_server, topic, shard) when is_atom(pubsub_server) do
try do
shard
|> local_for_shard(pubsub_server)
|> :ets.lookup_element(topic, 2)
catch
:error, :badarg -> []
end
end
@doc false
# This is an expensive and private operation. DO NOT USE IT IN PROD.
def list(pubsub_server, shard) when is_atom(pubsub_server) do
shard
|> local_for_shard(pubsub_server)
|> :ets.select([{{:'$1', :_}, [], [:'$1']}])
|> Enum.uniq
end
@doc false
# This is an expensive and private operation. DO NOT USE IT IN PROD.
def subscription(pubsub_server, pool_size, pid) when is_atom(pubsub_server) do
{_local, gc_server} =
pid
|> pid_to_shard(pool_size)
|> pools_for_shard(pubsub_server)
GenServer.call(gc_server, {:subscription, pid})
end
@doc false
def local_name(pubsub_server, shard) do
Module.concat(["#{pubsub_server}.Local#{shard}"])
end
@doc false
def gc_name(pubsub_server, shard) do
Module.concat(["#{pubsub_server}.GC#{shard}"])
end
def init({local, gc}) do
^local = :ets.new(local, [:duplicate_bag, :named_table, :public,
read_concurrency: true, write_concurrency: true])
^gc = :ets.new(gc, [:duplicate_bag, :named_table, :public,
read_concurrency: true, write_concurrency: true])
Process.flag(:trap_exit, true)
{:ok, %{topics: local, pids: gc, gc_server: gc}}
end
def handle_call({:subscribe, pid, _topic, opts}, _from, state) do
if opts[:link], do: Process.link(pid)
Process.monitor(pid)
{:reply, {:ok, {state.topics, state.pids}}, state}
end
def handle_info({:DOWN, _ref, _type, pid, _info}, state) do
Phoenix.PubSub.GC.down(state.gc_server, pid)
{:noreply, state}
end
def handle_info(_, state) do
{:noreply, state}
end
defp local_for_pid(pubsub_server, pid, pool_size) do
pid
|> pid_to_shard(pool_size)
|> local_for_shard(pubsub_server)
end
defp local_for_shard(shard, pubsub_server) do
{local_server, _gc_server} = pools_for_shard(shard, pubsub_server)
local_server
end
defp pools_for_shard(shard, pubsub_server) do
[{^shard, {_, _} = servers}] = :ets.lookup(pubsub_server, shard)
servers
end
defp pid_to_shard(pid, shard_size) do
pid
|> pid_id()
|> rem(shard_size)
end
defp pid_id(pid) do
binary = :erlang.term_to_binary(pid)
prefix = (byte_size(binary) - 9) * 8
<<_::size(prefix), id::size(32), _::size(40)>> = binary
id
end
end