Skip to content

Commit

Permalink
storage: delete all "raft log as part of snapshots" code
Browse files Browse the repository at this point in the history
Raft log entries are no longer sent along with snapshots as of v19.2
(more specifically, cockroachdb#35701), we can/should delete all code around the
sending/handling of log entries in snapshots before v20.1 is cut. This
simplifies a few snapshot related protos and naturally obviates the need
to handle sideloaded proposals within snapshots.

Release note: None
  • Loading branch information
irfansharif committed Jan 31, 2020
1 parent c434a8d commit 814e918
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 864 deletions.
276 changes: 94 additions & 182 deletions pkg/storage/raft.pb.go

Large diffs are not rendered by default.

18 changes: 3 additions & 15 deletions pkg/storage/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -164,29 +164,17 @@ message SnapshotRequest {
// The type of the snapshot.
optional Type type = 9 [(gogoproto.nullable) = false];

// Whether the snapshot uses the unreplicated RaftTruncatedState or not.
// This is generally always true at 2.2 and above outside of the migration
// phase, though theoretically it could take a long time for all ranges
// to update to the new mechanism. This bool is true iff the Raft log at
// the snapshot's applied index is using the new key. In particular, it
// is true if the index itself carries out the migration (in which case
// the data in the snapshot contains neither key).
//
// See VersionUnreplicatedRaftTruncatedState.
optional bool unreplicated_truncated_state = 8 [(gogoproto.nullable) = false];
reserved 8;
}

optional Header header = 1;

// A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple request messages.
optional bytes kv_batch = 2 [(gogoproto.customname) = "KVBatch"];

// These are really raftpb.Entry, but we model them as raw bytes to avoid
// roundtripping through memory. They are separate from the kv_batch to
// allow flexibility in log implementations.
repeated bytes log_entries = 3;

optional bool final = 4 [(gogoproto.nullable) = false];

reserved 3;
}

