forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 1
/
delay.go
275 lines (241 loc) · 8.06 KB
/
delay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
// Copyright 2011 Google Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
/*
Package delay provides a way to execute code outside the scope of a
user request by using the taskqueue API.
To declare a function that may be executed later, call Func
in a top-level assignment context, passing it an arbitrary string key
and a function whose first argument is of type appengine.Context.
var laterFunc = delay.Func("key", myFunc)
It is also possible to use a function literal.
var laterFunc = delay.Func("key", func(c appengine.Context, x string) {
// ...
})
To call a function, invoke its Call method.
laterFunc.Call(c, "something")
A function may be called any number of times. If the function has any
return arguments, and the last one is of type error, the function may
return a non-nil error to signal that the function should be retried.
The arguments to functions may be of any type that is encodable by the gob
package. If an argument is of interface type, it is the client's responsibility
to register with the gob package whatever concrete type may be passed for that
argument; see http://golang.org/pkg/gob/#Register for details.
Any errors during initialization or execution of a function will be
logged to the application logs. Error logs that occur during initialization will
be associated with the request that invoked the Call method.
The state of a function invocation that has not yet successfully
executed is preserved by combining the file name in which it is declared
with the string key that was passed to the Func function. Updating an app
with pending function invocations is safe as long as the relevant
functions have the (filename, key) combination preserved.
The delay package uses the Task Queue API to create tasks that call the
reserved application path "/_ah/queue/go/delay".
This path must not be marked as "login: required" in app.yaml;
it must be marked as "login: admin" or have no access restriction.
*/
package delay
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"net/http"
"reflect"
"runtime"
"google.golang.org/appengine"
"google.golang.org/appengine/taskqueue"
)
// Function represents a function that may have a delayed invocation.
type Function struct {
fv reflect.Value // Kind() == reflect.Func
key string
err error // any error during initialization
}
const (
// The HTTP path for invocations.
path = "/_ah/queue/go/delay"
// Use the default queue.
queue = ""
)
var (
// registry of all delayed functions
funcs = make(map[string]*Function)
// precomputed types
contextType = reflect.TypeOf((*appengine.Context)(nil)).Elem()
errorType = reflect.TypeOf((*error)(nil)).Elem()
// errors
errFirstArg = errors.New("first argument must be appengine.Context")
)
// Func declares a new Function. The second argument must be a function with a
// first argument of type appengine.Context.
// This function must be called at program initialization time. That means it
// must be called in a global variable declaration or from an init function.
// This restriction is necessary because the instance that delays a function
// call may not be the one that executes it. Only the code executed at program
// initialization time is guaranteed to have been run by an instance before it
// receives a request.
func Func(key string, i interface{}) *Function {
f := &Function{fv: reflect.ValueOf(i)}
// Derive unique, somewhat stable key for this func.
_, file, _, _ := runtime.Caller(1)
f.key = file + ":" + key
t := f.fv.Type()
if t.Kind() != reflect.Func {
f.err = errors.New("not a function")
return f
}
if t.NumIn() == 0 || t.In(0) != contextType {
f.err = errFirstArg
return f
}
// Register the function's arguments with the gob package.
// This is required because they are marshaled inside a []interface{}.
// gob.Register only expects to be called during initialization;
// that's fine because this function expects the same.
for i := 0; i < t.NumIn(); i++ {
// Only concrete types may be registered. If the argument has
// interface type, the client is resposible for registering the
// concrete types it will hold.
if t.In(i).Kind() == reflect.Interface {
continue
}
gob.Register(reflect.Zero(t.In(i)).Interface())
}
funcs[f.key] = f
return f
}
type invocation struct {
Key string
Args []interface{}
}
// Call invokes a delayed function.
// f.Call(c, ...)
// is equivalent to
// t, _ := f.Task(...)
// taskqueue.Add(c, t, "")
func (f *Function) Call(c appengine.Context, args ...interface{}) {
t, err := f.Task(args...)
if err != nil {
c.Errorf("%v", err)
return
}
if _, err := taskqueueAdder(c, t, queue); err != nil {
c.Errorf("delay: taskqueue.Add failed: %v", err)
return
}
}
// Task creates a Task that will invoke the function.
// Its parameters may be tweaked before adding it to a queue.
// Users should not modify the Path or Payload fields of the returned Task.
func (f *Function) Task(args ...interface{}) (*taskqueue.Task, error) {
if f.err != nil {
return nil, fmt.Errorf("delay: func is invalid: %v", f.err)
}
nArgs := len(args) + 1 // +1 for the appengine.Context
ft := f.fv.Type()
minArgs := ft.NumIn()
if ft.IsVariadic() {
minArgs--
}
if nArgs < minArgs {
return nil, fmt.Errorf("delay: too few arguments to func: %d < %d", nArgs, minArgs)
}
if !ft.IsVariadic() && nArgs > minArgs {
return nil, fmt.Errorf("delay: too many arguments to func: %d > %d", nArgs, minArgs)
}
// Check arg types.
for i := 1; i < nArgs; i++ {
at := reflect.TypeOf(args[i-1])
var dt reflect.Type
if i < minArgs {
// not a variadic arg
dt = ft.In(i)
} else {
// a variadic arg
dt = ft.In(minArgs).Elem()
}
// nil arguments won't have a type, so they need special handling.
if at == nil {
// nil interface
switch dt.Kind() {
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice:
continue // may be nil
}
return nil, fmt.Errorf("delay: argument %d has wrong type: %v is not nilable", i, dt)
}
switch at.Kind() {
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice:
av := reflect.ValueOf(args[i-1])
if av.IsNil() {
// nil value in interface; not supported by gob, so we replace it
// with a nil interface value
args[i-1] = nil
}
}
if !at.AssignableTo(dt) {
return nil, fmt.Errorf("delay: argument %d has wrong type: %v is not assignable to %v", i, at, dt)
}
}
inv := invocation{
Key: f.key,
Args: args,
}
buf := new(bytes.Buffer)
if err := gob.NewEncoder(buf).Encode(inv); err != nil {
return nil, fmt.Errorf("delay: gob encoding failed: %v", err)
}
return &taskqueue.Task{
Path: path,
Payload: buf.Bytes(),
}, nil
}
var taskqueueAdder = taskqueue.Add // for testing
func init() {
http.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
runFunc(appengine.NewContext(req), w, req)
})
}
func runFunc(c appengine.Context, w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
var inv invocation
if err := gob.NewDecoder(req.Body).Decode(&inv); err != nil {
c.Errorf("delay: failed decoding task payload: %v", err)
c.Warningf("delay: dropping task")
return
}
f := funcs[inv.Key]
if f == nil {
c.Errorf("delay: no func with key %q found", inv.Key)
c.Warningf("delay: dropping task")
return
}
ft := f.fv.Type()
in := []reflect.Value{reflect.ValueOf(c)}
for _, arg := range inv.Args {
var v reflect.Value
if arg != nil {
v = reflect.ValueOf(arg)
} else {
// Task was passed a nil argument, so we must construct
// the zero value for the argument here.
n := len(in) // we're constructing the nth argument
var at reflect.Type
if !ft.IsVariadic() || n < ft.NumIn()-1 {
at = ft.In(n)
} else {
at = ft.In(ft.NumIn() - 1).Elem()
}
v = reflect.Zero(at)
}
in = append(in, v)
}
out := f.fv.Call(in)
if n := ft.NumOut(); n > 0 && ft.Out(n-1) == errorType {
if errv := out[n-1]; !errv.IsNil() {
c.Errorf("delay: func failed (will retry): %v", errv.Interface())
w.WriteHeader(http.StatusInternalServerError)
return
}
}
}