Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions common/membership/ringpop/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,25 +227,6 @@ func (rpo *monitor) WaitUntilInitialized(ctx context.Context) error {
return err
}

func serviceNameToServiceTypeEnum(name primitives.ServiceName) (persistence.ServiceType, error) {
switch name {
case primitives.AllServices:
return persistence.All, nil
case primitives.FrontendService:
return persistence.Frontend, nil
case primitives.InternalFrontendService:
return persistence.InternalFrontend, nil
case primitives.HistoryService:
return persistence.History, nil
case primitives.MatchingService:
return persistence.Matching, nil
case primitives.WorkerService:
return persistence.Worker, nil
default:
return persistence.All, fmt.Errorf("unable to parse servicename '%s'", name)
}
}

func (rpo *monitor) upsertMyMembership(
ctx context.Context,
request *persistence.UpsertClusterMembershipRequest,
Expand Down Expand Up @@ -348,7 +329,6 @@ func (rpo *monitor) fetchCurrentBootstrapHostports() ([]string, error) {
PageSize: pageSize,
NextPageToken: nextPageToken,
})

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -381,7 +361,6 @@ func (rpo *monitor) startHeartbeatUpsertLoop(request *persistence.UpsertClusterM
default:
}
err := rpo.upsertMyMembership(rpo.lifecycleCtx, request)

if err != nil {
rpo.logger.Error("Membership upsert failed.", tag.Error(err))
}
Expand Down Expand Up @@ -474,3 +453,29 @@ func replaceServicePort(address string, servicePort int) (string, error) {
}
return net.JoinHostPort(host, convert.IntToString(servicePort)), nil
}

var (
serviceNameToServiceTypeEnumMap = map[primitives.ServiceName]persistence.ServiceType{}
)

// RegisterServiceNameToServiceTypeEnum must be called from a static init().
func RegisterServiceNameToServiceTypeEnum(serviceName primitives.ServiceName, serviceType persistence.ServiceType) {
serviceNameToServiceTypeEnumMap[serviceName] = serviceType
}

func init() {
RegisterServiceNameToServiceTypeEnum(primitives.AllServices, persistence.All)
RegisterServiceNameToServiceTypeEnum(primitives.FrontendService, persistence.Frontend)
RegisterServiceNameToServiceTypeEnum(primitives.InternalFrontendService, persistence.InternalFrontend)
RegisterServiceNameToServiceTypeEnum(primitives.HistoryService, persistence.History)
RegisterServiceNameToServiceTypeEnum(primitives.MatchingService, persistence.Matching)
RegisterServiceNameToServiceTypeEnum(primitives.WorkerService, persistence.Worker)
}
Comment on lines +457 to +473
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var (
serviceNameToServiceTypeEnumMap = map[primitives.ServiceName]persistence.ServiceType{}
)
func RegisterServiceNameToServiceTypeEnum(serviceName primitives.ServiceName, serviceType persistence.ServiceType) {
serviceNameToServiceTypeEnumMap[serviceName] = serviceType
}
func init() {
RegisterServiceNameToServiceTypeEnum(primitives.AllServices, persistence.All)
RegisterServiceNameToServiceTypeEnum(primitives.FrontendService, persistence.Frontend)
RegisterServiceNameToServiceTypeEnum(primitives.InternalFrontendService, persistence.InternalFrontend)
RegisterServiceNameToServiceTypeEnum(primitives.HistoryService, persistence.History)
RegisterServiceNameToServiceTypeEnum(primitives.MatchingService, persistence.Matching)
RegisterServiceNameToServiceTypeEnum(primitives.WorkerService, persistence.Worker)
}
var serviceNameToServiceTypeEnumMap = map[primitives.ServiceName]persistence.ServiceType{
primitives.AllServices: persistence.All,
primitives.FrontendService: persistence.Frontend,
primitives.InternalFrontendService: persistence.InternalFrontend,
primitives.HistoryService: persistence.History,
primitives.MatchingService: persistence.Matching,
primitives.WorkerService: persistence.Worker,
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there still needs to be a RegisterServiceNameToServiceTypeEnum, that's the whole point of this change


func serviceNameToServiceTypeEnum(name primitives.ServiceName) (persistence.ServiceType, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a redundant helper but 🤷

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wanted to keep the changes contained - can remove it down the line

if serviceType, ok := serviceNameToServiceTypeEnumMap[name]; ok {
return serviceType, nil
}

return persistence.All, fmt.Errorf("unable to parse servicename '%s'", name)
}
27 changes: 26 additions & 1 deletion service/worker/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/resolver"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service"
"go.temporal.io/server/service/worker/batcher"
Expand Down Expand Up @@ -90,7 +91,7 @@ var Module = fx.Options(
fx.Provide(ServerProvider),
fx.Provide(NewService),
fx.Provide(fx.Annotate(NewWorkerManager, fx.ParamTags(workercommon.WorkerComponentTag))),
fx.Provide(NewPerNamespaceWorkerManager),
fx.Provide(PerNamespaceWorkerManagerProvider),
fx.Invoke(ServiceLifetimeHooks),
)

Expand Down Expand Up @@ -180,6 +181,30 @@ func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service) {
lc.Append(fx.StartStopHook(svc.Start, svc.Stop))
}

