From 86eba9e024f9b34a96b591e430b4017533ae858c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 24 Apr 2024 17:01:36 -0400 Subject: [PATCH] loqrecovery: parallelize node fanout Informs #122639. This commit uses the MaxConcurrency parameters added in the previous commits to parallelize the fanout of RPCs to nodes in the cluster during the data collection phase of LoQ. Release note: None --- pkg/kv/kvserver/loqrecovery/BUILD.bazel | 2 + pkg/kv/kvserver/loqrecovery/server.go | 370 +++++++++++-------- pkg/kv/kvserver/loqrecovery/testing_knobs.go | 1 + pkg/kv/kvserver/loqrecovery/utils.go | 88 +++++ 4 files changed, 308 insertions(+), 153 deletions(-) diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel index 7f30694650fc..1155f12fb05a 100644 --- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel @@ -43,12 +43,14 @@ go_library( "//pkg/util/protoutil", "//pkg/util/retry", "//pkg/util/strutil", + "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_pebble//vfs", "@io_etcd_go_raft_v3//raftpb", "@org_golang_google_grpc//:go_default_library", + "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/kv/kvserver/loqrecovery/server.go b/pkg/kv/kvserver/loqrecovery/server.go index bdfdce802e03..4469c276b12e 100644 --- a/pkg/kv/kvserver/loqrecovery/server.go +++ b/pkg/kv/kvserver/loqrecovery/server.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "io" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -36,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" ) @@ -56,9 +58,11 @@ func IsRetryableError(err error) bool { return errors.Is(err, errMarkRetry) } -type visitNodeAdminFn func(ctx context.Context, retryOpts retry.Options, +type visitNodeAdminFn func(nodeID roachpb.NodeID, client serverpb.AdminClient) error + +type visitNodesAdminFn func(ctx context.Context, retryOpts retry.Options, maxConcurrency int32, nodeFilter func(nodeID roachpb.NodeID) bool, - visitor func(nodeID roachpb.NodeID, client serverpb.AdminClient) error, + visitor visitNodeAdminFn, ) error type visitNodeStatusFn func(ctx context.Context, nodeID roachpb.NodeID, retryOpts retry.Options, @@ -70,7 +74,7 @@ type Server struct { clusterIDContainer *base.ClusterIDContainer settings *cluster.Settings stores *kvserver.Stores - visitAdminNodes visitNodeAdminFn + visitAdminNodes visitNodesAdminFn visitStatusNode visitNodeStatusFn planStore PlanStore decommissionFn func(context.Context, roachpb.NodeID) error @@ -106,7 +110,7 @@ func NewServer( clusterIDContainer: rpcCtx.StorageClusterID, settings: settings, stores: stores, - visitAdminNodes: makeVisitAvailableNodes(g, loc, rpcCtx), + visitAdminNodes: makeVisitAvailableNodesInParallel(g, loc, rpcCtx), visitStatusNode: makeVisitNode(g, loc, rpcCtx), planStore: planStore, decommissionFn: decommission, @@ -133,7 +137,7 @@ func (s Server) ServeLocalReplicas( func (s Server) ServeClusterReplicas( ctx context.Context, - _ *serverpb.RecoveryCollectReplicaInfoRequest, + req *serverpb.RecoveryCollectReplicaInfoRequest, outStream serverpb.Admin_RecoveryCollectReplicaInfoServer, kvDB *kv.DB, ) (err error) { @@ -145,13 +149,11 @@ func (s Server) ServeClusterReplicas( return errors.Newf("loss of quorum recovery service requires cluster upgraded to 23.1") } - var ( - descriptors, nodes, replicas int - ) + var descriptors, nodes, replicas atomic.Int64 defer func() { if err == nil { - log.Infof(ctx, "streamed info: range descriptors %d, nodes %d, replica infos %d", descriptors, - nodes, replicas) + log.Infof(ctx, "streamed info: range descriptors %d, nodes %d, replica infos %d", + descriptors.Load(), nodes.Load(), replicas.Load()) } }() @@ -189,7 +191,7 @@ func (s Server) ServeClusterReplicas( }); err != nil { return err } - descriptors++ + descriptors.Add(1) } return nil }) @@ -206,56 +208,68 @@ func (s Server) ServeClusterReplicas( } // Stream local replica info from all nodes wrapping them in response stream. - return s.visitAdminNodes(ctx, + syncOutStream := makeThreadSafeStream[*serverpb.RecoveryCollectReplicaInfoResponse](outStream) + return s.visitAdminNodes( + ctx, fanOutConnectionRetryOptions, + req.MaxConcurrency, allNodes, - func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { - log.Infof(ctx, "trying to get info from node n%d", nodeID) - nodeReplicas := 0 - inStream, err := client.RecoveryCollectLocalReplicaInfo(ctx, - &serverpb.RecoveryCollectLocalReplicaInfoRequest{}) - if err != nil { - return errors.Mark(errors.Wrapf(err, - "failed retrieving replicas from node n%d during fan-out", nodeID), errMarkRetry) + serveClusterReplicasParallelFn(ctx, syncOutStream, s.forwardReplicaFilter, &replicas, &nodes)) +} + +func serveClusterReplicasParallelFn( + ctx context.Context, + outStream *threadSafeStream[*serverpb.RecoveryCollectReplicaInfoResponse], + forwardReplicaFilter func(*serverpb.RecoveryCollectLocalReplicaInfoResponse) error, + replicas, nodes *atomic.Int64, +) visitNodeAdminFn { + return func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { + log.Infof(ctx, "trying to get info from node n%d", nodeID) + var nodeReplicas int64 + inStream, err := client.RecoveryCollectLocalReplicaInfo(ctx, + &serverpb.RecoveryCollectLocalReplicaInfoRequest{}) + if err != nil { + return errors.Mark(errors.Wrapf(err, + "failed retrieving replicas from node n%d during fan-out", nodeID), errMarkRetry) + } + for { + r, err := inStream.Recv() + if err == io.EOF { + break } - for { - r, err := inStream.Recv() - if err == io.EOF { - break - } - if s.forwardReplicaFilter != nil { - err = s.forwardReplicaFilter(r) - } - if err != nil { - // Some replicas were already sent back, need to notify client of stream - // restart. - if err := outStream.Send(&serverpb.RecoveryCollectReplicaInfoResponse{ - Info: &serverpb.RecoveryCollectReplicaInfoResponse_NodeStreamRestarted{ - NodeStreamRestarted: &serverpb.RecoveryCollectReplicaRestartNodeStream{ - NodeID: nodeID, - }, - }, - }); err != nil { - return err - } - return errors.Mark(errors.Wrapf(err, - "failed retrieving replicas from node n%d during fan-out", - nodeID), errMarkRetry) - } + if forwardReplicaFilter != nil { + err = forwardReplicaFilter(r) + } + if err != nil { + // Some replicas were already sent back, need to notify client of stream + // restart. if err := outStream.Send(&serverpb.RecoveryCollectReplicaInfoResponse{ - Info: &serverpb.RecoveryCollectReplicaInfoResponse_ReplicaInfo{ - ReplicaInfo: r.ReplicaInfo, + Info: &serverpb.RecoveryCollectReplicaInfoResponse_NodeStreamRestarted{ + NodeStreamRestarted: &serverpb.RecoveryCollectReplicaRestartNodeStream{ + NodeID: nodeID, + }, }, }); err != nil { return err } - nodeReplicas++ + return errors.Mark(errors.Wrapf(err, + "failed retrieving replicas from node n%d during fan-out", + nodeID), errMarkRetry) + } + if err := outStream.Send(&serverpb.RecoveryCollectReplicaInfoResponse{ + Info: &serverpb.RecoveryCollectReplicaInfoResponse_ReplicaInfo{ + ReplicaInfo: r.ReplicaInfo, + }, + }); err != nil { + return err } + nodeReplicas++ + } - replicas += nodeReplicas - nodes++ - return nil - }) + replicas.Add(nodeReplicas) + nodes.Add(1) + return nil + } } func (s Server) StagePlan( @@ -297,84 +311,57 @@ func (s Server) StagePlan( if req.AllNodes { // Scan cluster for conflicting recovery plans and for stray nodes that are // planned for forced decommission, but rejoined cluster. - foundNodes := make(map[roachpb.NodeID]bool) + foundNodes := makeThreadSafeMap[roachpb.NodeID, bool]() err := s.visitAdminNodes( ctx, fanOutConnectionRetryOptions, + req.MaxConcurrency, allNodes, - func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { - res, err := client.RecoveryNodeStatus(ctx, &serverpb.RecoveryNodeStatusRequest{}) - if err != nil { - return errors.Mark(err, errMarkRetry) - } - // If operation fails here, we don't want to find all remaining - // violating nodes because cli must ensure that cluster is safe for - // staging. - if !req.ForcePlan && res.Status.PendingPlanID != nil && !res.Status.PendingPlanID.Equal(plan.PlanID) { - return errors.Newf("plan %s is already staged on node n%d", res.Status.PendingPlanID, nodeID) - } - foundNodes[nodeID] = true - return nil - }) + stagePlanRecoveryNodeStatusParallelFn(ctx, req, plan, foundNodes)) if err != nil { return nil, err } // Check that no nodes that must be decommissioned are present. for _, dID := range plan.DecommissionedNodeIDs { - if foundNodes[dID] { + if foundNodes.Get(dID) { return nil, errors.Newf("node n%d was planned for decommission, but is present in cluster", dID) } } // Check out that all nodes that should save plan are present. for _, u := range plan.Updates { - if !foundNodes[u.NodeID()] { + if !foundNodes.Get(u.NodeID()) { return nil, errors.Newf("node n%d has planned changed but is unreachable in the cluster", u.NodeID()) } } for _, n := range plan.StaleLeaseholderNodeIDs { - if !foundNodes[n] { + if !foundNodes.Get(n) { return nil, errors.Newf("node n%d has planned restart but is unreachable in the cluster", n) } } // Distribute plan - this should not use fan out to available, but use // list from previous step. - var nodeErrors []string + var nodeErrors threadSafeSlice[string] err = s.visitAdminNodes( ctx, fanOutConnectionRetryOptions, - onlyListed(foundNodes), - func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { - delete(foundNodes, nodeID) - res, err := client.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ - Plan: req.Plan, - AllNodes: false, - ForcePlan: req.ForcePlan, - ForceLocalInternalVersion: req.ForceLocalInternalVersion, - }) - if err != nil { - nodeErrors = append(nodeErrors, - errors.Wrapf(err, "failed staging the plan on node n%d", nodeID).Error()) - return nil - } - nodeErrors = append(nodeErrors, res.Errors...) - return nil - }) + req.MaxConcurrency, + onlyListed(foundNodes.Clone()), + stagePlanRecoveryStagePlanParallelFn(ctx, req, foundNodes, &nodeErrors)) if err != nil { - nodeErrors = append(nodeErrors, - errors.Wrapf(err, "failed to perform fan-out to cluster nodes from n%d", - localNodeID).Error()) + nodeErrors.Append( + errors.Wrapf(err, "failed to perform fan-out to cluster nodes from n%d", localNodeID).Error()) } - if len(foundNodes) > 0 { + if foundNodes.Len() > 0 { // We didn't talk to some of originally found nodes. Need to report // disappeared nodes as we don't know what is happening with the cluster. - for n := range foundNodes { - nodeErrors = append(nodeErrors, fmt.Sprintf("node n%d disappeared while performing plan staging operation", n)) + for n := range foundNodes.Clone() { + nodeErrors.Append(fmt.Sprintf("node n%d disappeared while performing plan staging operation", n)) } } - return &serverpb.RecoveryStagePlanResponse{Errors: nodeErrors}, nil + return &serverpb.RecoveryStagePlanResponse{Errors: nodeErrors.Clone()}, nil } log.Infof(ctx, "attempting to stage loss of quorum recovery plan") @@ -432,6 +419,53 @@ func (s Server) StagePlan( return &serverpb.RecoveryStagePlanResponse{}, nil } +func stagePlanRecoveryNodeStatusParallelFn( + ctx context.Context, + req *serverpb.RecoveryStagePlanRequest, + plan loqrecoverypb.ReplicaUpdatePlan, + foundNodes *threadSafeMap[roachpb.NodeID, bool], +) visitNodeAdminFn { + return func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { + res, err := client.RecoveryNodeStatus(ctx, &serverpb.RecoveryNodeStatusRequest{}) + if err != nil { + return errors.Mark(err, errMarkRetry) + } + // If operation fails here, we don't want to find all remaining + // violating nodes because cli must ensure that cluster is safe for + // staging. + if !req.ForcePlan && res.Status.PendingPlanID != nil && !res.Status.PendingPlanID.Equal(plan.PlanID) { + return errors.Newf("plan %s is already staged on node n%d", res.Status.PendingPlanID, nodeID) + } + foundNodes.Set(nodeID, true) + return nil + } +} + +func stagePlanRecoveryStagePlanParallelFn( + ctx context.Context, + req *serverpb.RecoveryStagePlanRequest, + foundNodes *threadSafeMap[roachpb.NodeID, bool], + nodeErrors *threadSafeSlice[string], +) visitNodeAdminFn { + return func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { + foundNodes.Delete(nodeID) + res, err := client.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: req.Plan, + AllNodes: false, + ForcePlan: req.ForcePlan, + ForceLocalInternalVersion: req.ForceLocalInternalVersion, + MaxConcurrency: req.MaxConcurrency, + }) + if err != nil { + nodeErrors.Append( + errors.Wrapf(err, "failed staging the plan on node n%d", nodeID).Error()) + return nil + } + nodeErrors.Append(res.Errors...) + return nil + } +} + func (s Server) NodeStatus( ctx context.Context, _ *serverpb.RecoveryNodeStatusRequest, ) (*serverpb.RecoveryNodeStatusResponse, error) { @@ -476,22 +510,13 @@ func (s Server) Verify( return nil, errors.Newf("loss of quorum recovery service requires cluster upgraded to 23.1") } - var nss []loqrecoverypb.NodeRecoveryStatus - err := s.visitAdminNodes(ctx, fanOutConnectionRetryOptions, + var nss threadSafeSlice[loqrecoverypb.NodeRecoveryStatus] + err := s.visitAdminNodes( + ctx, + fanOutConnectionRetryOptions, + req.MaxConcurrency, notListed(req.DecommissionedNodeIDs), - func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { - return timeutil.RunWithTimeout(ctx, fmt.Sprintf("retrieve status of n%d", nodeID), - retrieveNodeStatusTimeout, - func(ctx context.Context) error { - res, err := client.RecoveryNodeStatus(ctx, &serverpb.RecoveryNodeStatusRequest{}) - if err != nil { - return errors.Mark(errors.Wrapf(err, "failed to retrieve status of n%d", nodeID), - errMarkRetry) - } - nss = append(nss, res.Status) - return nil - }) - }) + verifyRecoveryNodeStatusParallelFn(ctx, &nss)) if err != nil { return nil, err } @@ -591,12 +616,30 @@ func (s Server) Verify( } return &serverpb.RecoveryVerifyResponse{ - Statuses: nss, + Statuses: nss.Clone(), DecommissionedNodeStatuses: decomStatus, UnavailableRanges: rangeHealth, }, nil } +func verifyRecoveryNodeStatusParallelFn( + ctx context.Context, nss *threadSafeSlice[loqrecoverypb.NodeRecoveryStatus], +) visitNodeAdminFn { + return func(nodeID roachpb.NodeID, client serverpb.AdminClient) error { + return timeutil.RunWithTimeout(ctx, fmt.Sprintf("retrieve status of n%d", nodeID), + retrieveNodeStatusTimeout, + func(ctx context.Context) error { + res, err := client.RecoveryNodeStatus(ctx, &serverpb.RecoveryNodeStatusRequest{}) + if err != nil { + return errors.Mark(errors.Wrapf(err, "failed to retrieve status of n%d", nodeID), + errMarkRetry) + } + nss.Append(res.Status) + return nil + }) + } +} + func checkRangeHealth( ctx context.Context, d roachpb.RangeDescriptor, @@ -637,7 +680,8 @@ func checkRangeHealth( return loqrecoverypb.RangeHealth_LOSS_OF_QUORUM } -// makeVisitAvailableNodes creates a function to visit available remote nodes. +// makeVisitAvailableNodesInParallel creates a function to visit available +// remote nodes, in parallel. // // Returned function would dial all cluster nodes from gossip and executes // visitor function with admin client after connection is established. Function @@ -650,48 +694,19 @@ func checkRangeHealth( // // For latter, errors marked with errMarkRetry marker are retried. It is up // to the visitor to mark appropriate errors are retryable. -func makeVisitAvailableNodes( +// +// Nodes may be visited in parallel, so the visitor function must be safe for +// concurrent use. +func makeVisitAvailableNodesInParallel( g *gossip.Gossip, loc roachpb.Locality, rpcCtx *rpc.Context, -) visitNodeAdminFn { - return func(ctx context.Context, retryOpts retry.Options, +) visitNodesAdminFn { + return func( + ctx context.Context, + retryOpts retry.Options, + maxConcurrency int32, nodeFilter func(nodeID roachpb.NodeID) bool, - visitor func(nodeID roachpb.NodeID, client serverpb.AdminClient) error, + visitor visitNodeAdminFn, ) error { - visitWithRetry := func(node roachpb.NodeDescriptor) error { - var err error - for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { - log.Infof(ctx, "visiting node n%d, attempt %d", node.NodeID, r.CurrentAttempt()) - addr := node.AddressForLocality(loc) - var conn *grpc.ClientConn - // Note that we use ConnectNoBreaker here to avoid any race with probe - // running on current node and target node restarting. Errors from circuit - // breaker probes could confuse us and present node as unavailable. - conn, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, rpc.DefaultClass).ConnectNoBreaker(ctx) - // Nodes would contain dead nodes that we don't need to visit. We can skip - // them and let caller handle incomplete info. - if err != nil { - if grpcutil.IsConnectionUnavailable(err) { - log.Infof(ctx, "rejecting node n%d because of suspected un-retryable error: %s", - node.NodeID, err) - return nil - } - // This was an initial heartbeat type error, we must retry as node seems - // live. - continue - } - client := serverpb.NewAdminClient(conn) - err = visitor(node.NodeID, client) - if err == nil { - return nil - } - log.Infof(ctx, "failed calling a visitor for node n%d: %s", node.NodeID, err) - if !IsRetryableError(err) { - return err - } - } - return err - } - var nodes []roachpb.NodeDescriptor if err := g.IterateInfos(gossip.KeyNodeDescPrefix, func(key string, i gossip.Info) error { b, err := i.Value.GetBytes() @@ -715,13 +730,62 @@ func makeVisitAvailableNodes( return err } + var g errgroup.Group + if maxConcurrency == 0 { + // "A value of 0 disables concurrency." + maxConcurrency = 1 + } + g.SetLimit(int(maxConcurrency)) for _, node := range nodes { - if err := visitWithRetry(node); err != nil { - return err + node := node // copy for closure + g.Go(func() error { + return visitNodeWithRetry(ctx, loc, rpcCtx, retryOpts, visitor, node) + }) + } + return g.Wait() + } +} + +func visitNodeWithRetry( + ctx context.Context, + loc roachpb.Locality, + rpcCtx *rpc.Context, + retryOpts retry.Options, + visitor visitNodeAdminFn, + node roachpb.NodeDescriptor, +) error { + var err error + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + log.Infof(ctx, "visiting node n%d, attempt %d", node.NodeID, r.CurrentAttempt()) + addr := node.AddressForLocality(loc) + var conn *grpc.ClientConn + // Note that we use ConnectNoBreaker here to avoid any race with probe + // running on current node and target node restarting. Errors from circuit + // breaker probes could confuse us and present node as unavailable. + conn, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, rpc.DefaultClass).ConnectNoBreaker(ctx) + // Nodes would contain dead nodes that we don't need to visit. We can skip + // them and let caller handle incomplete info. + if err != nil { + if grpcutil.IsConnectionUnavailable(err) { + log.Infof(ctx, "rejecting node n%d because of suspected un-retryable error: %s", + node.NodeID, err) + return nil } + // This was an initial heartbeat type error, we must retry as node seems + // live. + continue + } + client := serverpb.NewAdminClient(conn) + err = visitor(node.NodeID, client) + if err == nil { + return nil + } + log.Infof(ctx, "failed calling a visitor for node n%d: %s", node.NodeID, err) + if !IsRetryableError(err) { + return err } - return nil } + return err } // makeVisitNode creates a function to visit a remote node. diff --git a/pkg/kv/kvserver/loqrecovery/testing_knobs.go b/pkg/kv/kvserver/loqrecovery/testing_knobs.go index 34bca894132d..ad84fa2907d8 100644 --- a/pkg/kv/kvserver/loqrecovery/testing_knobs.go +++ b/pkg/kv/kvserver/loqrecovery/testing_knobs.go @@ -23,6 +23,7 @@ type TestingKnobs struct { MetadataScanTimeout time.Duration // Replica filter for forwarded replica info when collecting fan-out data. + // It can be called concurrently, so must be safe for concurrent use. ForwardReplicaFilter func(*serverpb.RecoveryCollectLocalReplicaInfoResponse) error } diff --git a/pkg/kv/kvserver/loqrecovery/utils.go b/pkg/kv/kvserver/loqrecovery/utils.go index e13f28f4d742..213397f459e2 100644 --- a/pkg/kv/kvserver/loqrecovery/utils.go +++ b/pkg/kv/kvserver/loqrecovery/utils.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/strutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) type storeIDSet map[roachpb.StoreID]struct{} @@ -315,3 +316,90 @@ func (e *RecoveryError) ErrorDetail() string { } return strings.Join(descriptions, "\n") } + +// grpcStream is a specialization of a grpc.ServerStream for a specific message +// type. It is implemented by code generated gRPC server streams. +// +// As mentioned in the grpc.ServerStream documentation, it is not safe to call +// Send on the same stream in different goroutines. +type grpcStream[T any] interface { + Send(T) error +} + +// threadSafeStream wraps a grpcStream and provides basic thread safety. +type threadSafeStream[T any] struct { + mu syncutil.Mutex + s grpcStream[T] +} + +func makeThreadSafeStream[T any](s grpcStream[T]) *threadSafeStream[T] { + return &threadSafeStream[T]{s: s} +} + +func (s *threadSafeStream[T]) Send(t T) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.s.Send(t) +} + +// threadSafeMap wraps a map and provides basic thread safety. +type threadSafeMap[K comparable, V any] struct { + mu syncutil.Mutex + m map[K]V +} + +func makeThreadSafeMap[K comparable, V any]() *threadSafeMap[K, V] { + return &threadSafeMap[K, V]{m: make(map[K]V)} +} + +func (m *threadSafeMap[K, V]) Get(k K) V { + m.mu.Lock() + defer m.mu.Unlock() + return m.m[k] +} + +func (m *threadSafeMap[K, V]) Set(k K, v V) { + m.mu.Lock() + defer m.mu.Unlock() + m.m[k] = v +} + +func (m *threadSafeMap[K, V]) Delete(k K) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.m, k) +} + +func (m *threadSafeMap[K, V]) Len() int { + m.mu.Lock() + defer m.mu.Unlock() + return len(m.m) +} + +func (m *threadSafeMap[K, V]) Clone() map[K]V { + m.mu.Lock() + defer m.mu.Unlock() + clone := make(map[K]V, len(m.m)) + for k, v := range m.m { + clone[k] = v + } + return clone +} + +// threadSafeSlice wraps a slice and provides basic thread safety. +type threadSafeSlice[T any] struct { + mu syncutil.Mutex + s []T +} + +func (s *threadSafeSlice[T]) Append(t ...T) { + s.mu.Lock() + defer s.mu.Unlock() + s.s = append(s.s, t...) +} + +func (s *threadSafeSlice[T]) Clone() []T { + s.mu.Lock() + defer s.mu.Unlock() + return append([]T(nil), s.s...) +}