/
conn.go
146 lines (125 loc) · 2.63 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package server
import (
"fmt"
"github.com/mstone/focus/msg"
"reflect"
"strings"
"sync"
log "gopkg.in/inconshreveable/log15.v2"
"github.com/gorilla/websocket"
)
// struct conn represents an open WebSocket connection.
type conn struct {
msgs chan interface{}
l log.Logger
no int
numSend int
numRecv int
wg sync.WaitGroup
ws *websocket.Conn
docs map[int]chan interface{}
srvr chan interface{}
}
func (c *conn) String() string {
if c == nil {
return "nil"
}
return fmt.Sprintf("%d", c.no)
}
func (c *conn) Run() {
c.wg.Add(2)
go c.readLoop()
go c.writeLoop()
c.wg.Wait()
c.l.Info("conn done; disconnecting client")
}
func (c *conn) Close() error {
return nil
}
func (c *conn) onVppOpen(m msg.Msg) {
srvrReplyChan := make(chan allocdocresp)
c.srvr <- allocdoc{
reply: srvrReplyChan,
name: m.Name,
}
srvrResp := <-srvrReplyChan
if srvrResp.err != nil {
c.l.Error("conn unable to allocdoc", "err", srvrResp.err)
panic("conn unable to allocdoc")
}
c.l.Info("conn finished allocdoc, sending open", "cmd", m)
doc := srvrResp.doc
doc <- open{
dbgConn: c,
conn: c.msgs,
name: m.Name,
}
}
func (c *conn) onVppWrite(m msg.Msg) {
doc, ok := c.docs[m.Fd]
if !ok {
c.l.Error("conn got WRITE with bad fd, exiting")
panic("conn got WRITE with bad fd")
}
c.l.Info("conn enqueuing write for doc", "cmd", m, "doc", doc)
doc <- write{
dbgConn: c,
fd: m.Fd,
rev: m.Rev,
ops: m.Ops,
}
}
func (c *conn) readLoop() {
defer c.wg.Done()
for {
var m msg.Msg
if err := c.ws.ReadJSON(&m); err != nil {
c.l.Error("conn read error; closing conn", "err", err)
c.Close() // BUG(mistone): errcheck?
return
}
switch m.Cmd {
default:
c.l.Error("conn got unknown cmd; exiting", "cmd", m)
return
case msg.C_OPEN:
c.l.Info("conn got OPEN, sending allocdoc", "cmd", m)
c.onVppOpen(m)
c.l.Info("conn finished OPEN", "cmd", m)
case msg.C_WRITE:
c.l.Info("conn got WRITE", "cmd", m)
c.onVppWrite(m)
c.l.Info("conn finished WRITE", "cmd", m)
}
c.numRecv++
}
}
func (c *conn) writeLoop() {
defer c.wg.Done()
for m := range c.msgs {
c.l.Info("server writing "+strings.ToUpper(reflect.TypeOf(m).Name()), "cmd", m.(fmt.Stringer).String())
switch v := m.(type) {
case openresp:
c.docs[v.fd] = v.doc
c.ws.WriteJSON(msg.Msg{
Cmd: msg.C_OPEN_RESP,
Name: v.name,
Fd: v.fd,
})
case writeresp:
c.ws.WriteJSON(msg.Msg{
Cmd: msg.C_WRITE_RESP,
Fd: v.fd,
Rev: v.rev,
})
case write:
c.ws.WriteJSON(msg.Msg{
Cmd: msg.C_WRITE,
Fd: v.fd,
Rev: v.rev,
Ops: v.ops,
})
}
c.numSend++
}
}