/
config.go
115 lines (99 loc) · 4.32 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package srvtemporal
import (
"bytes"
"context"
"errors"
"fmt"
"time"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/worker"
)
type WorkflowPanicPolicy worker.WorkflowPanicPolicy
var (
workflowPanicPolicyBlock = []byte("block")
workflowPanicPolicyFail = []byte("fail")
ErrUnknownWorkflowPanicPolicy = errors.New("unknown workflow panic policy")
)
func (w *WorkflowPanicPolicy) UnmarshalText(text []byte) error {
switch {
case bytes.Equal(text, workflowPanicPolicyBlock):
*w = WorkflowPanicPolicy(worker.BlockWorkflow)
case bytes.Equal(text, workflowPanicPolicyFail):
*w = WorkflowPanicPolicy(worker.FailWorkflow)
default:
return fmt.Errorf("%w: %s", ErrUnknownWorkflowPanicPolicy, text)
}
return nil
}
type PlatformConfig struct {
MaxConcurrentActivityExecutionSize int `config:"max_concurrent_activity_execution_size"`
WorkerActivitiesPerSecond float64 `config:"worker_activities_per_second"`
MaxConcurrentLocalActivityExecutionSize int `config:"max_concurrent_local_activity_execution_size"`
WorkerLocalActivitiesPerSecond float64 `config:"worker_local_activities_per_second"`
TaskQueueActivitiesPerSecond float64 `config:"task_queue_activities_per_second"`
MaxConcurrentActivityTaskPollers int `config:"max_concurrent_activity_task_pollers"`
MaxConcurrentWorkflowTaskExecutionSize int `config:"max_concurrent_workflow_task_execution_size"`
MaxConcurrentWorkflowTaskPollers int `config:"max_concurrent_workflow_task_pollers"`
EnableLoggingInReplay bool `config:"enable_logging_in_replay"`
StickyScheduleToStartTimeout time.Duration `config:"sticky_schedule_to_start_timeout"`
WorkflowPanicPolicy WorkflowPanicPolicy `config:"workflow_panic_policy"`
WorkerStopTimeout time.Duration `config:"worker_stop_timeout"`
EnableSessionWorker bool `config:"enable_session_worker"`
MaxConcurrentSessionExecutionSize int `config:"max_concurrent_session_execution_size"`
DisableWorkflowWorker bool `config:"disable_workflow_worker"`
LocalActivityWorkerOnly bool `config:"local_activity_worker_only"`
Identity string `config:"identity"`
DeadlockDetectionTimeout time.Duration `config:"deadlock_detection_timeout"`
MaxHeartbeatThrottleInterval time.Duration `config:"max_heartbeat_throttle_interval"`
DefaultHeartbeatThrottleInterval time.Duration `config:"default_heartbeat_throttle_interval"`
DisableEagerActivities bool `config:"disable_eager_activities"`
MaxConcurrentEagerActivityExecutionSize int `config:"max_concurrent_eager_activity_execution_size"`
DisableRegistrationAliasing bool `config:"disable_registration_aliasing"`
BuildID string `config:"build_id"`
UseBuildIDForVersioning bool `config:"use_build_id_for_versioning"`
}
type config struct {
name string
taskQueue string
backgroundAcitivityContext context.Context
interceptors []interceptor.WorkerInterceptor
onFatalError func(error)
client client.Client
}
func defaultOpts() config {
return config{
name: "Temporal Worker Server",
}
}
type Option func(*config)
func WithName(name string) Option {
return func(c *config) {
c.name = name
}
}
func WithTaskQueue(taskQueue string) Option {
return func(c *config) {
c.taskQueue = taskQueue
}
}
func WithBackgroundActivityContext(ctx context.Context) Option {
return func(c *config) {
c.backgroundAcitivityContext = ctx
}
}
func WithInterceptors(interceptors ...interceptor.WorkerInterceptor) Option {
return func(c *config) {
c.interceptors = interceptors
}
}
func WithOnFatalError(onFatalError func(error)) Option {
return func(c *config) {
c.onFatalError = onFatalError
}
}
func WithClient(client client.Client) Option {
return func(c *config) {
c.client = client
}
}