-
Notifications
You must be signed in to change notification settings - Fork 39
/
receiver.go
99 lines (91 loc) · 2.99 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Package receiver implements the messages receiver. It can be used
// both by the download and the upload subtests.
package receiver
import (
"context"
"encoding/json"
"time"
"github.com/gorilla/websocket"
"github.com/m-lab/ndt-server/logging"
"github.com/m-lab/ndt-server/ndt7/model"
"github.com/m-lab/ndt-server/ndt7/ping"
"github.com/m-lab/ndt-server/ndt7/spec"
)
type receiverKind int
const (
downloadReceiver = receiverKind(iota)
uploadReceiver
)
func loop(
ctx context.Context, conn *websocket.Conn, kind receiverKind,
dst chan<- model.Measurement,
) {
logging.Logger.Debug("receiver: start")
defer logging.Logger.Debug("receiver: stop")
defer close(dst)
conn.SetReadLimit(spec.MaxMessageSize)
receiverctx, cancel := context.WithTimeout(ctx, spec.MaxRuntime)
defer cancel()
err := conn.SetReadDeadline(time.Now().Add(spec.MaxRuntime)) // Liveness!
if err != nil {
logging.Logger.WithError(err).Warn("receiver: conn.SetReadDeadline failed")
return
}
conn.SetPongHandler(func(s string) error {
rtt, err := ping.ParseTicks(s)
if err == nil {
rtt /= int64(time.Millisecond)
logging.Logger.Debugf("receiver: ApplicationLevel RTT: %d ms", rtt)
}
return err
})
for receiverctx.Err() == nil { // Liveness!
mtype, mdata, err := conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
return
}
logging.Logger.WithError(err).Warn("receiver: conn.ReadMessage failed")
return
}
if mtype != websocket.TextMessage {
switch kind {
case downloadReceiver:
logging.Logger.Warn("receiver: got non-Text message")
return // Unexpected message type
default:
continue // No further processing required
}
}
var measurement model.Measurement
err = json.Unmarshal(mdata, &measurement)
if err != nil {
logging.Logger.WithError(err).Warn("receiver: json.Unmarshal failed")
return
}
dst <- measurement // Liveness: this is blocking
}
}
func start(ctx context.Context, conn *websocket.Conn, kind receiverKind) <-chan model.Measurement {
dst := make(chan model.Measurement)
go loop(ctx, conn, kind, dst)
return dst
}
// StartDownloadReceiver starts the receiver in a background goroutine and
// returns the messages received from the client in the returned channel.
//
// This receiver will not tolerate receiving binary messages. It will
// terminate early if such a message is received.
//
// Liveness guarantee: the goroutine will always terminate after a
// MaxRuntime timeout, provided that the consumer will keep reading
// from the returned channel.
func StartDownloadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement {
return start(ctx, conn, downloadReceiver)
}
// StartUploadReceiver is like StartDownloadReceiver except that it
// tolerates incoming binary messages, which are sent to cause
// network load, and therefore must not be rejected.
func StartUploadReceiver(ctx context.Context, conn *websocket.Conn) <-chan model.Measurement {
return start(ctx, conn, uploadReceiver)
}