message SnapshotResponse {
Expand Down
78 changes: 18 additions & 60 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1836,9 +1836,9 @@ func execChangeReplicasTxn(
// returns true, this is communicated back to the sender, which then proceeds to
// call `kvBatchSnapshotStrategy.Send`. This uses the iterator captured earlier
// to send the data in chunks, each chunk a streaming grpc message. The sender
// then sends a final message with an indicaton that it's done and blocks again,
// waiting for a second and final response from the recipient which indicates if
// the snapshot was a success.
// then sends a final message with an indication that it's done and blocks
// again, waiting for a second and final response from the recipient which
// indicates if the snapshot was a success.
//
// `receiveSnapshot` takes the key-value pairs sent and incrementally creates
// three SSTs from them for direct ingestion: one for the replicated range-ID
Expand All @@ -1857,13 +1857,13 @@ func execChangeReplicasTxn(
// snapshot* message is manually handed to the replica's Raft node (by calling
// `stepRaftGroup` + `handleRaftReadyRaftMuLocked`). During the application
// process, several other SSTs may be created for direct ingestion. An SST for
// the unreplicated range-ID local keys is created for the Raft entries, hard
// state, and truncated state. An SST is created for deleting each subsumed
// replica's range-ID local keys and at most two SSTs are created for deleting
// the user keys and range local keys of all subsumed replicas. All in all, a
// maximum of 6 + SR SSTs will be created for direct ingestion where SR is the
// number of subsumed replicas. In the case where there are no subsumed
// replicas, 4 SSTs will be created.
// the unreplicated range-ID local keys is created for the hard state and
// truncated state. An SST is created for deleting each subsumed replica's
// range-ID local keys and at most two SSTs are created for deleting the user
// keys and range local keys of all subsumed replicas. All in all, a maximum of
// 6 + SR SSTs will be created for direct ingestion where SR is the number of
// subsumed replicas. In the case where there are no subsumed replicas, 4 SSTs
// will be created.
//
// [1]: There is a third kind of snapshot, called "preemptive", which is how we
// avoided the above fragility before learner replicas were introduced in the
Expand Down Expand Up @@ -1922,42 +1922,16 @@ func (r *Replica) sendSnapshot(
return &benignError{errors.New("raft status not initialized")}
}

usesReplicatedTruncatedState, err := engine.MVCCGetProto(
ctx, snap.EngineSnap, keys.RaftTruncatedStateLegacyKey(r.RangeID), hlc.Timestamp{}, nil, engine.MVCCGetOptions{},
)
if err != nil {
return errors.Wrap(err, "loading legacy truncated state")
}

canAvoidSendingLog := !usesReplicatedTruncatedState &&
snap.State.TruncatedState.Index < snap.State.RaftAppliedIndex

if canAvoidSendingLog {
// If we're not using a legacy (replicated) truncated state, we avoid
// sending the (past) Raft log in the snapshot in the first place and
// send only those entries that are actually useful to the follower.
// This is done by changing the truncated state, which we're allowed
// to do since it is not a replicated key (and thus not subject to
// matching across replicas). The actual sending happens here:
_ = (*kvBatchSnapshotStrategy)(nil).Send
// and results in no log entries being sent at all. Note that
// Metadata.Index is really the applied index of the replica.
snap.State.TruncatedState = &roachpb.RaftTruncatedState{
Index: snap.RaftSnap.Metadata.Index,
Term: snap.RaftSnap.Metadata.Term,
}
// We update the ReplicaState in the snapshot to match the snapshot metadata
// provided to use by etcd/raft.
metadata := snap.RaftSnap.Metadata
snap.State.TruncatedState = &roachpb.RaftTruncatedState{
Index: metadata.Index,
Term: metadata.Term,
}

req := SnapshotRequest_Header{
State: snap.State,
// Tell the recipient whether it needs to synthesize the new
// unreplicated TruncatedState. It could tell by itself by peeking into
// the data, but it uses a write only batch for performance which
// doesn't support that; this is easier. Notably, this is true if the
// snap index itself is the one at which the migration happens.
//
// See VersionUnreplicatedRaftTruncatedState.
UnreplicatedTruncatedState: !usesReplicatedTruncatedState,
RaftMessageRequest: RaftMessageRequest{
RangeID: r.RangeID,
FromReplica: sender,
Expand All @@ -1980,25 +1954,9 @@ func (r *Replica) sendSnapshot(
sent := func() {
r.store.metrics.RangeSnapshotsGenerated.Inc(1)
}
if err := r.store.cfg.Transport.SendSnapshot(
ctx,
&r.store.cfg.RaftConfig,
r.store.allocator.storePool,
req,
snap,
r.store.Engine().NewBatch,
sent,
if err := r.store.cfg.Transport.SendSnapshot(ctx, &r.store.cfg.RaftConfig,
r.store.allocator.storePool, req, snap, r.store.Engine().NewBatch, sent,
); err != nil {
if errors.Cause(err) == errMalformedSnapshot {
tag := fmt.Sprintf("r%d_%s", r.RangeID, snap.SnapUUID.Short())
if dir, err := r.store.checkpoint(ctx, tag); err != nil {
log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err)
} else {
log.Warningf(ctx, "created checkpoint %s", dir)
}

log.Fatal(ctx, "malformed snapshot generated")
}
return &snapshotError{err}
}
return nil
Expand Down
138 changes: 17 additions & 121 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -415,20 +414,12 @@ func (r *Replica) GetSnapshot(

log.Eventf(ctx, "new engine snapshot for replica %s", r)

// Delegate to a static function to make sure that we do not depend
// on any indirect calls to r.store.Engine() (or other in-memory
// state of the Replica). Everything must come from the snapshot.
withSideloaded := func(fn func(SideloadStorage) error) error {
r.raftMu.Lock()
defer r.raftMu.Unlock()
return fn(r.raftMu.sideloaded)
}
// NB: We have Replica.mu read-locked, but we need it write-locked in order
// to use Replica.mu.stateLoader. This call is not performance sensitive, so
// create a new state loader.
snapData, err := snapshot(
ctx, snapUUID, stateloader.Make(rangeID), snapType,
snap, rangeID, r.store.raftEntryCache, withSideloaded, startKey,
snap, rangeID, r.store.raftEntryCache, startKey,
)
if err != nil {
log.Errorf(ctx, "error generating snapshot: %+v", err)
Expand All @@ -445,17 +436,10 @@ type OutgoingSnapshot struct {
SnapUUID uuid.UUID
// The Raft snapshot message to send. Contains SnapUUID as its data.
RaftSnap raftpb.Snapshot
// The RocksDB snapshot that will be streamed from.
EngineSnap engine.Reader
// The complete range iterator for the snapshot to stream.
Iter *rditer.ReplicaDataIterator
// The replica state within the snapshot.
State storagepb.ReplicaState
// Allows access the the original Replica's sideloaded storage. Note that
// this isn't a snapshot of the sideloaded storage congruent with EngineSnap
// or RaftSnap -- a log truncation could have removed files from the
// sideloaded storage in the meantime.
WithSideloaded func(func(SideloadStorage) error) error
State storagepb.ReplicaState
RaftEntryCache *raftentry.Cache
snapType SnapshotRequest_Type
onClose func()
Expand All @@ -468,7 +452,6 @@ func (s *OutgoingSnapshot) String() string {
// Close releases the resources associated with the snapshot.
func (s *OutgoingSnapshot) Close() {
s.Iter.Close()
s.EngineSnap.Close()
if s.onClose != nil {
s.onClose()
}
Expand All @@ -479,20 +462,9 @@ type IncomingSnapshot struct {
SnapUUID uuid.UUID
// The storage interface for the underlying SSTs.
SSTStorageScratch *SSTSnapshotStorageScratch
// The Raft log entries for this snapshot.
LogEntries [][]byte
// The replica state at the time the snapshot was generated (never nil).
State *storagepb.ReplicaState
//
// When true, this snapshot contains an unreplicated TruncatedState. When
// false, the TruncatedState is replicated (see the reference below) and the
// recipient must avoid also writing the unreplicated TruncatedState. The
// migration to an unreplicated TruncatedState will be carried out during
// the next log truncation (assuming cluster version is bumped at that
// point).
// See the comment on VersionUnreplicatedRaftTruncatedState for details.
UsesUnreplicatedTruncatedState bool
snapType SnapshotRequest_Type
State *storagepb.ReplicaState
snapType SnapshotRequest_Type
}

func (s *IncomingSnapshot) String() string {
Expand All @@ -509,7 +481,6 @@ func snapshot(
snap engine.Reader,
rangeID roachpb.RangeID,
eCache *raftentry.Cache,
withSideloaded func(func(SideloadStorage) error) error,
startKey roachpb.RKey,
) (OutgoingSnapshot, error) {
var desc roachpb.RangeDescriptor
Expand Down Expand Up @@ -549,8 +520,6 @@ func snapshot(

return OutgoingSnapshot{
RaftEntryCache: eCache,
WithSideloaded: withSideloaded,
EngineSnap: snap,
Iter: iter,
State: state,
SnapUUID: snapUUID,
Expand Down Expand Up @@ -738,10 +707,6 @@ func (r *Replica) applySnapshot(
log.Fatalf(ctx, "unexpected range ID %d", s.Desc.RangeID)
}

r.mu.RLock()
replicaID := r.mu.replicaID
r.mu.RUnlock()

snapType := inSnap.snapType
defer func() {
if err == nil {
Expand Down Expand Up @@ -809,6 +774,10 @@ func (r *Replica) applySnapshot(
inSnap.SnapUUID.Short(), snap.Metadata.Index)
}(timeutil.Now())

// We clear out all all existing raft log entries for the recipient replica
// and start afresh with the hard state, truncated state and snapshot
// key-space state prior to the truncated state index.

unreplicatedSSTFile := &engine.MemFile{}
unreplicatedSST := engine.MakeIngestionSSTWriter(unreplicatedSSTFile)
defer unreplicatedSST.Close()
Expand All @@ -826,45 +795,13 @@ func (r *Replica) applySnapshot(
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}

// Update Raft entries.
var lastTerm uint64
var raftLogSize int64
if len(inSnap.LogEntries) > 0 {
logEntries := make([]raftpb.Entry, len(inSnap.LogEntries))
for i, bytes := range inSnap.LogEntries {
if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil {
return err
}
}
// If this replica doesn't know its ReplicaID yet, we're applying a
// preemptive snapshot. In this case, we're going to have to write the
// sideloaded proposals into the Raft log. Otherwise, sideload.
if replicaID != 0 {
var err error
var sideloadedEntriesSize int64
logEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries)
if err != nil {
return err
}
raftLogSize += sideloadedEntriesSize
}
var err error
_, lastTerm, raftLogSize, err = r.append(ctx, &unreplicatedSST, 0, invalidLastTerm, raftLogSize, logEntries)
if err != nil {
return err
}
} else {
lastTerm = invalidLastTerm
}
var lastTerm uint64 = invalidLastTerm
r.store.raftEntryCache.Drop(r.RangeID)

// Update TruncatedState if it is unreplicated.
if inSnap.UsesUnreplicatedTruncatedState {
if err := r.raftMu.stateLoader.SetRaftTruncatedState(
ctx, &unreplicatedSST, s.TruncatedState,
); err != nil {
return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer")
}
if err := r.raftMu.stateLoader.SetRaftTruncatedState(
ctx, &unreplicatedSST, s.TruncatedState,
); err != nil {
return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer")
}

if err := unreplicatedSST.Finish(); err != nil {
Expand All @@ -877,32 +814,15 @@ func (r *Replica) applySnapshot(
return err
}
}

if s.TruncatedState.Index != snap.Metadata.Index {
log.Fatalf(ctx, "snapshot TruncatedState index %d doesn't match its metadata index %d",
s.TruncatedState.Index, snap.Metadata.Index)
}
if s.RaftAppliedIndex != snap.Metadata.Index {
log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d",
s.RaftAppliedIndex, snap.Metadata.Index)
}

if expLen := s.RaftAppliedIndex - s.TruncatedState.Index; expLen != uint64(len(inSnap.LogEntries)) {
entriesRange, err := extractRangeFromEntries(inSnap.LogEntries)
if err != nil {
return err
}

tag := fmt.Sprintf("r%d_%s", r.RangeID, inSnap.SnapUUID.String())
dir, err := r.store.checkpoint(ctx, tag)
if err != nil {
log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err)
} else {
log.Warningf(ctx, "created checkpoint %s", dir)
}

log.Fatalf(ctx, "missing log entries in snapshot (%s): got %d entries, expected %d "+
"(TruncatedState.Index=%d, HardState=%s, LogEntries=%s)",
inSnap.String(), len(inSnap.LogEntries), expLen, s.TruncatedState.Index,
hs.String(), entriesRange)
}

// If we're subsuming a replica below, we don't have its last NextReplicaID,
// nor can we obtain it. That's OK: we can just be conservative and use the
// maximum possible replica ID. preDestroyRaftMuLocked will write a replica
Expand Down Expand Up @@ -962,7 +882,6 @@ func (r *Replica) applySnapshot(
// raftpb.SnapshotMetadata.
r.mu.lastIndex = s.RaftAppliedIndex
r.mu.lastTerm = lastTerm
r.mu.raftLogSize = raftLogSize
// Update the store stats for the data in the snapshot.
r.store.metrics.subtractMVCCStats(*r.mu.state.Stats)
r.store.metrics.addMVCCStats(*s.Stats)
Expand Down Expand Up @@ -1137,29 +1056,6 @@ func (r *Replica) clearSubsumedReplicaInMemoryData(
return nil
}

// extractRangeFromEntries returns a string representation of the range of
// marshaled list of raft log entries in the form of [first-index, last-index].
// If the list is empty, "[n/a, n/a]" is returned instead.
func extractRangeFromEntries(logEntries [][]byte) (string, error) {
var firstIndex, lastIndex string
if len(logEntries) == 0 {
firstIndex = "n/a"
lastIndex = "n/a"
} else {
firstAndLastLogEntries := make([]raftpb.Entry, 2)
if err := protoutil.Unmarshal(logEntries[0], &firstAndLastLogEntries[0]); err != nil {
return "", err
}
if err := protoutil.Unmarshal(logEntries[len(logEntries)-1], &firstAndLastLogEntries[1]); err != nil {
return "", err
}

firstIndex = string(firstAndLastLogEntries[0].Index)
lastIndex = string(firstAndLastLogEntries[1].Index)
}
return fmt.Sprintf("[%s, %s]", firstIndex, lastIndex), nil
}

type raftCommandEncodingVersion byte

// Raft commands are encoded with a 1-byte version (currently 0 or 1), an 8-byte
Expand Down
Loading

0 comments on commit 814e918

Please sign in to comment.