Skip to content

Commit

Permalink
Merge e4d135b into 12f7398
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Sep 12, 2019
2 parents 12f7398 + e4d135b commit fcc014f
Show file tree
Hide file tree
Showing 16 changed files with 50 additions and 52 deletions.
17 changes: 8 additions & 9 deletions .travis.yml
Expand Up @@ -19,19 +19,18 @@ install:
services:
- mysql
before_script:
- EXCLUDE_VENDOR=$(go list ./... | grep -v "/vendor/")
- EXCLUDE_VENDOR_AND_PROTO_DIR=$(go list ./... | grep -v "/vendor/" | grep -v "/spb")
# Remove the "spb" directory that contains protobuf
- GO_LIST=$(go list ./... | grep -v "/spb")
- go install
- $(exit $(go fmt $EXCLUDE_VENDOR | wc -l))
- go vet $EXCLUDE_VENDOR
- $(exit $(misspell -locale US $EXCLUDE_VENDOR_AND_PROTO_DIR | wc -l))
- $(exit $(misspell -locale US README.md | wc -l))
- staticcheck $EXCLUDE_VENDOR_AND_PROTO_DIR
- $(exit $(go fmt $GO_LIST | wc -l))
- go vet $GO_LIST
- find . -type f -name "*.go" | grep -v "/vendor/" | grep -v "/spb/" | xargs misspell -error -locale US
- staticcheck $GO_LIST
script:
- set -e
- mysql -u root -e "CREATE USER 'nss'@'localhost' IDENTIFIED BY 'password'; GRANT ALL PRIVILEGES ON *.* TO 'nss'@'localhost'; CREATE DATABASE test_nats_streaming;"
- go test -i $EXCLUDE_VENDOR
- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -failfast $EXCLUDE_VENDOR; fi
- go test -i ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -failfast ./...; fi
- set +e

deploy:
Expand Down
4 changes: 3 additions & 1 deletion README.md
Expand Up @@ -2,7 +2,7 @@

