Skip to content

Commit

Permalink
Add per-namespace worker manager (#2799)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed May 12, 2022
1 parent 87c3beb commit ab6e338
Show file tree
Hide file tree
Showing 8 changed files with 421 additions and 24 deletions.
4 changes: 4 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,10 @@ func CertThumbprint(thumbprint string) ZapTag {
return NewStringTag("cert-thumbprint", thumbprint)
}

func WorkerComponent(v interface{}) ZapTag {
return NewStringTag("worker-component", fmt.Sprintf("%T", v))
}

// history engine shard

// ShardID returns tag for ShardID
Expand Down
1 change: 1 addition & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ var (
ComponentBatcher = component("batcher")
ComponentWorker = component("worker")
ComponentWorkerManager = component("worker-manager")
ComponentPerNSWorkerManager = component("perns-worker-manager")
ComponentServiceResolver = component("service-resolver")
ComponentMetadataInitializer = component("metadata-initializer")
ComponentAddSearchAttributes = component("add-search-attributes")
Expand Down
91 changes: 75 additions & 16 deletions common/namespace/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ type (
// it is guaranteed that PrepareCallbackFn and CallbackFn pair will be both called or non will be called
CallbackFn func(oldNamespaces []*Namespace, newNamespaces []*Namespace)

// StateChangeCallbackFn can be registered to be called on any namespace state change,
// plus once for all namespaces after registration. There is no guarantee about when
// these are called.
StateChangeCallbackFn func(*Namespace)

// Registry provides access to Namespace objects by name or by ID.
Registry interface {
common.Daemon
Expand All @@ -132,6 +137,11 @@ type (
GetCacheSize() (sizeOfCacheByName int64, sizeOfCacheByID int64)
// Refresh forces an immediate refresh of the namespace cache and blocks until it's complete.
Refresh()
// Registers callback for namespace state changes. This is regrettably
// different from the above RegisterNamespaceChangeCallback because we
// need different semantics (and that one is going away).
RegisterStateChangeCallback(key any, cb StateChangeCallbackFn)
UnregisterStateChangeCallback(key any)
}

registry struct {
Expand All @@ -146,9 +156,9 @@ type (
lastRefreshTime atomic.Value
refreshInterval dynamicconfig.DurationPropertyFn

// cacheLock protects cachNameToID and cacheByID. If the exclusive side
// is to be held at the same time as the callbackLock (below), this lock
// MUST be acquired *first*.
// cacheLock protects cachNameToID, cacheByID and stateChangeCallbacks.
// If the exclusive side is to be held at the same time as the
// callbackLock (below), this lock MUST be acquired *first*.
cacheLock sync.RWMutex
cacheNameToID cache.Cache
cacheByID cache.Cache
Expand All @@ -159,6 +169,9 @@ type (
callbackLock sync.Mutex
prepareCallbacks map[any]PrepareCallbackFn
callbacks map[any]CallbackFn

// State-change callbacks. Protected by cacheLock
stateChangeCallbacks map[any]StateChangeCallbackFn
}
)

Expand All @@ -183,6 +196,7 @@ func NewRegistry(
prepareCallbacks: make(map[any]PrepareCallbackFn),
callbacks: make(map[any]CallbackFn),
refreshInterval: refreshInterval,
stateChangeCallbacks: make(map[any]StateChangeCallbackFn),
}
reg.lastRefreshTime.Store(time.Time{})
return reg
Expand Down Expand Up @@ -222,9 +236,13 @@ func (r *registry) Stop() {
}

func (r *registry) getAllNamespace() map[ID]*Namespace {
result := make(map[ID]*Namespace)
r.cacheLock.RLock()
defer r.cacheLock.RUnlock()
return r.getAllNamespaceLocked()
}

func (r *registry) getAllNamespaceLocked() map[ID]*Namespace {
result := make(map[ID]*Namespace)

ite := r.cacheByID.Iterator()
defer ite.Close()
Expand Down Expand Up @@ -287,6 +305,24 @@ func (r *registry) UnregisterNamespaceChangeCallback(
delete(r.callbacks, listenerID)
}

func (r *registry) RegisterStateChangeCallback(key any, cb StateChangeCallbackFn) {
r.cacheLock.Lock()
r.stateChangeCallbacks[key] = cb
allNamespaces := r.getAllNamespaceLocked()
r.cacheLock.Unlock()

// call once for each namespace already in the registry
for _, ns := range allNamespaces {
cb(ns)
}
}

func (r *registry) UnregisterStateChangeCallback(key any) {
r.cacheLock.Lock()
defer r.cacheLock.Unlock()
delete(r.stateChangeCallbacks, key)
}

// GetNamespace retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
// store and writes it to the cache with an expiry before returning back
func (r *registry) GetNamespace(name Name) (*Namespace, error) {
Expand Down Expand Up @@ -428,6 +464,7 @@ func (r *registry) refreshNamespaces(ctx context.Context) error {

var oldEntries []*Namespace
var newEntries []*Namespace
var stateChanged []*Namespace
UpdateLoop:
for _, namespace := range namespacesDb {
if namespace.notificationVersion >= namespaceNotificationVersion {
Expand All @@ -438,43 +475,55 @@ UpdateLoop:
// will be loaded into cache in the next refresh
break UpdateLoop
}
oldNS := r.updateIDToNamespaceCache(newCacheByID, namespace.ID(), namespace)
r.updateNameToIDCache(newCacheNameToID, namespace.Name(), namespace.ID())
oldNS, oldNSAnyVersion := r.updateIDToNamespaceCache(newCacheByID, namespace.ID(), namespace)
newCacheNameToID.Put(namespace.Name(), namespace.ID())

if oldNS != nil {
oldEntries = append(oldEntries, oldNS)
newEntries = append(newEntries, namespace)
}

if oldNSAnyVersion == nil || oldNSAnyVersion.State() != namespace.State() {
stateChanged = append(stateChanged, namespace)
}
}

var stateChangeCallbacks []StateChangeCallbackFn

// NOTE: READ REF BEFORE MODIFICATION
// ref: historyEngine.go registerNamespaceFailoverCallback function
r.publishCacheUpdate(func() (Namespaces, Namespaces) {
r.cacheLock.Lock()
defer r.cacheLock.Unlock()
r.cacheByID = newCacheByID
r.cacheNameToID = newCacheNameToID
stateChangeCallbacks = mapAnyValues(r.stateChangeCallbacks)
return oldEntries, newEntries
})
return nil
}

func (r *registry) updateNameToIDCache(c cache.Cache, name Name, id ID) {
c.Put(name, id)
// call state change callbacks
for _, cb := range stateChangeCallbacks {
for _, ns := range stateChanged {
cb(ns)
}
}

return nil
}

func (r *registry) updateIDToNamespaceCache(
cacheByID cache.Cache,
id ID,
newNS *Namespace,
) *Namespace {
) (*Namespace, *Namespace) {
oldCacheRec := cacheByID.Put(id, newNS)
if oldNS, ok := oldCacheRec.(*Namespace); ok &&
newNS.notificationVersion > oldNS.notificationVersion &&
r.globalNamespacesEnabled {
return oldNS
if oldNS, ok := oldCacheRec.(*Namespace); ok {
if newNS.notificationVersion > oldNS.notificationVersion && r.globalNamespacesEnabled {
return oldNS, oldNS
}
return nil, oldNS
}
return nil
return nil, nil
}

// getNamespace retrieves the information from the cache if it exists
Expand Down Expand Up @@ -544,3 +593,13 @@ func byName(name Name) *persistence.GetNamespaceRequest {
func byID(id ID) *persistence.GetNamespaceRequest {
return &persistence.GetNamespaceRequest{ID: id.String()}
}

// This is https://pkg.go.dev/golang.org/x/exp/maps#Values except that it works
// for map[any]T (see https://github.com/golang/go/issues/51257 and many more)
func mapAnyValues[T any](m map[any]T) []T {
r := make([]T, 0, len(m))
for _, v := range m {
r = append(r, v)
}
return r
}
24 changes: 24 additions & 0 deletions common/namespace/registry_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions service/worker/common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ type (
WorkerComponent interface {
// Register registers Workflow and Activity types provided by this worker component
Register(sdkworker.Worker)
// DedicatedWorkerOptions returns a DedicatedWorkerOptions for this worker component. Return nil to use
// default worker instance.
// DedicatedWorkerOptions returns a DedicatedWorkerOptions for this worker component.
// For regular (system namespace) components: return nil to use default worker instance.
// For per-namespace components: must not return nil.
DedicatedWorkerOptions() *DedicatedWorkerOptions
}

DedicatedWorkerOptions struct {
// For regular (system namespace) components: TaskQueue is optional
// For per-namespace components: TaskQueue is required
TaskQueue string
Options sdkworker.Options
// How many worker nodes should run a worker per namespace
NumWorkers int
// Other worker options
Options sdkworker.Options
}
)
1 change: 1 addition & 0 deletions service/worker/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var Module = fx.Options(
fx.Provide(PersistenceMaxQpsProvider),
fx.Provide(NewService),
fx.Provide(NewWorkerManager),
fx.Provide(NewPerNamespaceWorkerManager),
fx.Invoke(ServiceLifetimeHooks),
)

Expand Down

0 comments on commit ab6e338

Please sign in to comment.