type perNamespaceWorkerManagerInitParams struct {
fx.In
Logger log.Logger
SdkClientFactory sdk.ClientFactory
NamespaceRegistry namespace.Registry
HostName resource.HostName
Config *Config
ClusterMetadata cluster.Metadata
Components []workercommon.PerNSWorkerComponent `group:"perNamespaceWorkerComponent"`
}

func PerNamespaceWorkerManagerProvider(params perNamespaceWorkerManagerInitParams) *PerNamespaceWorkerManager {
return NewPerNamespaceWorkerManager(
params.Logger,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could just pass params as a whole, so there's fewer places to change when adding dependencies? I guess this separates the fx stuff more cleanly, so either way

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the goal was separating the FX a tiny bit more so it is not hard assumed which fields come FX and which come from somewhere else

params.SdkClientFactory,
params.NamespaceRegistry,
params.HostName,
params.Config,
params.ClusterMetadata,
params.Components,
primitives.PerNSWorkerTaskQueue,
)
}

func ServerProvider(rpcFactory common.RPCFactory, logger log.Logger) *grpc.Server {
opts, err := rpcFactory.GetInternodeGRPCServerOptions()
if err != nil {
Expand Down
74 changes: 35 additions & 39 deletions service/worker/pernamespaceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,23 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/util"
workercommon "go.temporal.io/server/service/worker/common"
"go.uber.org/fx"
expmaps "golang.org/x/exp/maps"
)

const (
perNamespaceWorkerManagerListenerKey = "perNamespaceWorkerManager"

// Always refresh workers after this time even if there were no membership or namespace
// state changes. This is to pick up dynamic config changes in component enabled status,
// and heal anything that may have gotten into a bad state.
refreshInterval = 10 * time.Minute
)

type (
perNamespaceWorkerManagerInitParams struct {
fx.In
Logger log.Logger
SdkClientFactory sdk.ClientFactory
NamespaceRegistry namespace.Registry
HostName resource.HostName
Config *Config
ClusterMetadata cluster.Metadata
Components []workercommon.PerNSWorkerComponent `group:"perNamespaceWorkerComponent"`
}

perNamespaceWorkerManager struct {
PerNamespaceWorkerManager struct {
status int32

// from init params or Start
Expand All @@ -65,6 +50,7 @@ type (
self membership.HostInfo
hostName resource.HostName
config *Config
taskQueueName string
serviceResolver membership.ServiceResolver
components []workercommon.PerNSWorkerComponent
initialRetry time.Duration
Expand All @@ -78,7 +64,7 @@ type (
}

perNamespaceWorker struct {
wm *perNamespaceWorkerManager
wm *PerNamespaceWorkerManager
logger log.Logger
cancel func()

Expand Down Expand Up @@ -114,27 +100,37 @@ var (
errInvalidConfiguration = errors.New("invalid dynamic configuration")
)

func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *perNamespaceWorkerManager {
return &perNamespaceWorkerManager{
logger: log.With(params.Logger, tag.ComponentPerNSWorkerManager),
sdkClientFactory: params.SdkClientFactory,
namespaceRegistry: params.NamespaceRegistry,
hostName: params.HostName,
config: params.Config,
components: params.Components,
func NewPerNamespaceWorkerManager(
logger log.Logger,
sdkClientFactory sdk.ClientFactory,
namespaceRegistry namespace.Registry,
hostName resource.HostName,
config *Config,
clusterMetadata cluster.Metadata,
components []workercommon.PerNSWorkerComponent,
taskQueueName string,
) *PerNamespaceWorkerManager {
return &PerNamespaceWorkerManager{
logger: log.With(logger, tag.ComponentPerNSWorkerManager),
sdkClientFactory: sdkClientFactory,
namespaceRegistry: namespaceRegistry,
hostName: hostName,
taskQueueName: taskQueueName,
config: config,
components: components,
initialRetry: 1 * time.Second,
thisClusterName: params.ClusterMetadata.GetCurrentClusterName(),
startLimiter: quotas.NewDefaultOutgoingRateLimiter(quotas.RateFn(params.Config.PerNamespaceWorkerStartRate)),
thisClusterName: clusterMetadata.GetCurrentClusterName(),
startLimiter: quotas.NewDefaultOutgoingRateLimiter(quotas.RateFn(config.PerNamespaceWorkerStartRate)),
membershipChangedCh: make(chan *membership.ChangedEvent),
workers: make(map[namespace.ID]*perNamespaceWorker),
}
}

func (wm *perNamespaceWorkerManager) Running() bool {
func (wm *PerNamespaceWorkerManager) Running() bool {
return atomic.LoadInt32(&wm.status) == common.DaemonStatusStarted
}

func (wm *perNamespaceWorkerManager) Start(
func (wm *PerNamespaceWorkerManager) Start(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should change the key in the AddListener call to something unique in case these end up in the same process. I like to use fmt.Sprintf("%p", wm)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah gotcha, made the change

self membership.HostInfo,
serviceResolver membership.ServiceResolver,
) {
Expand All @@ -154,7 +150,7 @@ func (wm *perNamespaceWorkerManager) Start(
// this will call namespaceCallback with current namespaces
wm.namespaceRegistry.RegisterStateChangeCallback(wm, wm.namespaceCallback)

err := wm.serviceResolver.AddListener(perNamespaceWorkerManagerListenerKey, wm.membershipChangedCh)
err := wm.serviceResolver.AddListener(fmt.Sprintf("%p", wm), wm.membershipChangedCh)
if err != nil {
wm.logger.Fatal("Unable to register membership listener", tag.Error(err))
}
Expand All @@ -164,7 +160,7 @@ func (wm *perNamespaceWorkerManager) Start(
wm.logger.Info("", tag.LifeCycleStarted)
}

func (wm *perNamespaceWorkerManager) Stop() {
func (wm *PerNamespaceWorkerManager) Stop() {
if !atomic.CompareAndSwapInt32(
&wm.status,
common.DaemonStatusStarted,
Expand All @@ -176,7 +172,7 @@ func (wm *perNamespaceWorkerManager) Stop() {
wm.logger.Info("", tag.LifeCycleStopping)

wm.namespaceRegistry.UnregisterStateChangeCallback(wm)
err := wm.serviceResolver.RemoveListener(perNamespaceWorkerManagerListenerKey)
err := wm.serviceResolver.RemoveListener(fmt.Sprintf("%p", wm))
if err != nil {
wm.logger.Error("Unable to unregister membership listener", tag.Error(err))
}
Expand All @@ -195,25 +191,25 @@ func (wm *perNamespaceWorkerManager) Stop() {
wm.logger.Info("", tag.LifeCycleStopped)
}

func (wm *perNamespaceWorkerManager) namespaceCallback(ns *namespace.Namespace, nsDeleted bool) {
func (wm *PerNamespaceWorkerManager) namespaceCallback(ns *namespace.Namespace, nsDeleted bool) {
go wm.getWorkerByNamespace(ns).update(ns, nsDeleted, nil, nil)
}

func (wm *perNamespaceWorkerManager) refreshAll() {
func (wm *PerNamespaceWorkerManager) refreshAll() {
wm.lock.Lock()
defer wm.lock.Unlock()
for _, worker := range wm.workers {
go worker.update(nil, false, nil, nil)
}
}

func (wm *perNamespaceWorkerManager) membershipChangedListener() {
func (wm *PerNamespaceWorkerManager) membershipChangedListener() {
for range wm.membershipChangedCh {
wm.refreshAll()
}
}

func (wm *perNamespaceWorkerManager) periodicRefresh() {
func (wm *PerNamespaceWorkerManager) periodicRefresh() {
for range time.NewTicker(refreshInterval).C {
if atomic.LoadInt32(&wm.status) != common.DaemonStatusStarted {
return
Expand All @@ -222,7 +218,7 @@ func (wm *perNamespaceWorkerManager) periodicRefresh() {
}
}

func (wm *perNamespaceWorkerManager) getWorkerByNamespace(ns *namespace.Namespace) *perNamespaceWorker {
func (wm *PerNamespaceWorkerManager) getWorkerByNamespace(ns *namespace.Namespace) *perNamespaceWorker {
wm.lock.Lock()
defer wm.lock.Unlock()

Expand All @@ -246,7 +242,7 @@ func (wm *perNamespaceWorkerManager) getWorkerByNamespace(ns *namespace.Namespac
return worker
}

func (wm *perNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) {
func (wm *PerNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) {
wm.lock.Lock()
defer wm.lock.Unlock()
prev := wm.workers[ns.ID()]
Expand Down Expand Up @@ -490,7 +486,7 @@ func (w *perNamespaceWorker) startWorker(
sdkoptions.OnFatalError = w.onFatalError

// this should not block because the client already has server capabilities
worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
worker := w.wm.sdkClientFactory.NewWorker(client, w.wm.taskQueueName, sdkoptions)
details := workercommon.RegistrationDetails{
TotalWorkers: allocation.total,
Multiplicity: allocation.local,
Expand Down
6 changes: 3 additions & 3 deletions service/worker/pernamespaceworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type perNsWorkerManagerSuite struct {
cmp1 *workercommon.MockPerNSWorkerComponent
cmp2 *workercommon.MockPerNSWorkerComponent

manager *perNamespaceWorkerManager
manager *PerNamespaceWorkerManager
}

func TestPerNsWorkerManager(t *testing.T) {
Expand All @@ -57,7 +57,7 @@ func (s *perNsWorkerManagerSuite) SetupTest() {
s.cmp1 = workercommon.NewMockPerNSWorkerComponent(s.controller)
s.cmp2 = workercommon.NewMockPerNSWorkerComponent(s.controller)

s.manager = NewPerNamespaceWorkerManager(perNamespaceWorkerManagerInitParams{
s.manager = PerNamespaceWorkerManagerProvider(perNamespaceWorkerManagerInitParams{
Logger: s.logger,
SdkClientFactory: s.cfactory,
NamespaceRegistry: s.registry,
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestPerNsWorkerManagerSubscription(t *testing.T) {
t.Cleanup(dc.Stop)
config := NewConfig(dc, nil)

manager := NewPerNamespaceWorkerManager(perNamespaceWorkerManagerInitParams{
manager := PerNamespaceWorkerManagerProvider(perNamespaceWorkerManagerInitParams{
Logger: logger,
SdkClientFactory: cfactory,
NamespaceRegistry: registry,
Expand Down
4 changes: 2 additions & 2 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type (
config *Config

workerManager *workerManager
perNamespaceWorkerManager *perNamespaceWorkerManager
perNamespaceWorkerManager *PerNamespaceWorkerManager
scanner *scanner.Scanner
matchingClient matchingservice.MatchingServiceClient
namespaceReplicationTaskExecutor nsreplication.TaskExecutor
Expand Down Expand Up @@ -121,7 +121,7 @@ func NewService(
taskManager persistence.TaskManager,
historyClient resource.HistoryClient,
workerManager *workerManager,
perNamespaceWorkerManager *perNamespaceWorkerManager,
perNamespaceWorkerManager *PerNamespaceWorkerManager,
visibilityManager manager.VisibilityManager,
matchingClient resource.MatchingClient,
namespaceReplicationTaskExecutor nsreplication.TaskExecutor,
Expand Down
Loading