-
Notifications
You must be signed in to change notification settings - Fork 1
/
socket.go
291 lines (274 loc) · 9.7 KB
/
socket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
// Package socket handles communication with a player using a websocket connection
package socket
import (
"context"
"fmt"
"net"
"sync"
"time"
"github.com/jacobpatterson1549/selene-bananas/game/message"
"github.com/jacobpatterson1549/selene-bananas/game/player"
"github.com/jacobpatterson1549/selene-bananas/server/log"
)
type (
// Socket reads and writes messages to the browsers
Socket struct {
log log.Logger
Conn Conn
PlayerName player.Name
Addr message.Addr
Config
}
// Config contains commonly shared Socket properties
Config struct {
// Debug is a flag that causes the socket to log the types non-ping/pong messages that are read/written
Debug bool
// ReadWait is the amount of time that can pass between receiving client messages before timing out.
ReadWait time.Duration
// WriteWait is the amount of time that the socket can take to write a message.
WriteWait time.Duration
// PingPeriod is how often ping messages should be sent. Should be less than WriteWait.
PingPeriod time.Duration
// HTTPPingPeriod is how frequently to ask the client to send an HTTP request, as Heroku servers shut down if 30 minutes passess between HTTP requests.
HTTPPingPeriod time.Duration
// TimeFunc is a function which should supply the current time since the unix epoch.
// Used to update the read deadline.
TimeFunc func() int64
}
// Conn is the connection than backs the socket
Conn interface {
// ReadJSON reads the next message from the connection.
ReadMessage(m *message.Message) error
// WriteJSON writes the message to the connection.
WriteMessage(m message.Message) error
// SetReadDeadline sets how long a read can take before it returns an error.
SetReadDeadline(t time.Time) error
// SetWriteDeadline sets how long a read can take before it returns an error.
SetWriteDeadline(t time.Time) error
// SetPongHandler is triggered when the server receives a pong response from a previous ping
SetPongHandler(h func(appData string) error)
// Close closes the connection.
Close() error
// WritePing writes a ping message on the connection.
WritePing() error
// WriteClose writes a close message on the connection. The connestion is NOT closed.
WriteClose(reason string) error
// IsNormalClose determines if the error message is an error that implies a normal close or is unexpected.
IsNormalClose(err error) bool
// RemoteAddr gets the remote network address of the connection.
RemoteAddr() net.Addr
}
)
var errSocketClosed = fmt.Errorf("socket closed")
var errServerShuttingDown = fmt.Errorf("server shutting down")
// NewSocket creates a socket
func (cfg Config) NewSocket(log log.Logger, pn player.Name, conn Conn) (*Socket, error) {
a, err := cfg.validate(log, pn, conn)
if err != nil {
return nil, fmt.Errorf("creating socket: validation: %w", err)
}
s := Socket{
log: log,
Conn: conn,
Config: cfg,
PlayerName: pn,
Addr: message.Addr(a.String()),
}
return &s, nil
}
// validate ensures the configuration has no errors.
func (cfg Config) validate(log log.Logger, pn player.Name, conn Conn) (net.Addr, error) {
switch {
case len(pn) == 0:
return nil, fmt.Errorf("player name required")
case conn == nil:
return nil, fmt.Errorf("websocket connection required")
}
a := conn.RemoteAddr()
switch {
case a == nil:
return nil, fmt.Errorf("remote address of connection required")
case log == nil:
return nil, fmt.Errorf("log required")
case cfg.TimeFunc == nil:
return nil, fmt.Errorf("time func required")
case cfg.ReadWait <= 0:
return nil, fmt.Errorf("positive read wait period required")
case cfg.WriteWait <= 0:
return nil, fmt.Errorf("positive write wait period required")
case cfg.PingPeriod <= 0:
return nil, fmt.Errorf("positive ping period required")
case cfg.HTTPPingPeriod <= 0:
return nil, fmt.Errorf("positive http ping period required")
case cfg.PingPeriod <= cfg.WriteWait:
return nil, fmt.Errorf("ping period should be greater than write wait")
}
return a, nil
}
// Run writes messages from the connection to the shared "out" channel.
// Run writes messages received from the "in" channel to the connection,
// The run stays active even if it errors out. The only way to stop it is by closing the 'in' channel.
// If the connection has an error, the socket will send a socketClose message on the out channel, but will still consume and ignore messages from the in channel until it is closed this prevents a channel blockage.
func (s *Socket) Run(ctx context.Context, wg *sync.WaitGroup, in <-chan message.Message, out chan<- message.Message) {
pingTicker := time.NewTicker(s.PingPeriod)
httpPingTicker := time.NewTicker(s.HTTPPingPeriod)
wg.Add(2)
go s.readMessagesSync(ctx, wg, out)
go s.writeMessagesSync(ctx, wg, in, pingTicker, httpPingTicker)
}
// readMessagesSync receives messages from the connected socket and writes the to the messages channel.
// messages are not sent if the reading is cancelled from the done channel or an error is encountered and sent to the error channel.
func (s *Socket) readMessagesSync(ctx context.Context, wg *sync.WaitGroup, out chan<- message.Message) {
defer wg.Done()
defer s.closeConn() // will cause writeMessages() to fail, but not stop until the in channel is closed.
defer s.sendClose(ctx, out)
pongHandler := func(appData string) error {
if err := s.refreshDeadline(s.Conn.SetReadDeadline, s.ReadWait); err != nil {
err = fmt.Errorf("setting read deadline: %w", err)
s.writeClose(err)
return err
}
return nil
}
if err := pongHandler(""); err != nil {
return
}
s.Conn.SetPongHandler(pongHandler)
for { // BLOCKING
m, err := s.readMessage() // BLOCKING
select {
case <-ctx.Done():
return
default:
}
if err != nil {
var err2 error
if err != errSocketClosed {
err2 = fmt.Errorf("reading socket messages stopped for player %v: %v", s, err)
}
s.writeClose(err2)
return
}
message.Send(*m, out, s.Debug, s.log)
}
}
// writeMessagesSync sends messages from the outbound messages channel to the connected socket.
// Messages are not sent if the context is cancelled or an error is encountered and sent to the error channel.
// NOTE: this function does not terminate until the input channel closes.
// The tickers are used to periodically write message different ping messages.
func (s *Socket) writeMessagesSync(ctx context.Context, wg *sync.WaitGroup, in <-chan message.Message, pingTicker, httpPingTicker *time.Ticker) {
defer wg.Done()
defer s.closeConn() // will cause readMessages() to fail
skipWrite, stopWrite := false, false
write := func(writeFunc func() error) error {
if skipWrite {
return fmt.Errorf("skipping write for socket (%v) because an error has already occurred", s)
}
if err := s.refreshDeadline(s.Conn.SetWriteDeadline, s.WriteWait); err != nil {
return fmt.Errorf("setting write deadline: %v", err)
}
return writeFunc()
}
var err error
for { // BLOCKING
select {
case <-ctx.Done():
s.writeClose(errServerShuttingDown)
return
case m, ok := <-in:
switch {
case !ok:
err = errSocketClosed
stopWrite = true
default:
err = write(func() error { return s.writeMessage(m) })
}
case <-pingTicker.C:
err = write(s.Conn.WritePing)
case <-httpPingTicker.C:
m := message.Message{
Type: message.SocketHTTPPing,
}
err = write(func() error { return s.writeMessage(m) })
}
if err != nil {
s.writeClose(err)
s.closeConn() // will cause readMessages() to fail
if stopWrite {
return
}
skipWrite = true
}
}
}
// readMessage reads the next message from the connection.
func (s *Socket) readMessage() (*message.Message, error) {
var m message.Message
if err := s.Conn.ReadMessage(&m); err != nil { // BLOCKING
if !s.Conn.IsNormalClose(err) {
return nil, fmt.Errorf("unexpected socket closure: %v", err)
}
return nil, errSocketClosed
}
if s.Debug {
s.log.Printf("socket reading message with type %v", m.Type)
}
if m.Game == nil {
return nil, fmt.Errorf("received message not relating to game")
}
// Add the player name and address so subscribers of the socket can know who to send responses to because the out channel is shared.
m.PlayerName = s.PlayerName
m.Addr = s.Addr
return &m, nil
}
// writeMessage writes a message to the connection.
func (s *Socket) writeMessage(m message.Message) error {
if s.Debug {
s.log.Printf("socket writing message with type %v", m.Type)
}
if err := s.Conn.WriteMessage(m); err != nil {
return fmt.Errorf("writing socket message: %v", err)
}
if m.Type == message.PlayerRemove {
return fmt.Errorf("player deleted")
}
return nil
}
// writeClose writes a closeMessage with the reason, logging the reason.
func (s *Socket) writeClose(reasonErr error) {
var reason string
if reasonErr != nil && reasonErr != errSocketClosed {
reason = reasonErr.Error()
s.log.Printf("closing socket for %v at %v: %v", s.PlayerName, s.Addr, reason)
}
s.Conn.WriteClose(reason)
}
func (s *Socket) refreshDeadline(refreshDeadlineFunc func(t time.Time) error, period time.Duration) error {
now := s.TimeFunc()
nowTime := time.Unix(now, 0)
deadline := nowTime.Add(period)
if err := refreshDeadlineFunc(deadline); err != nil {
err = fmt.Errorf("error refreshing ping/pong deadline: %w", err)
s.log.Printf(err.Error())
return err
}
return nil
}
// sendClose notifies the out channel that it is closed, hopefully causing it to close the in channel.
func (s *Socket) sendClose(ctx context.Context, out chan<- message.Message) {
select {
case <-ctx.Done():
return
default:
}
m := message.Message{
Type: message.SocketClose,
PlayerName: s.PlayerName,
Addr: s.Addr,
}
message.Send(m, out, s.Debug, s.log)
}
// closeConn closes the connection of the socket. Thread safe.
func (s *Socket) closeConn() {
s.Conn.Close()
}