Skip to content

Commit

Permalink
Merge 484ef4f into fdeedce
Browse files Browse the repository at this point in the history
  • Loading branch information
variadico committed Feb 25, 2021
2 parents fdeedce + 484ef4f commit afaf14f
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 18 deletions.
92 changes: 77 additions & 15 deletions js.go
Expand Up @@ -332,13 +332,21 @@ func ExpectLastMsgId(id string) PubOpt {
// MaxWait sets the maximum amount of time we will wait for a response.
type MaxWait time.Duration

func (ttl MaxWait) configurePublish(opts *pubOpts) error {
func (ttl MaxWait) configureJSContext(js *js) error {
js.wait = time.Duration(ttl)
return nil
}

// AckWait sets the maximum amount of time we will wait for an ack.
type AckWait time.Duration

func (ttl AckWait) configurePublish(opts *pubOpts) error {
opts.ttl = time.Duration(ttl)
return nil
}

func (ttl MaxWait) configureJSContext(js *js) error {
js.wait = time.Duration(ttl)
func (ttl AckWait) configureSubscribe(opts *subOpts) error {
opts.cfg.AckWait = time.Duration(ttl)
return nil
}

Expand Down Expand Up @@ -487,6 +495,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
return nil, ErrPullModeNotAllowed
}

badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
if isPullMode && badPullAck {
return nil, fmt.Errorf("invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
}

var (
err error
shouldCreate bool
Expand Down Expand Up @@ -686,18 +699,6 @@ type subOpts struct {
cfg *ConsumerConfig
}

// Durable defines the consumer name for JetStream durable subscribers.
func Durable(name string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if strings.Contains(name, ".") {
return ErrInvalidDurableName
}

opts.cfg.Durable = name
return nil
})
}

// Pull defines the batch size of messages that will be received
// when using pull based JetStream consumers.
func Pull(batchSize int) SubOpt {
Expand Down Expand Up @@ -730,6 +731,18 @@ func ManualAck() SubOpt {
})
}

// Durable defines the consumer name for JetStream durable subscribers.
func Durable(name string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if strings.Contains(name, ".") {
return ErrInvalidDurableName
}

opts.cfg.Durable = name
return nil
})
}

// DeliverAll will configure a Consumer to receive all the
// messages from a Stream.
func DeliverAll() SubOpt {
Expand Down Expand Up @@ -798,6 +811,55 @@ func AckExplicit() SubOpt {
})
}

func MaxDeliver(n int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxDeliver = n
return nil
})
}

func PlaybackInstant() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.ReplayPolicy = ReplayInstant
return nil
})
}

func PlaybackOriginal() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.ReplayPolicy = ReplayOriginal
return nil
})
}

func RateLimit(n uint64) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.RateLimit = n
return nil
})
}

func SampleFrequency(s string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.SampleFrequency = s
return nil
})
}

func PullMaxWaiting(n int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxWaiting = n
return nil
})
}

func MaxAckPending(n int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxAckPending = n
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down
66 changes: 63 additions & 3 deletions test/js_test.go
Expand Up @@ -211,7 +211,7 @@ func TestJetStreamPublish(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

_, err = js.Publish("foo", msg, nats.MaxWait(time.Second), nats.Context(ctx))
_, err = js.Publish("foo", msg, nats.AckWait(time.Second), nats.Context(ctx))
if err != nats.ErrContextAndTimeout {
t.Fatalf("Expected %q, got %q", nats.ErrContextAndTimeout, err)
}
Expand All @@ -220,7 +220,7 @@ func TestJetStreamPublish(t *testing.T) {
sub, _ := nc.SubscribeSync("baz")
defer sub.Unsubscribe()

_, err = js.Publish("baz", msg, nats.MaxWait(time.Nanosecond))
_, err = js.Publish("baz", msg, nats.AckWait(time.Nanosecond))
if err != nats.ErrTimeout {
t.Fatalf("Expected %q, got %q", nats.ErrTimeout, err)
}
Expand Down Expand Up @@ -564,6 +564,66 @@ func TestJetStreamSubscribe(t *testing.T) {
}
}

func TestAckPendingGoesDown(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar", "baz", "foo.*"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

const totalMsgs = 3
for i := 0; i < totalMsgs; i++ {
if _, err := js.Publish("foo", []byte(fmt.Sprintf("msg %d", i))); err != nil {
t.Fatal(err)
}
}

sub, err := js.SubscribeSync("foo", nats.Durable("dname"), nats.Pull(totalMsgs), nats.AckExplicit())
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()

for i := totalMsgs; i > 0; i-- {
info, err := sub.ConsumerInfo()
if err != nil {
t.Fatal(err)
}
if got, want := info.NumAckPending, i; got != want {
t.Fatalf("unexpected num ack pending: got=%d, want=%d", got, want)
}

m, err := sub.NextMsg(100 * time.Millisecond)
if err != nil {
t.Fatal("NextMsg:", err)
}

if err := m.Ack(); err != nil {
t.Fatal("Ack:", err)
}
}
}

func TestJetStream_Drain(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
Expand Down Expand Up @@ -1776,7 +1836,7 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}

err = msg.AckSync(nats.MaxWait(2 * time.Second))
err = msg.AckSync(nats.AckWait(2 * time.Second))
if err != nats.ErrInvalidJSAck {
t.Errorf("Unexpected error: %v", err)
}
Expand Down

0 comments on commit afaf14f

Please sign in to comment.