diff --git a/pkg/kv/kvserver/loqrecovery/server.go b/pkg/kv/kvserver/loqrecovery/server.go index da5462f7bfa7..52ec8da257f0 100644 --- a/pkg/kv/kvserver/loqrecovery/server.go +++ b/pkg/kv/kvserver/loqrecovery/server.go @@ -125,14 +125,28 @@ func (s Server) ServeLocalReplicas( stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer, ) error { v := s.settings.Version.ActiveVersion(ctx) - return s.stores.VisitStores(func(s *kvserver.Store) error { - reader := s.TODOEngine().NewSnapshot() - defer reader.Close() - return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(), v, - func(info loqrecoverypb.ReplicaInfo) error { - return stream.Send(&serverpb.RecoveryCollectLocalReplicaInfoResponse{ReplicaInfo: &info}) - }) - }) + var stores []*kvserver.Store + if err := s.stores.VisitStores(func(s *kvserver.Store) error { + stores = append(stores, s) + return nil + }); err != nil { + return err + } + syncStream := makeThreadSafeStream[*serverpb.RecoveryCollectLocalReplicaInfoResponse](stream) + stream = nil // prevent misuse + var g errgroup.Group + for _, s := range stores { + s := s // copy for closure + g.Go(func() error { + reader := s.TODOEngine().NewSnapshot() + defer reader.Close() + return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(), v, + func(info loqrecoverypb.ReplicaInfo) error { + return syncStream.Send(&serverpb.RecoveryCollectLocalReplicaInfoResponse{ReplicaInfo: &info}) + }) + }) + } + return g.Wait() } func (s Server) ServeClusterReplicas(