Skip to content

Commit

Permalink
[FIXED] Reduce use of new timer for publish calls
Browse files Browse the repository at this point in the history
Use a long-lived go routine that deals with publish timeouts.
After the low level publish, we used to create a new timer for
the timeout of the publish call. This is replaced by adding the
"ack" to the list of publish ack waiting to be timed-out.
If an ack is processed from the server, the ack is removed from
the map and the list.

There was also an issue that on connection close we would "fail"
pending publish acks, but invoke the callback from the connection
close, which would have caused a deadlock if user invoke anything
in the ack handler that requires the connection lock.
Solve that by letting the publish ack timeout routine do that job.
We could argue that on connection close we may not even want to
do anything with those pending acks.

Also fixed a bunch of tests.

Resolves #294

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Dec 19, 2019
1 parent 3b79c52 commit dafb6e8
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 88 deletions.
237 changes: 182 additions & 55 deletions stan.go
Expand Up @@ -292,13 +292,17 @@ type conn struct {
pubAckMap map[string]*ack
pubAckChan chan (struct{})
pubAckCloseChan chan (struct{})
pubAckTimeoutCh chan (struct{})
pubAckHead *ack
pubAckTail *ack
opts Options
nc *nats.Conn
ncOwned bool // NATS Streaming created the connection, so needs to close it.
pubNUID *nuid.NUID // NUID generator for published messages.
connLostCB ConnectionLostHandler
closed bool
ping pingInfo
wg sync.WaitGroup
}

