diff --git a/context.go b/context.go index 9c456454d..f1630ca2b 100644 --- a/context.go +++ b/context.go @@ -129,19 +129,33 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) { return nil, err } + // snapshot mch := s.mch s.mu.Unlock() var ok bool var msg *Msg + // If something is available right away, let's optimize that case. select { case msg, ok = <-mch: if !ok { return nil, ErrConnectionClosed } - err := s.processNextMsgDelivered(msg) - if err != nil { + if err := s.processNextMsgDelivered(msg); err != nil { + return nil, err + } else { + return msg, nil + } + default: + } + + select { + case msg, ok = <-mch: + if !ok { + return nil, ErrConnectionClosed + } + if err := s.processNextMsgDelivered(msg); err != nil { return nil, err } case <-ctx.Done(): diff --git a/nats.go b/nats.go index 06bb0730c..576293b1d 100644 --- a/nats.go +++ b/nats.go @@ -806,6 +806,15 @@ func Nkey(pubKey string, sigCB SignatureHandler) Option { } } +// SyncQueueLen will set the maximum queue len for the internal +// channel used for SubsribeSync(). +func SyncQueueLen(max int) Option { + return func(o *Options) error { + o.SubChanLen = max + return nil + } +} + // Dialer is an Option to set the dialer which will be used when // attempting to establish a connection. // DEPRECATED: Should use CustomDialer instead. @@ -1018,6 +1027,7 @@ func (nc *Conn) pickServer() error { if len(nc.srvPool) <= 0 { return ErrNoServers } + for _, s := range nc.srvPool { if s != nil { nc.current = s @@ -3031,6 +3041,23 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { var ok bool var msg *Msg + // If something is available right away, let's optimize that case. + select { + case msg, ok = <-mch: + if !ok { + return nil, ErrConnectionClosed + } + if err := s.processNextMsgDelivered(msg); err != nil { + return nil, err + } else { + return msg, nil + } + default: + } + + // If we are here a message was not immediately available, so lets loop + // with a timeout. + t := globalTimerPool.Get(timeout) defer globalTimerPool.Put(t) @@ -3039,8 +3066,7 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { if !ok { return nil, ErrConnectionClosed } - err := s.processNextMsgDelivered(msg) - if err != nil { + if err := s.processNextMsgDelivered(msg); err != nil { return nil, err } case <-t.C: diff --git a/nats_test.go b/nats_test.go index c84783dfe..ecaff56f0 100644 --- a/nats_test.go +++ b/nats_test.go @@ -1614,3 +1614,55 @@ func TestConnectedAddr(t *testing.T) { t.Fatalf("Expected empty result for closed connection, got %q", addr) } } + +func BenchmarkNextMsgNoTimeout(b *testing.B) { + s := RunServerOnPort(TEST_PORT) + defer s.Shutdown() + + ncp, err := Connect(fmt.Sprintf("localhost:%d", TEST_PORT)) + if err != nil { + b.Fatalf("Error connecting: %v", err) + } + ncs, err := Connect(fmt.Sprintf("localhost:%d", TEST_PORT), SyncQueueLen(b.N)) + if err != nil { + b.Fatalf("Error connecting: %v", err) + } + + // Test processing speed so no long subject or payloads. + subj := "a" + + sub, err := ncs.SubscribeSync(subj) + if err != nil { + b.Fatalf("Error subscribing: %v", err) + } + ncs.Flush() + + // Set it up so we can internally queue all the messages. + sub.SetPendingLimits(b.N, b.N*1000) + + for i := 0; i < b.N; i++ { + ncp.Publish(subj, nil) + } + ncp.Flush() + + // Wait for them to all be queued up, testing NextMsg not server here. + // Only wait at most one second. + wait := time.Now().Add(time.Second) + for time.Now().Before(wait) { + nm, _, err := sub.Pending() + if err != nil { + b.Fatalf("Error on Pending() - %v", err) + } + if nm >= b.N { + break + } + time.Sleep(10 * time.Millisecond) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := sub.NextMsg(10 * time.Millisecond); err != nil { + b.Fatalf("Error getting message[%d]: %v", i, err) + } + } +}