Skip to content

Commit

Permalink
Merge 9b7fc5f into efdce8e
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Feb 21, 2019
2 parents efdce8e + 9b7fc5f commit fed3939
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 4 deletions.
18 changes: 16 additions & 2 deletions context.go
Expand Up @@ -129,19 +129,33 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {
return nil, err
}

// snapshot
mch := s.mch
s.mu.Unlock()

var ok bool
var msg *Msg

// If something is available right away, let's optimize that case.
select {
case msg, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
}
err := s.processNextMsgDelivered(msg)
if err != nil {
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
} else {
return msg, nil
}
default:
}

select {
case msg, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case <-ctx.Done():
Expand Down
30 changes: 28 additions & 2 deletions nats.go
Expand Up @@ -806,6 +806,15 @@ func Nkey(pubKey string, sigCB SignatureHandler) Option {
}
}

// SyncQueueLen will set the maximum queue len for the internal
// channel used for SubsribeSync().
func SyncQueueLen(max int) Option {
return func(o *Options) error {
o.SubChanLen = max
return nil
}
}

// Dialer is an Option to set the dialer which will be used when
// attempting to establish a connection.
// DEPRECATED: Should use CustomDialer instead.
Expand Down Expand Up @@ -1018,6 +1027,7 @@ func (nc *Conn) pickServer() error {
if len(nc.srvPool) <= 0 {
return ErrNoServers
}

for _, s := range nc.srvPool {
if s != nil {
nc.current = s
Expand Down Expand Up @@ -3031,6 +3041,23 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
var ok bool
var msg *Msg

// If something is available right away, let's optimize that case.
select {
case msg, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
} else {
return msg, nil
}
default:
}

// If we are here a message was not immediately available, so lets loop
// with a timeout.

t := globalTimerPool.Get(timeout)
defer globalTimerPool.Put(t)

Expand All @@ -3039,8 +3066,7 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
if !ok {
return nil, ErrConnectionClosed
}
err := s.processNextMsgDelivered(msg)
if err != nil {
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case <-t.C:
Expand Down
52 changes: 52 additions & 0 deletions nats_test.go
Expand Up @@ -1614,3 +1614,55 @@ func TestConnectedAddr(t *testing.T) {
t.Fatalf("Expected empty result for closed connection, got %q", addr)
}
}

func BenchmarkNextMsgNoTimeout(b *testing.B) {
s := RunServerOnPort(TEST_PORT)
defer s.Shutdown()

ncp, err := Connect(fmt.Sprintf("localhost:%d", TEST_PORT))
if err != nil {
b.Fatalf("Error connecting: %v", err)
}
ncs, err := Connect(fmt.Sprintf("localhost:%d", TEST_PORT), SyncQueueLen(b.N))
if err != nil {
b.Fatalf("Error connecting: %v", err)
}

// Test processing speed so no long subject or payloads.
subj := "a"

sub, err := ncs.SubscribeSync(subj)
if err != nil {
b.Fatalf("Error subscribing: %v", err)
}
ncs.Flush()

// Set it up so we can internally queue all the messages.
sub.SetPendingLimits(b.N, b.N*1000)

for i := 0; i < b.N; i++ {
ncp.Publish(subj, nil)
}
ncp.Flush()

// Wait for them to all be queued up, testing NextMsg not server here.
// Only wait at most one second.
wait := time.Now().Add(time.Second)
for time.Now().Before(wait) {
nm, _, err := sub.Pending()
if err != nil {
b.Fatalf("Error on Pending() - %v", err)
}
if nm >= b.N {
break
}
time.Sleep(10 * time.Millisecond)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := sub.NextMsg(10 * time.Millisecond); err != nil {
b.Fatalf("Error getting message[%d]: %v", i, err)
}
}
}

0 comments on commit fed3939

Please sign in to comment.