NATS Streaming is an extremely performant, lightweight reliable streaming platform built on [NATS](https://nats.io).

[![License][License-Image]][License-Url] [![ReportCard][ReportCard-Image]][ReportCard-Url] [![Build][Build-Status-Image]][Build-Status-Url] [![Coverage][Coverage-Image]][Coverage-Url]
[![License][License-Image]][License-Url] [![ReportCard][ReportCard-Image]][ReportCard-Url] [![Build][Build-Status-Image]][Build-Status-Url] [![Release][Release-Image]][Release-Url] [![Coverage][Coverage-Image]][Coverage-Url]

## Documentation

Expand Down Expand Up @@ -44,4 +44,6 @@ under the Apache Version 2.0 license found in the LICENSE file.
[Coverage-image]: https://coveralls.io/repos/github/nats-io/nats-streaming-server/badge.svg?branch=master&t=kIxrDE
[ReportCard-Url]: http://goreportcard.com/report/nats-io/nats-streaming-server
[ReportCard-Image]: http://goreportcard.com/badge/github.com/nats-io/nats-streaming-server
[Release-Url]: https://github.com/nats-io/nats-streaming-server/releases/tag/v0.16.2
[Release-image]: https://img.shields.io/badge/release-v0.16.2-1eb0fc.svg
[github-release]: https://github.com/nats-io/nats-streaming-server/releases/
2 changes: 1 addition & 1 deletion server/clustering.go
Expand Up @@ -433,7 +433,7 @@ func (s *StanServer) bootstrapCluster(name string, node *raft.Raft) error {
var (
addr = s.getClusteringAddr(name)
// Include ourself in the cluster.
servers = []raft.Server{raft.Server{
servers = []raft.Server{{
ID: raft.ServerID(s.opts.Clustering.NodeID),
Address: raft.ServerAddress(addr),
}}
Expand Down
6 changes: 3 additions & 3 deletions server/raft_log_test.go
Expand Up @@ -285,19 +285,19 @@ func TestRaftLogWithEncryption(t *testing.T) {
}

expected := []*raft.Log{
&raft.Log{
{
Type: raft.LogCommand,
Index: 1,
Term: 1,
Data: []byte("msg1"),
},
&raft.Log{
{
Type: raft.LogCommand,
Index: 2,
Term: 1,
Data: []byte("msg2"),
},
&raft.Log{
{
Type: raft.LogCommand,
Index: 3,
Term: 1,
Expand Down
6 changes: 3 additions & 3 deletions server/raft_transport_test.go
Expand Up @@ -169,7 +169,7 @@ func TestRAFTTransportAppendEntries(t *testing.T) {
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*raft.Log{
&raft.Log{
{
Index: 101,
Term: 4,
Type: raft.LogNoop,
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestRAFTTransportAppendEntriesPipeline(t *testing.T) {
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*raft.Log{
&raft.Log{
{
Index: 101,
Term: 4,
Type: raft.LogNoop,
Expand Down Expand Up @@ -556,7 +556,7 @@ func TestRAFTTransportPooledConn(t *testing.T) {
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*raft.Log{
&raft.Log{
{
Index: 101,
Term: 4,
Type: raft.LogNoop,
Expand Down
6 changes: 2 additions & 4 deletions server/server.go
Expand Up @@ -46,7 +46,7 @@ import (
// Server defaults.
const (
// VERSION is the current version for the NATS Streaming server.
VERSION = "0.16.1"
VERSION = "0.16.2"

DefaultClusterID = "test-cluster"
DefaultDiscoverPrefix = "_STAN.discover"
Expand Down Expand Up @@ -3468,7 +3468,6 @@ func (s *StanServer) performAckExpirationRedelivery(sub *subState, isStartup boo
var (
pick *subState
sent bool
tracePrinted bool
foundWithZero bool
nextExpirationTime int64
)
Expand Down Expand Up @@ -3497,8 +3496,7 @@ func (s *StanServer) performAckExpirationRedelivery(sub *subState, isStartup boo
// If this message has not yet expired, reset timer for next callback
if pm.expire > limit {
nextExpirationTime = pm.expire
if !tracePrinted && s.trace {
tracePrinted = true
if s.trace {
s.log.Tracef("[Client:%s] Redelivery for subid=%d, skipping seq=%d", clientID, subID, m.Sequence)
}
break
Expand Down
2 changes: 1 addition & 1 deletion server/server_run_test.go
Expand Up @@ -1146,7 +1146,7 @@ func TestStreamingServerReadyLog(t *testing.T) {
// At first, we should not get the lock since server
// is standby...
checkLog(t, l2, false)
// Now shutdown s and s2 shoudl report it is ready
// Now shutdown s and s2 should report it is ready
s.Shutdown()
checkLog(t, l2, true)
s2.Shutdown()
Expand Down
38 changes: 19 additions & 19 deletions server/server_sub_test.go
Expand Up @@ -843,11 +843,11 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) {
suffix bool
}
subOpts := []optAndText{
optAndText{stan.StartAt(pb.StartPosition_NewOnly), "new-only, seq=1", true},
optAndText{stan.StartWithLastReceived(), "last message, seq=1", true},
optAndText{stan.StartAtSequence(10), "from sequence, asked_seq=10 actual_seq=1", true},
optAndText{stan.StartAt(pb.StartPosition_First), "from beginning, seq=1", true},
optAndText{stan.StartAtTimeDelta(time.Hour), "from time time=", false},
{stan.StartAt(pb.StartPosition_NewOnly), "new-only, seq=1", true},
{stan.StartWithLastReceived(), "last message, seq=1", true},
{stan.StartAtSequence(10), "from sequence, asked_seq=10 actual_seq=1", true},
{stan.StartAt(pb.StartPosition_First), "from beginning, seq=1", true},
{stan.StartAtTimeDelta(time.Hour), "from time time=", false},
}
for _, o := range subOpts {
sub, err := sc.Subscribe("foo", func(_ *stan.Msg) {}, o.opt)
Expand Down Expand Up @@ -891,21 +891,21 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) {
}
ssubs := []startSub{
// New plain subscription followed by Unsubscribe should remove the subscription
startSub{
{
start: func() (stan.Subscription, error) { return sc.Subscribe("foo", func(_ *stan.Msg) {}) },
startTrace: "Started new subscription",
end: func(sub stan.Subscription) error { return sub.Unsubscribe() },
endTrace: "Removed subscription",
},
// New plain subscription followed by Close should remove the subscription
startSub{
{
start: func() (stan.Subscription, error) { return sc.Subscribe("foo", func(_ *stan.Msg) {}) },
startTrace: "Started new subscription",
end: func(sub stan.Subscription) error { return sub.Close() },
endTrace: "Removed subscription",
},
// New durable subscription followed by Close should suspend the subscription
startSub{
{
start: func() (stan.Subscription, error) {
return sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
Expand All @@ -914,7 +914,7 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) {
endTrace: "Suspended durable subscription",
},
// Resuming the durable subscription, followed by Unsubscribe should removed the subscription
startSub{
{
start: func() (stan.Subscription, error) {
return sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
Expand All @@ -923,41 +923,41 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) {
endTrace: "Removed durable subscription",
},
// Non durable queue subscribption
startSub{
{
start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}) },
startTrace: "Started new queue subscription",
end: func(sub stan.Subscription) error { return nil }, endTrace: "",
},
// Adding a member followed by Unsubscribe should simply remove this member.
startSub{
{
start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}) },
startTrace: "Added member to queue subscription",
end: func(sub stan.Subscription) error { return sub.Unsubscribe() },
endTrace: "Removed member from queue subscription",
},
// Adding a member followed by Close should simply remove this member.
startSub{
{
start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}) },
startTrace: "Added member to queue subscription",
end: func(sub stan.Subscription) error { return sub.Close() },
endTrace: "Removed member from queue subscription",
},
// New queue subscription followed by Unsubscribe should remove the queue subscription
startSub{
{
start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue2", func(_ *stan.Msg) {}) },
startTrace: "Started new queue subscription",
end: func(sub stan.Subscription) error { return sub.Unsubscribe() },
endTrace: "Removed queue subscription",
},
// New queue subscription followed by Close should remove the queue subscription
startSub{
{
start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue2", func(_ *stan.Msg) {}) },
startTrace: "Started new queue subscription",
end: func(sub stan.Subscription) error { return sub.Close() },
endTrace: "Removed queue subscription",
},
// New durable queue subscription followed by Close should suspend the subscription
startSub{
{
start: func() (stan.Subscription, error) {
return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
Expand All @@ -966,15 +966,15 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) {
endTrace: "Suspended durable queue subscription",
},
// Resuming durable queue subscription
startSub{
{
start: func() (stan.Subscription, error) {
return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
startTrace: "Resumed durable queue subscription",
end: func(sub stan.Subscription) error { return nil }, endTrace: "",
},
// Adding a member followed by Close should remove this member only
startSub{
{
start: func() (stan.Subscription, error) {
return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
Expand All @@ -983,7 +983,7 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) {
endTrace: "Removed member from durable queue subscription",
},
// Adding a member followed by Unsubscribe should remove this member only
startSub{
{
start: func() (stan.Subscription, error) {
return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
Expand All @@ -992,7 +992,7 @@ func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) {
endTrace: "Removed member from durable queue subscription",
},
// New durable subscription followed by Unsubscribe should remove the subscription
startSub{
{
start: func() (stan.Subscription, error) {
return sc.QueueSubscribe("foo", "queue2", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
Expand Down
1 change: 0 additions & 1 deletion server/snapshot.go
Expand Up @@ -588,7 +588,6 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64, fromAp
if err := c.store.Msgs.Empty(); err != nil {
return err
}
stored = false
// Set the channel's first sequence, but not past the
// `last` that we use in our for-loop.
if msg.Sequence > last {
Expand Down
1 change: 1 addition & 0 deletions server/timeout_reader.go
Expand Up @@ -23,6 +23,7 @@ import (

const bufferSize = 4096

// ErrTimeout reports a read timeout error
var ErrTimeout = errors.New("natslog: read timeout")

type timeoutReader struct {
Expand Down
1 change: 0 additions & 1 deletion stores/common_msg_test.go
Expand Up @@ -448,7 +448,6 @@ func TestCSMaxAge(t *testing.T) {
}
// Store a message
storeMsg(t, cs, "bar", seq, []byte("msg"))
seq++
// Now timer should have been set again
if !isSet() {
t.Fatal("Timer should have been set")
Expand Down
8 changes: 4 additions & 4 deletions stores/common_test.go
Expand Up @@ -63,10 +63,10 @@ type testStore struct {
var (
testLogger logger.Logger
testStores = []*testStore{
&testStore{TypeMemory, false},
&testStore{TypeFile, true},
&testStore{TypeSQL, true},
&testStore{TypeRaft, false},
{TypeMemory, false},
{TypeFile, true},
{TypeSQL, true},
{TypeRaft, false},
}
testTimestampMu sync.Mutex
testLastTimestamp int64
Expand Down
4 changes: 2 additions & 2 deletions stores/filestore_msg_test.go
Expand Up @@ -1442,8 +1442,8 @@ type testFSGapsOption struct {
func testFSGetOptionsForGapsTests() []testFSGapsOption {
defaultOptions := DefaultFileStoreOptions
opts := []testFSGapsOption{
testFSGapsOption{"Default", AllOptions(&defaultOptions)},
testFSGapsOption{"NoBuffer", BufferSize(0)},
{"Default", AllOptions(&defaultOptions)},
{"NoBuffer", BufferSize(0)},
}
return opts
}
Expand Down
2 changes: 1 addition & 1 deletion stores/filestore_test.go
Expand Up @@ -1391,7 +1391,7 @@ func TestFSReadRecord(t *testing.T) {

buf := make([]byte, 5)
var retBuf []byte
recType := recNoType
var recType recordType
recSize := 0

// Reader returns an error
Expand Down
2 changes: 1 addition & 1 deletion stores/sqlstore.go
Expand Up @@ -504,7 +504,7 @@ func (s *SQLStore) updateDBLock() {

var (
ticker = time.NewTicker(sqlLockUpdateInterval)
hasLock = true
hasLock bool
err error
failed int
)
Expand Down
2 changes: 1 addition & 1 deletion util/channels.go
Expand Up @@ -24,7 +24,7 @@ import (
// Number of bytes used to encode a channel name
const encodedChannelLen = 2

// SendsChannelsList sends the list of channels to the given subject, possibly
// SendChannelsList sends the list of channels to the given subject, possibly
// splitting the list in several requests if it cannot fit in a single message.
func SendChannelsList(channels []string, sendInbox, replyInbox string, nc *nats.Conn, serverID string) error {
// Since the NATS message payload is limited, we need to repeat
Expand Down

0 comments on commit fcc014f

Please sign in to comment.