From fd2a9e58bcb36e155350bfd3b842c4a9051e0db3 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 24 Apr 2024 17:02:55 -0400 Subject: [PATCH] loqrecovery: parallelize store fanout Closes #122639. This commit parallelizes the fanout of store iteration during the data collection phase of LoQ. This fanout is not limited, as we expect nodes to have a sufficiently high cpu-to-store ratio to handle the fanout. Release note: None --- pkg/kv/kvserver/loqrecovery/server.go | 30 ++++++++++++++++++++------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvserver/loqrecovery/server.go b/pkg/kv/kvserver/loqrecovery/server.go index da5462f7bfa7..52ec8da257f0 100644 --- a/pkg/kv/kvserver/loqrecovery/server.go +++ b/pkg/kv/kvserver/loqrecovery/server.go @@ -125,14 +125,28 @@ func (s Server) ServeLocalReplicas( stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer, ) error { v := s.settings.Version.ActiveVersion(ctx) - return s.stores.VisitStores(func(s *kvserver.Store) error { - reader := s.TODOEngine().NewSnapshot() - defer reader.Close() - return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(), v, - func(info loqrecoverypb.ReplicaInfo) error { - return stream.Send(&serverpb.RecoveryCollectLocalReplicaInfoResponse{ReplicaInfo: &info}) - }) - }) + var stores []*kvserver.Store + if err := s.stores.VisitStores(func(s *kvserver.Store) error { + stores = append(stores, s) + return nil + }); err != nil { + return err + } + syncStream := makeThreadSafeStream[*serverpb.RecoveryCollectLocalReplicaInfoResponse](stream) + stream = nil // prevent misuse + var g errgroup.Group + for _, s := range stores { + s := s // copy for closure + g.Go(func() error { + reader := s.TODOEngine().NewSnapshot() + defer reader.Close() + return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(), v, + func(info loqrecoverypb.ReplicaInfo) error { + return syncStream.Send(&serverpb.RecoveryCollectLocalReplicaInfoResponse{ReplicaInfo: &info}) + }) + }) + } + return g.Wait() } func (s Server) ServeClusterReplicas(