Skip to content

Commit

Permalink
loqrecovery: parallelize store fanout
Browse files Browse the repository at this point in the history
Closes cockroachdb#122639.

This commit parallelizes the fanout of store iteration during the data
collection phase of LoQ. This fanout is not limited, as we expect nodes
to have a sufficiently high cpu-to-store ratio to handle the fanout.

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 24, 2024
1 parent b921862 commit fd2a9e5
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions pkg/kv/kvserver/loqrecovery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,28 @@ func (s Server) ServeLocalReplicas(
stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer,
) error {
v := s.settings.Version.ActiveVersion(ctx)
return s.stores.VisitStores(func(s *kvserver.Store) error {
reader := s.TODOEngine().NewSnapshot()
defer reader.Close()
return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(), v,
func(info loqrecoverypb.ReplicaInfo) error {
return stream.Send(&serverpb.RecoveryCollectLocalReplicaInfoResponse{ReplicaInfo: &info})
})
})
var stores []*kvserver.Store
if err := s.stores.VisitStores(func(s *kvserver.Store) error {
stores = append(stores, s)
return nil
}); err != nil {
return err
}
syncStream := makeThreadSafeStream[*serverpb.RecoveryCollectLocalReplicaInfoResponse](stream)
stream = nil // prevent misuse
var g errgroup.Group
for _, s := range stores {
s := s // copy for closure
g.Go(func() error {
reader := s.TODOEngine().NewSnapshot()
defer reader.Close()
return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(), v,
func(info loqrecoverypb.ReplicaInfo) error {
return syncStream.Send(&serverpb.RecoveryCollectLocalReplicaInfoResponse{ReplicaInfo: &info})
})
})
}
return g.Wait()
}

func (s Server) ServeClusterReplicas(
Expand Down

0 comments on commit fd2a9e5

Please sign in to comment.