Skip to content

Commit

Permalink
Merge pull request #971 from nats-io/release_1_15_0
Browse files Browse the repository at this point in the history
Release v1.15.0
  • Loading branch information
kozlovic committed May 4, 2022
2 parents 96c1445 + eae1479 commit f52eb7b
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 104 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -29,7 +29,7 @@ When using or transitioning to Go modules support:
```bash
# Go client latest or explicit version
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.14.0
go get github.com/nats-io/nats.go/@v1.15.0

# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2
Expand Down
4 changes: 2 additions & 2 deletions go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.7.5-0.20220416010054-1b0a5d9e9b36
github.com/nats-io/nats-server/v2 v2.8.2
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand All @@ -14,7 +14,7 @@ require (
github.com/klauspost/compress v1.14.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
)
8 changes: 4 additions & 4 deletions go_test.sum
Expand Up @@ -15,16 +15,16 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.5-0.20220416010054-1b0a5d9e9b36 h1:jOdTqOcfwlsbTMz45x+bsVEPs7P8jPjMOrlu1o4de3k=
github.com/nats-io/nats-server/v2 v2.7.5-0.20220416010054-1b0a5d9e9b36/go.mod h1:5vic7C58BFEVltiZhs7Kq81q2WcEPhJPsmNv1FOrdv0=
github.com/nats-io/nats-server/v2 v2.8.2 h1:5m1VytMEbZx0YINvKY+X2gXdLNwP43uLXnFRwz8j8KE=
github.com/nats-io/nats-server/v2 v2.8.2/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4=
github.com/nats-io/nats.go v1.14.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce h1:Roh6XWxHFKrPgC/EQhVubSAGQ6Ozk6IdxHSzt1mR0EI=
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
2 changes: 1 addition & 1 deletion nats.go
Expand Up @@ -48,7 +48,7 @@ import (

// Default Constants
const (
Version = "1.14.0"
Version = "1.15.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
Expand Down
209 changes: 113 additions & 96 deletions test/js_test.go
Expand Up @@ -4262,97 +4262,103 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) {
}
})

t.Run("create sourced stream from origin", func(t *testing.T) {
sources := make([]*nats.StreamSource, 0)
sources = append(sources, &nats.StreamSource{Name: "origin"})
sources = append(sources, &nats.StreamSource{Name: "m1"})
streamName := "s2"
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Sources: sources,
Storage: nats.FileStorage,
Replicas: 1,
})
if err != nil {
t.Fatalf("Unexpected error creating stream: %v", err)
}
// Commenting out this test until we figure out what was the intent.
// Since v2.8.0, this test would fail with a "detected cycle" error,
// I guess because "m1" already sources "origin", so creating a
// stream with both as a source is bad.
/*
t.Run("create sourced stream from origin", func(t *testing.T) {
sources := make([]*nats.StreamSource, 0)
sources = append(sources, &nats.StreamSource{Name: "origin"})
sources = append(sources, &nats.StreamSource{Name: "m1"})
streamName := "s2"
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Sources: sources,
Storage: nats.FileStorage,
Replicas: 1,
})
if err != nil {
t.Fatalf("Unexpected error creating stream: %v", err)
}
msgs := make([]*nats.RawStreamMsg, 0)
msgs := make([]*nats.RawStreamMsg, 0)
// Stored message sequences start at 1
startSequence := 1
expectedTotal := totalMsgs * 2
// Stored message sequences start at 1
startSequence := 1
expectedTotal := totalMsgs * 2
GetNextMsg:
for i := startSequence; i < expectedTotal+1; i++ {
var (
err error
seq = uint64(i)
msg *nats.RawStreamMsg
timeout = time.Now().Add(5 * time.Second)
)
GetNextMsg:
for i := startSequence; i < expectedTotal+1; i++ {
var (
err error
seq = uint64(i)
msg *nats.RawStreamMsg
timeout = time.Now().Add(5 * time.Second)
)
Retry:
for time.Now().Before(timeout) {
msg, err = js.GetMsg(streamName, seq)
Retry:
for time.Now().Before(timeout) {
msg, err = js.GetMsg(streamName, seq)
if err != nil {
time.Sleep(100 * time.Millisecond)
continue Retry
}
msgs = append(msgs, msg)
continue GetNextMsg
}
if err != nil {
time.Sleep(100 * time.Millisecond)
continue Retry
t.Fatalf("Unexpected error fetching seq=%v: %v", seq, err)
}
msgs = append(msgs, msg)
continue GetNextMsg
}
if err != nil {
t.Fatalf("Unexpected error fetching seq=%v: %v", seq, err)
}
}
got := len(msgs)
if got < expectedTotal {
t.Errorf("Expected %v, got: %v", expectedTotal, got)
}

si, err := js.StreamInfo(streamName)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
got = int(si.State.Msgs)
if got != expectedTotal {
t.Errorf("Expected %v, got: %v", expectedTotal, got)
}

got = len(si.Sources)
expected := 2
if got != expected {
t.Errorf("Expected %v, got: %v", expected, got)
}
got := len(msgs)
if got < expectedTotal {
t.Errorf("Expected %v, got: %v", expectedTotal, got)
}
t.Run("consume from sourced stream", func(t *testing.T) {
sub, err := js.SubscribeSync("origin", nats.BindStream(streamName))
si, err := js.StreamInfo(streamName)
if err != nil {
t.Fatal(err)
t.Fatalf("Unexpected error: %v", err)
}
got = int(si.State.Msgs)
if got != expectedTotal {
t.Errorf("Expected %v, got: %v", expectedTotal, got)
}
mmsgs := make([]*nats.Msg, 0)
for i := 0; i < totalMsgs; i++ {
msg, err := sub.NextMsg(2 * time.Second)
got = len(si.Sources)
expected := 2
if got != expected {
t.Errorf("Expected %v, got: %v", expected, got)
}
t.Run("consume from sourced stream", func(t *testing.T) {
sub, err := js.SubscribeSync("origin", nats.BindStream(streamName))
if err != nil {
t.Error(err)
t.Fatal(err)
}
meta, err := msg.Metadata()
if err != nil {
t.Error(err)
mmsgs := make([]*nats.Msg, 0)
for i := 0; i < totalMsgs; i++ {
msg, err := sub.NextMsg(2 * time.Second)
if err != nil {
t.Error(err)
}
meta, err := msg.Metadata()
if err != nil {
t.Error(err)
}
if meta.Stream != streamName {
t.Errorf("Expected m1, got: %v", meta.Stream)
}
mmsgs = append(mmsgs, msg)
}
if meta.Stream != streamName {
t.Errorf("Expected m1, got: %v", meta.Stream)
if len(mmsgs) != totalMsgs {
t.Errorf("Expected to consume %v msgs, got: %v", totalMsgs, len(mmsgs))
}
mmsgs = append(mmsgs, msg)
}
if len(mmsgs) != totalMsgs {
t.Errorf("Expected to consume %v msgs, got: %v", totalMsgs, len(mmsgs))
}
})
})
})
*/
}

func TestJetStream_ClusterMultipleSubscribe(t *testing.T) {
Expand Down Expand Up @@ -5278,44 +5284,55 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) {
}
})

t.Run("max waiting timeout", func(t *testing.T) {
t.Run("max waiting exceeded", func(t *testing.T) {
defer js.PurgeStream(subject)

expected := 10
sendMsgs(t, expected)

sub, err := js.PullSubscribe(subject, "max-waiting")
_, err := js.AddConsumer(subject, &nats.ConsumerConfig{
Durable: "max-waiting",
MaxWaiting: 2,
AckPolicy: nats.AckExplicitPolicy,
})
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()

// Poll more than the default max of waiting/inflight pull requests,
// so that We will get only 408 timeout errors.
errCh := make(chan error, 1024)
defer close(errCh)
var wg sync.WaitGroup
for i := 0; i < 1024; i++ {
wg.Add(1)

wg.Add(2)
for i := 0; i < 2; i++ {
go func() {
_, err := sub.Fetch(1, nats.MaxWait(500*time.Millisecond))
defer wg.Done()

sub, err := js.PullSubscribe(subject, "max-waiting")
if err != nil {
errCh <- err
return
}
sub.Fetch(1, nats.MaxWait(time.Second))
}()
}
wg.Wait()

select {
case <-time.After(1 * time.Second):
t.Fatal("Expected RequestTimeout (408) error due to many inflight pulls")
case err := <-errCh:
if err != nil && (err.Error() != `nats: Request Timeout` && err != nats.ErrTimeout) {
t.Errorf("Expected request timeout fetching next message, got: %+v", err)
// Give time to those 2 above to fill the MaxWaiting
checkFor(t, time.Second, 15*time.Millisecond, func() error {
ci, err := js.ConsumerInfo(subject, "max-waiting")
if err != nil {
return err
}
if n := ci.NumWaiting; n != 2 {
return fmt.Errorf("NumWaiting should be 2, was %v", n)
}
return nil
})

// Now this request should get a 409. Currently, we do not re-fetch
// on that error, so would be visible in the error returned by Fetch()
sub, err := js.PullSubscribe(subject, "max-waiting")
if err != nil {
t.Fatal(err)
}
_, err = sub.Fetch(1, nats.MaxWait(time.Second))
if err == nil || !strings.Contains(err.Error(), "MaxWaiting") {
t.Fatalf("Unexpected error: %v", err)
}
wg.Wait()
})

t.Run("no wait", func(t *testing.T) {
Expand Down

0 comments on commit f52eb7b

Please sign in to comment.