/
ws.go
136 lines (119 loc) · 3.18 KB
/
ws.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package dashboard
import (
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/labstack/echo"
analysisserver "github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/goshimmer/plugins/dashboard"
)
var (
webSocketWriteTimeout = 3 * time.Second
// clients
wsClientsMu sync.Mutex
wsClients = make(map[uint64]*wsclient)
nextWsClientID uint64
// gorilla websocket layer
upgrader = websocket.Upgrader{
HandshakeTimeout: webSocketWriteTimeout,
CheckOrigin: func(r *http.Request) bool { return true },
EnableCompression: true,
}
)
// a websocket client with a channel for downstream blocks.
type wsclient struct {
// downstream block channel.
channel chan interface{}
// a channel which is closed when the websocket client is disconnected.
exit chan struct{}
}
// reigsters and creates a new websocket client.
func registerWSClient() (uint64, *wsclient) {
wsClientsMu.Lock()
defer wsClientsMu.Unlock()
clientID := nextWsClientID
wsClient := &wsclient{
channel: make(chan interface{}, 500),
exit: make(chan struct{}),
}
wsClients[clientID] = wsClient
nextWsClientID++
return clientID, wsClient
}
// removes the websocket client with the given id.
func removeWsClient(clientID uint64) {
wsClientsMu.Lock()
defer wsClientsMu.Unlock()
wsClient := wsClients[clientID]
close(wsClient.exit)
close(wsClient.channel)
delete(wsClients, clientID)
}
// broadcasts the given block to all connected websocket clients.
func broadcastWsBlock(blk interface{}, dontDrop ...bool) {
wsClientsMu.Lock()
defer wsClientsMu.Unlock()
for _, wsClient := range wsClients {
if len(dontDrop) > 0 {
select {
case wsClient.channel <- blk:
case <-wsClient.exit:
// get unblocked if the websocket connection just got closed
}
continue
}
select {
case wsClient.channel <- blk:
default:
// potentially drop if slow consumer
}
}
}
// handles a new websocket connection, registers the client
// and waits for downstream blocks to be sent to the client
func websocketRoute(c echo.Context) error {
defer func() {
if r := recover(); r != nil {
log.Errorf("recovered from websocket handle func: %s", r)
}
}()
// upgrade to websocket connection
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return err
}
defer ws.Close()
ws.EnableWriteCompression(true)
// cleanup client websocket
clientID, wsClient := registerWSClient()
defer removeWsClient(clientID)
// send mana dashboard address info
manaDashboardHostAddress := Parameters.ManaDashboardAddress
err = sendJSON(ws, &wsblk{
Type: dashboard.MsgManaDashboardAddress,
Data: manaDashboardHostAddress,
})
if err != nil {
return err
}
// replay autopeering events from the past upon connecting a new client
analysisserver.ReplayAutopeeringEvents(createAutopeeringEventHandlers(ws))
for {
blk := <-wsClient.channel
if err := sendJSON(ws, blk); err != nil {
// silent
break
}
}
return nil
}
func sendJSON(ws *websocket.Conn, blk interface{}) error {
if err := ws.WriteJSON(blk); err != nil {
return err
}
if err := ws.SetWriteDeadline(time.Now().Add(webSocketWriteTimeout)); err != nil {
return err
}
return nil
}