diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 5aeb27f4dc4f..7868d266a823 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -58,7 +58,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -8130,253 +8129,6 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { } } -// TestReplicaRefreshMultiple tests an interaction between refreshing -// proposals after a new leader or ticks (which results in multiple -// copies in the log with the same lease index) and refreshing after -// an illegal lease index error (with a new lease index assigned). -// -// The setup here is rather artificial, but it represents something -// that can happen (very rarely) in the real world with multiple raft -// leadership transfers. -func TestReplicaRefreshMultiple(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - if useReproposalsV2 { - skip.IgnoreLintf(t, "TODO(tbg)") - } - - ctx := context.Background() - - const incCmdID = "deadbeef" - var incApplyCount int64 - tsc := TestStoreConfig(nil) - tsc.TestingKnobs.TestingPostApplyFilter = func(filterArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) { - if filterArgs.ForcedError == nil && filterArgs.CmdID == incCmdID { - atomic.AddInt64(&incApplyCount, 1) - } - return 0, nil - } - var tc testContext - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - tc.StartWithStoreConfig(ctx, t, stopper, tsc) - repl := tc.repl - - key := roachpb.Key("a") - - // Run a few commands first: This advances the lease index, which is - // necessary for the tricks we're going to play to induce failures - // (we need to be able to subtract from the current lease index - // without going below 0). - for i := 0; i < 3; i++ { - inc := incrementArgs(key, 1) - if _, pErr := kv.SendWrapped(ctx, tc.Sender(), inc); pErr != nil { - t.Fatal(pErr) - } - } - // Sanity check the resulting value. - get := getArgs(key) - if resp, pErr := kv.SendWrapped(ctx, tc.Sender(), &get); pErr != nil { - t.Fatal(pErr) - } else if x, err := resp.(*kvpb.GetResponse).Value.GetInt(); err != nil { - t.Fatalf("returned non-int: %+v", err) - } else if x != 3 { - t.Fatalf("expected 3, got %d", x) - } - - // Manually propose another increment. This is the one we'll - // manipulate into failing. (the use of increment here is not - // significant. I originally wrote it this way because I thought the - // non-idempotence of increment would make it easier to test, but - // since the reproposals we're concerned with don't result in - // reevaluation it doesn't matter) - inc := incrementArgs(key, 1) - ba := &kvpb.BatchRequest{} - ba.Add(inc) - ba.Timestamp = tc.Clock().Now() - - st := repl.CurrentLeaseStatus(ctx) - proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, allSpansGuard(), &st, uncertainty.Interval{}) - if pErr != nil { - t.Fatal(pErr) - } - // Save this channel; it may get reset to nil before we read from it. - proposalDoneCh := proposal.doneCh - - repl.mu.Lock() - ai := repl.mu.state.LeaseAppliedIndex - if ai <= 1 { - // Lease index zero is special in this test because we subtract - // from it below, so we need enough previous proposals in the - // log to ensure it doesn't go negative. - t.Fatalf("test requires LeaseAppliedIndex >= 2 at this point, have %d", ai) - } - assigned := false - repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) kvpb.LeaseAppliedIndex { - if p == proposal && !assigned { - assigned = true - t.Logf("assigned wrong LAI %d", ai-1) - return ai - 1 - } - return 0 - } - repl.mu.Unlock() - - // Propose the command manually with errors induced. The first time it is - // proposed it will be given the incorrect max lease index which ensures - // that it will generate a retry when it fails. Then call refreshProposals - // twice to repropose it and put it in the logs twice more. - proposal.command.ProposerLeaseSequence = repl.mu.state.Lease.Sequence - _, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - if pErr := repl.propose(ctx, proposal, tok); pErr != nil { - t.Fatal(pErr) - } - repl.mu.Lock() - if err := tc.repl.mu.proposalBuf.flushLocked(ctx); err != nil { - t.Fatal(err) - } - repl.refreshProposalsLocked(ctx, 0 /* refreshAtDelta */, reasonNewLeader) - repl.refreshProposalsLocked(ctx, 0 /* refreshAtDelta */, reasonNewLeader) - repl.mu.Unlock() - require.Zero(t, tc.repl.mu.proposalBuf.EvaluatingRequestsCount()) - - // Wait for our proposal to apply. The two refreshed proposals above - // will fail due to their illegal lease index. Then they'll generate - // a reproposal (in the bug that we're testing against, they'd - // *each* generate a reproposal). When this reproposal succeeds, the - // doneCh is signaled. - select { - case resp := <-proposalDoneCh: - if resp.Err != nil { - t.Fatal(resp.Err) - } - case <-time.After(5 * time.Second): - t.Fatal("timed out") - } - // In the buggy case, there's a second reproposal that we don't have - // a good way to observe, so just sleep to let it apply if it's in - // the system. - time.Sleep(10 * time.Millisecond) - - // The command applied exactly once. Note that this check would pass - // even in the buggy case, since illegal lease index proposals do - // not generate reevaluations (and increment is handled upstream of - // raft). - if resp, pErr := kv.SendWrapped(ctx, tc.Sender(), &get); pErr != nil { - t.Fatal(pErr) - } else if x, err := resp.(*kvpb.GetResponse).Value.GetInt(); err != nil { - t.Fatalf("returned non-int: %+v", err) - } else if x != 4 { - t.Fatalf("expected 4, got %d", x) - } - - // The real test: our apply filter can tell us whether there was a - // duplicate reproposal. (A reproposed increment isn't harmful, but - // some other commands could be) - if x := atomic.LoadInt64(&incApplyCount); x != 1 { - t.Fatalf("expected 1, got %d", x) - } -} - -// TestReplicaReproposalWithNewLeaseIndexError tests an interaction where a -// proposal is rejected beneath raft due an illegal lease index error and then -// hits an error when being reproposed. The expectation is that this error -// manages to make its way back to the client. -func TestReplicaReproposalWithNewLeaseIndexError(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - if useReproposalsV2 { - skip.IgnoreLint(t, "TODO(tbg)") - } - - ctx := context.Background() - var tc testContext - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - tc.Start(ctx, t, stopper) - - type magicKey struct{} - magicCtx := context.WithValue(ctx, magicKey{}, "foo") - - var curFlushAttempt, curInsertAttempt int32 // updated atomically - tc.repl.mu.Lock() - tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) kvpb.LeaseAppliedIndex { - if v := p.ctx.Value(magicKey{}); v != nil { - flushAttempts := atomic.AddInt32(&curFlushAttempt, 1) - switch flushAttempts { - case 1: - // This is the first time the command is being given a max lease - // applied index. Set the index to that of the recently applied - // write. Two requests can't have the same lease applied index, - // so this will cause it to be rejected beneath raft with an - // illegal lease index error. - wrongLeaseIndex := kvpb.LeaseAppliedIndex(1) - return wrongLeaseIndex - default: - // Unexpected. Asserted against below. - return 0 - } - } - return 0 - } - tc.repl.mu.proposalBuf.testing.insertFilter = func(p *ProposalData) error { - if v := p.ctx.Value(magicKey{}); v != nil { - curAttempt := atomic.AddInt32(&curInsertAttempt, 1) - switch curAttempt { - case 2: - // This is the second time the command is being given a max - // lease applied index, which should be after the command was - // rejected beneath raft. Return an error. We expect this error - // to propagate up through tryReproposeWithNewLeaseIndex and - // make it back to the client. - return errors.New("boom") - default: - // Unexpected. Asserted against below. - return nil - } - } - return nil - } - tc.repl.mu.Unlock() - - // Perform a few writes to advance the lease applied index. - const initCount = 3 - key := roachpb.Key("a") - for i := 0; i < initCount; i++ { - iArg := incrementArgs(key, 1) - if _, pErr := tc.SendWrapped(iArg); pErr != nil { - t.Fatal(pErr) - } - } - - // Perform a write that will first hit an illegal lease index error and - // will then hit the injected error when we attempt to repropose it. - ba := &kvpb.BatchRequest{} - iArg := incrementArgs(key, 10) - ba.Add(iArg) - if _, pErr := tc.Sender().Send(magicCtx, ba); pErr == nil { - t.Fatal("expected a non-nil error") - } else if !testutils.IsPError(pErr, "boom") { - t.Fatalf("unexpected error: %v", pErr) - } - // The command should have been inserted in the buffer exactly twice. - if exp, act := int32(2), atomic.LoadInt32(&curInsertAttempt); exp != act { - t.Fatalf("expected %d proposals, got %d", exp, act) - } - - // The command should not have applied. - gArgs := getArgs(key) - if reply, pErr := tc.SendWrapped(&gArgs); pErr != nil { - t.Fatal(pErr) - } else if v, err := reply.(*kvpb.GetResponse).Value.GetInt(); err != nil { - t.Fatal(err) - } else if v != initCount { - t.Fatalf("expected value of %d, found %d", initCount, v) - } -} - // Test that, if the Raft command resulting from EndTxn request fails to be // processed/apply, then the LocalResult associated with that command is // cleared.