diff --git a/.travis.yml b/.travis.yml index 433ad358..2419e2c4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,19 +19,18 @@ install: services: - mysql before_script: -- EXCLUDE_VENDOR=$(go list ./... | grep -v "/vendor/") -- EXCLUDE_VENDOR_AND_PROTO_DIR=$(go list ./... | grep -v "/vendor/" | grep -v "/spb") +# Remove the "spb" directory that contains protobuf +- GO_LIST=$(go list ./... | grep -v "/spb") - go install -- $(exit $(go fmt $EXCLUDE_VENDOR | wc -l)) -- go vet $EXCLUDE_VENDOR -- $(exit $(misspell -locale US $EXCLUDE_VENDOR_AND_PROTO_DIR | wc -l)) -- $(exit $(misspell -locale US README.md | wc -l)) -- staticcheck $EXCLUDE_VENDOR_AND_PROTO_DIR +- $(exit $(go fmt $GO_LIST | wc -l)) +- go vet $GO_LIST +- find . -type f -name "*.go" | grep -v "/vendor/" | grep -v "/spb/" | xargs misspell -error -locale US +- staticcheck $GO_LIST script: - set -e - mysql -u root -e "CREATE USER 'nss'@'localhost' IDENTIFIED BY 'password'; GRANT ALL PRIVILEGES ON *.* TO 'nss'@'localhost'; CREATE DATABASE test_nats_streaming;" -- go test -i $EXCLUDE_VENDOR -- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -failfast $EXCLUDE_VENDOR; fi +- go test -i ./... +- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -failfast ./...; fi - set +e deploy: diff --git a/README.md b/README.md index 749dab4d..aa1d964d 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ NATS Streaming is an extremely performant, lightweight reliable streaming platform built on [NATS](https://nats.io). -[![License][License-Image]][License-Url] [![ReportCard][ReportCard-Image]][ReportCard-Url] [![Build][Build-Status-Image]][Build-Status-Url] [![Coverage][Coverage-Image]][Coverage-Url] +[![License][License-Image]][License-Url] [![ReportCard][ReportCard-Image]][ReportCard-Url] [![Build][Build-Status-Image]][Build-Status-Url] [![Release][Release-Image]][Release-Url] [![Coverage][Coverage-Image]][Coverage-Url] ## Documentation @@ -44,4 +44,6 @@ under the Apache Version 2.0 license found in the LICENSE file. [Coverage-image]: https://coveralls.io/repos/github/nats-io/nats-streaming-server/badge.svg?branch=master&t=kIxrDE [ReportCard-Url]: http://goreportcard.com/report/nats-io/nats-streaming-server [ReportCard-Image]: http://goreportcard.com/badge/github.com/nats-io/nats-streaming-server +[Release-Url]: https://github.com/nats-io/nats-streaming-server/releases/tag/v0.16.2 +[Release-image]: https://img.shields.io/badge/release-v0.16.2-1eb0fc.svg [github-release]: https://github.com/nats-io/nats-streaming-server/releases/ diff --git a/server/clustering.go b/server/clustering.go index 7fa14518..90dd3700 100644 --- a/server/clustering.go +++ b/server/clustering.go @@ -433,7 +433,7 @@ func (s *StanServer) bootstrapCluster(name string, node *raft.Raft) error { var ( addr = s.getClusteringAddr(name) // Include ourself in the cluster. - servers = []raft.Server{raft.Server{ + servers = []raft.Server{{ ID: raft.ServerID(s.opts.Clustering.NodeID), Address: raft.ServerAddress(addr), }} diff --git a/server/raft_log_test.go b/server/raft_log_test.go index 92842022..8c360dff 100644 --- a/server/raft_log_test.go +++ b/server/raft_log_test.go @@ -285,19 +285,19 @@ func TestRaftLogWithEncryption(t *testing.T) { } expected := []*raft.Log{ - &raft.Log{ + { Type: raft.LogCommand, Index: 1, Term: 1, Data: []byte("msg1"), }, - &raft.Log{ + { Type: raft.LogCommand, Index: 2, Term: 1, Data: []byte("msg2"), }, - &raft.Log{ + { Type: raft.LogCommand, Index: 3, Term: 1, diff --git a/server/raft_transport_test.go b/server/raft_transport_test.go index 9926a84d..737ba307 100644 --- a/server/raft_transport_test.go +++ b/server/raft_transport_test.go @@ -169,7 +169,7 @@ func TestRAFTTransportAppendEntries(t *testing.T) { PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*raft.Log{ - &raft.Log{ + { Index: 101, Term: 4, Type: raft.LogNoop, @@ -251,7 +251,7 @@ func TestRAFTTransportAppendEntriesPipeline(t *testing.T) { PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*raft.Log{ - &raft.Log{ + { Index: 101, Term: 4, Type: raft.LogNoop, @@ -556,7 +556,7 @@ func TestRAFTTransportPooledConn(t *testing.T) { PrevLogEntry: 100, PrevLogTerm: 4, Entries: []*raft.Log{ - &raft.Log{ + { Index: 101, Term: 4, Type: raft.LogNoop, diff --git a/server/server.go b/server/server.go index 264cf2db..1c6fd899 100644 --- a/server/server.go +++ b/server/server.go @@ -46,7 +46,7 @@ import ( // Server defaults. const ( // VERSION is the current version for the NATS Streaming server. - VERSION = "0.16.1" + VERSION = "0.16.2" DefaultClusterID = "test-cluster" DefaultDiscoverPrefix = "_STAN.discover" @@ -3468,7 +3468,6 @@ func (s *StanServer) performAckExpirationRedelivery(sub *subState, isStartup boo var ( pick *subState sent bool - tracePrinted bool foundWithZero bool nextExpirationTime int64 ) @@ -3497,8 +3496,7 @@ func (s *StanServer) performAckExpirationRedelivery(sub *subState, isStartup boo // If this message has not yet expired, reset timer for next callback if pm.expire > limit { nextExpirationTime = pm.expire - if !tracePrinted && s.trace { - tracePrinted = true + if s.trace { s.log.Tracef("[Client:%s] Redelivery for subid=%d, skipping seq=%d", clientID, subID, m.Sequence) } break diff --git a/server/server_run_test.go b/server/server_run_test.go index 19f76659..7c841a65 100644 --- a/server/server_run_test.go +++ b/server/server_run_test.go @@ -1146,7 +1146,7 @@ func TestStreamingServerReadyLog(t *testing.T) { // At first, we should not get the lock since server // is standby... checkLog(t, l2, false) - // Now shutdown s and s2 shoudl report it is ready + // Now shutdown s and s2 should report it is ready s.Shutdown() checkLog(t, l2, true) s2.Shutdown() diff --git a/server/server_sub_test.go b/server/server_sub_test.go index fbf5350c..c8df7ad6 100644 --- a/server/server_sub_test.go +++ b/server/server_sub_test.go @@ -843,11 +843,11 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) { suffix bool } subOpts := []optAndText{ - optAndText{stan.StartAt(pb.StartPosition_NewOnly), "new-only, seq=1", true}, - optAndText{stan.StartWithLastReceived(), "last message, seq=1", true}, - optAndText{stan.StartAtSequence(10), "from sequence, asked_seq=10 actual_seq=1", true}, - optAndText{stan.StartAt(pb.StartPosition_First), "from beginning, seq=1", true}, - optAndText{stan.StartAtTimeDelta(time.Hour), "from time time=", false}, + {stan.StartAt(pb.StartPosition_NewOnly), "new-only, seq=1", true}, + {stan.StartWithLastReceived(), "last message, seq=1", true}, + {stan.StartAtSequence(10), "from sequence, asked_seq=10 actual_seq=1", true}, + {stan.StartAt(pb.StartPosition_First), "from beginning, seq=1", true}, + {stan.StartAtTimeDelta(time.Hour), "from time time=", false}, } for _, o := range subOpts { sub, err := sc.Subscribe("foo", func(_ *stan.Msg) {}, o.opt) @@ -891,21 +891,21 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) { } ssubs := []startSub{ // New plain subscription followed by Unsubscribe should remove the subscription - startSub{ + { start: func() (stan.Subscription, error) { return sc.Subscribe("foo", func(_ *stan.Msg) {}) }, startTrace: "Started new subscription", end: func(sub stan.Subscription) error { return sub.Unsubscribe() }, endTrace: "Removed subscription", }, // New plain subscription followed by Close should remove the subscription - startSub{ + { start: func() (stan.Subscription, error) { return sc.Subscribe("foo", func(_ *stan.Msg) {}) }, startTrace: "Started new subscription", end: func(sub stan.Subscription) error { return sub.Close() }, endTrace: "Removed subscription", }, // New durable subscription followed by Close should suspend the subscription - startSub{ + { start: func() (stan.Subscription, error) { return sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur")) }, @@ -914,7 +914,7 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) { endTrace: "Suspended durable subscription", }, // Resuming the durable subscription, followed by Unsubscribe should removed the subscription - startSub{ + { start: func() (stan.Subscription, error) { return sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur")) }, @@ -923,41 +923,41 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) { endTrace: "Removed durable subscription", }, // Non durable queue subscribption - startSub{ + { start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}) }, startTrace: "Started new queue subscription", end: func(sub stan.Subscription) error { return nil }, endTrace: "", }, // Adding a member followed by Unsubscribe should simply remove this member. - startSub{ + { start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}) }, startTrace: "Added member to queue subscription", end: func(sub stan.Subscription) error { return sub.Unsubscribe() }, endTrace: "Removed member from queue subscription", }, // Adding a member followed by Close should simply remove this member. - startSub{ + { start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}) }, startTrace: "Added member to queue subscription", end: func(sub stan.Subscription) error { return sub.Close() }, endTrace: "Removed member from queue subscription", }, // New queue subscription followed by Unsubscribe should remove the queue subscription - startSub{ + { start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue2", func(_ *stan.Msg) {}) }, startTrace: "Started new queue subscription", end: func(sub stan.Subscription) error { return sub.Unsubscribe() }, endTrace: "Removed queue subscription", }, // New queue subscription followed by Close should remove the queue subscription - startSub{ + { start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue2", func(_ *stan.Msg) {}) }, startTrace: "Started new queue subscription", end: func(sub stan.Subscription) error { return sub.Close() }, endTrace: "Removed queue subscription", }, // New durable queue subscription followed by Close should suspend the subscription - startSub{ + { start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur")) }, @@ -966,7 +966,7 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) { endTrace: "Suspended durable queue subscription", }, // Resuming durable queue subscription - startSub{ + { start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur")) }, @@ -974,7 +974,7 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) { end: func(sub stan.Subscription) error { return nil }, endTrace: "", }, // Adding a member followed by Close should remove this member only - startSub{ + { start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur")) }, @@ -983,7 +983,7 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) { endTrace: "Removed member from durable queue subscription", }, // Adding a member followed by Unsubscribe should remove this member only - startSub{ + { start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur")) }, @@ -992,7 +992,7 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) { endTrace: "Removed member from durable queue subscription", }, // New durable subscription followed by Unsubscribe should remove the subscription - startSub{ + { start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue2", func(_ *stan.Msg) {}, stan.DurableName("dur")) }, diff --git a/server/snapshot.go b/server/snapshot.go index ba7f73ca..bf898991 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -588,7 +588,6 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64, fromAp if err := c.store.Msgs.Empty(); err != nil { return err } - stored = false // Set the channel's first sequence, but not past the // `last` that we use in our for-loop. if msg.Sequence > last { diff --git a/server/timeout_reader.go b/server/timeout_reader.go index 9efebf87..768b6517 100644 --- a/server/timeout_reader.go +++ b/server/timeout_reader.go @@ -23,6 +23,7 @@ import ( const bufferSize = 4096 +// ErrTimeout reports a read timeout error var ErrTimeout = errors.New("natslog: read timeout") type timeoutReader struct { diff --git a/stores/common_msg_test.go b/stores/common_msg_test.go index 636ef8b3..106fbfb1 100644 --- a/stores/common_msg_test.go +++ b/stores/common_msg_test.go @@ -448,7 +448,6 @@ func TestCSMaxAge(t *testing.T) { } // Store a message storeMsg(t, cs, "bar", seq, []byte("msg")) - seq++ // Now timer should have been set again if !isSet() { t.Fatal("Timer should have been set") diff --git a/stores/common_test.go b/stores/common_test.go index 47db0eee..be3ea682 100644 --- a/stores/common_test.go +++ b/stores/common_test.go @@ -63,10 +63,10 @@ type testStore struct { var ( testLogger logger.Logger testStores = []*testStore{ - &testStore{TypeMemory, false}, - &testStore{TypeFile, true}, - &testStore{TypeSQL, true}, - &testStore{TypeRaft, false}, + {TypeMemory, false}, + {TypeFile, true}, + {TypeSQL, true}, + {TypeRaft, false}, } testTimestampMu sync.Mutex testLastTimestamp int64 diff --git a/stores/filestore_msg_test.go b/stores/filestore_msg_test.go index 7d65dc99..09d2e5fc 100644 --- a/stores/filestore_msg_test.go +++ b/stores/filestore_msg_test.go @@ -1442,8 +1442,8 @@ type testFSGapsOption struct { func testFSGetOptionsForGapsTests() []testFSGapsOption { defaultOptions := DefaultFileStoreOptions opts := []testFSGapsOption{ - testFSGapsOption{"Default", AllOptions(&defaultOptions)}, - testFSGapsOption{"NoBuffer", BufferSize(0)}, + {"Default", AllOptions(&defaultOptions)}, + {"NoBuffer", BufferSize(0)}, } return opts } diff --git a/stores/filestore_test.go b/stores/filestore_test.go index 5df7e715..774911b7 100644 --- a/stores/filestore_test.go +++ b/stores/filestore_test.go @@ -1391,7 +1391,7 @@ func TestFSReadRecord(t *testing.T) { buf := make([]byte, 5) var retBuf []byte - recType := recNoType + var recType recordType recSize := 0 // Reader returns an error diff --git a/stores/sqlstore.go b/stores/sqlstore.go index 809d748c..1f9339ce 100644 --- a/stores/sqlstore.go +++ b/stores/sqlstore.go @@ -504,7 +504,7 @@ func (s *SQLStore) updateDBLock() { var ( ticker = time.NewTicker(sqlLockUpdateInterval) - hasLock = true + hasLock bool err error failed int ) diff --git a/util/channels.go b/util/channels.go index d0770d9f..b59147ea 100644 --- a/util/channels.go +++ b/util/channels.go @@ -24,7 +24,7 @@ import ( // Number of bytes used to encode a channel name const encodedChannelLen = 2 -// SendsChannelsList sends the list of channels to the given subject, possibly +// SendChannelsList sends the list of channels to the given subject, possibly // splitting the list in several requests if it cannot fit in a single message. func SendChannelsList(channels []string, sendInbox, replyInbox string, nc *nats.Conn, serverID string) error { // Since the NATS message payload is limited, we need to repeat