forked from RichardKnop/machinery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.go
127 lines (105 loc) · 2.93 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package tasks
import (
"errors"
"fmt"
"reflect"
"runtime/debug"
"context"
"github.com/RichardKnop/machinery/v1/log"
)
// ErrTaskPanicked ...
var ErrTaskPanicked = errors.New("Invoking task caused a panic")
// Task wraps a signature and methods used to reflect task arguments and
// return values after invoking the task
type Task struct {
TaskFunc reflect.Value
UseContext bool
Args []reflect.Value
}
// New tries to use reflection to convert the function and arguments
// into a reflect.Value and prepare it for invocation
func New(taskFunc interface{}, args []Arg) (*Task, error) {
task := &Task{
TaskFunc: reflect.ValueOf(taskFunc),
}
taskFuncType := reflect.TypeOf(taskFunc)
if taskFuncType.NumIn() > 0 {
arg0Type := taskFuncType.In(0)
if IsContextType(arg0Type) {
task.UseContext = true
}
}
if err := task.ReflectArgs(args); err != nil {
return nil, fmt.Errorf("Reflect task args error: %s", err)
}
return task, nil
}
// Call attempts to call the task with the supplied arguments.
//
// `err` is set in the return value in two cases:
// 1. The reflected function invocation panics (e.g. due to a mismatched
// argument list).
// 2. The task func itself returns a non-nil error.
func (t *Task) Call() (taskResults []*TaskResult, err error) {
defer func() {
// Recover from panic and set err.
if e := recover(); e != nil {
switch e := e.(type) {
default:
err = ErrTaskPanicked
case error:
err = e
case string:
err = errors.New(e)
}
// Print stack trace
log.ERROR.Printf("%s", debug.Stack())
}
}()
args := t.Args
if t.UseContext {
ctx := context.Background()
ctxValue := reflect.ValueOf(ctx)
args = append([]reflect.Value{ctxValue}, args...)
}
// Invoke the task
results := t.TaskFunc.Call(args)
// Task must return at least a value
if len(results) == 0 {
return nil, ErrTaskReturnsNoValue
}
// Last returned value
lastResult := results[len(results)-1]
// If the last returned value is not nil, it has to be of error type, if that
// is not the case, return error message, otherwise propagate the task error
// to the caller
if !lastResult.IsNil() {
errorInterface := reflect.TypeOf((*error)(nil)).Elem()
if !lastResult.Type().Implements(errorInterface) {
return nil, ErrLastReturnValueMustBeError
}
return nil, lastResult.Interface().(error)
}
// Convert reflect values to task results
taskResults = make([]*TaskResult, len(results)-1)
for i := 0; i < len(results)-1; i++ {
taskResults[i] = &TaskResult{
Type: reflect.TypeOf(results[i].Interface()).String(),
Value: results[i].Interface(),
}
}
return taskResults, err
}
// ReflectArgs converts []TaskArg to []reflect.Value
func (t *Task) ReflectArgs(args []Arg) error {
argValues := make([]reflect.Value, len(args))
for i, arg := range args {
argValue, err := ReflectValue(arg.Type, arg.Value)
if err != nil {
return err
}
argValues[i] = argValue
}
t.Args = argValues
return nil
}