From eb5e83a604e8eab5a45d679741a6e6e1498274e8 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 18 Mar 2020 21:52:34 -0700 Subject: [PATCH] fixes #192 [BUG] Panic on reconnect --- protocol/req/req.go | 70 +++++++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/protocol/req/req.go b/protocol/req/req.go index 27bbad7..02f9de5 100644 --- a/protocol/req/req.go +++ b/protocol/req/req.go @@ -53,8 +53,6 @@ type context struct { sendMsg *protocol.Message // messaging waiting for send lastPipe *pipe // last pipe used for transmit reqID uint32 // request ID - sendID uint32 // sent id (cleared after first send) - recvID uint32 // recv id (set after first send) recvWait bool // true if a thread is blocked in RecvMsg bestEffort bool // if true, don't block waiting in send queued bool // true if we need to send a message @@ -78,15 +76,15 @@ func (s *socket) send() { s.sendq = s.sendq[1:] c.queued = false - if c.sendID != 0 { - c.reqMsg = c.sendMsg + var m *protocol.Message + if m = c.sendMsg; m != nil { + c.reqMsg = m c.sendMsg = nil - c.recvID = c.sendID - s.ctxByID[c.recvID] = c - c.sendID = 0 + s.ctxByID[c.reqID] = c c.cond.Broadcast() + } else { + m = c.reqMsg } - m := c.reqMsg m.Clone() p := s.readyq[0] s.readyq = s.readyq[1:] @@ -94,8 +92,9 @@ func (s *socket) send() { // Schedule a retransmit for the future. c.lastPipe = p if c.resendTime > 0 { + id := c.reqID c.resender = time.AfterFunc(c.resendTime, func() { - c.resendMessage() + c.resendMessage(id) }) } go p.sendCtx(c, m) @@ -106,7 +105,7 @@ func (p *pipe) sendCtx(c *context, m *protocol.Message) { s := p.s // Send this message. If an error occurs, we examine the - // error. If it is ErrClosed, we don't schedule ourself. + // error. If it is ErrClosed, we don't schedule our self. if err := p.p.SendMsg(m); err != nil { m.Free() if err == protocol.ErrClosed { @@ -163,14 +162,16 @@ func (p *pipe) Close() { _ = p.p.Close() } -func (c *context) resendMessage() { +func (c *context) resendMessage(id uint32) { s := c.s s.Lock() defer s.Unlock() - if !c.queued { - c.queued = true - s.sendq = append(s.sendq, c) - s.send() + if c.reqID == id { + if !c.queued { + c.queued = true + s.sendq = append(s.sendq, c) + s.send() + } } } @@ -180,8 +181,7 @@ func (c *context) unscheduleSend() { c.queued = false for i, c2 := range s.sendq { if c2 == c { - s.sendq = append(s.sendq[:i], - s.sendq[i+1:]...) + s.sendq = append(s.sendq[:i], s.sendq[i+1:]...) return } } @@ -215,8 +215,6 @@ func (c *context) cancel() { c.recvTimer.Stop() c.recvTimer = nil } - c.sendID = 0 - c.recvID = 0 c.cond.Broadcast() } @@ -238,11 +236,12 @@ func (c *context) SendMsg(m *protocol.Message) error { } c.cancel() // this cancels any pending send or recv calls + c.unscheduleSend() c.reqID = id - s.ctxByID[id] = c - c.unscheduleSend() c.queued = true + c.sendMsg = m + s.sendq = append(s.sendq, c) if c.bestEffort { @@ -250,19 +249,15 @@ func (c *context) SendMsg(m *protocol.Message) error { // reqMsg, and schedule it as a send. No waiting. // This means that if the message cannot be delivered // immediately, it will still get a chance later. - c.reqMsg = m - c.recvID = id s.send() return nil } expired := false - c.sendID = id - c.sendMsg = m if c.sendExpire > 0 { c.sendTimer = time.AfterFunc(c.sendExpire, func() { s.Lock() - if c.sendID == id { + if c.sendMsg == m { expired = true c.cancel() // also does a wake up } @@ -276,17 +271,17 @@ func (c *context) SendMsg(m *protocol.Message) error { // It is responsible for providing the blocking semantic and // ultimately back-pressure. Note that we will "continue" if // the send is canceled by a subsequent send. - for c.sendID == id { + for c.sendMsg == m && !expired && !c.closed { c.cond.Wait() } if c.sendMsg == m { + c.unscheduleSend() c.sendMsg = nil - if expired { - return protocol.ErrSendTimeout - } + c.reqID = 0 if c.closed { return protocol.ErrClosed } + return protocol.ErrSendTimeout } return nil } @@ -298,17 +293,17 @@ func (c *context) RecvMsg() (*protocol.Message, error) { if s.closed || c.closed { return nil, protocol.ErrClosed } - if c.recvWait || c.recvID == 0 { + if c.recvWait || c.reqID == 0 { return nil, protocol.ErrProtoState } c.recvWait = true - id := c.recvID + id := c.reqID expired := false if c.recvExpire > 0 { c.recvTimer = time.AfterFunc(c.recvExpire, func() { s.Lock() - if c.recvID == id { + if c.reqID == id { expired = true c.cancel() } @@ -316,12 +311,12 @@ func (c *context) RecvMsg() (*protocol.Message, error) { }) } - for id == c.recvID && c.repMsg == nil { + for id == c.reqID && c.repMsg == nil { c.cond.Wait() } m := c.repMsg - c.recvID = 0 + c.reqID = 0 c.repMsg = nil c.recvWait = false c.cond.Broadcast() @@ -502,11 +497,12 @@ func (s *socket) RemovePipe(pp protocol.Pipe) { } } for c := range s.ctxs { - if c.lastPipe == p { + if c.lastPipe == p && c.reqMsg != nil { // We are closing this pipe, so we need to // immediately reschedule it. c.lastPipe = nil - go c.resendMessage() + c.unscheduleSend() + go c.resendMessage(c.reqID) } } s.Unlock()