-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
socket.ex
720 lines (563 loc) · 23.2 KB
/
socket.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
defmodule Phoenix.Socket do
@moduledoc ~S"""
A socket implementation that multiplexes messages over channels.
`Phoenix.Socket` is used as a module for establishing and maintaining
the socket state via the `Phoenix.Socket` struct.
Once connected to a socket, incoming and outgoing events are routed to
channels. The incoming client data is routed to channels via transports.
It is the responsibility of the socket to tie transports and channels
together.
By default, Phoenix supports both websockets and longpoll when invoking
`Phoenix.Endpoint.socket/3` in your endpoint:
socket "/socket", MyAppWeb.Socket, websocket: true, longpoll: false
The command above means incoming socket connections can be made via
a WebSocket connection. Events are routed by topic to channels:
channel "room:lobby", MyAppWeb.LobbyChannel
See `Phoenix.Channel` for more information on channels.
## Socket Behaviour
Socket handlers are mounted in Endpoints and must define two callbacks:
* `connect/3` - receives the socket params, connection info if any, and
authenticates the connection. Must return a `Phoenix.Socket` struct,
often with custom assigns
* `id/1` - receives the socket returned by `connect/3` and returns the
id of this connection as a string. The `id` is used to identify socket
connections, often to a particular user, allowing us to force disconnections.
For sockets requiring no authentication, `nil` can be returned
## Examples
defmodule MyAppWeb.UserSocket do
use Phoenix.Socket
channel "room:*", MyAppWeb.RoomChannel
def connect(params, socket, _connect_info) do
{:ok, assign(socket, :user_id, params["user_id"])}
end
def id(socket), do: "users_socket:#{socket.assigns.user_id}"
end
# Disconnect all user's socket connections and their multiplexed channels
MyAppWeb.Endpoint.broadcast("users_socket:" <> user.id, "disconnect", %{})
## Socket fields
* `:id` - The string id of the socket
* `:assigns` - The map of socket assigns, default: `%{}`
* `:channel` - The current channel module
* `:channel_pid` - The channel pid
* `:endpoint` - The endpoint module where this socket originated, for example: `MyAppWeb.Endpoint`
* `:handler` - The socket module where this socket originated, for example: `MyAppWeb.UserSocket`
* `:joined` - If the socket has effectively joined the channel
* `:join_ref` - The ref sent by the client when joining
* `:ref` - The latest ref sent by the client
* `:pubsub_server` - The registered name of the socket's pubsub server
* `:topic` - The string topic, for example `"room:123"`
* `:transport` - An identifier for the transport, used for logging
* `:transport_pid` - The pid of the socket's transport process
* `:serializer` - The serializer for socket messages
## Using options
On `use Phoenix.Socket`, the following options are accepted:
* `:log` - the default level to log socket actions. Defaults
to `:info`. May be set to `false` to disable it
* `:partitions` - each channel is spawned under a supervisor.
This option controls how many supervisors will be spawned
to handle channels. Defaults to the number of cores.
## Garbage collection
It's possible to force garbage collection in the transport process after
processing large messages. For example, to trigger such from your channels,
run:
send(socket.transport_pid, :garbage_collect)
## Client-server communication
The encoding of server data and the decoding of client data is done
according to a serializer, defined in `Phoenix.Socket.Serializer`.
By default, JSON encoding is used to broker messages to and from
clients with `Phoenix.Socket.V2.JSONSerializer`.
The serializer `decode!` function must return a `Phoenix.Socket.Message`
which is forwarded to channels except:
* `"heartbeat"` events in the "phoenix" topic - should just emit an OK reply
* `"phx_join"` on any topic - should join the topic
* `"phx_leave"` on any topic - should leave the topic
Each message also has a `ref` field which is used to track responses.
The server may send messages or replies back. For messages, the
ref uniquely identifies the message. For replies, the ref matches
the original message. Both data-types also include a join_ref that
uniquely identifies the currently joined channel.
The `Phoenix.Socket` implementation may also send special messages
and replies:
* `"phx_error"` - in case of errors, such as a channel process
crashing, or when attempting to join an already joined channel
* `"phx_close"` - the channel was gracefully closed
Phoenix ships with a JavaScript implementation of both websocket
and long polling that interacts with Phoenix.Socket and can be
used as reference for those interested in implementing custom clients.
## Custom sockets and transports
See the `Phoenix.Socket.Transport` documentation for more information on
writing your own socket that does not leverage channels or for writing
your own transports that interacts with other sockets.
## Custom channels
You can list any module as a channel as long as it implements
a `start_link/1` function that receives a tuple with three elements:
{auth_payload, from, socket}
A custom channel implementation MUST invoke
`GenServer.reply(from, {:ok | :error, reply_payload})` during its
initialization with a custom `reply_payload` that will be sent as
a reply to the client. Failing to do so will block the socket forever.
A custom channel receives `Phoenix.Socket.Message` structs as regular
messages from the transport. Replies to those messages and custom
messages can be sent to the socket at any moment by building an
appropriate `Phoenix.Socket.Reply` and `Phoenix.Socket.Message`
structs, encoding them with the serializer and dispatching the
serialized result to the transport.
For example, to handle "phx_leave" messages, which is recommended
to be handled by all channel implementations, one may do:
def handle_info(
%Message{topic: topic, event: "phx_leave"} = message,
%{topic: topic, serializer: serializer, transport_pid: transport_pid} = socket
) do
send transport_pid, serializer.encode!(build_leave_reply(message))
{:stop, {:shutdown, :left}, socket}
end
We also recommend all channels to monitor the `transport_pid`
on `init` and exit if the transport exits. We also advise to rewrite
`:normal` exit reasons (usually due to the socket being closed)
to the `{:shutdown, :closed}` to guarantee links are broken on
the channel exit (as a `:normal` exit does not break links):
def handle_info({:DOWN, _, _, transport_pid, reason}, %{transport_pid: transport_pid} = socket) do
reason = if reason == :normal, do: {:shutdown, :closed}, else: reason
{:stop, reason, socket}
end
Any process exit is treated as an error by the socket layer unless
a `{:socket_close, pid, reason}` message is sent to the socket before
shutdown.
Custom channel implementations cannot be tested with `Phoenix.ChannelTest`
and are currently considered experimental. The underlying API may be
changed at any moment.
**Note:** in future Phoenix versions we will require custom channels
to provide a custom `child_spec/1` function instead of `start_link/1`.
Since the default behaviour of `child_spec/1` is to invoke `start_link/1`,
this behaviour should be backwards compatible in almost all cases.
"""
require Logger
require Phoenix.Endpoint
alias Phoenix.Socket
alias Phoenix.Socket.{Broadcast, Message, Reply}
@doc """
Receives the socket params and authenticates the connection.
## Socket params and assigns
Socket params are passed from the client and can
be used to verify and authenticate a user. After
verification, you can put default assigns into
the socket that will be set for all channels, ie
{:ok, assign(socket, :user_id, verified_user_id)}
To deny connection, return `:error`.
See `Phoenix.Token` documentation for examples in
performing token verification on connect.
"""
@callback connect(params :: map, Socket.t) :: {:ok, Socket.t} | :error
@callback connect(params :: map, Socket.t, connect_info :: map) :: {:ok, Socket.t} | :error
@doc ~S"""
Identifies the socket connection.
Socket IDs are topics that allow you to identify all sockets for a given user:
def id(socket), do: "users_socket:#{socket.assigns.user_id}"
Would allow you to broadcast a `"disconnect"` event and terminate
all active sockets and channels for a given user:
MyAppWeb.Endpoint.broadcast("users_socket:" <> user.id, "disconnect", %{})
Returning `nil` makes this socket anonymous.
"""
@callback id(Socket.t) :: String.t | nil
@optional_callbacks connect: 2, connect: 3
defmodule InvalidMessageError do
@moduledoc """
Raised when the socket message is invalid.
"""
defexception [:message]
end
defstruct assigns: %{},
channel: nil,
channel_pid: nil,
endpoint: nil,
handler: nil,
id: nil,
joined: false,
join_ref: nil,
private: %{},
pubsub_server: nil,
ref: nil,
serializer: nil,
topic: nil,
transport: nil,
transport_pid: nil
@type t :: %Socket{
assigns: map,
channel: atom,
channel_pid: pid,
endpoint: atom,
handler: atom,
id: String.t | nil,
joined: boolean,
ref: term,
private: %{},
pubsub_server: atom,
serializer: atom,
topic: String.t,
transport: atom,
transport_pid: pid,
}
defmacro __using__(opts) do
quote do
## User API
import Phoenix.Socket
@behaviour Phoenix.Socket
@before_compile Phoenix.Socket
Module.register_attribute(__MODULE__, :phoenix_channels, accumulate: true)
@phoenix_socket_options unquote(opts)
## Callbacks
@behaviour Phoenix.Socket.Transport
@doc false
def child_spec(opts) do
Phoenix.Socket.__child_spec__(__MODULE__, opts, @phoenix_socket_options)
end
@doc false
def connect(map), do: Phoenix.Socket.__connect__(__MODULE__, map, @phoenix_socket_options)
@doc false
def init(state), do: Phoenix.Socket.__init__(state)
@doc false
def handle_in(message, state), do: Phoenix.Socket.__in__(message, state)
@doc false
def handle_info(message, state), do: Phoenix.Socket.__info__(message, state)
@doc false
def terminate(reason, state), do: Phoenix.Socket.__terminate__(reason, state)
end
end
## USER API
@doc """
Adds key value pairs to socket assigns.
A single key value pair may be passed, a keyword list or map
of assigns may be provided to be merged into existing socket
assigns.
## Examples
iex> assign(socket, :name, "Elixir")
iex> assign(socket, name: "Elixir", logo: "💧")
"""
def assign(%Socket{} = socket, key, value) do
assign(socket, [{key, value}])
end
def assign(%Socket{} = socket, attrs)
when is_map(attrs) or is_list(attrs) do
%{socket | assigns: Map.merge(socket.assigns, Map.new(attrs))}
end
@doc """
Defines a channel matching the given topic and transports.
* `topic_pattern` - The string pattern, for example `"room:*"`, `"users:*"`,
or `"system"`
* `module` - The channel module handler, for example `MyAppWeb.RoomChannel`
* `opts` - The optional list of options, see below
## Options
* `:assigns` - the map of socket assigns to merge into the socket on join
## Examples
channel "topic1:*", MyChannel
## Topic Patterns
The `channel` macro accepts topic patterns in two flavors. A splat (the `*`
character) argument can be provided as the last character to indicate a
`"topic:subtopic"` match. If a plain string is provided, only that topic will
match the channel handler. Most use-cases will use the `"topic:*"` pattern to
allow more versatile topic scoping.
See `Phoenix.Channel` for more information
"""
defmacro channel(topic_pattern, module, opts \\ []) do
# Tear the alias to simply store the root in the AST.
# This will make Elixir unable to track the dependency between
# endpoint <-> socket and avoid recompiling the endpoint
# (alongside the whole project) whenever the socket changes.
module = tear_alias(module)
quote do
@phoenix_channels {unquote(topic_pattern), unquote(module), unquote(opts)}
end
end
defp tear_alias({:__aliases__, meta, [h|t]}) do
alias = {:__aliases__, meta, [h]}
quote do
Module.concat([unquote(alias)|unquote(t)])
end
end
defp tear_alias(other), do: other
@doc false
@deprecated "transport/3 in Phoenix.Socket is deprecated and has no effect"
defmacro transport(_name, _module, _config \\ []) do
:ok
end
defmacro __before_compile__(env) do
channels = Module.get_attribute(env.module, :phoenix_channels)
channel_defs =
for {topic_pattern, module, opts} <- channels do
topic_pattern
|> to_topic_match()
|> defchannel(module, opts)
end
quote do
unquote(channel_defs)
def __channel__(_topic), do: nil
end
end
defp to_topic_match(topic_pattern) do
case String.split(topic_pattern, "*") do
[prefix, ""] -> quote do: <<unquote(prefix) <> _rest>>
[bare_topic] -> bare_topic
_ -> raise ArgumentError, "channels using splat patterns must end with *"
end
end
defp defchannel(topic_match, channel_module, opts) do
quote do
def __channel__(unquote(topic_match)), do: unquote({channel_module, Macro.escape(opts)})
end
end
## CALLBACKS IMPLEMENTATION
def __child_spec__(handler, opts, socket_options) do
endpoint = Keyword.fetch!(opts, :endpoint)
opts = Keyword.merge(socket_options, opts)
partitions = Keyword.get(opts, :partitions, System.schedulers_online())
args = {endpoint, handler, partitions}
Supervisor.child_spec({Phoenix.Socket.PoolSupervisor, args}, id: handler)
end
def __connect__(user_socket, map, socket_options) do
%{
endpoint: endpoint,
options: options,
transport: transport,
params: params,
connect_info: connect_info
} = map
vsn = params["vsn"] || "1.0.0"
options = Keyword.merge(socket_options, options)
start = System.monotonic_time()
case negotiate_serializer(Keyword.fetch!(options, :serializer), vsn) do
{:ok, serializer} ->
result = user_connect(user_socket, endpoint, transport, serializer, params, connect_info)
metadata = %{
endpoint: endpoint,
transport: transport,
params: params,
connect_info: connect_info,
vsn: vsn,
user_socket: user_socket,
log: Keyword.get(options, :log, :info),
result: result(result),
serializer: serializer
}
duration = System.monotonic_time() - start
:telemetry.execute([:phoenix, :socket_connected], %{duration: duration}, metadata)
result
:error ->
:error
end
end
defp result({:ok, _}), do: :ok
defp result(:error), do: :error
def __init__({state, %{id: id, endpoint: endpoint} = socket}) do
_ = id && endpoint.subscribe(id, link: true)
{:ok, {state, %{socket | transport_pid: self()}}}
end
def __in__({payload, opts}, {state, socket}) do
%{topic: topic} = message = socket.serializer.decode!(payload, opts)
handle_in(Map.get(state.channels, topic), message, state, socket)
end
def __info__({:DOWN, ref, _, pid, reason}, {state, socket}) do
case state.channels_inverse do
%{^pid => {topic, join_ref}} ->
state = delete_channel(state, pid, topic, ref)
{:push, encode_on_exit(socket, topic, join_ref, reason), {state, socket}}
%{} ->
{:ok, {state, socket}}
end
end
def __info__(%Broadcast{event: "disconnect"}, state) do
{:stop, {:shutdown, :disconnected}, state}
end
def __info__({:socket_push, opcode, payload}, state) do
{:push, {opcode, payload}, state}
end
def __info__({:socket_close, pid, _reason}, {state, socket}) do
case state.channels_inverse do
%{^pid => {topic, join_ref}} ->
{^pid, monitor_ref} = Map.fetch!(state.channels, topic)
state = delete_channel(state, pid, topic, monitor_ref)
{:push, encode_close(socket, topic, join_ref), {state, socket}}
%{} ->
{:ok, {state, socket}}
end
end
def __info__(:garbage_collect, state) do
:erlang.garbage_collect(self())
{:ok, state}
end
def __info__(_, state) do
{:ok, state}
end
def __terminate__(_reason, _state_socket) do
:ok
end
defp negotiate_serializer(serializers, vsn) when is_list(serializers) do
case Version.parse(vsn) do
{:ok, vsn} ->
serializers
|> Enum.find(:error, fn {_serializer, vsn_req} -> Version.match?(vsn, vsn_req) end)
|> case do
{serializer, _vsn_req} ->
{:ok, serializer}
:error ->
Logger.error "The client's requested transport version \"#{vsn}\" " <>
"does not match server's version requirements of #{inspect serializers}"
:error
end
:error ->
Logger.error "Client sent invalid transport version \"#{vsn}\""
:error
end
end
defp user_connect(handler, endpoint, transport, serializer, params, connect_info) do
if pubsub_server = endpoint.config(:pubsub_server) do
# The information in the Phoenix.Socket goes to userland and channels.
socket = %Socket{
handler: handler,
endpoint: endpoint,
pubsub_server: pubsub_server,
serializer: serializer,
transport: transport
}
user_connect(handler, params, socket, connect_info)
else
Logger.error """
The :pubsub_server was not configured for endpoint #{inspect(endpoint)}.
Make sure to start a PubSub proccess in your application supervision tree:
{Phoenix.PubSub, [name: YOURAPP.PubSub, adapter: Phoenix.PubSub.PG2]}
And then list it your endpoint config:
pubsub_server: YOURAPP.PubSub
"""
:error
end
end
defp user_connect(handler, params, socket, connect_info) do
# The information in the state is kept only inside the socket process.
state = %{
channels: %{},
channels_inverse: %{}
}
connect_result =
if function_exported?(handler, :connect, 3) do
handler.connect(params, socket, connect_info)
else
handler.connect(params, socket)
end
case connect_result do
{:ok, %Socket{} = socket} ->
case handler.id(socket) do
nil ->
{:ok, {state, socket}}
id when is_binary(id) ->
{:ok, {state, %{socket | id: id}}}
invalid ->
Logger.error "#{inspect handler}.id/1 returned invalid identifier " <>
"#{inspect invalid}. Expected nil or a string."
:error
end
:error ->
:error
invalid ->
connect_arity = if function_exported?(handler, :connect, 3), do: "connect/3", else: "connect/2"
Logger.error "#{inspect handler}. #{connect_arity} returned invalid value #{inspect invalid}. " <>
"Expected {:ok, socket} or :error"
:error
end
end
defp handle_in(_, %{ref: ref, topic: "phoenix", event: "heartbeat"}, state, socket) do
reply = %Reply{
ref: ref,
topic: "phoenix",
status: :ok,
payload: %{}
}
{:reply, :ok, encode_reply(socket, reply), {state, socket}}
end
defp handle_in(nil, %{event: "phx_join", topic: topic, ref: ref, join_ref: join_ref} = message, state, socket) do
case socket.handler.__channel__(topic) do
{channel, opts} ->
case Phoenix.Channel.Server.join(socket, channel, message, opts) do
{:ok, reply, pid} ->
reply = %Reply{join_ref: join_ref, ref: ref, topic: topic, status: :ok, payload: reply}
state = put_channel(state, pid, topic, join_ref)
{:reply, :ok, encode_reply(socket, reply), {state, socket}}
{:error, reply} ->
reply = %Reply{join_ref: join_ref, ref: ref, topic: topic, status: :error, payload: reply}
{:reply, :error, encode_reply(socket, reply), {state, socket}}
end
_ ->
{:reply, :error, encode_ignore(socket, message), {state, socket}}
end
end
defp handle_in({pid, ref}, %{event: "phx_join", topic: topic} = message, state, socket) do
Logger.debug fn ->
"Duplicate channel join for topic \"#{topic}\" in #{inspect(socket.handler)}. " <>
"Closing existing channel for new join."
end
:ok = shutdown_duplicate_channel(pid)
state = delete_channel(state, pid, topic, ref)
handle_in(nil, message, state, socket)
end
defp handle_in({pid, _ref}, message, state, socket) do
send(pid, message)
{:ok, {state, socket}}
end
defp handle_in(nil, %{event: "phx_leave", ref: ref, topic: topic, join_ref: join_ref}, state, socket) do
reply = %Reply{
ref: ref,
join_ref: join_ref,
topic: topic,
status: :ok,
payload: %{}
}
{:reply, :ok, encode_reply(socket, reply), {state, socket}}
end
defp handle_in(nil, message, state, socket) do
{:reply, :error, encode_ignore(socket, message), {state, socket}}
end
defp put_channel(state, pid, topic, join_ref) do
%{channels: channels, channels_inverse: channels_inverse} = state
monitor_ref = Process.monitor(pid)
%{
state |
channels: Map.put(channels, topic, {pid, monitor_ref}),
channels_inverse: Map.put(channels_inverse, pid, {topic, join_ref})
}
end
defp delete_channel(state, pid, topic, monitor_ref) do
%{channels: channels, channels_inverse: channels_inverse} = state
Process.demonitor(monitor_ref, [:flush])
%{
state |
channels: Map.delete(channels, topic),
channels_inverse: Map.delete(channels_inverse, pid)
}
end
defp encode_on_exit(socket, topic, ref, _reason) do
message = %Message{join_ref: ref, ref: ref, topic: topic, event: "phx_error", payload: %{}}
encode_reply(socket, message)
end
defp encode_ignore(%{handler: handler} = socket, %{ref: ref, topic: topic}) do
Logger.warn fn -> "Ignoring unmatched topic \"#{topic}\" in #{inspect(handler)}" end
reply = %Reply{ref: ref, topic: topic, status: :error, payload: %{reason: "unmatched topic"}}
encode_reply(socket, reply)
end
defp encode_reply(%{serializer: serializer}, message) do
{:socket_push, opcode, payload} = serializer.encode!(message)
{opcode, payload}
end
defp encode_close(socket, topic, join_ref) do
message = %Message{join_ref: join_ref, ref: join_ref, topic: topic, event: "phx_close", payload: %{}}
encode_reply(socket, message)
end
defp shutdown_duplicate_channel(pid) do
ref = Process.monitor(pid)
Process.exit(pid, {:shutdown, :duplicate_join})
receive do
{:DOWN, ^ref, _, _, _} -> :ok
after
5_000 ->
Process.exit(pid, :kill)
receive do: ({:DOWN, ^ref, _, _, _} -> :ok)
end
end
end