forked from RichardKnop/machinery
/
task.go
157 lines (131 loc) · 4.07 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package tasks
import (
"context"
"errors"
"fmt"
"reflect"
"runtime/debug"
opentracing "github.com/opentracing/opentracing-go"
opentracing_ext "github.com/opentracing/opentracing-go/ext"
opentracing_log "github.com/opentracing/opentracing-go/log"
"github.com/RichardKnop/machinery/v1/log"
)
// ErrTaskPanicked ...
var ErrTaskPanicked = errors.New("Invoking task caused a panic")
// Task wraps a signature and methods used to reflect task arguments and
// return values after invoking the task
type Task struct {
TaskFunc reflect.Value
UseContext bool
Context context.Context
Args []reflect.Value
}
// New tries to use reflection to convert the function and arguments
// into a reflect.Value and prepare it for invocation
func New(taskFunc interface{}, args []Arg) (*Task, error) {
task := &Task{
TaskFunc: reflect.ValueOf(taskFunc),
Context: context.Background(),
}
taskFuncType := reflect.TypeOf(taskFunc)
if taskFuncType.NumIn() > 0 {
arg0Type := taskFuncType.In(0)
if IsContextType(arg0Type) {
task.UseContext = true
}
}
if err := task.ReflectArgs(args); err != nil {
return nil, fmt.Errorf("Reflect task args error: %s", err)
}
return task, nil
}
// Call attempts to call the task with the supplied arguments.
//
// `err` is set in the return value in two cases:
// 1. The reflected function invocation panics (e.g. due to a mismatched
// argument list).
// 2. The task func itself returns a non-nil error.
func (t *Task) Call() (taskResults []*TaskResult, err error) {
// retrieve the span from the task's context and finish it as soon as this function returns
if span := opentracing.SpanFromContext(t.Context); span != nil {
defer span.Finish()
}
defer func() {
// Recover from panic and set err.
if e := recover(); e != nil {
switch e := e.(type) {
default:
err = ErrTaskPanicked
case error:
err = e
case string:
err = errors.New(e)
}
// mark the span as failed and dump the error and stack trace to the span
if span := opentracing.SpanFromContext(t.Context); span != nil {
opentracing_ext.Error.Set(span, true)
span.LogFields(
opentracing_log.Error(err),
opentracing_log.Object("stack", string(debug.Stack())),
)
}
// Print stack trace
log.ERROR.Printf("%s", debug.Stack())
}
}()
args := t.Args
if t.UseContext {
ctxValue := reflect.ValueOf(t.Context)
args = append([]reflect.Value{ctxValue}, args...)
}
// Invoke the task
results := t.TaskFunc.Call(args)
// Task must return at least a value
if len(results) == 0 {
return nil, ErrTaskReturnsNoValue
}
// Last returned value
lastResult := results[len(results)-1]
// If the last returned value is not nil, it has to be of error type, if that
// is not the case, return error message, otherwise propagate the task error
// to the caller
if !lastResult.IsNil() {
// If the result implements Retriable interface, return instance of Retriable
retriableErrorInterface := reflect.TypeOf((*Retriable)(nil)).Elem()
if lastResult.Type().Implements(retriableErrorInterface) {
return nil, lastResult.Interface().(ErrRetryTaskLater)
}
// Otherwise, check that the result implements the standard error interface,
// if not, return ErrLastReturnValueMustBeError error
errorInterface := reflect.TypeOf((*error)(nil)).Elem()
if !lastResult.Type().Implements(errorInterface) {
return nil, ErrLastReturnValueMustBeError
}
// Return the standard error
return nil, lastResult.Interface().(error)
}
// Convert reflect values to task results
taskResults = make([]*TaskResult, len(results)-1)
for i := 0; i < len(results)-1; i++ {
val := results[i].Interface()
typeStr := reflect.TypeOf(val).String()
taskResults[i] = &TaskResult{
Type: typeStr,
Value: val,
}
}
return taskResults, err
}
// ReflectArgs converts []TaskArg to []reflect.Value
func (t *Task) ReflectArgs(args []Arg) error {
argValues := make([]reflect.Value, len(args))
for i, arg := range args {
argValue, err := ReflectValue(arg.Type, arg.Value)
if err != nil {
return err
}
argValues[i] = argValue
}
t.Args = argValues
return nil
}