Skip to content

Commit

Permalink
storage: remove optimization to use in-memory RaftCommand when sidelo…
Browse files Browse the repository at this point in the history
…ading SSTs

Fixes cockroachdb#36861.

This optimization relied on the fact that `RaftCommands` in `Replica.mu.proposals`
were immutable over the lifetime of a Raft proposal. This invariant was violated
by cockroachdb#35261, which allowed a lease index error to trigger an immediate reproposal.
This reproposal mutated the corresponding `RaftCommand` in `Replica.mu.proposals`.
Combined with aliasing between multiple Raft proposals due to reproposals due to
ticks, this resulted in cases where a leaseholder's Raft logs could diverge from
its followers and cause Raft groups to become inconsistent.

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 18, 2019
1 parent b47269e commit d366383
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 105 deletions.
3 changes: 1 addition & 2 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,7 @@ type Replica struct {
//
// The *ProposalData in the map are "owned" by it. Elements from the
// map must only be referenced while Replica.mu is held, except if the
// element is removed from the map first. The notable exception is the
// contained RaftCommand, which we treat as immutable.
// element is removed from the map first.
proposals map[storagebase.CmdIDKey]*ProposalData
internalRaftGroup *raft.RawNode
// The ID of the replica within the Raft group. May be 0 if the replica has
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2052,6 +2052,8 @@ func (r *Replica) processRaftCommand(
// a new one. This is important for pipelined writes, since they
// don't have a client watching to retry, so a failure to
// eventually apply the proposal would be a user-visible error.
// TODO(nvanbenschoten): This reproposal is not tracked by the
// quota pool. We should fix that.
if proposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(proposal) {
return false
}
Expand Down
56 changes: 12 additions & 44 deletions pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/raftentry"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -70,36 +69,19 @@ type SideloadStorage interface {
func (r *Replica) maybeSideloadEntriesRaftMuLocked(
ctx context.Context, entriesToAppend []raftpb.Entry,
) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) {
// TODO(tschottdorf): allocating this closure could be expensive. If so make
// it a method on Replica.
maybeRaftCommand := func(cmdID storagebase.CmdIDKey) (storagepb.RaftCommand, bool) {
r.mu.Lock()
defer r.mu.Unlock()
cmd, ok := r.mu.proposals[cmdID]
if ok {
return *cmd.command, true
}
return storagepb.RaftCommand{}, false
}
return maybeSideloadEntriesImpl(ctx, entriesToAppend, r.raftMu.sideloaded, maybeRaftCommand)
return maybeSideloadEntriesImpl(ctx, entriesToAppend, r.raftMu.sideloaded)
}

// maybeSideloadEntriesImpl iterates through the provided slice of entries. If
// no sideloadable entries are found, it returns the same slice. Otherwise, it
// returns a new slice in which all applicable entries have been sideloaded to
// the specified SideloadStorage. maybeRaftCommand is called when sideloading is
// necessary and can optionally supply a pre-Unmarshaled RaftCommand (which
// usually is provided by the Replica in-flight proposal map.
// the specified SideloadStorage.
func maybeSideloadEntriesImpl(
ctx context.Context,
entriesToAppend []raftpb.Entry,
sideloaded SideloadStorage,
maybeRaftCommand func(storagebase.CmdIDKey) (storagepb.RaftCommand, bool),
ctx context.Context, entriesToAppend []raftpb.Entry, sideloaded SideloadStorage,
) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) {

cow := false
for i := range entriesToAppend {
var err error
if sniffSideloadedRaftCommand(entriesToAppend[i].Data) {
log.Event(ctx, "sideloading command in append")
if !cow {
Expand All @@ -112,31 +94,16 @@ func maybeSideloadEntriesImpl(

ent := &entriesToAppend[i]
cmdID, data := DecodeRaftCommand(ent.Data) // cheap
strippedCmd, ok := maybeRaftCommand(cmdID)
if ok {
// Happy case: we have this proposal locally (i.e. we proposed
// it). In this case, we can save unmarshalling the fat proposal
// because it's already in-memory.
if strippedCmd.ReplicatedEvalResult.AddSSTable == nil {
log.Fatalf(ctx, "encountered sideloaded non-AddSSTable command: %+v", strippedCmd)
}
log.Eventf(ctx, "command already in memory")
// The raft proposal is immutable. To respect that, shallow-copy
// the (nullable) AddSSTable struct which we intend to modify.
addSSTableCopy := *strippedCmd.ReplicatedEvalResult.AddSSTable
strippedCmd.ReplicatedEvalResult.AddSSTable = &addSSTableCopy
} else {
// Bad luck: we didn't have the proposal in-memory, so we'll
// have to unmarshal it.
log.Event(ctx, "proposal not already in memory; unmarshaling")
if err := protoutil.Unmarshal(data, &strippedCmd); err != nil {
return nil, 0, err
}

// Unmarshal the command into an object that we can mutate.
var strippedCmd storagepb.RaftCommand
if err := protoutil.Unmarshal(data, &strippedCmd); err != nil {
return nil, 0, err
}

if strippedCmd.ReplicatedEvalResult.AddSSTable == nil {
// Still no AddSSTable; someone must've proposed a v2 command
// but not becaused it contains an inlined SSTable. Strange, but
// but not because it contains an inlined SSTable. Strange, but
// let's be future proof.
log.Warning(ctx, "encountered sideloaded Raft command without inlined payload")
continue
Expand All @@ -146,8 +113,9 @@ func maybeSideloadEntriesImpl(
dataToSideload := strippedCmd.ReplicatedEvalResult.AddSSTable.Data
strippedCmd.ReplicatedEvalResult.AddSSTable.Data = nil

// Marshal the command and attach to the Raft entry.
{
data = make([]byte, raftCommandPrefixLen+strippedCmd.Size())
data := make([]byte, raftCommandPrefixLen+strippedCmd.Size())
encodeRaftCommandPrefix(data[:raftCommandPrefixLen], raftVersionSideloaded, cmdID)
_, err := protoutil.MarshalToWithoutFuzzing(&strippedCmd, data[raftCommandPrefixLen:])
if err != nil {
Expand All @@ -157,7 +125,7 @@ func maybeSideloadEntriesImpl(
}

log.Eventf(ctx, "writing payload at index=%d term=%d", ent.Index, ent.Term)
if err = sideloaded.Put(ctx, ent.Index, ent.Term, dataToSideload); err != nil {
if err := sideloaded.Put(ctx, ent.Index, ent.Term, dataToSideload); err != nil {
return nil, 0, err
}
sideloadedEntriesSize += int64(len(dataToSideload))
Expand Down
61 changes: 2 additions & 59 deletions pkg/storage/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,67 +569,9 @@ func TestRaftSSTableSideloadingInline(t *testing.T) {
}
}

func TestRaftSSTableSideloadingInflight(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), "test-recording")
defer cancel()

sideloaded := mustNewInMemSideloadStorage(roachpb.RangeID(5), roachpb.ReplicaID(7), ".")

// We'll set things up so that while sideloading this entry, there
// unmarshaled one is already in memory (so the payload here won't even be
// looked at).
preEnts := []raftpb.Entry{mkEnt(raftVersionSideloaded, 7, 1, &storagepb.ReplicatedEvalResult_AddSSTable{
Data: []byte("not the payload you're looking for"),
CRC32: 0, // not checked
})}

origBytes := []byte("compare me")

// Pretend there's an inflight command that actually has an SSTable in it.
var pendingCmd storagepb.RaftCommand
pendingCmd.ReplicatedEvalResult.AddSSTable = &storagepb.ReplicatedEvalResult_AddSSTable{
Data: origBytes, CRC32: 0, // not checked
}
maybeCmd := func(cmdID storagebase.CmdIDKey) (storagepb.RaftCommand, bool) {
return pendingCmd, true
}

// The entry should be recognized as "to be sideloaded", then maybeCmd is
// invoked and supplies the RaftCommand, whose SSTable is then persisted.
postEnts, size, err := maybeSideloadEntriesImpl(ctx, preEnts, sideloaded, maybeCmd)
if err != nil {
t.Fatal(err)
}

if len(postEnts) != 1 {
t.Fatalf("expected exactly one entry: %+v", postEnts)
}
if size != int64(len(origBytes)) {
t.Fatalf("expected %d sideloadedSize, but found %d", len(origBytes), size)
}

if b, err := sideloaded.Get(ctx, preEnts[0].Index, preEnts[0].Term); err != nil {
t.Fatal(err)
} else if !bytes.Equal(b, origBytes) {
t.Fatalf("expected payload %s, got %s", origBytes, b)
}

re := regexp.MustCompile(`(?ms)copying entries slice of length 1.*command already in memory.*writing payload`)
if trace := tracing.FormatRecordedSpans(collect()); !re.MatchString(trace) {
t.Fatalf("trace did not match %s:\n%s", re, trace)
}
}

func TestRaftSSTableSideloadingSideload(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
noCmd := func(storagebase.CmdIDKey) (cmd storagepb.RaftCommand, ok bool) {
return
}

addSST := storagepb.ReplicatedEvalResult_AddSSTable{
Data: []byte("foo"), CRC32: 0, // not checked
}
Expand Down Expand Up @@ -684,8 +626,9 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) {

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
sideloaded := mustNewInMemSideloadStorage(roachpb.RangeID(3), roachpb.ReplicaID(17), ".")
postEnts, size, err := maybeSideloadEntriesImpl(ctx, test.preEnts, sideloaded, noCmd)
postEnts, size, err := maybeSideloadEntriesImpl(ctx, test.preEnts, sideloaded)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit d366383

Please sign in to comment.