Skip to content

Commit

Permalink
loqrecovery: permit out of order ReplicaInfos in CollectRemoteReplica…
Browse files Browse the repository at this point in the history
…Info

Informs cockroachdb#122639.

This commit updates `CollectRemoteReplicaInfo` to permit out-of-order
ReplicaInfo responses, instead of assuming that the responses are in
order by node ID. This is a precursor to being able to perform a
parallel fanout in RecoveryCollectReplicaInfo.

The migration story for this is simple — this is only needed if the
MaxParallelism passed to RecoveryCollectReplicaInfo is greater than 1.
Old versions of the cockroachdb binary that can not tolerate out of
order responses will always pass a MaxParallelism of 0, so the
assumption that they are making about in-order responses will still
hold.

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 24, 2024
1 parent fac74db commit e04d53d
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions pkg/kv/kvserver/loqrecovery/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"io"
"math"
"sort"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -46,45 +47,45 @@ func CollectRemoteReplicaInfo(
}
stores := make(map[roachpb.StoreID]struct{})
nodes := make(map[roachpb.NodeID]struct{})
replInfoMap := make(map[roachpb.NodeID][]loqrecoverypb.ReplicaInfo)
var descriptors []roachpb.RangeDescriptor
var clusterReplInfo []loqrecoverypb.NodeReplicaInfo
var nodeReplicas []loqrecoverypb.ReplicaInfo
var currentNode roachpb.NodeID
var metadata loqrecoverypb.ClusterMetadata
for {
info, err := cc.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
if len(nodeReplicas) > 0 {
clusterReplInfo = append(clusterReplInfo, loqrecoverypb.NodeReplicaInfo{Replicas: nodeReplicas})
}
break
}
return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err
}
if r := info.GetReplicaInfo(); r != nil {
if currentNode != r.NodeID {
currentNode = r.NodeID
if len(nodeReplicas) > 0 {
clusterReplInfo = append(clusterReplInfo, loqrecoverypb.NodeReplicaInfo{Replicas: nodeReplicas})
nodeReplicas = nil
}
}
nodeReplicas = append(nodeReplicas, *r)
stores[r.StoreID] = struct{}{}
nodes[r.NodeID] = struct{}{}
replInfoMap[r.NodeID] = append(replInfoMap[r.NodeID], *r)
} else if d := info.GetRangeDescriptor(); d != nil {
descriptors = append(descriptors, *d)
} else if s := info.GetNodeStreamRestarted(); s != nil {
// If server had to restart a fan-out work because of error and retried,
// then we discard partial data for the node.
if s.NodeID == currentNode {
nodeReplicas = nil
}
delete(replInfoMap, s.NodeID)
} else if m := info.GetMetadata(); m != nil {
metadata = *m
} else {
return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, errors.AssertionFailedf(
"unknown info type: %T", info.GetInfo())
}
}
// Collapse the ReplicaInfos map into a slice, sorted by node ID.
replInfos := make([]loqrecoverypb.NodeReplicaInfo, 0, len(replInfoMap))
for _, replInfo := range replInfoMap {
if len(replInfo) == 0 {
continue
}
replInfos = append(replInfos, loqrecoverypb.NodeReplicaInfo{Replicas: replInfo})
}
sort.Slice(replInfos, func(i, j int) bool {
return replInfos[i].Replicas[0].NodeID < replInfos[j].Replicas[0].NodeID
})
// We don't want to process data outside of safe version range for this CLI
// binary. RPC allows us to communicate with a cluster that is newer than
// the binary, but it will not version gate the data to binary version so we
Expand All @@ -96,7 +97,7 @@ func CollectRemoteReplicaInfo(
return loqrecoverypb.ClusterReplicaInfo{
ClusterID: metadata.ClusterID,
Descriptors: descriptors,
LocalInfo: clusterReplInfo,
LocalInfo: replInfos,
Version: metadata.Version,
}, CollectionStats{
Nodes: len(nodes),
Expand Down

0 comments on commit e04d53d

Please sign in to comment.