// Holds all field related to the client-to-server pings
Expand All @@ -316,22 +320,26 @@ type pingInfo struct {

// Closure for ack contexts.
type ack struct {
t *time.Timer
ah AckHandler
ch chan error
ah AckHandler
ch chan error
guid string
expire int64
prev *ack
next *ack
}

// Connect will form a connection to the NATS Streaming subsystem.
// Note that clientID can contain only alphanumeric and `-` or `_` characters.
func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) {
// Process Options
c := conn{
c := &conn{
clientID: clientID,
opts: DefaultOptions,
connID: []byte(nuid.Next()),
pubNUID: nuid.New(),
pubAckMap: make(map[string]*ack),
pubAckCloseChan: make(chan struct{}),
pubAckTimeoutCh: make(chan struct{}, 1),
subMap: make(map[string]*subscription),
}
for _, opt := range options {
Expand Down Expand Up @@ -433,6 +441,10 @@ func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) {
// Capture the connection error cb
c.connLostCB = c.opts.ConnectionLostCB

// Start the routine that will timeout the publish calls.
c.wg.Add(1)
go c.pubAckTimeout()

unsubPingSub := true
// Do this with servers which are at least at protocolOne.
if cr.Protocol >= protocolOne {
Expand Down Expand Up @@ -470,7 +482,7 @@ func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) {
p.sub = nil
}

return &c, nil
return c, nil
}

// Invoked on a failed connect.
Expand Down Expand Up @@ -590,18 +602,9 @@ func (sc *conn) cleanupOnClose(err error) {
}
}

// Fail all pending pubs
for guid, pubAck := range sc.pubAckMap {
delete(sc.pubAckMap, guid)
if pubAck.t != nil {
pubAck.t.Stop()
}
if pubAck.ah != nil {
pubAck.ah(guid, err)
} else if pubAck.ch != nil {
pubAck.ch <- err
}
}
// Let the pubAckTimeout routine fail the pubAcks..
sc.signalPubAckTimeoutCh()

// Prevent publish calls that have passed the connection close check but
// not yet send to pubAckChan to be possibly blocked.
close(sc.pubAckCloseChan)
Expand All @@ -610,12 +613,15 @@ func (sc *conn) cleanupOnClose(err error) {
// Close a connection to the stan system.
func (sc *conn) Close() error {
sc.Lock()
defer sc.Unlock()

if sc.closed {
sc.Unlock()
// We are already closed.
return nil
}
defer func() {
sc.Unlock()
sc.wg.Wait()
}()
// Signals we are closed.
sc.closed = true

Expand Down Expand Up @@ -676,7 +682,9 @@ func (sc *conn) processAck(m *nats.Msg) {
}

// Remove
a := sc.removeAck(pa.Guid)
sc.Lock()
a := sc.removeAck(pa.Guid, true)
sc.Unlock()
if a != nil {
// Capture error if it exists.
if pa.Error != "" {
Expand Down Expand Up @@ -732,7 +740,6 @@ func (sc *conn) publishAsync(subject string, data []byte, ah AckHandler, ch chan
sc.pubAckMap[peGUID] = a
// snapshot
ackSubject := sc.ackSubject
ackTimeout := sc.opts.AckTimeout
sc.Unlock()

// Use the buffered channel to control the number of outstanding acks.
Expand All @@ -755,60 +762,41 @@ func (sc *conn) publishAsync(subject string, data []byte, ah AckHandler, ch chan
// Setup the timer for expiration.
sc.Lock()
if err != nil || sc.closed {
sc.Unlock()
// If we got and error on publish or the connection has been closed,
// we need to return an error only if:
// - we can remove the pubAck from the map
// - we can't, but this is an async pub with no provided AckHandler
removed := sc.removeAck(peGUID) != nil
removed := sc.removeAck(peGUID, true) != nil
if removed || (ch == nil && ah == nil) {
if err == nil {
err = ErrConnectionClosed
}
return "", err
}
// pubAck was removed from cleanupOnClose() and error will be sent
// else pubAck was removed from cleanupOnClose() and error will be sent
// to appropriate go channel (ah or ch).
return peGUID, nil
} else {
a.guid = peGUID
a.expire = time.Now().Add(sc.opts.AckTimeout).UnixNano()
sc.appendPubAckToList(a)
}
a.t = time.AfterFunc(ackTimeout, func() {
pubAck := sc.removeAck(peGUID)
// processAck could get here before and handle the ack.
// If that's the case, we would get nil here and simply return.
if pubAck == nil {
return
}
if pubAck.ah != nil {
pubAck.ah(peGUID, ErrTimeout)
} else if a.ch != nil {
pubAck.ch <- ErrTimeout
}
})
sc.Unlock()

return peGUID, nil
return peGUID, err
}

// removeAck removes the ack from the pubAckMap and cancels any state, e.g. timers
func (sc *conn) removeAck(guid string) *ack {
var t *time.Timer
sc.Lock()
// Removes the ack from the pubAckMap and possibly from the list.
// Lock held on entry.
func (sc *conn) removeAck(guid string, removeFromList bool) *ack {
a := sc.pubAckMap[guid]
if a != nil {
t = a.t
delete(sc.pubAckMap, guid)
if removeFromList {
sc.removePubAckFromList(a)
}
}
pac := sc.pubAckChan
sc.Unlock()

// Cancel timer if needed.
if t != nil {
t.Stop()
}

// Remove from channel to unblock PublishAsync
if a != nil && len(pac) > 0 {
<-pac
if a != nil && len(sc.pubAckChan) > 0 {
<-sc.pubAckChan
}
return a
}
Expand Down Expand Up @@ -858,3 +846,142 @@ func (sc *conn) processMsg(raw *nats.Msg) {
sc.nc.Publish(ackSubject, b)
}
}

// Append the pub ack to the list and signal the timeout routine if this was the first.
// Lock held on entry.
func (sc *conn) appendPubAckToList(a *ack) {
if sc.pubAckTail != nil {
a.prev = sc.pubAckTail
a.prev.next = a
sc.pubAckTail = a
} else {
sc.pubAckHead, sc.pubAckTail = a, a
sc.signalPubAckTimeoutCh()
}
}

// Signals the pubAckTimeout channel.
func (sc *conn) signalPubAckTimeoutCh() {
select {
case sc.pubAckTimeoutCh <- struct{}{}:
default:
}
}

// Remove the pub ack from the list and signal the timeout routine if it was
// the head of the list.
// Lock held on entry.
func (sc *conn) removePubAckFromList(a *ack) {
if a.prev != nil {
a.prev.next = a.next
}
if a.next != nil {
a.next.prev = a.prev
}
if a == sc.pubAckTail {
sc.pubAckTail = a.prev
}
if a == sc.pubAckHead {
sc.pubAckHead = a.next
sc.signalPubAckTimeoutCh()
}
}

// Long-lived go routine that deals with publish ack timeouts.
func (sc *conn) pubAckTimeout() {
defer sc.wg.Done()

var (
list *ack
closed bool
dur time.Duration
t = time.NewTimer(time.Hour)
errToReport = ErrTimeout
)
for {
sc.Lock()
list = sc.pubAckHead
if sc.closed {
closed = true
errToReport = ErrConnectionClosed
} else {
now := time.Now().UnixNano()
if list != nil {
dur = time.Duration(list.expire - now)
if dur < 0 {
dur = 0
}
} else {
// Any big value would do...
dur = time.Hour
}
}
sc.Unlock()

if !closed {
if dur > 0 {
t.Reset(dur)
// If the head of the list is removed in processAck, we should
// be notified through pubAckTimeoutCh and will get back to
// compute the new duration.
select {
case <-sc.pubAckTimeoutCh:
continue
case <-t.C:
// Nothing to do, go back to top of loop to refresh list..
if list == nil {
continue
}
}
}
// We have expired pub acks at this point..
sc.Lock()
var a *ack
now := time.Now().UnixNano()
for a = list; a != nil; a = a.next {
if a.expire-now > int64(time.Millisecond) {
// This element expires in more than 1ms from now,
// so stop and end the list prior to this element.
if a != sc.pubAckHead {
a.prev.next = nil
a.prev = nil
sc.pubAckHead = a
} else {
fmt.Printf("@@IK: HERE!!!!!!!!!!!!!!!!!!!!!!!\n")
}
break
}
}
// If all elements are expired, reset the connection's list.
if a == nil {
sc.pubAckHead, sc.pubAckTail = nil, nil
}
sc.Unlock()
}

var next *ack
for a := list; a != nil; {
// Remove the ack from the map.
sc.Lock()
removed := sc.removeAck(a.guid, false) != nil
next = a.next
sc.Unlock()
// If processAck has already processed the ack, we would not
// have been able to remove from the map, so move to the next.
if !removed {
a = next
continue
}
if a.ah != nil {
a.ah(a.guid, errToReport)
} else if a.ch != nil {
a.ch <- errToReport
}
a = next
}

if closed {
return
}
}
}

0 comments on commit dafb6e8

Please sign in to comment.