Skip to content

Commit

Permalink
fixes #192 [BUG] Panic on reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Mar 24, 2020
1 parent 2aa83c8 commit eb5e83a
Showing 1 changed file with 33 additions and 37 deletions.
70 changes: 33 additions & 37 deletions protocol/req/req.go
Expand Up @@ -53,8 +53,6 @@ type context struct {
sendMsg *protocol.Message // messaging waiting for send
lastPipe *pipe // last pipe used for transmit
reqID uint32 // request ID
sendID uint32 // sent id (cleared after first send)
recvID uint32 // recv id (set after first send)
recvWait bool // true if a thread is blocked in RecvMsg
bestEffort bool // if true, don't block waiting in send
queued bool // true if we need to send a message
Expand All @@ -78,24 +76,25 @@ func (s *socket) send() {
s.sendq = s.sendq[1:]
c.queued = false

if c.sendID != 0 {
c.reqMsg = c.sendMsg
var m *protocol.Message
if m = c.sendMsg; m != nil {
c.reqMsg = m
c.sendMsg = nil
c.recvID = c.sendID
s.ctxByID[c.recvID] = c
c.sendID = 0
s.ctxByID[c.reqID] = c
c.cond.Broadcast()
} else {
m = c.reqMsg
}
m := c.reqMsg
m.Clone()
p := s.readyq[0]
s.readyq = s.readyq[1:]

// Schedule a retransmit for the future.
c.lastPipe = p
if c.resendTime > 0 {
id := c.reqID
c.resender = time.AfterFunc(c.resendTime, func() {
c.resendMessage()
c.resendMessage(id)
})
}
go p.sendCtx(c, m)
Expand All @@ -106,7 +105,7 @@ func (p *pipe) sendCtx(c *context, m *protocol.Message) {
s := p.s

// Send this message. If an error occurs, we examine the
// error. If it is ErrClosed, we don't schedule ourself.
// error. If it is ErrClosed, we don't schedule our self.
if err := p.p.SendMsg(m); err != nil {
m.Free()
if err == protocol.ErrClosed {
Expand Down Expand Up @@ -163,14 +162,16 @@ func (p *pipe) Close() {
_ = p.p.Close()
}

func (c *context) resendMessage() {
func (c *context) resendMessage(id uint32) {
s := c.s
s.Lock()
defer s.Unlock()
if !c.queued {
c.queued = true
s.sendq = append(s.sendq, c)
s.send()
if c.reqID == id {
if !c.queued {
c.queued = true
s.sendq = append(s.sendq, c)
s.send()
}
}
}

Expand All @@ -180,8 +181,7 @@ func (c *context) unscheduleSend() {
c.queued = false
for i, c2 := range s.sendq {
if c2 == c {
s.sendq = append(s.sendq[:i],
s.sendq[i+1:]...)
s.sendq = append(s.sendq[:i], s.sendq[i+1:]...)
return
}
}
Expand Down Expand Up @@ -215,8 +215,6 @@ func (c *context) cancel() {
c.recvTimer.Stop()
c.recvTimer = nil
}
c.sendID = 0
c.recvID = 0
c.cond.Broadcast()
}

Expand All @@ -238,31 +236,28 @@ func (c *context) SendMsg(m *protocol.Message) error {
}

c.cancel() // this cancels any pending send or recv calls
c.unscheduleSend()

c.reqID = id
s.ctxByID[id] = c
c.unscheduleSend()
c.queued = true
c.sendMsg = m

s.sendq = append(s.sendq, c)

if c.bestEffort {
// for best effort case, we just immediately go the
// reqMsg, and schedule it as a send. No waiting.
// This means that if the message cannot be delivered
// immediately, it will still get a chance later.
c.reqMsg = m
c.recvID = id
s.send()
return nil
}

expired := false
c.sendID = id
c.sendMsg = m
if c.sendExpire > 0 {
c.sendTimer = time.AfterFunc(c.sendExpire, func() {
s.Lock()
if c.sendID == id {
if c.sendMsg == m {
expired = true
c.cancel() // also does a wake up
}
Expand All @@ -276,17 +271,17 @@ func (c *context) SendMsg(m *protocol.Message) error {
// It is responsible for providing the blocking semantic and
// ultimately back-pressure. Note that we will "continue" if
// the send is canceled by a subsequent send.
for c.sendID == id {
for c.sendMsg == m && !expired && !c.closed {
c.cond.Wait()
}
if c.sendMsg == m {
c.unscheduleSend()
c.sendMsg = nil
if expired {
return protocol.ErrSendTimeout
}
c.reqID = 0
if c.closed {
return protocol.ErrClosed
}
return protocol.ErrSendTimeout
}
return nil
}
Expand All @@ -298,30 +293,30 @@ func (c *context) RecvMsg() (*protocol.Message, error) {
if s.closed || c.closed {
return nil, protocol.ErrClosed
}
if c.recvWait || c.recvID == 0 {
if c.recvWait || c.reqID == 0 {
return nil, protocol.ErrProtoState
}
c.recvWait = true
id := c.recvID
id := c.reqID
expired := false

if c.recvExpire > 0 {
c.recvTimer = time.AfterFunc(c.recvExpire, func() {
s.Lock()
if c.recvID == id {
if c.reqID == id {
expired = true
c.cancel()
}
s.Unlock()
})
}

for id == c.recvID && c.repMsg == nil {
for id == c.reqID && c.repMsg == nil {
c.cond.Wait()
}

m := c.repMsg
c.recvID = 0
c.reqID = 0
c.repMsg = nil
c.recvWait = false
c.cond.Broadcast()
Expand Down Expand Up @@ -502,11 +497,12 @@ func (s *socket) RemovePipe(pp protocol.Pipe) {
}
}
for c := range s.ctxs {
if c.lastPipe == p {
if c.lastPipe == p && c.reqMsg != nil {
// We are closing this pipe, so we need to
// immediately reschedule it.
c.lastPipe = nil
go c.resendMessage()
c.unscheduleSend()
go c.resendMessage(c.reqID)
}
}
s.Unlock()
Expand Down

0 comments on commit eb5e83a

Please sign in to comment.