/
socket_driver.ex
119 lines (98 loc) · 3.63 KB
/
socket_driver.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
defmodule SocketDriver do
use GenServer
require Logger
alias Phoenix.ChannelTest.NoopSerializer
def start_link(endpoint, socket_handler, opts \\ [], gen_server_opts \\ []) do
GenServer.start_link(__MODULE__, {endpoint, socket_handler, opts}, gen_server_opts)
end
def join(driver, topic, payload \\ %{}),
do: push(driver, topic, "phx_join", payload)
def push(driver, topic, event, payload) do
push(driver,
%Phoenix.Socket.Message{
event: event,
topic: topic,
payload: payload,
ref: make_ref
}
)
end
defp push(driver, message) do
:ok = GenServer.cast(driver, {:push, message})
message.ref
end
def init({endpoint, socket_handler, driver_opts}) do
Process.flag(:trap_exit, true)
# Using connect to create the socket
{:ok, socket} = Phoenix.Socket.Transport.connect(
endpoint,
socket_handler,
:socket_driver,
__MODULE__,
NoopSerializer,
%{"vsn" => driver_opts[:vsn] || "1.0.0"}
)
# A socket driver needs to manage some state
{:ok, %{
socket: socket, # the socket struct
channels: HashDict.new, # topic -> channel pid
channels_inverse: HashDict.new, # channel pid -> topic
receiver: driver_opts[:receiver]}
}
end
def handle_cast({:push, message}, state) do
{:noreply,
message
|> NoopSerializer.decode!([])
|> Phoenix.Socket.Transport.dispatch(state.channels, state.socket)
|> handle_socket_response(state)
}
end
# received through PubSub when a broadcast message is fastlaned
# The message format is governed by the serializer, in this case
# Phoenix.ChannelTest.NoopSerializer
def handle_info(%Phoenix.Socket.Message{} = encoded_message, state),
do: {:noreply, send_out(state, encoded_message)}
# received from the channel process if the callback function replies
def handle_info(%Phoenix.Socket.Reply{} = message, state),
do: {:noreply, encode_and_send_out(state, message)}
# channel process has terminated -> remove it from internal HashDicts
def handle_info({:EXIT, pid, reason}, state) do
case HashDict.fetch(state.channels_inverse, pid) do
:error -> {:noreply, state}
{:ok, topic} ->
{:noreply,
state
|> delete_channel_process(topic, pid)
|> encode_and_send_out(Phoenix.Socket.Transport.on_exit_message(topic, reason))
}
end
end
def handle_info(_message, state), do: {:noreply, state}
# Handling results of Phoenix.Socket.Transport.dispatch
defp handle_socket_response(:noreply, state), do: state
defp handle_socket_response({:reply, reply_message}, state),
do: encode_and_send_out(state, reply_message)
defp handle_socket_response({:joined, pid, reply_message}, state) do
state
|> store_channel_process(reply_message.topic, pid)
|> encode_and_send_out(reply_message)
end
defp handle_socket_response({:error, _reason, reply_message}, state),
do: encode_and_send_out(state, reply_message)
defp store_channel_process(state, topic, pid) do
state
|> update_in([:channels], &HashDict.put(&1, topic, pid))
|> update_in([:channels_inverse], &HashDict.put(&1, pid, topic))
end
defp delete_channel_process(state, topic, pid) do
state
|> update_in([:channels], &HashDict.delete(&1, topic))
|> update_in([:channels_inverse], &HashDict.delete(&1, pid))
end
defp encode_and_send_out(state, message), do: send_out(state, NoopSerializer.encode!(message))
defp send_out(state, encoded_message) do
if state.receiver, do: send(state.receiver, {:message, encoded_message})
state
end
end