forked from go-pivo/pivo
/
conn.go
184 lines (158 loc) · 4.43 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// Copyright 2015 The Pivo Authors. All rights reserved.
// Use of this source code is governed by a Simplified BSD
// license that can be found in the LICENSE file.
// Package ws implements the Pivo/Websocket connector.
package ws
import (
"fmt"
"io"
"net"
"net/http"
"time"
"github.com/gorilla/websocket"
"gopkg.in/pivo.v1"
)
// Default ping timeout value in seconds
const DefaultPingTimeout = 60
// Default port buffer size
const DefaultPortBufferSize = 64
// Default read buffer size
const DefaultReadBufferSize = 1024
// Default write buffer size
const DefaultWriteBufferSize = 1024
// Default write timeout value in seconds
const DefaultWriteTimeout = 10
// Conn specifies parameters for this connector.
type Conn struct {
port chan []byte
timeout time.Duration
ws *websocket.Conn
// Ping timeout
PingTimeout time.Duration
// Write timeout
WriteTimeout time.Duration
// Port buffer size
PortBufferSize int
// Those settings are only used by upgraders.
// See http://godoc.org/github.com/gorilla/websocket#Upgrader
CheckOrigin func(*http.Request) bool
ReadBufferSize int
WriteBufferSize int
}
// NewConn instantiate a connector with default settings.
func NewConn() *Conn {
return &Conn{
PingTimeout: DefaultPingTimeout,
PortBufferSize: DefaultPortBufferSize,
ReadBufferSize: DefaultReadBufferSize,
WriteBufferSize: DefaultWriteBufferSize,
WriteTimeout: DefaultWriteTimeout,
}
}
func (c *Conn) ping() error {
return c.write(websocket.PingMessage, []byte{})
}
func (c *Conn) send(buf []byte) (err error) {
return c.write(websocket.TextMessage, buf)
}
func (c *Conn) write(t int, buf []byte) error {
c.ws.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Second))
return c.ws.WriteMessage(t, buf)
}
// Close sends a closure message to the remote end.
func (c *Conn) Close(err error) error {
code := websocket.CloseNormalClosure
msg := websocket.FormatCloseMessage(code, fmt.Sprint(err))
wait := time.Now().Add(c.WriteTimeout * time.Second)
return c.ws.WriteControl(websocket.CloseMessage, msg, wait)
}
// Dial opens a connection to the given URL with the provided header.
func (c *Conn) Dial(url string, h http.Header) (*Conn, *http.Response, error) {
var dialer = &websocket.Dialer{}
ws, r, err := dialer.Dial(url, h)
if err != nil {
return nil, r, err
}
c.ws = ws
return c, r, nil
}
// Receiver is an event loop that either calls OnCloser if the connection
// has terminated or OnReader when data has been read from the socket.
func (c *Conn) Receiver(rc pivo.OnReadCloser) error {
defer c.ws.Close()
timeout := c.PingTimeout * time.Second
c.ws.SetReadDeadline(time.Now().Add(timeout))
c.ws.SetPongHandler(func(string) error {
c.ws.SetReadDeadline(time.Now().Add(timeout))
return nil
})
for {
msgt, data, err := c.ws.ReadMessage()
switch {
case err == io.EOF:
return rc.OnClose(nil)
case err != nil:
return rc.OnClose(err)
case msgt == websocket.BinaryMessage:
if err := rc.OnReadBinary(data); err != nil {
return rc.OnClose(err)
}
case msgt == websocket.TextMessage:
if err := rc.OnReadText(data); err != nil {
return rc.OnClose(err)
}
}
}
}
// RemoteAddr returns the IP address of the remote end.
func (c *Conn) RemoteAddr() net.Addr {
return c.ws.RemoteAddr()
}
// Sender kicks off a goroutine reading from the returned channel
// and writing the bytes harvested through the socket. The goroutine
// will run until one of the following conditions are met:
//
// 1. The returned channel has been closed
// 2. An error occured writing on the socket
// 3. A ping timeout occured.
//
func (c *Conn) Sender() chan []byte {
c.port = make(chan []byte, c.PortBufferSize)
pingInterval := (9 * c.PingTimeout * time.Second) / 10
pinger := time.NewTicker(pingInterval)
go func() {
defer pinger.Stop()
for {
select {
case msg, ok := <-c.port:
if !ok {
return
}
if err := c.send(msg); err != nil {
c.ws.Close()
return
}
case <-pinger.C:
if err := c.ping(); err != nil {
c.ws.Close()
return
}
}
}
}()
return c.port
}
// Upgrade tries to upgrade an HTTP request to a Websocket session.
func (c *Conn) Upgrade(w http.ResponseWriter, r *http.Request, h http.Header) error {
upgrader := &websocket.Upgrader{
CheckOrigin: c.CheckOrigin,
ReadBufferSize: c.ReadBufferSize,
WriteBufferSize: c.WriteBufferSize,
}
ws, err := upgrader.Upgrade(w, r, h)
if err != nil {
return err
}
c.ws = ws
return nil
}