Skip to content

Commit

Permalink
Merge 30e91f5 into abca9ca
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Sep 28, 2021
2 parents abca9ca + 30e91f5 commit fe7b7c5
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 6 deletions.
120 changes: 120 additions & 0 deletions server/clustering_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"io/ioutil"
"math/rand"
"net"
"net/http"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -8382,3 +8383,122 @@ func TestClusteringSnapshotQSubLastSent(t *testing.T) {
// OK
}
}

func TestClusteringMonitorQueueLastSentAfterHBTimeout(t *testing.T) {
resetPreviousHTTPConnections()
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.ClientHBInterval = 100 * time.Millisecond
s1sOpts.ClientHBTimeout = 100 * time.Millisecond
s1sOpts.ClientHBFailCount = 5
s1nOpts := defaultMonitorOptions
s1 := runServerWithOpts(t, s1sOpts, &s1nOpts)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.ClientHBInterval = 100 * time.Millisecond
s2sOpts.ClientHBTimeout = 100 * time.Millisecond
s2sOpts.ClientHBFailCount = 5
s2nOpts := defaultMonitorOptions
s2nOpts.HTTPPort = monitorPort + 1
s2 := runServerWithOpts(t, s2sOpts, &s2nOpts)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.ClientHBInterval = 100 * time.Millisecond
s3sOpts.ClientHBTimeout = 100 * time.Millisecond
s3sOpts.ClientHBFailCount = 5
s3nOpts := defaultMonitorOptions
s3nOpts.HTTPPort = monitorPort + 2
s3 := runServerWithOpts(t, s3sOpts, &s3nOpts)
defer s3.Shutdown()

getLeader(t, 10*time.Second, s1, s2, s3)

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Unexpected error on connect: %v", err)
}
defer nc.Close()
sc, err := stan.Connect(clusterName, "instance1", stan.NatsConn(nc))
if err != nil {
t.Fatalf("Expected to connect correctly, got err %v", err)
}
defer sc.Close()

ch := make(chan bool, 1)
count := 0
if _, err := sc.QueueSubscribe("foo", "bar", func(m *stan.Msg) {
count++
if count == 3 {
ch <- true
}
}, stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

for i := 0; i < 3; i++ {
sc.Publish("foo", []byte("msg"))
}

if err := Wait(ch); err != nil {
t.Fatalf("Did not get all messages: %v", err)
}

checkLastSent := func() {
t.Helper()
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
for _, port := range []int{monitorPort, monitorPort + 1, monitorPort + 2} {
resp, body := getBodyEx(t, http.DefaultClient, "http", ChannelsPath+"?channel=foo&subs=1", port, http.StatusOK, expectedJSON)
defer resp.Body.Close()

cz := Channelz{}
if err := json.Unmarshal(body, &cz); err != nil {
return fmt.Errorf("Got an error unmarshalling the body: %v", err)
}
resp.Body.Close()

sub := cz.Subscriptions[0]
if sub.LastSent != 3 {
return fmt.Errorf("Unexpected last_sent: %v", sub.LastSent)
}
}
return nil
})
}

// Check that all see last_sent == 3
checkLastSent()

// Start a new connection and create the same queue durable
sc2 := NewDefaultConnection(t)
defer sc2.Close()
if _, err := sc2.QueueSubscribe("foo", "bar", func(m *stan.Msg) {},
stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

// Now close the underlying NATS connection to simulate loss of HB
// and for the server to close the connection.
nc.Close()

// Wait for the server to close the old client
waitForNumClients(t, s1, 1)
waitForNumClients(t, s2, 1)
waitForNumClients(t, s3, 1)

// Now make sure that all servers reflect that the sole queue member's
// last_sent is set to 3.
checkLastSent()
}
10 changes: 5 additions & 5 deletions server/monitor_test.go
Expand Up @@ -67,8 +67,8 @@ func runMonitorServer(t *testing.T, sOpts *Options) *StanServer {
return runServerWithOpts(t, sOpts, &nOpts)
}

func getBodyEx(t *testing.T, client *http.Client, scheme, endpoint string, expectedStatus int, expectedContentType string) (*http.Response, []byte) {
url := fmt.Sprintf("%s://%s:%d%s", scheme, monitorHost, monitorPort, endpoint)
func getBodyEx(t *testing.T, client *http.Client, scheme, endpoint string, mp, expectedStatus int, expectedContentType string) (*http.Response, []byte) {
url := fmt.Sprintf("%s://%s:%d%s", scheme, monitorHost, mp, endpoint)
resp, err := client.Get(url)
if err != nil {
stackFatalf(t, "Expected no error: Got %v\n", err)
Expand All @@ -89,7 +89,7 @@ func getBodyEx(t *testing.T, client *http.Client, scheme, endpoint string, expec
}

func getBody(t *testing.T, endpoint, expectedContentType string) (*http.Response, []byte) {
return getBodyEx(t, http.DefaultClient, "http", endpoint, http.StatusOK, expectedContentType)
return getBodyEx(t, http.DefaultClient, "http", endpoint, monitorPort, http.StatusOK, expectedContentType)
}

func monitorExpectStatusEx(t *testing.T, client *http.Client, scheme, endpoint string, expectedStatus int) {
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestMonitorStartOwnHTTPSServer(t *testing.T) {
transport := &http.Transport{TLSClientConfig: tlsConfig}
httpClient := &http.Client{Transport: transport}

r, _ := getBodyEx(t, httpClient, "https", RootPath, http.StatusOK, expectedText)
r, _ := getBodyEx(t, httpClient, "https", RootPath, monitorPort, http.StatusOK, expectedText)
r.Body.Close()
}

Expand Down Expand Up @@ -413,7 +413,7 @@ func TestMonitorIsFTActiveFTServer(t *testing.T) {
standby := runServerWithOpts(t, opts, sNOpts)
defer standby.Shutdown()

resp, _ := getBodyEx(t, http.DefaultClient, "http", IsFTActivePath, test.expectedStatus, "")
resp, _ := getBodyEx(t, http.DefaultClient, "http", IsFTActivePath, monitorPort, test.expectedStatus, "")
defer resp.Body.Close()
})
}
Expand Down
9 changes: 8 additions & 1 deletion server/server.go
Expand Up @@ -47,7 +47,7 @@ import (
// Server defaults.
const (
// VERSION is the current version for the NATS Streaming server.
VERSION = "0.22.1"
VERSION = "0.22.2-beta.1"

DefaultClusterID = "test-cluster"
DefaultDiscoverPrefix = "_STAN.discover"
Expand Down Expand Up @@ -1241,6 +1241,13 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
qsub.Lock()
qsub.LastSent = qs.lastSent
qsub.store.UpdateSub(&qsub.SubState)
// In cluster mode, let send a "sent" event for this queue sub so that
// followers can have an updated version of the last sent, which otherwise
// may stay at 0 until new messages are delivered in some cases.
// See https://github.com/nats-io/nats-streaming-server/issues/1189
if s.isClustered {
s.collectSentOrAck(qsub, true, qs.lastSent)
}
qsub.Unlock()
}
qs.Unlock()
Expand Down

0 comments on commit fe7b7c5

Please sign in to comment.