Skip to content

Commit

Permalink
Add graceful exit contract to transports
Browse files Browse the repository at this point in the history
ref: phoenixframework/phoenix_pubsub#68

Previously a channel could gracefully terminate
using the stop semantics of a regular genserver;
however when restarting an application for deploys
the shutdown of the transport and channel processes
would be indistinguisable from an intentional channel
shutdown and would cause clients to incorrectly not
reconnect after server restart. This commit adds a
{:graceful_exit, channel_pid, %Phoenix.Socket.Message{}}
contract to distinguish an intentional channel exit from
what should be regarded as an error condition on the client.
  • Loading branch information
chrismccord committed Feb 21, 2017
1 parent 663d5f7 commit e1b504b
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 31 deletions.
14 changes: 13 additions & 1 deletion lib/phoenix/channel/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,16 @@ defmodule Phoenix.Channel.Server do

defp handle_result({:stop, reason, reply, socket}, callback) do
handle_reply(socket, reply, callback)
{:stop, reason, socket}
handle_result({:stop, reason, socket}, callback)
end

defp handle_result({:stop, reason, socket}, _callback) do
case reason do
:normal -> notify_transport_of_graceful_exit(socket)
:shutdown -> notify_transport_of_graceful_exit(socket)
{:shutdown, _} -> notify_transport_of_graceful_exit(socket)
_ -> :noop
end
{:stop, reason, socket}
end

