Skip to content

Commit

Permalink
Make sure to wait properly until we believe we are caught up to enabl…
Browse files Browse the repository at this point in the history
…e direct gets.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed May 16, 2023
1 parent 4feb7b9 commit b0340ce
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 19 deletions.
30 changes: 11 additions & 19 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2284,27 +2284,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Here we are checking if we are not the leader but we have been asked to allow
// direct access. We now allow non-leaders to participate in the queue group.
if !isLeader && mset != nil {
mset.mu.Lock()
// Check direct gets first.
if mset.cfg.AllowDirect {
if mset.directSub == nil && mset.isCurrent() {
mset.subscribeToDirect()
} else {
startDirectAccessMonitoring()
}
}
// Now check for mirror directs as well.
if mset.cfg.MirrorDirect {
if mset.mirror != nil && mset.mirror.dsub == nil && mset.isCurrent() {
mset.subscribeToMirrorDirect()
} else {
startDirectAccessMonitoring()
}
}
mset.mu.Unlock()
startDirectAccessMonitoring()
}

case <-datc:
if mset == nil || isRecovering {
return
}
// If we are leader we can stop, we know this is setup now.
if isLeader {
stopDirectMonitoring()
return
}

mset.mu.Lock()
ad, md, current := mset.cfg.AllowDirect, mset.cfg.MirrorDirect, mset.isCurrent()
if !current {
Expand All @@ -2328,7 +2320,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
mset.subscribeToMirrorDirect()
}
mset.mu.Unlock()
// Stop monitoring.
// Stop direct monitoring.
stopDirectMonitoring()

case <-t.C:
Expand Down
76 changes: 76 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4046,3 +4046,79 @@ func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) {
})
require_NoError(t, err)
}

// https://github.com/nats-io/nats-server/issues/4162
func TestJetStreamClusterStaleDirectGetOnRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "TEST",
Replicas: 3,
})
require_NoError(t, err)

_, err = kv.PutString("foo", "bar")
require_NoError(t, err)

// Close client in case we were connected to server below.
// We will recreate.
nc.Close()

// Shutdown a non-leader.
s := c.randomNonStreamLeader(globalAccountName, "KV_TEST")
s.Shutdown()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err = js.KeyValue("TEST")
require_NoError(t, err)

_, err = kv.PutString("foo", "baz")
require_NoError(t, err)

errCh := make(chan error, 100)
done := make(chan struct{})

go func() {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err := js.KeyValue("TEST")
if err != nil {
errCh <- err
return
}

for {
select {
case <-done:
return
default:
entry, err := kv.Get("foo")
if err != nil {
errCh <- err
return
}
if v := string(entry.Value()); v != "baz" {
errCh <- fmt.Errorf("Got wrong value: %q", v)
}
}
}
}()

// Restart
c.restartServer(s)
// Wait for a bit to make sure as this server participates in direct gets
// it does not server stale reads.
time.Sleep(2 * time.Second)
close(done)

if len(errCh) > 0 {
t.Fatalf("Expected no errors but got %v", <-errCh)
}
}

0 comments on commit b0340ce

Please sign in to comment.