/
task.go
157 lines (137 loc) · 3.42 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package delayed
import (
"bytes"
"reflect"
"runtime"
"github.com/keakon/golog/log"
"github.com/shamaton/msgpack/v2"
)
// Task is the interface of both GoTask and PyTask.
type Task interface {
Serialize() ([]byte, error)
getFuncPath() string
}
// RawGoTask store the fields need to be serialized for a GoTask.
type RawGoTask struct {
FuncPath string
Payload []byte // serialized arg
}
// GoTask store a RawGoTask and the serialized data.
type GoTask struct {
raw RawGoTask // make it unexported but can be serialized by MessagePack
arg interface{}
data []byte // serialized data
}
// NewGoTask creates a new GoTask by the function path.
func NewGoTask(funcPath string, arg ...interface{}) *GoTask {
var a interface{}
if len(arg) == 1 {
a = arg[0]
} else {
a = arg
}
return &GoTask{
raw: RawGoTask{
FuncPath: funcPath,
},
arg: a,
}
}
// NewGoTaskOfFunc creates a new GoTask by a function.
// It's about 100x slower than NewGoTask.
func NewGoTaskOfFunc(f interface{}, arg ...interface{}) *GoTask {
fn := reflect.ValueOf(f)
if fn.Kind() != reflect.Func {
return nil
}
funcPath := runtime.FuncForPC(fn.Pointer()).Name()
if funcPath == "" {
return nil
}
var a interface{}
if len(arg) == 1 {
a = arg[0]
} else {
a = arg
}
return &GoTask{
raw: RawGoTask{
FuncPath: funcPath,
},
arg: a,
}
}
// Equal returns if two tasks are equal.
// It may return false if one task is not serialized and the other is deserialized.
func (t *GoTask) Equal(task *GoTask) bool {
return t.raw.FuncPath == task.raw.FuncPath && (bytes.Equal(t.raw.Payload, task.raw.Payload) || reflect.DeepEqual(t.arg, task.arg))
}
// Serialize returns the serialized data of the task.
func (t *GoTask) Serialize() (data []byte, err error) {
if len(t.data) != 0 {
return t.data, nil
}
if t.arg != nil {
t.raw.Payload, err = msgpack.MarshalAsArray(t.arg)
if err != nil {
log.Errorf("Failed to serialize task.arg: %v", err)
return
}
}
t.data, err = msgpack.MarshalAsArray(&t.raw)
if err != nil {
log.Errorf("Failed to serialize task.data: %v", err)
return
}
return t.data, nil
}
// DeserializeGoTask creates a new GoTask from the serialized data.
func DeserializeGoTask(data []byte) (task *GoTask, err error) {
t := &GoTask{
data: data,
}
err = msgpack.UnmarshalAsArray(data, &t.raw)
if err != nil {
log.Errorf("Failed to deserialize task: %v", err)
return
}
return t, nil
}
func (t *GoTask) getFuncPath() string {
return t.raw.FuncPath
}
// RawPyTask store the fields need to be serialized for a PyTask.
type RawPyTask struct {
FuncPath string
Args interface{} // must be slice, array or nil
KwArgs interface{} // must be map, struct or nil
}
// PyTask store a RawPyTask and the serialized data.
type PyTask struct {
raw RawPyTask // make it unexported but can be serialized by MessagePack
data []byte // serialized data
}
// NewPyTask creates a new PyTask by the function path.
func NewPyTask(funcPath string, args, kwArgs interface{}) *PyTask {
return &PyTask{
raw: RawPyTask{
FuncPath: funcPath,
Args: args,
KwArgs: kwArgs,
},
}
}
// Serialize returns the serialized data of the task.
func (t *PyTask) Serialize() (data []byte, err error) {
if t.data == nil {
t.data, err = msgpack.MarshalAsArray(&t.raw)
if err != nil {
log.Errorf("Failed to serialize task.data: %v", err)
return
}
}
return t.data, nil
}
func (t *PyTask) getFuncPath() string {
return t.raw.FuncPath
}