diff --git a/common/membership/ringpop/monitor.go b/common/membership/ringpop/monitor.go index 61c498e350d..e9f29590cd4 100644 --- a/common/membership/ringpop/monitor.go +++ b/common/membership/ringpop/monitor.go @@ -227,25 +227,6 @@ func (rpo *monitor) WaitUntilInitialized(ctx context.Context) error { return err } -func serviceNameToServiceTypeEnum(name primitives.ServiceName) (persistence.ServiceType, error) { - switch name { - case primitives.AllServices: - return persistence.All, nil - case primitives.FrontendService: - return persistence.Frontend, nil - case primitives.InternalFrontendService: - return persistence.InternalFrontend, nil - case primitives.HistoryService: - return persistence.History, nil - case primitives.MatchingService: - return persistence.Matching, nil - case primitives.WorkerService: - return persistence.Worker, nil - default: - return persistence.All, fmt.Errorf("unable to parse servicename '%s'", name) - } -} - func (rpo *monitor) upsertMyMembership( ctx context.Context, request *persistence.UpsertClusterMembershipRequest, @@ -348,7 +329,6 @@ func (rpo *monitor) fetchCurrentBootstrapHostports() ([]string, error) { PageSize: pageSize, NextPageToken: nextPageToken, }) - if err != nil { return nil, err } @@ -381,7 +361,6 @@ func (rpo *monitor) startHeartbeatUpsertLoop(request *persistence.UpsertClusterM default: } err := rpo.upsertMyMembership(rpo.lifecycleCtx, request) - if err != nil { rpo.logger.Error("Membership upsert failed.", tag.Error(err)) } @@ -474,3 +453,29 @@ func replaceServicePort(address string, servicePort int) (string, error) { } return net.JoinHostPort(host, convert.IntToString(servicePort)), nil } + +var ( + serviceNameToServiceTypeEnumMap = map[primitives.ServiceName]persistence.ServiceType{} +) + +// RegisterServiceNameToServiceTypeEnum must be called from a static init(). +func RegisterServiceNameToServiceTypeEnum(serviceName primitives.ServiceName, serviceType persistence.ServiceType) { + serviceNameToServiceTypeEnumMap[serviceName] = serviceType +} + +func init() { + RegisterServiceNameToServiceTypeEnum(primitives.AllServices, persistence.All) + RegisterServiceNameToServiceTypeEnum(primitives.FrontendService, persistence.Frontend) + RegisterServiceNameToServiceTypeEnum(primitives.InternalFrontendService, persistence.InternalFrontend) + RegisterServiceNameToServiceTypeEnum(primitives.HistoryService, persistence.History) + RegisterServiceNameToServiceTypeEnum(primitives.MatchingService, persistence.Matching) + RegisterServiceNameToServiceTypeEnum(primitives.WorkerService, persistence.Worker) +} + +func serviceNameToServiceTypeEnum(name primitives.ServiceName) (persistence.ServiceType, error) { + if serviceType, ok := serviceNameToServiceTypeEnumMap[name]; ok { + return serviceType, nil + } + + return persistence.All, fmt.Errorf("unable to parse servicename '%s'", name) +} diff --git a/service/worker/fx.go b/service/worker/fx.go index eaa1769913d..4001cfc4cad 100644 --- a/service/worker/fx.go +++ b/service/worker/fx.go @@ -24,6 +24,7 @@ import ( "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/resolver" "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service" "go.temporal.io/server/service/worker/batcher" @@ -90,7 +91,7 @@ var Module = fx.Options( fx.Provide(ServerProvider), fx.Provide(NewService), fx.Provide(fx.Annotate(NewWorkerManager, fx.ParamTags(workercommon.WorkerComponentTag))), - fx.Provide(NewPerNamespaceWorkerManager), + fx.Provide(PerNamespaceWorkerManagerProvider), fx.Invoke(ServiceLifetimeHooks), ) @@ -180,6 +181,30 @@ func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service) { lc.Append(fx.StartStopHook(svc.Start, svc.Stop)) } +type perNamespaceWorkerManagerInitParams struct { + fx.In + Logger log.Logger + SdkClientFactory sdk.ClientFactory + NamespaceRegistry namespace.Registry + HostName resource.HostName + Config *Config + ClusterMetadata cluster.Metadata + Components []workercommon.PerNSWorkerComponent `group:"perNamespaceWorkerComponent"` +} + +func PerNamespaceWorkerManagerProvider(params perNamespaceWorkerManagerInitParams) *PerNamespaceWorkerManager { + return NewPerNamespaceWorkerManager( + params.Logger, + params.SdkClientFactory, + params.NamespaceRegistry, + params.HostName, + params.Config, + params.ClusterMetadata, + params.Components, + primitives.PerNSWorkerTaskQueue, + ) +} + func ServerProvider(rpcFactory common.RPCFactory, logger log.Logger) *grpc.Server { opts, err := rpcFactory.GetInternodeGRPCServerOptions() if err != nil { diff --git a/service/worker/pernamespaceworker.go b/service/worker/pernamespaceworker.go index bdca012e745..2f6cf20e65b 100644 --- a/service/worker/pernamespaceworker.go +++ b/service/worker/pernamespaceworker.go @@ -24,19 +24,15 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/membership" "go.temporal.io/server/common/namespace" - "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/resource" "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/util" workercommon "go.temporal.io/server/service/worker/common" - "go.uber.org/fx" expmaps "golang.org/x/exp/maps" ) const ( - perNamespaceWorkerManagerListenerKey = "perNamespaceWorkerManager" - // Always refresh workers after this time even if there were no membership or namespace // state changes. This is to pick up dynamic config changes in component enabled status, // and heal anything that may have gotten into a bad state. @@ -44,18 +40,7 @@ const ( ) type ( - perNamespaceWorkerManagerInitParams struct { - fx.In - Logger log.Logger - SdkClientFactory sdk.ClientFactory - NamespaceRegistry namespace.Registry - HostName resource.HostName - Config *Config - ClusterMetadata cluster.Metadata - Components []workercommon.PerNSWorkerComponent `group:"perNamespaceWorkerComponent"` - } - - perNamespaceWorkerManager struct { + PerNamespaceWorkerManager struct { status int32 // from init params or Start @@ -65,6 +50,7 @@ type ( self membership.HostInfo hostName resource.HostName config *Config + taskQueueName string serviceResolver membership.ServiceResolver components []workercommon.PerNSWorkerComponent initialRetry time.Duration @@ -78,7 +64,7 @@ type ( } perNamespaceWorker struct { - wm *perNamespaceWorkerManager + wm *PerNamespaceWorkerManager logger log.Logger cancel func() @@ -114,27 +100,37 @@ var ( errInvalidConfiguration = errors.New("invalid dynamic configuration") ) -func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *perNamespaceWorkerManager { - return &perNamespaceWorkerManager{ - logger: log.With(params.Logger, tag.ComponentPerNSWorkerManager), - sdkClientFactory: params.SdkClientFactory, - namespaceRegistry: params.NamespaceRegistry, - hostName: params.HostName, - config: params.Config, - components: params.Components, +func NewPerNamespaceWorkerManager( + logger log.Logger, + sdkClientFactory sdk.ClientFactory, + namespaceRegistry namespace.Registry, + hostName resource.HostName, + config *Config, + clusterMetadata cluster.Metadata, + components []workercommon.PerNSWorkerComponent, + taskQueueName string, +) *PerNamespaceWorkerManager { + return &PerNamespaceWorkerManager{ + logger: log.With(logger, tag.ComponentPerNSWorkerManager), + sdkClientFactory: sdkClientFactory, + namespaceRegistry: namespaceRegistry, + hostName: hostName, + taskQueueName: taskQueueName, + config: config, + components: components, initialRetry: 1 * time.Second, - thisClusterName: params.ClusterMetadata.GetCurrentClusterName(), - startLimiter: quotas.NewDefaultOutgoingRateLimiter(quotas.RateFn(params.Config.PerNamespaceWorkerStartRate)), + thisClusterName: clusterMetadata.GetCurrentClusterName(), + startLimiter: quotas.NewDefaultOutgoingRateLimiter(quotas.RateFn(config.PerNamespaceWorkerStartRate)), membershipChangedCh: make(chan *membership.ChangedEvent), workers: make(map[namespace.ID]*perNamespaceWorker), } } -func (wm *perNamespaceWorkerManager) Running() bool { +func (wm *PerNamespaceWorkerManager) Running() bool { return atomic.LoadInt32(&wm.status) == common.DaemonStatusStarted } -func (wm *perNamespaceWorkerManager) Start( +func (wm *PerNamespaceWorkerManager) Start( self membership.HostInfo, serviceResolver membership.ServiceResolver, ) { @@ -154,7 +150,7 @@ func (wm *perNamespaceWorkerManager) Start( // this will call namespaceCallback with current namespaces wm.namespaceRegistry.RegisterStateChangeCallback(wm, wm.namespaceCallback) - err := wm.serviceResolver.AddListener(perNamespaceWorkerManagerListenerKey, wm.membershipChangedCh) + err := wm.serviceResolver.AddListener(fmt.Sprintf("%p", wm), wm.membershipChangedCh) if err != nil { wm.logger.Fatal("Unable to register membership listener", tag.Error(err)) } @@ -164,7 +160,7 @@ func (wm *perNamespaceWorkerManager) Start( wm.logger.Info("", tag.LifeCycleStarted) } -func (wm *perNamespaceWorkerManager) Stop() { +func (wm *PerNamespaceWorkerManager) Stop() { if !atomic.CompareAndSwapInt32( &wm.status, common.DaemonStatusStarted, @@ -176,7 +172,7 @@ func (wm *perNamespaceWorkerManager) Stop() { wm.logger.Info("", tag.LifeCycleStopping) wm.namespaceRegistry.UnregisterStateChangeCallback(wm) - err := wm.serviceResolver.RemoveListener(perNamespaceWorkerManagerListenerKey) + err := wm.serviceResolver.RemoveListener(fmt.Sprintf("%p", wm)) if err != nil { wm.logger.Error("Unable to unregister membership listener", tag.Error(err)) } @@ -195,11 +191,11 @@ func (wm *perNamespaceWorkerManager) Stop() { wm.logger.Info("", tag.LifeCycleStopped) } -func (wm *perNamespaceWorkerManager) namespaceCallback(ns *namespace.Namespace, nsDeleted bool) { +func (wm *PerNamespaceWorkerManager) namespaceCallback(ns *namespace.Namespace, nsDeleted bool) { go wm.getWorkerByNamespace(ns).update(ns, nsDeleted, nil, nil) } -func (wm *perNamespaceWorkerManager) refreshAll() { +func (wm *PerNamespaceWorkerManager) refreshAll() { wm.lock.Lock() defer wm.lock.Unlock() for _, worker := range wm.workers { @@ -207,13 +203,13 @@ func (wm *perNamespaceWorkerManager) refreshAll() { } } -func (wm *perNamespaceWorkerManager) membershipChangedListener() { +func (wm *PerNamespaceWorkerManager) membershipChangedListener() { for range wm.membershipChangedCh { wm.refreshAll() } } -func (wm *perNamespaceWorkerManager) periodicRefresh() { +func (wm *PerNamespaceWorkerManager) periodicRefresh() { for range time.NewTicker(refreshInterval).C { if atomic.LoadInt32(&wm.status) != common.DaemonStatusStarted { return @@ -222,7 +218,7 @@ func (wm *perNamespaceWorkerManager) periodicRefresh() { } } -func (wm *perNamespaceWorkerManager) getWorkerByNamespace(ns *namespace.Namespace) *perNamespaceWorker { +func (wm *PerNamespaceWorkerManager) getWorkerByNamespace(ns *namespace.Namespace) *perNamespaceWorker { wm.lock.Lock() defer wm.lock.Unlock() @@ -246,7 +242,7 @@ func (wm *perNamespaceWorkerManager) getWorkerByNamespace(ns *namespace.Namespac return worker } -func (wm *perNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) { +func (wm *PerNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) { wm.lock.Lock() defer wm.lock.Unlock() prev := wm.workers[ns.ID()] @@ -490,7 +486,7 @@ func (w *perNamespaceWorker) startWorker( sdkoptions.OnFatalError = w.onFatalError // this should not block because the client already has server capabilities - worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions) + worker := w.wm.sdkClientFactory.NewWorker(client, w.wm.taskQueueName, sdkoptions) details := workercommon.RegistrationDetails{ TotalWorkers: allocation.total, Multiplicity: allocation.local, diff --git a/service/worker/pernamespaceworker_test.go b/service/worker/pernamespaceworker_test.go index 622488028b0..b36c388e21c 100644 --- a/service/worker/pernamespaceworker_test.go +++ b/service/worker/pernamespaceworker_test.go @@ -39,7 +39,7 @@ type perNsWorkerManagerSuite struct { cmp1 *workercommon.MockPerNSWorkerComponent cmp2 *workercommon.MockPerNSWorkerComponent - manager *perNamespaceWorkerManager + manager *PerNamespaceWorkerManager } func TestPerNsWorkerManager(t *testing.T) { @@ -57,7 +57,7 @@ func (s *perNsWorkerManagerSuite) SetupTest() { s.cmp1 = workercommon.NewMockPerNSWorkerComponent(s.controller) s.cmp2 = workercommon.NewMockPerNSWorkerComponent(s.controller) - s.manager = NewPerNamespaceWorkerManager(perNamespaceWorkerManagerInitParams{ + s.manager = PerNamespaceWorkerManagerProvider(perNamespaceWorkerManagerInitParams{ Logger: s.logger, SdkClientFactory: s.cfactory, NamespaceRegistry: s.registry, @@ -558,7 +558,7 @@ func TestPerNsWorkerManagerSubscription(t *testing.T) { t.Cleanup(dc.Stop) config := NewConfig(dc, nil) - manager := NewPerNamespaceWorkerManager(perNamespaceWorkerManagerInitParams{ + manager := PerNamespaceWorkerManagerProvider(perNamespaceWorkerManagerInitParams{ Logger: logger, SdkClientFactory: cfactory, NamespaceRegistry: registry, diff --git a/service/worker/service.go b/service/worker/service.go index 869a6e1a808..36979894946 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -62,7 +62,7 @@ type ( config *Config workerManager *workerManager - perNamespaceWorkerManager *perNamespaceWorkerManager + perNamespaceWorkerManager *PerNamespaceWorkerManager scanner *scanner.Scanner matchingClient matchingservice.MatchingServiceClient namespaceReplicationTaskExecutor nsreplication.TaskExecutor @@ -121,7 +121,7 @@ func NewService( taskManager persistence.TaskManager, historyClient resource.HistoryClient, workerManager *workerManager, - perNamespaceWorkerManager *perNamespaceWorkerManager, + perNamespaceWorkerManager *PerNamespaceWorkerManager, visibilityManager manager.VisibilityManager, matchingClient resource.MatchingClient, namespaceReplicationTaskExecutor nsreplication.TaskExecutor,