Expand Down Expand Up @@ -382,4 +388,10 @@ defmodule Phoenix.Channel.Server do
Use `push/3` to send an out-of-band message down the socket
"""
end

defp notify_transport_of_graceful_exit(socket) do
Phoenix.Socket.Transport.notify_graceful_exit(socket)
Process.unlink(socket.transport_pid)
:ok
end
end
2 changes: 2 additions & 0 deletions lib/phoenix/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ defmodule Phoenix.Socket do
* `handler` - The socket module where this socket originated, for example: `MyApp.UserSocket`
* `joined` - If the socket has effectively joined the channel
* `pubsub_server` - The registered name of the socket's pubsub server
* `join_ref` - The ref sent by the client when joining
* `ref` - The latest ref sent by the client
* `topic` - The string topic, for example `"room:123"`
* `transport` - The socket's transport, for example: `Phoenix.Transports.WebSocket`
Expand Down Expand Up @@ -145,6 +146,7 @@ defmodule Phoenix.Socket do
joined: false,
pubsub_server: nil,
ref: nil,
join_ref: nil,
topic: nil,
transport: nil,
transport_pid: nil,
Expand Down
35 changes: 21 additions & 14 deletions lib/phoenix/socket/transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,23 @@ defmodule Phoenix.Socket.Transport do
serializer that abides to the behaviour described in
`Phoenix.Transports.Serializer`.
## Managing channels
## Managing channel exits
Because channels are spawned from the transport process, transports
must trap exits and correctly handle the `{:EXIT, _, _}` messages
arriving from channels, relaying the proper response to the client.
The following events are sent by the transport when a channel exits:
The `"phx_error"` event is sent by the transport when a channel exits,
and represents the channel terminating against its will. The
`on_exit_message/3` function aids in constructing the `"phx_error"` message.
* `"phx_close"` - The channel has exited gracefully
* `"phx_error"` - The channel has crashed
For graceful exits, the channel will notify the transort it is
gracefully terminating via the following message:
The `on_exit_message/3` function aids in constructing these messages.
{:graceful_exit, channel_pid, %Phoenix.Socket.Message{}}
The `%Phoenix.Socket.Message{}` is the leave message for the transport
to relay to the client.
## Duplicate Join Subscriptions
Expand Down Expand Up @@ -224,16 +229,17 @@ defmodule Phoenix.Socket.Transport do
end

@doc false
def build_channel_socket(%Socket{} = socket, channel, topic) do
def build_channel_socket(%Socket{} = socket, channel, topic, join_ref) do
%Socket{socket |
topic: topic,
channel: channel,
join_ref: join_ref,
private: channel.__socket__(:private)}
end

defp do_dispatch(nil, %{event: "phx_join", topic: topic} = msg, base_socket) do
if channel = base_socket.handler.__channel__(topic, base_socket.transport_name) do
socket = build_channel_socket(base_socket, channel, topic)
socket = build_channel_socket(base_socket, channel, topic, msg.ref)

case Phoenix.Channel.Server.join(socket, msg.payload) do
{:ok, response, pid} ->
Expand Down Expand Up @@ -282,13 +288,14 @@ defmodule Phoenix.Socket.Transport do
IO.write :stderr, "Phoenix.Transport.on_exit_message/2 is deprecated. Use on_exit_message/3 instead."
on_exit_message(topic, nil, reason)
end
def on_exit_message(topic, join_ref, reason) do
case reason do
:normal -> %Message{ref: join_ref, topic: topic, event: "phx_close", payload: %{}}
:shutdown -> %Message{ref: join_ref, topic: topic, event: "phx_close", payload: %{}}
{:shutdown, _} -> %Message{ref: join_ref, topic: topic, event: "phx_close", payload: %{}}
_ -> %Message{ref: join_ref, topic: topic, event: "phx_error", payload: %{}}
end
def on_exit_message(topic, join_ref, _reason) do
%Message{ref: join_ref, topic: topic, event: "phx_error", payload: %{}}
end

@doc false
def notify_graceful_exit(%Socket{topic: topic, join_ref: ref} = socket) do
close_msg = %Message{ref: ref, topic: topic, event: "phx_close", payload: %{}}
send(socket.transport_pid, {:graceful_exit, self(), close_msg})
end

@doc """
Expand Down
4 changes: 3 additions & 1 deletion lib/phoenix/test/channel_test.ex
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,9 @@ defmodule Phoenix.ChannelTest do
"""
def join(%Socket{} = socket, channel, topic, payload \\ %{})
when is_atom(channel) and is_binary(topic) and is_map(payload) do
socket = Transport.build_channel_socket(socket, channel, topic)

ref = System.unique_integer([:positive])
socket = Transport.build_channel_socket(socket, channel, topic, ref)

case Server.join(socket, payload) do
{:ok, reply, pid} ->
Expand Down
5 changes: 5 additions & 0 deletions lib/phoenix/transports/long_poll_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ defmodule Phoenix.Transports.LongPoll.Server do
end
end

def handle_info({:graceful_exit, channel_pid, %Phoenix.Socket.Message{} = msg}, state) do
new_state = delete(state, msg.topic, channel_pid)
publish_reply(msg, new_state)
end

def handle_info({:subscribe, client_ref, ref}, state) do
broadcast_from!(state, client_ref, {:subscribe, ref})
{:noreply, state}
Expand Down
7 changes: 6 additions & 1 deletion lib/phoenix/transports/websocket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,15 @@ defmodule Phoenix.Transports.WebSocket do
nil -> {:ok, state}
{topic, join_ref} ->
new_state = delete(state, topic, channel_pid)
encode_reply Transport.on_exit_message(topic, join_ref, reason), new_state
encode_reply(Transport.on_exit_message(topic, join_ref, reason), new_state)
end
end

def ws_info({:graceful_exit, channel_pid, %Phoenix.Socket.Message{} = msg}, state) do
new_state = delete(state, msg.topic, channel_pid)
encode_reply(msg, new_state)
end

@doc false
def ws_info(%Broadcast{event: "disconnect"}, state) do
{:shutdown, state}
Expand Down
2 changes: 1 addition & 1 deletion test/phoenix/integration/long_poll_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ defmodule Phoenix.Integration.LongPollTest do

resp = poll(:get, "/ws", session)

[_phx_reply, _joined, _user_entered, _leave_reply, _you_left_msg, phx_close] = resp.body["messages"]
[_phx_reply, _joined, _user_entered, _leave_reply, phx_close, _you_left_msg] = resp.body["messages"]

assert phx_close ==
%{"event" => "phx_close", "payload" => %{}, "ref" => "123", "topic" => "room:lobby"}
Expand Down
32 changes: 25 additions & 7 deletions test/phoenix/test/channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ defmodule Phoenix.Test.ChannelTest do

@moduletag :capture_log

defp assert_graceful_exit(pid) do
assert_receive {:graceful_exit, ^pid, %Message{event: "phx_close"}}
end

defmodule Endpoint do
use Phoenix.Endpoint, otp_app: :phoenix
end
Expand Down Expand Up @@ -74,6 +78,10 @@ defmodule Phoenix.Test.ChannelTest do
{:reply, :ok, socket}
end

def handle_in("crash", %{}, _socket) do
raise "boom!"
end

def handle_in("async_reply", %{"req" => arg}, socket) do
ref = socket_ref(socket)
Task.start(fn -> reply(ref, {:ok, %{"async_resp" => arg}}) end)
Expand Down Expand Up @@ -245,13 +253,23 @@ defmodule Phoenix.Test.ChannelTest do
assert_reply ref, :ok, %{"async_resp" => "foo"}
end

test "crashed channel propagates exit" do
Process.flag(:trap_exit, true)
{:ok, _, socket} = join(socket(), Channel, "foo:ok")
push socket, "crash", %{}
pid = socket.channel_pid
assert_receive {:terminate, _}
assert_receive {:EXIT, ^pid, _}
refute_receive {:graceful_exit, _, _}
end

test "pushes on stop" do
Process.flag(:trap_exit, true)
{:ok, _, socket} = join(socket(), Channel, "foo:ok")
push socket, "stop", %{"reason" => :normal}
pid = socket.channel_pid
assert_receive {:terminate, :normal}
assert_receive {:EXIT, ^pid, :normal}
assert_graceful_exit(pid)

# Pushing after stop doesn't crash the client/transport
Process.flag(:trap_exit, false)
Expand All @@ -266,14 +284,14 @@ defmodule Phoenix.Test.ChannelTest do
assert_reply ref, :ok
pid = socket.channel_pid
assert_receive {:terminate, :shutdown}
assert_receive {:EXIT, ^pid, :shutdown}
assert_graceful_exit(pid)

{:ok, _, socket} = join(socket(), Channel, "foo:ok")
ref = push socket, "stop_and_reply", %{"req" => "foo"}
assert_reply ref, :ok, %{"resp" => "foo"}
pid = socket.channel_pid
assert_receive {:terminate, :shutdown}
assert_receive {:EXIT, ^pid, :shutdown}
assert_graceful_exit(pid)
end

test "pushes and broadcast messages" do
Expand Down Expand Up @@ -322,7 +340,7 @@ defmodule Phoenix.Test.ChannelTest do
broadcast_from! socket, "stop", %{"foo" => "bar"}
pid = socket.channel_pid
assert_receive {:terminate, :shutdown}
assert_receive {:EXIT, ^pid, :shutdown}
assert_graceful_exit(pid)
end

## handle_info
Expand All @@ -333,7 +351,7 @@ defmodule Phoenix.Test.ChannelTest do
pid = socket.channel_pid
send pid, :stop
assert_receive {:terminate, :shutdown}
assert_receive {:EXIT, ^pid, :shutdown}
assert_graceful_exit(pid)
end

test "handles messages and pushes" do
Expand All @@ -358,7 +376,7 @@ defmodule Phoenix.Test.ChannelTest do

pid = socket.channel_pid
assert_receive {:terminate, {:shutdown, :left}}
assert_receive {:EXIT, ^pid, {:shutdown, :left}}
assert_graceful_exit(pid)

# Leaving again doesn't crash
_ = leave(socket)
Expand All @@ -371,7 +389,7 @@ defmodule Phoenix.Test.ChannelTest do

pid = socket.channel_pid
assert_receive {:terminate, {:shutdown, :closed}}
assert_receive {:EXIT, ^pid, {:shutdown, :closed}}
assert_graceful_exit(pid)

# Closing again doesn't crash
_ = close(socket)
Expand Down
6 changes: 0 additions & 6 deletions test/phoenix/transports/transport_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@ defmodule Phoenix.Transports.TransportTest do
## on_exit_message

test "on_exit_message/3" do
assert Transport.on_exit_message("foo", "1", :normal) ==
%Message{ref: "1", event: "phx_close", payload: %{}, topic: "foo"}
assert Transport.on_exit_message("foo", "1", :shutdown) ==
%Message{ref: "1", event: "phx_close", payload: %{}, topic: "foo"}
assert Transport.on_exit_message("foo", "1", {:shutdown, :whatever}) ==
%Message{ref: "1", event: "phx_close", payload: %{}, topic: "foo"}
assert Transport.on_exit_message("foo", "1", :oops) ==
%Message{ref: "1", event: "phx_error", payload: %{}, topic: "foo"}
end
Expand Down

0 comments on commit e1b504b

Please sign in to comment.