forked from uber-go/cadence-client
/
error.go
305 lines (263 loc) · 11.4 KB
/
error.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package internal
import (
"errors"
"fmt"
"strings"
"go.uber.org/cadence/.gen/go/shared"
)
/*
Below are the possible cases that activity could fail:
1) *CustomError: (this should be the most common one)
If activity implementation returns *CustomError by using NewCustomError() API, workflow code would receive *CustomError.
The err would contain a Reason and Details. The reason is what activity specified to NewCustomError(), which workflow
code could check to determine what kind of error it was and take actions based on the reason. The details is encoded
[]byte which workflow code could extract strong typed data. Workflow code needs to know what the types of the encoded
details are before extracting them.
2) *GenericError:
If activity implementation returns errors other than from NewCustomError() API, workflow code would receive *GenericError.
Use err.Error() to get the string representation of the actual error.
3) *CanceledError:
If activity was canceled, workflow code will receive instance of *CanceledError. When activity cancels itself by
returning NewCancelError() it would supply optional details which could be extracted by workflow code.
4) *TimeoutError:
If activity was timed out (several timeout types), workflow code will receive instance of *TimeoutError. The err contains
details about what type of timeout it was.
5) *PanicError:
If activity code panic while executing, cadence activity worker will report it as activity failure to cadence server.
The cadence client library will present that failure as *PanicError to workflow code. The err contains a string
representation of the panic message and the call stack when panic was happen.
Workflow code could handle errors based on different types of error. Below is sample code of how error handling looks like.
_, err := workflow.ExecuteActivity(ctx, MyActivity, ...).Get(nil)
if err != nil {
switch err := err.(type) {
case *workflow.CustomError:
// handle activity errors (created via NewCustomError() API)
switch err.Reason() {
case CustomErrReasonA: // assume CustomErrReasonA is constant defined by activity implementation
var detailMsg string // assuming activity return error by NewCustomError(CustomErrReasonA, "string details")
err.Details(&detailMsg) // extract strong typed details (corresponding to CustomErrReasonA)
// handle CustomErrReasonA
case CustomErrReasonB:
// handle CustomErrReasonB
default:
// newer version of activity could return new errors that workflow was not aware of.
}
case *workflow.GenericError:
// handle generic error (errors created other than using NewCustomError() API)
case *workflow.CanceledError:
// handle cancellation
case *workflow.TimeoutError:
// handle timeout, could check timeout type by err.TimeoutType()
case *workflow.PanicError:
// handle panic
}
}
Errors from child workflow should be handled in a similar way, except that there should be no *PanicError from child workflow.
When panic happen in workflow implementation code, cadence client library catches that panic and causing the decision timeout.
That decision task will be retried at a later time (with exponential backoff retry intervals).
*/
type (
// CustomError returned from workflow and activity implementations with reason and optional details.
CustomError struct {
reason string
details EncodedValues
}
// GenericError returned from workflow/workflow when the implementations return errors other than from NewCustomError() API.
GenericError struct {
err string
}
// TimeoutError returned when activity or child workflow timed out.
TimeoutError struct {
timeoutType shared.TimeoutType
details EncodedValues
}
// CanceledError returned when operation was canceled.
CanceledError struct {
details EncodedValues
}
// TerminatedError returned when workflow was terminated.
TerminatedError struct {
}
// PanicError contains information about panicked workflow/activity.
PanicError struct {
value string
stackTrace string
}
// ContinueAsNewError contains information about how to continue the workflow as new.
ContinueAsNewError struct {
wfn interface{}
args []interface{}
options *workflowOptions
}
)
const (
errReasonPanic = "cadenceInternal:Panic"
errReasonGeneric = "cadenceInternal:Generic"
errReasonCanceled = "cadenceInternal:Canceled"
errReasonTimeout = "cadenceInternal:Timeout"
)
// ErrNoData is returned when trying to extract strong typed data while there is no data available.
var ErrNoData = errors.New("no data available")
// ErrActivityResultPending is returned from activity's implementation to indicate the activity is not completed when
// activity method returns. Activity needs to be completed by Client.CompleteActivity() separately. For example, if an
// activity require human interaction (like approve an expense report), the activity could return activity.ErrResultPending
// which indicate the activity is not done yet. Then, when the waited human action happened, it needs to trigger something
// that could report the activity completed event to cadence server via Client.CompleteActivity() API.
var ErrActivityResultPending = errors.New("not error: do not autocomplete, using Client.CompleteActivity() to complete")
// NewCustomError create new instance of *CustomError with reason and optional details.
func NewCustomError(reason string, details ...interface{}) *CustomError {
if strings.HasPrefix(reason, "cadenceInternal:") {
panic("'cadenceInternal:' is reserved prefix, please use different reason")
}
data, err := getHostEnvironment().encodeArgs(details)
if err != nil {
panic(err)
}
return &CustomError{reason: reason, details: data}
}
// NewTimeoutError creates TimeoutError instance.
// Use NewHeartbeatTimeoutError to create heartbeat TimeoutError
func NewTimeoutError(timeoutType shared.TimeoutType) *TimeoutError {
return &TimeoutError{timeoutType: timeoutType}
}
// NewHeartbeatTimeoutError creates TimeoutError instance
func NewHeartbeatTimeoutError(details ...interface{}) *TimeoutError {
data, err := getHostEnvironment().encodeArgs(details)
if err != nil {
panic(err)
}
return &TimeoutError{timeoutType: shared.TimeoutTypeHeartbeat, details: data}
}
// NewCanceledError creates CanceledError instance
func NewCanceledError(details ...interface{}) *CanceledError {
data, err := getHostEnvironment().encodeArgs(details)
if err != nil {
panic(err)
}
return &CanceledError{details: data}
}
// NewContinueAsNewError creates ContinueAsNewError instance
// If the workflow main function returns this error then the current execution is ended and
// the new execution with same workflow ID is started automatically with options
// provided to this function.
// ctx - use context to override any options for the new workflow like execution time out, decision task time out, task list.
// if not mentioned it would use the defaults that the current workflow is using.
// ctx := WithExecutionStartToCloseTimeout(ctx, 30 * time.Minute)
// ctx := WithWorkflowTaskStartToCloseTimeout(ctx, time.Minute)
// ctx := WithWorkflowTaskList(ctx, "example-group")
// wfn - workflow function. for new execution it can be different from the currently running.
// args - arguments for the new workflow.
//
func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) *ContinueAsNewError {
// Validate type and its arguments.
workflowType, input, err := getValidatedWorkflowFunction(wfn, args)
if err != nil {
panic(err)
}
options := getWorkflowEnvOptions(ctx)
if options == nil {
panic("context is missing required options for continue as new")
}
if options.taskListName == nil || *options.taskListName == "" {
panic("invalid task list provided")
}
if options.executionStartToCloseTimeoutSeconds == nil || *options.executionStartToCloseTimeoutSeconds <= 0 {
panic("invalid executionStartToCloseTimeoutSeconds provided")
}
if options.taskStartToCloseTimeoutSeconds == nil || *options.taskStartToCloseTimeoutSeconds <= 0 {
panic("invalid taskStartToCloseTimeoutSeconds provided")
}
options.workflowType = workflowType
options.input = input
return &ContinueAsNewError{wfn: wfn, args: args, options: options}
}
// Error from error interface
func (e *CustomError) Error() string {
return e.reason
}
// Reason gets the reason of this custom error
func (e *CustomError) Reason() string {
return e.reason
}
// HasDetails return if this error has strong typed detail data.
func (e *CustomError) HasDetails() bool {
return e.details.HasValues()
}
// Details extracts strong typed detail data of this custom error. If there is no details, it will return ErrNoData.
func (e *CustomError) Details(d ...interface{}) error {
return e.details.Get(d...)
}
// Error from error interface
func (e *GenericError) Error() string {
return e.err
}
// Error from error interface
func (e *TimeoutError) Error() string {
return fmt.Sprintf("TimeoutType: %v", e.timeoutType)
}
// TimeoutType return timeout type of this error
func (e *TimeoutError) TimeoutType() shared.TimeoutType {
return e.timeoutType
}
// HasDetails return if this error has strong typed detail data.
func (e *TimeoutError) HasDetails() bool {
return e.details.HasValues()
}
// Details extracts strong typed detail data of this error. If there is no details, it will return ErrNoData.
func (e *TimeoutError) Details(d ...interface{}) error {
return e.details.Get(d...)
}
// Error from error interface
func (e *CanceledError) Error() string {
return "CanceledError"
}
// HasDetails return if this error has strong typed detail data.
func (e *CanceledError) HasDetails() bool {
return e.details.HasValues()
}
// Details extracts strong typed detail data of this error.
func (e *CanceledError) Details(d ...interface{}) error {
return e.details.Get(d...)
}
func newPanicError(value interface{}, stackTrace string) *PanicError {
return &PanicError{value: fmt.Sprintf("%v", value), stackTrace: stackTrace}
}
// Error from error interface
func (e *PanicError) Error() string {
return e.value
}
// StackTrace return stack trace of the panic
func (e *PanicError) StackTrace() string {
return e.stackTrace
}
// Error from error interface
func (e *ContinueAsNewError) Error() string {
return "ContinueAsNew"
}
// newTerminatedError creates NewTerminatedError instance
func newTerminatedError() *TerminatedError {
return &TerminatedError{}
}
// Error from error interface
func (e *TerminatedError) Error() string {
return "Terminated"
}