Skip to content

Commit

Permalink
More instrument-ification. (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelquigley committed Jun 4, 2020
1 parent 7cbd0e8 commit 93ac4b8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 9 deletions.
28 changes: 22 additions & 6 deletions protocol/westworld2/dialerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func (self *dialerConn) rxer() {
for {
wm, _, err := readWireMessage(self.conn, self.pool, self.ins)
if err != nil {
logrus.Errorf("read error (%v)", err)
if self.ins != nil {
self.ins.readError(self.peer, err)
}
continue
}

Expand All @@ -102,22 +104,30 @@ func (self *dialerConn) rxer() {
wm.buffer.unref()

} else {
logrus.Errorf("invalid mt [%s]", mtString(wm.mt))
if self.ins != nil {
self.ins.unexpectedMessageType(self.peer, wm.mt)
}
wm.buffer.unref()
}
}
}

func (self *dialerConn) hello() error {
/*
* Send Hello
*/
helloSeq := self.seq.Next()
hello := newHello(helloSeq, self.pool)
defer hello.buffer.unref()

if err := writeWireMessage(hello, self.conn, self.peer, self.ins); err != nil {
return errors.Wrap(err, "write hello")
}
logrus.Infof("{hello} -> [%s]", self.peer)
/* */

/*
* Expect Ack'd Hello Response
*/
if err := self.conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
return errors.Wrap(err, "set read deadline")
}
Expand All @@ -137,19 +147,25 @@ func (self *dialerConn) hello() error {
if err := self.conn.SetReadDeadline(time.Time{}); err != nil {
return errors.Wrap(err, "clear read deadline")
}
logrus.Infof("{helloack} <- [%s]", self.peer)
/* */

// The next sequence should be the next highest sequence
self.rxPortal.accepted = helloAck.seq

/*
* Send Final Ack
*/
ack := newAck(helloAck.seq, self.pool)
defer ack.buffer.unref()

if err := writeWireMessage(ack, self.conn, self.peer, self.ins); err != nil {
return errors.Wrap(err, "write ack")
}
logrus.Infof("{ack} -> [%s]", self.peer)
/* */

logrus.Infof("connection established, peer [%s]", self.peer)
if self.ins != nil {
self.ins.connected(self.peer)
}

go self.rxer()

Expand Down
5 changes: 3 additions & 2 deletions protocol/westworld2/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import (
)

type instrument interface {
accepted(peer *net.UDPAddr)
connected(peer *net.UDPAddr)
wireMessageTx(wm *wireMessage)
wireMessageRx(wm *wireMessage)
unknownPeer(peer *net.UDPAddr)
readError(peer *net.UDPAddr, err error)
connectError(peer *net.UDPAddr, err error)
unexpectedMessageType(peer *net.UDPAddr, mt messageType)
}

type loggerInstrument struct{}

func (self *loggerInstrument) accepted(peer *net.UDPAddr) {
logrus.Infof("accepted, peer [%s]", peer)
logrus.Infof("connected, peer [%s]", peer)
}

func (self *loggerInstrument) wireMessageRx(wm *wireMessage) {
Expand Down
2 changes: 1 addition & 1 deletion protocol/westworld2/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (self *listener) hello(hello *wireMessage, peer *net.UDPAddr) {
self.acceptQueue <- conn

if self.ins != nil {
self.ins.accepted(peer)
self.ins.connected(peer)
}
}

Expand Down

0 comments on commit 93ac4b8

Please sign in to comment.