-
Notifications
You must be signed in to change notification settings - Fork 910
/
conn.go
151 lines (124 loc) · 3.58 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package master
import (
"bytes"
"encoding/binary"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vmihailenco/msgpack"
"io"
"net"
"sync"
)
// Simple helper to manage the underlying connection using locks
type Conn struct {
netConn net.Conn
sendmu sync.Mutex
ID int64
// Called on incoming messages
MessageHandler func(*Message)
// called when the connection is closed
ConnClosedHanlder func()
}
// ConnFromNetCon wraos a Conn around a net.Conn
func ConnFromNetCon(conn net.Conn) *Conn {
return &Conn{
netConn: conn,
ID: getNewID(),
}
}
// Listen starts listening for events on the connection
func (c *Conn) Listen() {
logrus.Info("Master/Slave connection: starting listening for events ", c.ID)
var err error
defer func() {
if err != nil {
logrus.WithError(err).Error("An error occured while handling a connection")
}
c.netConn.Close()
if c.ConnClosedHanlder != nil {
c.ConnClosedHanlder()
}
}()
idBuf := make([]byte, 4)
lenBuf := make([]byte, 4)
for {
// Read the event id
_, err = c.netConn.Read(idBuf)
if err != nil {
logrus.WithError(err).Error("Failed reading event id")
return
}
// Read the body length
_, err = c.netConn.Read(lenBuf)
if err != nil {
logrus.WithError(err).Error("Failed reading event length")
return
}
id := binary.LittleEndian.Uint32(idBuf)
l := binary.LittleEndian.Uint32(lenBuf)
body := make([]byte, int(l))
if l > 0 {
// Read the body, if there was one
_, err = io.ReadFull(c.netConn, body)
if err != nil {
logrus.WithError(err).Error("Failed reading body")
return
}
}
c.MessageHandler(&Message{EvtID: EventType(id), Body: body})
}
}
// Send sends the specified message over the connection, marshaling the data using json
// this locks the writer
func (c *Conn) Send(evtID EventType, data interface{}) error {
encoded, err := EncodeEvent(evtID, data)
if err != nil {
return errors.WithMessage(err, "EncodeEvent")
}
c.sendmu.Lock()
defer c.sendmu.Unlock()
return c.SendNoLock(encoded)
}
// Same as Send but logs the error (usefull for launching send in new goroutines)
func (c *Conn) SendLogErr(evtID EventType, data interface{}) {
err := c.Send(evtID, data)
if err != nil {
logrus.WithError(err).Error("[MASTER] Failed sending message to slave")
}
}
// SendNoLock sends the specified message over the connection, marshaling the data using json
// This does no locking and the caller is responsible for making sure its not called in multiple goroutines at the same time
func (c *Conn) SendNoLock(data []byte) error {
_, err := c.netConn.Write(data)
return errors.WithMessage(err, "netConn.Write")
}
// EncodeEvent encodes the event to the wire format
// The wire format is pretty basic, first 4 bytes is a uin32 representing what type of event this is
// next 4 bytes is another uin32 which represents the length of the body
// next n bytes is the body itself, which can even be empty in some cases
func EncodeEvent(evtID EventType, data interface{}) ([]byte, error) {
var buf bytes.Buffer
tmpBuf := make([]byte, 4)
binary.LittleEndian.PutUint32(tmpBuf, uint32(evtID))
buf.Write(tmpBuf)
l := uint32(0)
if data != nil {
var serialized []byte
if byteSlice, ok := data.([]byte); ok {
serialized = byteSlice
} else {
var err error
serialized, err = msgpack.Marshal(data)
if err != nil {
return nil, errors.WithMessage(err, "msgpack.Marshal")
}
}
l = uint32(len(serialized))
binary.LittleEndian.PutUint32(tmpBuf, l)
buf.Write(tmpBuf)
buf.Write(serialized)
} else {
buf.Write(make([]byte, 4))
}
return buf.Bytes(), nil
}