Skip to content

Commit

Permalink
Merge pull request #107 from nats-io/sub-pef-improvement
Browse files Browse the repository at this point in the history
Performance improvement for async subscribers
  • Loading branch information
derekcollison committed Nov 11, 2015
2 parents 8b04ee5 + 66d52e8 commit 4ff5c72
Showing 1 changed file with 45 additions and 31 deletions.
76 changes: 45 additions & 31 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type Subscription struct {
bytes uint64
max uint64
conn *Conn
closed bool
mcb MsgHandler
mch chan *Msg
sc bool
Expand Down Expand Up @@ -990,33 +991,40 @@ func (nc *Conn) readLoop() {

// deliverMsgs waits on the delivery channel shared with readLoop and processMsg.
// It is used to deliver messages to asynchronous subscribers.
func (nc *Conn) deliverMsgs(ch chan *Msg) {
func (nc *Conn) deliverMsgs(s *Subscription) {
var closed bool
var delivered uint64
var max uint64

s.mu.Lock()
mcb := s.mcb
ch := s.mch
if ch == nil {
// We were unsubscribed before we had a chance to start. We are done!
s.mu.Unlock()
return
}
s.mu.Unlock()

for {
nc.mu.Lock()
closed := nc.isClosed()
nc.mu.Unlock()
if closed {
break
}

m, ok := <-ch
if !ok {
break
}
s := m.Sub

// Capture under locks
// Capture under lock
s.mu.Lock()
conn := s.conn
mcb := s.mcb
max := s.max
max = s.max
closed = s.closed
s.delivered++
delivered = s.delivered
s.mu.Unlock()

if conn == nil || mcb == nil {
continue
if closed {
break
}

delivered := atomic.AddUint64(&s.delivered, 1)
if max <= 0 || delivered <= max {
mcb(m)
}
Expand All @@ -1034,20 +1042,34 @@ func (nc *Conn) deliverMsgs(ch chan *Msg) {
// appropriate channel for processing. All subscribers have their
// their own channel. If the channel is full, the connection is
// considered a slow subscriber.
func (nc *Conn) processMsg(msg []byte) {
func (nc *Conn) processMsg(data []byte) {
// Lock from here on out.
nc.mu.Lock()

// Stats
nc.InMsgs += 1
nc.InBytes += uint64(len(msg))
nc.InBytes += uint64(len(data))

sub := nc.subs[nc.ps.ma.sid]
if sub == nil {
nc.mu.Unlock()
return
}

// Copy them into string
subj := string(nc.ps.ma.subject)
reply := string(nc.ps.ma.reply)

// Doing message create outside of the sub's lock to reduce contention.
// It's possible that we end-up not using the message, but that's ok.

// FIXME(dlc): Need to copy, should/can do COW?
msgPayload := make([]byte, len(data))
copy(msgPayload, data)

// FIXME(dlc): Should we recycle these containers?
m := &Msg{Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}

sub.mu.Lock()

// This is a catch all for more than max messages delivered.
Expand All @@ -1060,18 +1082,7 @@ func (nc *Conn) processMsg(msg []byte) {

// Sub internal stats
sub.msgs += 1
sub.bytes += uint64(len(msg))

// Copy them into string
subj := string(nc.ps.ma.subject)
reply := string(nc.ps.ma.reply)

// FIXME(dlc): Need to copy, should/can do COW?
newMsg := make([]byte, len(msg))
copy(newMsg, msg)

// FIXME(dlc): Should we recycle these containers?
m := &Msg{Data: newMsg, Subject: subj, Reply: reply, Sub: sub}
sub.bytes += uint64(len(data))

if sub.mch != nil {
if len(sub.mch) >= nc.Opts.SubChanLen {
Expand Down Expand Up @@ -1355,7 +1366,7 @@ func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, chanlen int) (*Subs
// If we have an async callback, start up a sub specific
// Go routine to deliver the messages.
if cb != nil {
go nc.deliverMsgs(sub.mch)
go nc.deliverMsgs(sub)
}

sub.sid = atomic.AddInt64(&nc.ssid, 1)
Expand Down Expand Up @@ -1444,6 +1455,7 @@ func (nc *Conn) removeSub(s *Subscription) {
}
// Mark as invalid
s.conn = nil
s.closed = true
}

// IsValid returns a boolean indicating whether the subscription
Expand Down Expand Up @@ -1706,12 +1718,14 @@ func (nc *Conn) close(status Status, doCBs bool) {
// pending NextMsg() calls.
for _, s := range nc.subs {
s.mu.Lock()

if s.mch != nil {
close(s.mch)
s.mch = nil
}
// Mark as invalid, for signalling to deliverMsgs
s.mcb = nil
s.closed = true

s.mu.Unlock()
}
nc.subs = nil
Expand Down

0 comments on commit 4ff5c72

Please sign in to comment.