Skip to content

Commit

Permalink
Add namespace argument to PerNSWorkerComponent.Register (#2847)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed May 17, 2022
1 parent b385075 commit 7d42654
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 14 deletions.
33 changes: 26 additions & 7 deletions service/worker/common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,41 @@

package common

import sdkworker "go.temporal.io/sdk/worker"
import (
sdkworker "go.temporal.io/sdk/worker"
"go.temporal.io/server/common/namespace"
)

type (
// WorkerComponent represent a type of work needed for worker role
// WorkerComponent represents a type of work needed for worker role
WorkerComponent interface {
// Register registers Workflow and Activity types provided by this worker component
// Register registers Workflow and Activity types provided by this worker component.
Register(sdkworker.Worker)
// DedicatedWorkerOptions returns a DedicatedWorkerOptions for this worker component.
// For regular (system namespace) components: return nil to use default worker instance.
// For per-namespace components: must not return nil.
// Return nil to use default worker instance.
DedicatedWorkerOptions() *DedicatedWorkerOptions
}

DedicatedWorkerOptions struct {
// For regular (system namespace) components: TaskQueue is optional
// For per-namespace components: TaskQueue is required
// TaskQueue is optional
TaskQueue string
// How many worker nodes should run a worker per namespace
NumWorkers int
// Other worker options
Options sdkworker.Options
}

// PerNSWorkerComponent represents a per-namespace worker needed for worker role
PerNSWorkerComponent interface {
// Register registers Workflow and Activity types provided by this worker component.
// The namespace that this worker is running in is also provided.
Register(sdkworker.Worker, *namespace.Namespace)
// DedicatedWorkerOptions returns a DedicatedWorkerOptions for this worker component.
DedicatedWorkerOptions() *PerNSDedicatedWorkerOptions
}

PerNSDedicatedWorkerOptions struct {
// TaskQueue is required
TaskQueue string
// How many worker nodes should run a worker per namespace
NumWorkers int
Expand Down
14 changes: 7 additions & 7 deletions service/worker/pernamespaceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type (
Logger log.Logger
SdkClientFactory sdk.ClientFactory
NamespaceRegistry namespace.Registry
Components []workercommon.WorkerComponent `group:"perNamespaceWorkerComponent"`
Components []workercommon.PerNSWorkerComponent `group:"perNamespaceWorkerComponent"`
}

perNamespaceWorkerManager struct {
Expand All @@ -66,7 +66,7 @@ type (
namespaceRegistry namespace.Registry
self *membership.HostInfo
serviceResolver membership.ServiceResolver
components []workercommon.WorkerComponent
components []workercommon.PerNSWorkerComponent

membershipChangedCh chan *membership.ChangedEvent

Expand All @@ -79,7 +79,7 @@ type (
wm *perNamespaceWorkerManager

lock sync.Mutex
workers map[workercommon.WorkerComponent]*worker
workers map[workercommon.PerNSWorkerComponent]*worker
}

worker struct {
Expand Down Expand Up @@ -179,7 +179,7 @@ func (wm *perNamespaceWorkerManager) getWorkerSet(ns *namespace.Namespace) *work
ws := &workerSet{
ns: ns,
wm: wm,
workers: make(map[workercommon.WorkerComponent]*worker),
workers: make(map[workercommon.PerNSWorkerComponent]*worker),
}

wm.workerSets[ns.ID()] = ws
Expand Down Expand Up @@ -207,7 +207,7 @@ func (ws *workerSet) refresh() {
}

func (ws *workerSet) refreshComponent(
cmp workercommon.WorkerComponent,
cmp workercommon.PerNSWorkerComponent,
nsExists bool,
) {
op := func() error {
Expand Down Expand Up @@ -268,15 +268,15 @@ func (ws *workerSet) refreshComponent(
backoff.Retry(op, policy, nil)
}

func (ws *workerSet) startWorker(wc workercommon.WorkerComponent) (*worker, error) {
func (ws *workerSet) startWorker(wc workercommon.PerNSWorkerComponent) (*worker, error) {
client, err := ws.wm.sdkClientFactory.NewClient(ws.ns.Name().String(), ws.wm.logger)
if err != nil {
return nil, err
}

options := wc.DedicatedWorkerOptions()
sdkworker := sdkworker.New(client, options.TaskQueue, options.Options)
wc.Register(sdkworker)
wc.Register(sdkworker, ws.ns)
// TODO: use Run() and handle post-startup errors by recreating worker
// (after sdk supports returning post-startup errors from Run)
err = sdkworker.Start()
Expand Down

0 comments on commit 7d42654

Please sign in to comment.