/
worker.go
130 lines (112 loc) · 5.26 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cadence
import (
"context"
"github.com/uber-go/tally"
m "go.uber.org/cadence/.gen/go/cadence"
s "go.uber.org/cadence/.gen/go/shared"
"go.uber.org/zap"
)
type (
// Worker represents objects that can be started and stopped.
Worker interface {
// Start starts the worker in a non-blocking fashion
Start() error
// Run is a blocking start and cleans up resources when killed
// returns error only if it fails to start the worker
Run() error
// Stop cleans up any resources opened by worker
Stop()
}
// WorkerOptions is to configure a worker instance,
// for example (1) the logger or any specific metrics.
// (2) Whether to heart beat for activities automatically.
// Use NewWorkerOptions function to create an instance.
WorkerOptions struct {
// Optional: To set the maximum concurrent activity executions this host can have.
// The zero value of this uses the default value.
// default: defaultMaxConcurrentActivityExecutionSize(1k)
MaxConcurrentActivityExecutionSize int
// Optional: Sets the rate limiting on number of activities that can be executed per second. Notice that the
// number is represented in float, so that you can set it to less than 1 if needed. For example, set the number
// to 0.1 means you want your activity to be executed once for every 10 seconds. This can be used to protect
// down stream services from flooding.
// The zero value of this uses the default value.
// default: defaultMaxActivityExecutionRate(100k)
// Warning: activity's StartToCloseTimeout starts ticking even if a task is blocked due to rate limiting.
MaxActivityExecutionPerSecond float64
// Optional: if the activities need auto heart beating for those activities
// by the framework
// default: false not to heartbeat.
AutoHeartBeat bool
// Optional: Sets an identify that can be used to track this host for debugging.
// default: default identity that include hostname, groupName and process ID.
Identity string
// Optional: Metrics to be reported.
// default: no metrics.
MetricsScope tally.Scope
// Optional: Logger framework can use to log.
// default: default logger provided.
Logger *zap.Logger
// Optional: Enable logging in replay.
// In the decider you can use Cadence.GetLogger(ctx) to access logger that is replay aware.
// This will enable workflow decider code to log during
// the replay mode as well. This will be too verbose and often repeated logs.
// default: false
EnableLoggingInReplay bool
// Optional: Disable running workflow workers.
// default: false
DisableWorkflowWorker bool
// Optional: Disable running activity workers.
// default: false
DisableActivityWorker bool
// Optional: sets context for activity. The context can be used to pass any configuration to activity
// like common logger for all activities.
BackgroundActivityContext context.Context
}
// ActivityContextValueKey is a type to be used as the key for data attached
// as values to the BackgroundActivityContext.
ActivityContextValueKey int
)
// NewWorker creates an instance of worker for managing workflow and activity executions.
// service - thrift connection to the cadence server.
// domain - the name of the cadence domain.
// taskList - is the task list name you use to identify your client worker, also
// identifies group of workflow and activity implementations that are hosted by a single worker process.
// options - configure any worker specific options like logger, metrics, identity.
func NewWorker(
service m.TChanWorkflowService,
domain string,
taskList string,
options WorkerOptions,
) Worker {
return newAggregatedWorker(service, domain, taskList, options)
}
// GetWorkflowStackTrace returns a stack trace of all goroutines of a workflow given its current history.
// It requires workflow function that was used to create the history to be registered
// through RegisterWorkflow.
// Use Client.GetWorkflowStackTrace to get a stack trace given workflowID and runID.
func GetWorkflowStackTrace(h *s.History) (string, error) {
getHistoryPage := func(nextPageToken []byte) (*s.History, []byte, error) {
return h, nil, nil
}
return getWorkflowStackTraceImpl("unknown", "unknown", getHistoryPage)
}