/
presence.ex
711 lines (561 loc) · 22.2 KB
/
presence.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
defmodule Phoenix.Presence do
@moduledoc """
Provides Presence tracking to processes and channels.
This behaviour provides presence features such as fetching
presences for a given topic, as well as handling diffs of
join and leave events as they occur in real-time. Using this
module defines a supervisor and a module that implements the
`Phoenix.Tracker` behaviour that uses `Phoenix.PubSub` to
broadcast presence updates.
In case you want to use only a subset of the functionality
provided by `Phoenix.Presence`, such as tracking processes
but without broadcasting updates, we recommend that you look
at the `Phoenix.Tracker` functionality from the `phoenix_pubsub`
project.
## Example Usage
Start by defining a presence module within your application
which uses `Phoenix.Presence` and provide the `:otp_app` which
holds your configuration, as well as the `:pubsub_server`.
defmodule MyAppWeb.Presence do
use Phoenix.Presence,
otp_app: :my_app,
pubsub_server: MyApp.PubSub
end
The `:pubsub_server` must point to an existing pubsub server
running in your application, which is included by default as
`MyApp.PubSub` for new applications.
Next, add the new supervisor to your supervision tree in
`lib/my_app/application.ex`. It must be after the PubSub child
and before the endpoint:
children = [
...
{Phoenix.PubSub, name: MyApp.PubSub},
MyAppWeb.Presence,
MyAppWeb.Endpoint
]
Once added, presences can be tracked in your channel after joining:
defmodule MyAppWeb.MyChannel do
use MyAppWeb, :channel
alias MyAppWeb.Presence
def join("some:topic", _params, socket) do
send(self(), :after_join)
{:ok, assign(socket, :user_id, ...)}
end
def handle_info(:after_join, socket) do
{:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
online_at: inspect(System.system_time(:second))
})
push(socket, "presence_state", Presence.list(socket))
{:noreply, socket}
end
end
In the example above, `Presence.track` is used to register this channel's process as a
presence for the socket's user ID, with a map of metadata.
Next, the current presence information for
the socket's topic is pushed to the client as a `"presence_state"` event.
Finally, a diff of presence join and leave events will be sent to the
client as they happen in real-time with the "presence_diff" event.
The diff structure will be a map of `:joins` and `:leaves` of the form:
%{
joins: %{"123" => %{metas: [%{status: "away", phx_ref: ...}]}},
leaves: %{"456" => %{metas: [%{status: "online", phx_ref: ...}]}}
},
See `c:list/1` for more information on the presence data structure.
## Fetching Presence Information
Presence metadata should be minimized and used to store small,
ephemeral state, such as a user's "online" or "away" status.
More detailed information, such as user details that need to be fetched
from the database, can be achieved by overriding the `c:fetch/2` function.
The `c:fetch/2` callback is triggered when using `c:list/1` and on
every update, and it serves as a mechanism to fetch presence information
a single time, before broadcasting the information to all channel subscribers.
This prevents N query problems and gives you a single place to group
isolated data fetching to extend presence metadata.
The function must return a map of data matching the outlined Presence
data structure, including the `:metas` key, but can extend the map of
information to include any additional information. For example:
def fetch(_topic, presences) do
users = presences |> Map.keys() |> Accounts.get_users_map()
for {key, %{metas: metas}} <- presences, into: %{} do
{key, %{metas: metas, user: users[String.to_integer(key)]}}
end
end
Where `Account.get_users_map/1` could be implemented like:
def get_users_map(ids) do
query =
from u in User,
where: u.id in ^ids,
select: {u.id, u}
query |> Repo.all() |> Enum.into(%{})
end
The `fetch/2` function above fetches all users from the database who
have registered presences for the given topic. The presences
information is then extended with a `:user` key of the user's
information, while maintaining the required `:metas` field from the
original presence data.
## Using Elixir as a Presence Client
Presence is great for external clients, such as JavaScript applications, but
it can also be used from an Elixir client process to keep track of presence
changes as they happen on the server. This can be accomplished by implementing
the optional [`init/1`](`c:init/1`) and [`handle_metas/4`](`c:handle_metas/4`)
callbacks on your presence module. For example, the following callback
receives presence metadata changes, and broadcasts to other Elixir processes
about users joining and leaving:
defmodule MyApp.Presence do
use Phoenix.Presence,
otp_app: :my_app,
pubsub_server: MyApp.PubSub
def init(_opts) do
{:ok, %{}} # user-land state
end
def handle_metas(topic, %{joins: joins, leaves: leaves}, presences, state) do
# fetch existing presence information for the joined users and broadcast the
# event to all subscribers
for {user_id, presence} <- joins do
user_data = %{user: presence.user, metas: Map.fetch!(presences, user_id)}
msg = {MyApp.PresenceClient, {:join, user_data}}
Phoenix.PubSub.local_broadcast(MyApp.PubSub, topic, msg)
end
# fetch existing presence information for the left users and broadcast the
# event to all subscribers
for {user_id, presence} <- leaves do
metas =
case Map.fetch(presences, user_id) do
{:ok, presence_metas} -> presence_metas
:error -> []
end
user_data = %{user: presence.user, metas: metas}
msg = {MyApp.PresenceClient, {:leave, user_data}}
Phoenix.PubSub.local_broadcast(MyApp.PubSub, topic, msg)
end
{:ok, state}
end
end
The `handle_metas/4` callback receives the topic, presence diff, current presences
for the topic with their metadata, and any user-land state accumulated from init and
subsequent `handle_metas/4` calls. In our example implementation, we walk the `:joins` and
`:leaves` in the diff, and populate a complete presence from our known presence information.
Then we broadcast to the local node subscribers about user joins and leaves.
## Testing with Presence
Every time the `fetch` callback is invoked, it is done from a separate
process. Given those processes run asynchronously, it is often necessary
to guarantee they have been shutdown at the end of every test. This can
be done by using ExUnit's `on_exit` hook plus `fetchers_pids` function:
on_exit(fn ->
for pid <- MyAppWeb.Presence.fetchers_pids() do
ref = Process.monitor(pid)
assert_receive {:DOWN, ^ref, _, _, _}, 1000
end
end)
"""
@type presences :: %{String.t() => %{metas: [map()]}}
@type presence :: %{key: String.t(), meta: map()}
@type topic :: String.t()
@doc """
Track a channel's process as a presence.
Tracked presences are grouped by `key`, cast as a string. For example, to
group each user's channels together, use user IDs as keys. Each presence can
be associated with a map of metadata to store small, ephemeral state, such as
a user's online status. To store detailed information, see `c:fetch/2`.
## Example
alias MyApp.Presence
def handle_info(:after_join, socket) do
{:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
online_at: inspect(System.system_time(:second))
})
{:noreply, socket}
end
"""
@callback track(socket :: Phoenix.Socket.t(), key :: String.t(), meta :: map()) ::
{:ok, ref :: binary()}
| {:error, reason :: term()}
@doc """
Track an arbitrary process as a presence.
Same with `track/3`, except track any process by `topic` and `key`.
"""
@callback track(pid, topic, key :: String.t(), meta :: map()) ::
{:ok, ref :: binary()}
| {:error, reason :: term()}
@doc """
Stop tracking a channel's process.
"""
@callback untrack(socket :: Phoenix.Socket.t(), key :: String.t()) :: :ok
@doc """
Stop tracking a process.
"""
@callback untrack(pid, topic, key :: String.t()) :: :ok
@doc """
Update a channel presence's metadata.
Replace a presence's metadata by passing a new map or a function that takes
the current map and returns a new one.
"""
@callback update(
socket :: Phoenix.Socket.t(),
key :: String.t(),
meta :: map() | (map() -> map())
) ::
{:ok, ref :: binary()}
| {:error, reason :: term()}
@doc """
Update a process presence's metadata.
Same as `update/3`, but with an arbitrary process.
"""
@callback update(pid, topic, key :: String.t(), meta :: map() | (map() -> map())) ::
{:ok, ref :: binary()}
| {:error, reason :: term()}
@doc """
Returns presences for a socket/topic.
## Presence data structure
The presence information is returned as a map with presences grouped
by key, cast as a string, and accumulated metadata, with the following form:
%{key => %{metas: [%{phx_ref: ..., ...}, ...]}}
For example, imagine a user with id `123` online from two
different devices, as well as a user with id `456` online from
just one device. The following presence information might be returned:
%{"123" => %{metas: [%{status: "away", phx_ref: ...},
%{status: "online", phx_ref: ...}]},
"456" => %{metas: [%{status: "online", phx_ref: ...}]}}
The keys of the map will usually point to a resource ID. The value
will contain a map with a `:metas` key containing a list of metadata
for each resource. Additionally, every metadata entry will contain a
`:phx_ref` key which can be used to uniquely identify metadata for a
given key. In the event that the metadata was previously updated,
a `:phx_ref_prev` key will be present containing the previous
`:phx_ref` value.
"""
@callback list(Phoenix.Socket.t() | topic) :: presences
@doc """
Returns the map of presence metadata for a socket/topic-key pair.
## Examples
Uses the same data format as each presence in `c:list/1`, but only
returns metadata for the presences under a topic and key pair. For example,
a user with key `"user1"`, connected to the same chat room `"room:1"` from two
devices, could return:
iex> MyPresence.get_by_key("room:1", "user1")
[%{name: "User 1", metas: [%{device: "Desktop"}, %{device: "Mobile"}]}]
Like `c:list/1`, the presence metadata is passed to the `fetch`
callback of your presence module to fetch any additional information.
"""
@callback get_by_key(Phoenix.Socket.t() | topic, key :: String.t()) :: [presence]
@doc """
Extend presence information with additional data.
When `c:list/1` is used to list all presences of the given `topic`, this
callback is triggered once to modify the result before it is broadcasted to
all channel subscribers. This avoids N query problems and provides a single
place to extend presence metadata. You must return a map of data matching the
original result, including the `:metas` key, but can extend the map to include
any additional information.
The default implementation simply passes `presences` through unchanged.
## Example
def fetch(_topic, presences) do
query =
from u in User,
where: u.id in ^Map.keys(presences),
select: {u.id, u}
users = query |> Repo.all() |> Enum.into(%{})
for {key, %{metas: metas}} <- presences, into: %{} do
{key, %{metas: metas, user: users[key]}}
end
end
"""
@callback fetch(topic, presences) :: presences
@doc """
Initializes the presence client state.
Invoked when your presence module starts, allows dynamically
providing initial state for handling presence metadata.
"""
@callback init(state :: term) :: {:ok, new_state :: term}
@doc """
Receives presence metadata changes.
"""
@callback handle_metas(topic :: String.t(), diff :: map(), presences :: map(), state :: term) ::
{:ok, term}
@optional_callbacks init: 1, handle_metas: 4
defmacro __using__(opts) do
quote location: :keep, bind_quoted: [opts: opts] do
@behaviour Phoenix.Presence
@opts opts
@task_supervisor Module.concat(__MODULE__, "TaskSupervisor")
_ = opts[:otp_app] || raise "use Phoenix.Presence expects :otp_app to be given"
# User defined
def fetch(_topic, presences), do: presences
defoverridable fetch: 2
# Private
def child_spec(opts) do
opts = Keyword.merge(@opts, opts)
%{
id: __MODULE__,
start: {Phoenix.Presence, :start_link, [__MODULE__, @task_supervisor, opts]},
type: :supervisor
}
end
# API
def track(%Phoenix.Socket{} = socket, key, meta) do
track(socket.channel_pid, socket.topic, key, meta)
end
def track(pid, topic, key, meta) do
Phoenix.Tracker.track(__MODULE__, pid, topic, key, meta)
end
def untrack(%Phoenix.Socket{} = socket, key) do
untrack(socket.channel_pid, socket.topic, key)
end
def untrack(pid, topic, key) do
Phoenix.Tracker.untrack(__MODULE__, pid, topic, key)
end
def update(%Phoenix.Socket{} = socket, key, meta) do
update(socket.channel_pid, socket.topic, key, meta)
end
def update(pid, topic, key, meta) do
Phoenix.Tracker.update(__MODULE__, pid, topic, key, meta)
end
def list(%Phoenix.Socket{topic: topic}), do: list(topic)
def list(topic), do: Phoenix.Presence.list(__MODULE__, topic)
def get_by_key(%Phoenix.Socket{topic: topic}, key), do: get_by_key(topic, key)
def get_by_key(topic, key), do: Phoenix.Presence.get_by_key(__MODULE__, topic, key)
def fetchers_pids(), do: Task.Supervisor.children(@task_supervisor)
end
end
defmodule Tracker do
@moduledoc false
use Phoenix.Tracker
def start_link({module, task_supervisor, opts}) do
pubsub_server =
opts[:pubsub_server] || raise "use Phoenix.Presence expects :pubsub_server to be given"
Phoenix.Tracker.start_link(
__MODULE__,
{module, task_supervisor, pubsub_server},
opts
)
end
def init(state), do: Phoenix.Presence.init(state)
def handle_diff(diff, state), do: Phoenix.Presence.handle_diff(diff, state)
def handle_info(msg, state),
do: Phoenix.Presence.handle_info(msg, state)
end
@doc false
def start_link(module, task_supervisor, opts) do
otp_app = opts[:otp_app]
opts =
opts
|> Keyword.merge(Application.get_env(otp_app, module, []))
|> Keyword.put(:name, module)
children = [
{Task.Supervisor, name: task_supervisor},
{Tracker, {module, task_supervisor, opts}}
]
sup_opts = [
strategy: :rest_for_one,
name: Module.concat(module, "Supervisor")
]
Supervisor.start_link(children, sup_opts)
end
@doc false
def init({module, task_supervisor, pubsub_server}) do
state = %{
module: module,
task_supervisor: task_supervisor,
pubsub_server: pubsub_server,
topics: %{},
tasks: :queue.new(),
current_task: nil,
client_state: nil
}
client_state =
if function_exported?(module, :handle_metas, 4) do
unless function_exported?(module, :init, 1) do
raise ArgumentError, """
missing #{inspect(module)}.init/1 callback for client state
When you implement the handle_metas/4 callback, you must also
implement init/1. For example, add the following to
#{inspect(module)}:
def init(_opts), do: {:ok, %{}}
"""
end
case module.init(%{}) do
{:ok, client_state} ->
client_state
other ->
raise ArgumentError, """
expected #{inspect(module)}.init/1 to return {:ok, state}, got: #{inspect(other)}
"""
end
end
{:ok, %{state | client_state: client_state}}
end
@doc false
def handle_diff(diff, state) do
{:ok, async_merge(state, diff)}
end
@doc false
def handle_info({task_ref, {:phoenix, ref, computed_diffs}}, state) do
%{current_task: current_task} = state
{^ref, %Task{ref: ^task_ref} = task} = current_task
Task.shutdown(task)
Enum.each(computed_diffs, fn {topic, presence_diff} ->
Phoenix.Channel.Server.local_broadcast(
state.pubsub_server,
topic,
"presence_diff",
presence_diff
)
end)
new_state =
if function_exported?(state.module, :handle_metas, 4) do
do_handle_metas(state, computed_diffs)
else
state
end
{:noreply, next_task(new_state)}
end
@doc false
def list(module, topic) do
grouped =
module
|> Phoenix.Tracker.list(topic)
|> group()
module.fetch(topic, grouped)
end
@doc false
def get_by_key(module, topic, key) do
string_key = to_string(key)
case Phoenix.Tracker.get_by_key(module, topic, key) do
[] ->
[]
[_ | _] = pid_metas ->
metas = Enum.map(pid_metas, fn {_pid, meta} -> meta end)
%{^string_key => fetched_metas} = module.fetch(topic, %{string_key => %{metas: metas}})
fetched_metas
end
end
@doc false
def group(presences) do
presences
|> Enum.reverse()
|> Enum.reduce(%{}, fn {key, meta}, acc ->
Map.update(acc, to_string(key), %{metas: [meta]}, fn %{metas: metas} ->
%{metas: [meta | metas]}
end)
end)
end
defp send_continue(%Task{} = task, ref), do: send(task.pid, {ref, :continue})
defp next_task(state) do
case :queue.out(state.tasks) do
{{:value, {ref, %Task{} = next}}, remaining_tasks} ->
send_continue(next, ref)
%{state | current_task: {ref, next}, tasks: remaining_tasks}
{:empty, _} ->
%{state | current_task: nil, tasks: :queue.new()}
end
end
defp do_handle_metas(state, computed_diffs) do
Enum.reduce(computed_diffs, state, fn {topic, presence_diff}, acc ->
updated_topics = merge_diff(acc.topics, topic, presence_diff)
topic_presences =
case Map.fetch(updated_topics, topic) do
{:ok, presences} -> presences
:error -> %{}
end
case acc.module.handle_metas(topic, presence_diff, topic_presences, acc.client_state) do
{:ok, updated_client_state} ->
%{acc | topics: updated_topics, client_state: updated_client_state}
other ->
raise ArgumentError, """
expected #{inspect(acc.module)}.handle_metas/4 to return {:ok, new_state}.
got: #{inspect(other)}
"""
end
end)
end
defp async_merge(state, diff) do
%{module: module} = state
ref = make_ref()
new_task =
Task.Supervisor.async(state.task_supervisor, fn ->
computed_diffs =
Enum.map(diff, fn {topic, {joins, leaves}} ->
joins = module.fetch(topic, Phoenix.Presence.group(joins))
leaves = module.fetch(topic, Phoenix.Presence.group(leaves))
{topic, %{joins: joins, leaves: leaves}}
end)
receive do
{^ref, :continue} -> {:phoenix, ref, computed_diffs}
end
end)
if state.current_task do
%{state | tasks: :queue.in({ref, new_task}, state.tasks)}
else
send_continue(new_task, ref)
%{state | current_task: {ref, new_task}}
end
end
defp merge_diff(topics, topic, %{leaves: leaves, joins: joins} = _diff) do
# add new topic if needed
updated_topics =
if Map.has_key?(topics, topic) do
topics
else
add_new_topic(topics, topic)
end
# merge diff into topics
{updated_topics, _topic} = Enum.reduce(joins, {updated_topics, topic}, &handle_join/2)
{updated_topics, _topic} = Enum.reduce(leaves, {updated_topics, topic}, &handle_leave/2)
# if no more presences for given topic, remove topic
if topic_presences_count(updated_topics, topic) == 0 do
remove_topic(updated_topics, topic)
else
updated_topics
end
end
defp handle_join({joined_key, presence}, {topics, topic}) do
joined_metas = Map.get(presence, :metas, [])
{add_new_presence_or_metas(topics, topic, joined_key, joined_metas), topic}
end
defp handle_leave({left_key, presence}, {topics, topic}) do
{remove_presence_or_metas(topics, topic, left_key, presence), topic}
end
defp add_new_presence_or_metas(
topics,
topic,
key,
new_metas
) do
topic_presences = topics[topic]
updated_topic =
case Map.fetch(topic_presences, key) do
# existing presence, add new metas
{:ok, existing_metas} ->
remaining_metas = new_metas -- existing_metas
updated_metas = existing_metas ++ remaining_metas
Map.put(topic_presences, key, updated_metas)
# there are no presences for that key
:error ->
Map.put_new(topic_presences, key, new_metas)
end
Map.put(topics, topic, updated_topic)
end
defp remove_presence_or_metas(
topics,
topic,
key,
deleted_metas
) do
topic_presences = topics[topic]
presence_metas = Map.get(topic_presences, key, [])
remaining_metas = presence_metas -- Map.get(deleted_metas, :metas, [])
updated_topic =
case remaining_metas do
[] -> Map.delete(topic_presences, key)
_ -> Map.put(topic_presences, key, remaining_metas)
end
Map.put(topics, topic, updated_topic)
end
defp add_new_topic(topics, topic) do
Map.put_new(topics, topic, %{})
end
defp remove_topic(topics, topic) do
Map.delete(topics, topic)
end
defp topic_presences_count(topics, topic) do
map_size(topics[topic])
end
end