Skip to content

Commit

Permalink
Merge 87ba3d2 into c75dfd5
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Apr 15, 2022
2 parents c75dfd5 + 87ba3d2 commit f144b67
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 164 deletions.
8 changes: 4 additions & 4 deletions go_test.mod
Expand Up @@ -4,16 +4,16 @@ go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.7.2
github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)

require (
github.com/klauspost/compress v1.13.4 // indirect
github.com/minio/highwayhash v1.0.1 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect
github.com/klauspost/compress v1.14.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
Expand Down
20 changes: 9 additions & 11 deletions go_test.sum
Expand Up @@ -5,21 +5,19 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.2 h1:+LEN8m0+jdCkiGc884WnDuxR+qj80/5arj+szKuRpRI=
github.com/nats-io/nats-server/v2 v2.7.2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703 h1:d8siT+8VQ68hfqPqYZvpMrHIihlMVW3gGy+o2hRDCyg=
github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703/go.mod h1:5vic7C58BFEVltiZhs7Kq81q2WcEPhJPsmNv1FOrdv0=
github.com/nats-io/nats.go v1.14.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
8 changes: 3 additions & 5 deletions js.go
Expand Up @@ -1465,12 +1465,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if cfg.ReplayPolicy == replayPolicyNotSet {
cfg.ReplayPolicy = ReplayInstantPolicy
}
// Note: Do not set MaxAckPending if not set by the user. The server may
// have limits that are lower than the sync subscription go channel size,
// so don't use the cap size as the "default" MaxAckPending.

// If we have acks at all and the MaxAckPending is not set go ahead
// and set to the internal max for channel based consumers
if cfg.MaxAckPending == 0 && ch != nil && cfg.AckPolicy != AckNonePolicy {
cfg.MaxAckPending = cap(ch)
}
// Create request here.
ccreq = &createConsumerRequest{
Stream: stream,
Expand Down
73 changes: 72 additions & 1 deletion js_test.go
Expand Up @@ -20,6 +20,7 @@ package nats
import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -843,7 +844,7 @@ func TestJetStreamFlowControlStalled(t *testing.T) {
}

// Publish bunch of messages.
payload := make([]byte, 1024)
payload := make([]byte, 100*1024)
for i := 0; i < 250; i++ {
nc.Publish("a", payload)
}
Expand Down Expand Up @@ -939,3 +940,73 @@ func TestJetStreamExpiredPullRequests(t *testing.T) {
}
}
}

func TestJetStreamSyncSubscribeWithMaxAckPending(t *testing.T) {
opts := natsserver.DefaultTestOptions
opts.Port = -1
opts.JetStream = true
opts.JetStreamLimits.MaxAckPending = 100
s := natsserver.RunServer(&opts)
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

if _, err := js.AddStream(&StreamConfig{Name: "MAX_ACK_PENDING", Subjects: []string{"foo"}}); err != nil {
t.Fatalf("Error adding stream: %v", err)
}

// We should be able to create a sync subscription
sub, err := js.SubscribeSync("foo")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
sub.Unsubscribe()

// And pull sub
sub, err = js.PullSubscribe("foo", "bar")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
sub.Unsubscribe()
}

func TestJetStreamClusterPlacement(t *testing.T) {
// There used to be a test here that would not work because it would require
// all servers in the cluster to know about each other tags. So we will simply
// verify that if a stream is configured with placement and tags, the proper
// "stream create" request is sent.
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

sub, err := nc.SubscribeSync(fmt.Sprintf("$JS.API."+apiStreamCreateT, "TEST"))
if err != nil {
t.Fatalf("Error on sub: %v", err)
}
js.AddStream(&StreamConfig{
Name: "TEST",
Placement: &Placement{
Tags: []string{"my_tag"},
},
})
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error getting stream create request: %v", err)
}
var req StreamConfig
if err := json.Unmarshal(msg.Data, &req); err != nil {
t.Fatalf("Unmarshal error: %v", err)
}
if req.Placement == nil {
t.Fatal("Expected placement, did not get it")
}
if n := len(req.Placement.Tags); n != 1 {
t.Fatalf("Expected 1 tag, got %v", n)
}
if v := req.Placement.Tags[0]; v != "my_tag" {
t.Fatalf("Unexpected tag: %q", v)
}
}
4 changes: 2 additions & 2 deletions norace_test.go
Expand Up @@ -603,7 +603,7 @@ Loop:
})
}

func TestJetStreamPushFlowControl_SubscribeAsyncAndChannel(t *testing.T) {
func TestNoRaceJetStreamPushFlowControl_SubscribeAsyncAndChannel(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

Expand Down Expand Up @@ -673,7 +673,7 @@ func TestJetStreamPushFlowControl_SubscribeAsyncAndChannel(t *testing.T) {
defer sub.Unsubscribe()

// Set this lower then normal to make sure we do not exceed bytes pending with FC turned on.
sub.SetPendingLimits(totalMsgs, 1024*1024) // This matches server window for flowcontrol.
sub.SetPendingLimits(totalMsgs, 4*1024*1024) // This matches server window for flowcontrol.

info, err := sub.ConsumerInfo()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions scripts/cov.sh
Expand Up @@ -3,10 +3,10 @@

rm -rf ./cov
mkdir cov
go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/nats.out
go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test
go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin
go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto
go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/nats.out -run='Test[^NoRace]'
go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go -run='Test[^NoRace]' ./test
go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin
go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto
gocovmerge ./cov/*.out > acc.out
rm -rf ./cov

Expand Down

0 comments on commit f144b67

Please sign in to comment.