/
conn.go
147 lines (124 loc) · 3.74 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !nacl
package websocket
import (
"fmt"
"net"
"sync"
"time"
"github.com/gorilla/websocket"
"v.io/v23/flow"
)
// WebsocketConn provides a flow.Conn interface for a websocket connection.
//
//nolint:revive // API change required.
func WebsocketConn(ws *websocket.Conn) flow.Conn {
return &wrappedConn{ws: ws}
}
// wrappedConn provides a flow.Conn interface to a websocket.
// The underlying websocket connection needs regular calls to Read to make sure
// websocket control messages (such as pings) are processed by the websocket
// library.
type wrappedConn struct {
ws *websocket.Conn
// The gorilla docs aren't explicit about reading and writing from
// different goroutines. It is explicit that only one goroutine can
// do a write at any given time and only one goroutine can do a read
// at any given time. Based on inspection it seems that using a reader
// and writer simultaneously is safe, but this might change with
// future changes. We can't actually share the lock, because this means
// that we can't write while we are waiting for a message, causing some
// deadlocks where a write is need to unblock a read.
writeLock sync.Mutex
readLock sync.Mutex
}
func (c *wrappedConn) ReadMsg() ([]byte, error) {
c.readLock.Lock()
defer c.readLock.Unlock()
t, b, err := c.ws.ReadMessage()
if err != nil {
return nil, err
}
if t != websocket.BinaryMessage {
return nil, fmt.Errorf("Unexpected message type %d", t)
}
return b, nil
}
func (c *wrappedConn) ReadMsg2([]byte) ([]byte, error) {
return c.ReadMsg()
}
func (c *wrappedConn) WriteMsg(bufs ...[]byte) (int, error) {
c.writeLock.Lock()
defer c.writeLock.Unlock()
if len(bufs) == 0 {
return 0, nil
}
var b []byte
for _, buf := range bufs {
b = append(b, buf...)
}
if err := c.ws.WriteMessage(websocket.BinaryMessage, b); err != nil {
return 0, err
}
return len(b), nil
}
func (c *wrappedConn) Close() error {
c.writeLock.Lock()
defer c.writeLock.Unlock()
// Send an EOF control message to the remote end so that it can
// handle the close gracefully.
msg := websocket.FormatCloseMessage(websocket.CloseGoingAway, "EOF")
//nolint:errcheck
c.ws.WriteControl(websocket.CloseMessage, msg, time.Now().Add(time.Second))
return c.ws.Close()
}
func (c *wrappedConn) LocalAddr() net.Addr {
return c.ws.LocalAddr()
}
func (c *wrappedConn) RemoteAddr() net.Addr {
return c.ws.RemoteAddr()
}
// hybridConn is used by the 'hybrid' protocol that can accept
// either 'tcp' or 'websocket' connections. In particular, it allows
// for the reader to peek and buffer the first n bytes of a stream
// in order to determine what the connection type is.
type hybridConn struct {
conn net.Conn
buffered []byte
}
func (wc *hybridConn) Read(b []byte) (int, error) {
lbuf := len(wc.buffered)
if lbuf == 0 {
return wc.conn.Read(b)
}
copyn := copy(b, wc.buffered)
wc.buffered = wc.buffered[copyn:]
if len(b) > copyn {
n, err := wc.conn.Read(b[copyn:])
return copyn + n, err
}
return copyn, nil
}
func (wc *hybridConn) Write(b []byte) (n int, err error) {
return wc.conn.Write(b)
}
func (wc *hybridConn) Close() error {
return wc.conn.Close()
}
func (wc *hybridConn) LocalAddr() net.Addr {
return &addr{"wsh", wc.conn.LocalAddr().String()}
}
func (wc *hybridConn) RemoteAddr() net.Addr {
return &addr{"wsh", wc.conn.RemoteAddr().String()}
}
func (wc *hybridConn) SetDeadline(t time.Time) error {
return wc.conn.SetDeadline(t)
}
func (wc *hybridConn) SetReadDeadline(t time.Time) error {
return wc.conn.SetReadDeadline(t)
}
func (wc *hybridConn) SetWriteDeadline(t time.Time) error {
return wc.conn.SetWriteDeadline(t)
}