diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel index 7f30694650fc..1155f12fb05a 100644 --- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel @@ -43,12 +43,14 @@ go_library( "//pkg/util/protoutil", "//pkg/util/retry", "//pkg/util/strutil", + "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_pebble//vfs", "@io_etcd_go_raft_v3//raftpb", "@org_golang_google_grpc//:go_default_library", + "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/kv/kvserver/loqrecovery/server.go b/pkg/kv/kvserver/loqrecovery/server.go index bdfdce802e03..4469c276b12e 100644 --- a/pkg/kv/kvserver/loqrecovery/server.go +++ b/pkg/kv/kvserver/loqrecovery/server.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "io" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -36,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" ) @@ -56,9 +58,11 @@ func IsRetryableError(err error) bool { return errors.Is(err, errMarkRetry) } -type visitNodeAdminFn func(ctx context.Context, retryOpts retry.Options, +type visitNodeAdminFn func(nodeID roachpb.NodeID, client serverpb.AdminClient) error + +type visitNodesAdminFn func(ctx context.Context, retryOpts retry.Options, maxConcurrency int32, nodeFilter func(nodeID roachpb.NodeID) bool, - visitor func(nodeID roachpb.NodeID, client serverpb.AdminClient) error, + visitor visitNodeAdminFn, ) error type visitNodeStatusFn func(ctx context.Context, nodeID roachpb.NodeID, retryOpts retry.Options, @@ -70,7 +74,7 @@ type Server struct { clusterIDContainer *base.ClusterIDContainer settings *cluster.Settings stores *kvserver.Stores - visitAdminNodes visitNodeAdminFn + visitAdminNodes visitNodesAdminFn visitStatusNode visitNodeStatusFn planStore PlanStore decommissionFn func(context.Context, roachpb.NodeID) error @@ -106,7 +110,7 @@ func NewServer( clusterIDContainer: rpcCtx.StorageClusterID, settings: settings, stores: stores, - visitAdminNodes: makeVisitAvailableNodes(g, loc, rpcCtx), + visitAdminNodes: makeVisitAvailableNodesInParallel(g, loc, rpcCtx), visitStatusNode: makeVisitNode(g, loc, rpcCtx), planStore: planStore, decommissionFn: decommission, @@ -133,7 +137,7 @@ func (s Server) ServeLocalReplicas( func (s Server) ServeClusterReplicas( ctx context.Context, - _ *serverpb.RecoveryCollectReplicaInfoRequest, + req *serverpb.RecoveryCollectReplicaInfoRequest, outStream serverpb.Admin_RecoveryCollectReplicaInfoServer, kvDB *kv.DB, ) (err error) { @@ -145,13 +149,11 @@ func (s Server) ServeClusterReplicas( return errors.Newf("loss of quorum recovery service requires cluster upgraded to 23.1") } - var ( - descriptors, nodes, replicas int - ) + var descriptors, nodes, replicas atomic.Int64 defer func() { if err == nil { - log.Infof(ctx, "streamed info: range descriptors %d, nodes %d, replica infos %d", descriptors, - nodes, replicas) + log.Infof(ctx, "streamed info: range descriptors %d, nodes %d, replica infos %d", + descriptors.Load(), nodes.Load(), replicas.Load()) } }() @@ -189,7 +191,7 @@ func (s Server) ServeClusterReplicas( }); err != nil { return err } - descriptors++ + descriptors.Add(1) } return nil }) @@ -206,56 +208,68 @@ func (s Server) ServeClusterReplicas( } // Stream local replica info from all nodes wrapping them in response stream. - return s.visitAdminNodes(ctx, + syncOutStream := makeThreadSafeStream[*serverpb.RecoveryCollectReplicaInfoResponse](outStream) + return s.visitAdminNodes( + ctx, fanOutConnectionRetryOptions, + req.MaxConcurrency, allNodes, - func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { - log.Infof(ctx, "trying to get info from node n%d", nodeID) - nodeReplicas := 0 - inStream, err := client.RecoveryCollectLocalReplicaInfo(ctx, - &serverpb.RecoveryCollectLocalReplicaInfoRequest{}) - if err != nil { - return errors.Mark(errors.Wrapf(err, - "failed retrieving replicas from node n%d during fan-out", nodeID), errMarkRetry) + serveClusterReplicasParallelFn(ctx, syncOutStream, s.forwardReplicaFilter, &replicas, &nodes)) +} + +func serveClusterReplicasParallelFn( + ctx context.Context, + outStream *threadSafeStream[*serverpb.RecoveryCollectReplicaInfoResponse], + forwardReplicaFilter func(*serverpb.RecoveryCollectLocalReplicaInfoResponse) error, + replicas, nodes *atomic.Int64, +) visitNodeAdminFn { + return func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { + log.Infof(ctx, "trying to get info from node n%d", nodeID) + var nodeReplicas int64 + inStream, err := client.RecoveryCollectLocalReplicaInfo(ctx, + &serverpb.RecoveryCollectLocalReplicaInfoRequest{}) + if err != nil { + return errors.Mark(errors.Wrapf(err, + "failed retrieving replicas from node n%d during fan-out", nodeID), errMarkRetry) + } + for { + r, err := inStream.Recv() + if err == io.EOF { + break } - for { - r, err := inStream.Recv() - if err == io.EOF { - break - } - if s.forwardReplicaFilter != nil { - err = s.forwardReplicaFilter(r) - } - if err != nil { - // Some replicas were already sent back, need to notify client of stream - // restart. - if err := outStream.Send(&serverpb.RecoveryCollectReplicaInfoResponse{ - Info: &serverpb.RecoveryCollectReplicaInfoResponse_NodeStreamRestarted{ - NodeStreamRestarted: &serverpb.RecoveryCollectReplicaRestartNodeStream{ - NodeID: nodeID, - }, - }, - }); err != nil { - return err - } - return errors.Mark(errors.Wrapf(err, - "failed retrieving replicas from node n%d during fan-out", - nodeID), errMarkRetry) - } + if forwardReplicaFilter != nil { + err = forwardReplicaFilter(r) + } + if err != nil { + // Some replicas were already sent back, need to notify client of stream + // restart. if err := outStream.Send(&serverpb.RecoveryCollectReplicaInfoResponse{ - Info: &serverpb.RecoveryCollectReplicaInfoResponse_ReplicaInfo{ - ReplicaInfo: r.ReplicaInfo, + Info: &serverpb.RecoveryCollectReplicaInfoResponse_NodeStreamRestarted{ + NodeStreamRestarted: &serverpb.RecoveryCollectReplicaRestartNodeStream{ + NodeID: nodeID, + }, }, }); err != nil { return err } - nodeReplicas++ + return errors.Mark(errors.Wrapf(err, + "failed retrieving replicas from node n%d during fan-out", + nodeID), errMarkRetry) + } + if err := outStream.Send(&serverpb.RecoveryCollectReplicaInfoResponse{ + Info: &serverpb.RecoveryCollectReplicaInfoResponse_ReplicaInfo{ + ReplicaInfo: r.ReplicaInfo, + }, + }); err != nil { + return err } + nodeReplicas++ + } - replicas += nodeReplicas - nodes++ - return nil - }) + replicas.Add(nodeReplicas) + nodes.Add(1) + return nil + } } func (s Server) StagePlan( @@ -297,84 +311,57 @@ func (s Server) StagePlan( if req.AllNodes { // Scan cluster for conflicting recovery plans and for stray nodes that are // planned for forced decommission, but rejoined cluster. - foundNodes := make(map[roachpb.NodeID]bool) + foundNodes := makeThreadSafeMap[roachpb.NodeID, bool]() err := s.visitAdminNodes( ctx, fanOutConnectionRetryOptions, + req.MaxConcurrency, allNodes, - func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { - res, err := client.RecoveryNodeStatus(ctx, &serverpb.RecoveryNodeStatusRequest{}) - if err != nil { - return errors.Mark(err, errMarkRetry) - } - // If operation fails here, we don't want to find all remaining - // violating nodes because cli must ensure that cluster is safe for - // staging. - if !req.ForcePlan && res.Status.PendingPlanID != nil && !res.Status.PendingPlanID.Equal(plan.PlanID) { - return errors.Newf("plan %s is already staged on node n%d", res.Status.PendingPlanID, nodeID) - } - foundNodes[nodeID] = true - return nil - }) + stagePlanRecoveryNodeStatusParallelFn(ctx, req, plan, foundNodes)) if err != nil { return nil, err } // Check that no nodes that must be decommissioned are present. for _, dID := range plan.DecommissionedNodeIDs { - if foundNodes[dID] { + if foundNodes.Get(dID) { return nil, errors.Newf("node n%d was planned for decommission, but is present in cluster", dID) } } // Check out that all nodes that should save plan are present. for _, u := range plan.Updates { - if !foundNodes[u.NodeID()] { + if !foundNodes.Get(u.NodeID()) { return nil, errors.Newf("node n%d has planned changed but is unreachable in the cluster", u.NodeID()) } } for _, n := range plan.StaleLeaseholderNodeIDs { - if !foundNodes[n] { + if !foundNodes.Get(n) { return nil, errors.Newf("node n%d has planned restart but is unreachable in the cluster", n) } } // Distribute plan - this should not use fan out to available, but use // list from previous step. - var nodeErrors []string + var nodeErrors threadSafeSlice[string] err = s.visitAdminNodes( ctx, fanOutConnectionRetryOptions, - onlyListed(foundNodes), - func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { - delete(foundNodes, nodeID) - res, err := client.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ - Plan: req.Plan, - AllNodes: false, - ForcePlan: req.ForcePlan, - ForceLocalInternalVersion: req.ForceLocalInternalVersion, - }) - if err != nil { - nodeErrors = append(nodeErrors, - errors.Wrapf(err, "failed staging the plan on node n%d", nodeID).Error()) - return nil - } - nodeErrors = append(nodeErrors, res.Errors...) - return nil - }) + req.MaxConcurrency, + onlyListed(foundNodes.Clone()), + stagePlanRecoveryStagePlanParallelFn(ctx, req, foundNodes, &nodeErrors)) if err != nil { - nodeErrors = append(nodeErrors, - errors.Wrapf(err, "failed to perform fan-out to cluster nodes from n%d", - localNodeID).Error()) + nodeErrors.Append( + errors.Wrapf(err, "failed to perform fan-out to cluster nodes from n%d", localNodeID).Error()) } - if len(foundNodes) > 0 { + if foundNodes.Len() > 0 { // We didn't talk to some of originally found nodes. Need to report // disappeared nodes as we don't know what is happening with the cluster. - for n := range foundNodes { - nodeErrors = append(nodeErrors, fmt.Sprintf("node n%d disappeared while performing plan staging operation", n)) + for n := range foundNodes.Clone() { + nodeErrors.Append(fmt.Sprintf("node n%d disappeared while performing plan staging operation", n)) } } - return &serverpb.RecoveryStagePlanResponse{Errors: nodeErrors}, nil + return &serverpb.RecoveryStagePlanResponse{Errors: nodeErrors.Clone()}, nil } log.Infof(ctx, "attempting to stage loss of quorum recovery plan") @@ -432,6 +419,53 @@ func (s Server) StagePlan( return &serverpb.RecoveryStagePlanResponse{}, nil } +func stagePlanRecoveryNodeStatusParallelFn( + ctx context.Context, + req *serverpb.RecoveryStagePlanRequest, + plan loqrecoverypb.ReplicaUpdatePlan, + foundNodes *threadSafeMap[roachpb.NodeID, bool], +) visitNodeAdminFn { + return func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { + res, err := client.RecoveryNodeStatus(ctx, &serverpb.RecoveryNodeStatusRequest{}) + if err != nil { + return errors.Mark(err, errMarkRetry) + } + // If operation fails here, we don't want to find all remaining + // violating nodes because cli must ensure that cluster is safe for + // staging. + if !req.ForcePlan && res.Status.PendingPlanID != nil && !res.Status.PendingPlanID.Equal(plan.PlanID) { + return errors.Newf("plan %s is already staged on node n%d", res.Status.PendingPlanID, nodeID) + } + foundNodes.Set(nodeID, true) + return nil + } +} + +func stagePlanRecoveryStagePlanParallelFn( + ctx context.Context, + req *serverpb.RecoveryStagePlanRequest, + foundNodes *threadSafeMap[roachpb.NodeID, bool], + nodeErrors *threadSafeSlice[string], +) visitNodeAdminFn { + return func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { + foundNodes.Delete(nodeID) + res, err := client.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: req.Plan, + AllNodes: false, + ForcePlan: req.ForcePlan, + ForceLocalInternalVersion: req.ForceLocalInternalVersion, + MaxConcurrency: req.MaxConcurrency, + }) + if err != nil { + nodeErrors.Append( + errors.Wrapf(err, "failed staging the plan on node n%d", nodeID).Error()) + return nil + } + nodeErrors.Append(res.Errors...) + return nil + } +} + func (s Server) NodeStatus( ctx context.Context, _ *serverpb.RecoveryNodeStatusRequest, ) (*serverpb.RecoveryNodeStatusResponse, error) { @@ -476,22 +510,13 @@ func (s Server) Verify( return nil, errors.Newf("loss of quorum recovery service requires cluster upgraded to 23.1") } - var nss []loqrecoverypb.NodeRecoveryStatus - err := s.visitAdminNodes(ctx, fanOutConnectionRetryOptions, + var nss threadSafeSlice[loqrecoverypb.NodeRecoveryStatus] + err := s.visitAdminNodes( + ctx, + fanOutConnectionRetryOptions, + req.MaxConcurrency, notListed(req.DecommissionedNodeIDs), - func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { - return timeutil.RunWithTimeout(ctx, fmt.Sprintf("retrieve status of n%d", nodeID), - retrieveNodeStatusTimeout, - func(ctx context.Context) error { - res, err := client.RecoveryNodeStatus(ctx, &serverpb.RecoveryNodeStatusRequest{}) - if err != nil { - return errors.Mark(errors.Wrapf(err, "failed to retrieve status of n%d", nodeID), - errMarkRetry) - } - nss = append(nss, res.Status) - return nil - }) - }) + verifyRecoveryNodeStatusParallelFn(ctx, &nss)) if err != nil { return nil, err } @@ -591,12 +616,30 @@ func (s Server) Verify( } return &serverpb.RecoveryVerifyResponse{ - Statuses: nss, + Statuses: nss.Clone(), DecommissionedNodeStatuses: decomStatus, UnavailableRanges: rangeHealth, }, nil } +func verifyRecoveryNodeStatusParallelFn( + ctx context.Context, nss *threadSafeSlice[loqrecoverypb.NodeRecoveryStatus], +) visitNodeAdminFn { + return func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { + return timeutil.RunWithTimeout(ctx, fmt.Sprintf("retrieve status of n%d", nodeID), + retrieveNodeStatusTimeout, + func(ctx context.Context) error { + res, err := client.RecoveryNodeStatus(ctx, &serverpb.RecoveryNodeStatusRequest{}) + if err != nil { + return errors.Mark(errors.Wrapf(err, "failed to retrieve status of n%d", nodeID), + errMarkRetry) + } + nss.Append(res.Status) + return nil + }) + } +} + func checkRangeHealth( ctx context.Context, d roachpb.RangeDescriptor, @@ -637,7 +680,8 @@ func checkRangeHealth( return loqrecoverypb.RangeHealth_LOSS_OF_QUORUM } -// makeVisitAvailableNodes creates a function to visit available remote nodes. +// makeVisitAvailableNodesInParallel creates a function to visit available +// remote nodes, in parallel. // // Returned function would dial all cluster nodes from gossip and executes // visitor function with admin client after connection is established. Function @@ -650,48 +694,19 @@ func checkRangeHealth( // // For latter, errors marked with errMarkRetry marker are retried. It is up // to the visitor to mark appropriate errors are retryable. -func makeVisitAvailableNodes( +// +// Nodes may be visited in parallel, so the visitor function must be safe for +// concurrent use. +func makeVisitAvailableNodesInParallel( g *gossip.Gossip, loc roachpb.Locality, rpcCtx *rpc.Context, -) visitNodeAdminFn { - return func(ctx context.Context, retryOpts retry.Options, +) visitNodesAdminFn { + return func( + ctx context.Context, + retryOpts retry.Options, + maxConcurrency int32, nodeFilter func(nodeID roachpb.NodeID) bool, - visitor func(nodeID roachpb.NodeID, client serverpb.AdminClient) error, + visitor visitNodeAdminFn, ) error { - visitWithRetry := func(node roachpb.NodeDescriptor) error { - var err error - for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { - log.Infof(ctx, "visiting node n%d, attempt %d", node.NodeID, r.CurrentAttempt()) - addr := node.AddressForLocality(loc) - var conn *grpc.ClientConn - // Note that we use ConnectNoBreaker here to avoid any race with probe - // running on current node and target node restarting. Errors from circuit - // breaker probes could confuse us and present node as unavailable. - conn, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, rpc.DefaultClass).ConnectNoBreaker(ctx) - // Nodes would contain dead nodes that we don't need to visit. We can skip - // them and let caller handle incomplete info. - if err != nil { - if grpcutil.IsConnectionUnavailable(err) { - log.Infof(ctx, "rejecting node n%d because of suspected un-retryable error: %s", - node.NodeID, err) - return nil - } - // This was an initial heartbeat type error, we must retry as node seems - // live. - continue - } - client := serverpb.NewAdminClient(conn) - err = visitor(node.NodeID, client) - if err == nil { - return nil - } - log.Infof(ctx, "failed calling a visitor for node n%d: %s", node.NodeID, err) - if !IsRetryableError(err) { - return err - } - } - return err - } - var nodes []roachpb.NodeDescriptor if err := g.IterateInfos(gossip.KeyNodeDescPrefix, func(key string, i gossip.Info) error { b, err := i.Value.GetBytes() @@ -715,13 +730,62 @@ func makeVisitAvailableNodes( return err } + var g errgroup.Group + if maxConcurrency == 0 { + // "A value of 0 disables concurrency." + maxConcurrency = 1 + } + g.SetLimit(int(maxConcurrency)) for _, node := range nodes { - if err := visitWithRetry(node); err != nil { - return err + node := node // copy for closure + g.Go(func() error { + return visitNodeWithRetry(ctx, loc, rpcCtx, retryOpts, visitor, node) + }) + } + return g.Wait() + } +} + +func visitNodeWithRetry( + ctx context.Context, + loc roachpb.Locality, + rpcCtx *rpc.Context, + retryOpts retry.Options, + visitor visitNodeAdminFn, + node roachpb.NodeDescriptor, +) error { + var err error + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + log.Infof(ctx, "visiting node n%d, attempt %d", node.NodeID, r.CurrentAttempt()) + addr := node.AddressForLocality(loc) + var conn *grpc.ClientConn + // Note that we use ConnectNoBreaker here to avoid any race with probe + // running on current node and target node restarting. Errors from circuit + // breaker probes could confuse us and present node as unavailable. + conn, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, rpc.DefaultClass).ConnectNoBreaker(ctx) + // Nodes would contain dead nodes that we don't need to visit. We can skip + // them and let caller handle incomplete info. + if err != nil { + if grpcutil.IsConnectionUnavailable(err) { + log.Infof(ctx, "rejecting node n%d because of suspected un-retryable error: %s", + node.NodeID, err) + return nil } + // This was an initial heartbeat type error, we must retry as node seems + // live. + continue + } + client := serverpb.NewAdminClient(conn) + err = visitor(node.NodeID, client) + if err == nil { + return nil + } + log.Infof(ctx, "failed calling a visitor for node n%d: %s", node.NodeID, err) + if !IsRetryableError(err) { + return err } - return nil } + return err } // makeVisitNode creates a function to visit a remote node. diff --git a/pkg/kv/kvserver/loqrecovery/testing_knobs.go b/pkg/kv/kvserver/loqrecovery/testing_knobs.go index 34bca894132d..ad84fa2907d8 100644 --- a/pkg/kv/kvserver/loqrecovery/testing_knobs.go +++ b/pkg/kv/kvserver/loqrecovery/testing_knobs.go @@ -23,6 +23,7 @@ type TestingKnobs struct { MetadataScanTimeout time.Duration // Replica filter for forwarded replica info when collecting fan-out data. + // It can be called concurrently, so must be safe for concurrent use. ForwardReplicaFilter func(*serverpb.RecoveryCollectLocalReplicaInfoResponse) error } diff --git a/pkg/kv/kvserver/loqrecovery/utils.go b/pkg/kv/kvserver/loqrecovery/utils.go index e13f28f4d742..213397f459e2 100644 --- a/pkg/kv/kvserver/loqrecovery/utils.go +++ b/pkg/kv/kvserver/loqrecovery/utils.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/strutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) type storeIDSet map[roachpb.StoreID]struct{} @@ -315,3 +316,90 @@ func (e *RecoveryError) ErrorDetail() string { } return strings.Join(descriptions, "\n") } + +// grpcStream is a specialization of a grpc.ServerStream for a specific message +// type. It is implemented by code generated gRPC server streams. +// +// As mentioned in the grpc.ServerStream documentation, it is not safe to call +// Send on the same stream in different goroutines. +type grpcStream[T any] interface { + Send(T) error +} + +// threadSafeStream wraps a grpcStream and provides basic thread safety. +type threadSafeStream[T any] struct { + mu syncutil.Mutex + s grpcStream[T] +} + +func makeThreadSafeStream[T any](s grpcStream[T]) *threadSafeStream[T] { + return &threadSafeStream[T]{s: s} +} + +func (s *threadSafeStream[T]) Send(t T) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.s.Send(t) +} + +// threadSafeMap wraps a map and provides basic thread safety. +type threadSafeMap[K comparable, V any] struct { + mu syncutil.Mutex + m map[K]V +} + +func makeThreadSafeMap[K comparable, V any]() *threadSafeMap[K, V] { + return &threadSafeMap[K, V]{m: make(map[K]V)} +} + +func (m *threadSafeMap[K, V]) Get(k K) V { + m.mu.Lock() + defer m.mu.Unlock() + return m.m[k] +} + +func (m *threadSafeMap[K, V]) Set(k K, v V) { + m.mu.Lock() + defer m.mu.Unlock() + m.m[k] = v +} + +func (m *threadSafeMap[K, V]) Delete(k K) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.m, k) +} + +func (m *threadSafeMap[K, V]) Len() int { + m.mu.Lock() + defer m.mu.Unlock() + return len(m.m) +} + +func (m *threadSafeMap[K, V]) Clone() map[K]V { + m.mu.Lock() + defer m.mu.Unlock() + clone := make(map[K]V, len(m.m)) + for k, v := range m.m { + clone[k] = v + } + return clone +} + +// threadSafeSlice wraps a slice and provides basic thread safety. +type threadSafeSlice[T any] struct { + mu syncutil.Mutex + s []T +} + +func (s *threadSafeSlice[T]) Append(t ...T) { + s.mu.Lock() + defer s.mu.Unlock() + s.s = append(s.s, t...) +} + +func (s *threadSafeSlice[T]) Clone() []T { + s.mu.Lock() + defer s.mu.Unlock() + return append([]T(nil), s.s...) +}