-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
long_poll_server.ex
173 lines (140 loc) · 5.46 KB
/
long_poll_server.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
defmodule Phoenix.Transports.LongPoll.Supervisor do
@moduledoc false
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end
def init([]) do
children = [
worker(Phoenix.Transports.LongPoll.Server, [], restart: :temporary)
]
supervise(children, strategy: :simple_one_for_one)
end
end
defmodule Phoenix.Transports.LongPoll.Server do
@moduledoc false
use GenServer
alias Phoenix.PubSub
alias Phoenix.Socket.Transport
alias Phoenix.Socket.Broadcast
alias Phoenix.Socket.Message
@doc """
Starts the Server.
* `socket` - The `Phoenix.Socket` struct returend from `connect/2`
of the socket handler.
* `window_ms` - The longpoll session timeout, in milliseconds
If the server receives no message within `window_ms`, it terminates
and clients are responsible for opening a new session.
"""
def start_link(endpoint, handler, transport_name, transport,
serializer, params, window_ms, priv_topic) do
GenServer.start_link(__MODULE__, [endpoint, handler, transport_name, transport,
serializer, params, window_ms, priv_topic])
end
## Callbacks
def init([endpoint, handler, transport_name, transport,
serializer, params, window_ms, priv_topic]) do
Process.flag(:trap_exit, true)
case Transport.connect(endpoint, handler, transport_name, transport, serializer, params) do
{:ok, socket} ->
state = %{buffer: [],
socket: socket,
channels: HashDict.new,
channels_inverse: HashDict.new,
window_ms: trunc(window_ms * 1.5),
pubsub_server: socket.endpoint.__pubsub_server__(),
priv_topic: priv_topic,
last_client_poll: now_ms(),
client_ref: nil}
if socket.id, do: PubSub.subscribe(state.pubsub_server, self, socket.id, link: true)
:ok = PubSub.subscribe(state.pubsub_server, self, priv_topic, link: true)
schedule_inactive_shutdown(state.window_ms)
{:ok, state}
:error ->
:ignore
end
end
def handle_call(:stop, _from, state), do: {:stop, :shutdown, :ok, state}
# Handle client dispatches
def handle_info({:dispatch, client_ref, msg, ref}, state) do
msg
|> Transport.dispatch(state.channels, state.socket)
|> case do
{:joined, channel_pid, reply_msg} ->
broadcast_from!(state, client_ref, {:dispatch, ref})
new_state = %{state | channels: HashDict.put(state.channels, msg.topic, channel_pid),
channels_inverse: HashDict.put(state.channels_inverse, channel_pid, msg.topic)}
publish_reply(reply_msg, new_state)
{:reply, reply_msg} ->
broadcast_from!(state, client_ref, {:dispatch, ref})
publish_reply(reply_msg, state)
:noreply ->
broadcast_from!(state, client_ref, {:dispatch, ref})
{:noreply, state}
{:error, reason, error_reply_msg} ->
broadcast_from!(state, client_ref, {:error, reason, ref})
publish_reply(error_reply_msg, state)
end
end
# Detects disconnect broadcasts and shuts down
def handle_info(%Broadcast{event: "disconnect"}, state) do
{:stop, {:shutdown, :disconnected}, state}
end
def handle_info({:EXIT, channel_pid, reason}, state) do
case HashDict.get(state.channels_inverse, channel_pid) do
nil ->
{:stop, {:shutdown, :pubsub_server_terminated}, state}
topic ->
new_state = %{state | channels: HashDict.delete(state.channels, topic),
channels_inverse: HashDict.delete(state.channels_inverse, channel_pid)}
publish_reply(Transport.on_exit_message(topic, reason), new_state)
end
end
def handle_info({:subscribe, client_ref, ref}, state) do
broadcast_from!(state, client_ref, {:subscribe, ref})
{:noreply, state}
end
def handle_info({:flush, client_ref, ref}, state) do
case state.buffer do
[] ->
{:noreply, %{state | client_ref: {client_ref, ref}, last_client_poll: now_ms()}}
buffer ->
broadcast_from!(state, client_ref, {:messages, Enum.reverse(buffer), ref})
{:noreply, %{state | client_ref: nil, last_client_poll: now_ms(), buffer: []}}
end
end
def handle_info(:shutdown_if_inactive, state) do
if now_ms() - state.last_client_poll > state.window_ms do
{:stop, {:shutdown, :inactive}, state}
else
schedule_inactive_shutdown(state.window_ms)
{:noreply, state}
end
end
def handle_info(%Message{} = msg, state) do
publish_reply(msg, state)
end
def terminate(_reason, _state) do
:ok
end
defp broadcast_from!(state, client_ref, msg) when is_binary(client_ref),
do: PubSub.broadcast_from!(state.pubsub_server, self, client_ref, msg)
defp broadcast_from!(_state, client_ref, msg) when is_pid(client_ref),
do: send(client_ref, msg)
defp publish_reply(msg, state) do
msg = state.socket.serializer.encode!(msg)
case state.client_ref do
{client_ref, ref} ->
broadcast_from!(state, client_ref, {:now_available, ref})
nil ->
:ok
end
{:noreply, %{state | buffer: [msg | state.buffer]}}
end
defp time_to_ms({mega, sec, micro}),
do: div(((((mega * 1000000) + sec) * 1000000) + micro), 1000)
defp now_ms, do: :os.timestamp() |> time_to_ms()
defp schedule_inactive_shutdown(window_ms) do
Process.send_after(self, :shutdown_if_inactive, window_ms)
end
end