Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MaxPubAcksInFlight option #88

Merged
merged 1 commit into from
Jul 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ func PubAckWait(t time.Duration) Option {
}
}

// MaxPubAcksInFlight is an Option to set the maximum number of published
// messages without outstanding ACKs from the server.
func MaxPubAcksInFlight(max int) Option {
return func(o *Options) error {
o.MaxPubAcksInflight = max
return nil
}
}

// NatsConn is an Option to set the underlying NATS connection to be used
// by a NATS Streaming Conn object.
func NatsConn(nc *nats.Conn) Option {
Expand Down
40 changes: 40 additions & 0 deletions stan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1888,3 +1888,43 @@ func TestNatsConn(t *testing.T) {
t.Fatal("Unexpected wrapped conn")
}
}

func TestMaxPubAcksInFlight(t *testing.T) {
s := RunServer(clusterName)
defer s.Shutdown()

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Unexpected error on connect: %v", err)
}
defer nc.Close()

sc, err := Connect(clusterName, clientName,
MaxPubAcksInFlight(1),
PubAckWait(time.Second),
NatsConn(nc))
if err != nil {
t.Fatalf("Expected to connect correctly, got err %v", err)
}
// Don't defer the close of connection since the server is stopped,
// the close would delay the test.

// Cause the ACK to not come by shutdown the server now
s.Shutdown()

msg := []byte("hello")

// Send more than one message, if MaxPubAcksInFlight() works, one
// of the publish call should block for up to PubAckWait.
start := time.Now()
for i := 0; i < 2; i++ {
if _, err := sc.PublishAsync("foo", msg, nil); err != nil {
t.Fatalf("Unexpected error on publish: %v", err)
}
}
end := time.Now()
// So if the loop ended before the PubAckWait timeout, then it's a failure.
if end.Sub(start) < time.Second {
t.Fatal("Should have blocked after 1 message sent")
}
}