mirrored from https://chromium.googlesource.com/infra/luci/luci-go
/
policy.go
132 lines (120 loc) · 4.6 KB
/
policy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package policy contains implementation of triggering policy functions.
package policy
import (
"fmt"
"time"
"github.com/golang/protobuf/proto"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/scheduler/appengine/internal"
"go.chromium.org/luci/scheduler/appengine/messages"
"go.chromium.org/luci/scheduler/appengine/task"
)
// Environment is used by the triggering policy for getting transient
// information about the environment and for logging.
//
// TODO(vadimsh): This is intentionally mostly empty for now. Will be extended
// on as-needed basis.
type Environment interface {
// DebugLog appends a line to the triage text log.
DebugLog(format string, args ...interface{})
}
// In contains parameters for a triggering policy function.
type In struct {
// Now is the time when the policy function was called.
Now time.Time
// ActiveInvocations is a set of currently running invocations of the job.
ActiveInvocations []int64
// Triggers is a list of pending triggers sorted by time, more recent last.
Triggers []*internal.Trigger
}
// Out contains the decision of a triggering policy function.
type Out struct {
// Requests is a list of requests to start new invocations (if any).
//
// Each request contains parameters that will be passed to a new invocation by
// the engine. The policy is responsible for filling them in.
//
// Triggers specified in the each request will be removed from the set of
// pending triggers (they are consumed).
Requests []task.Request
}
// Func is the concrete implementation of a triggering policy.
//
// It looks at the current state of the job and its pending triggers list and
// (optionally) emits a bunch of requests to start new invocations.
//
// It is a pure function without any side effects (except, perhaps, logging into
// the given environment).
type Func func(Environment, In) Out
// New is a factory that takes TriggeringPolicy proto and returns a concrete
// function that implements this policy.
//
// The returned function will be used only during one triage round and then
// discarded.
//
// The caller is responsible for filling in all default values in the proto if
// necessary. Use UnmarshalDefinition to deserialize TriggeringPolicy filling in
// the defaults.
//
// Returns an error if the TriggeringPolicy message can't be
// understood (for example, it references an undefined policy kind).
func New(p *messages.TriggeringPolicy) (Func, error) {
// There will be more kinds here, so this should technically be a switch, even
// though currently it is not a very interesting switch.
switch p.Kind {
case messages.TriggeringPolicy_GREEDY_BATCHING:
return GreedyBatchingPolicy(int(p.MaxConcurrentInvocations), int(p.MaxBatchSize))
case messages.TriggeringPolicy_LOGARITHMIC_BATCHING:
return LogarithmicBatchingPolicy(int(p.MaxConcurrentInvocations), int(p.MaxBatchSize), float64(p.LogBase))
default:
return nil, errors.Reason("unrecognized triggering policy kind %d", p.Kind).Err()
}
}
var defaultPolicy = messages.TriggeringPolicy{
Kind: messages.TriggeringPolicy_GREEDY_BATCHING,
MaxConcurrentInvocations: 1,
MaxBatchSize: 1000,
}
// Default instantiates default triggering policy function.
//
// Is is used if jobs do not define a triggering policy or it can't be
// understood.
func Default() Func {
f, err := New(&defaultPolicy)
if err != nil {
panic(fmt.Errorf("failed to instantiate default triggering policy - %s", err))
}
return f
}
// UnmarshalDefinition deserializes TriggeringPolicy, filling in defaults.
func UnmarshalDefinition(b []byte) (*messages.TriggeringPolicy, error) {
msg := &messages.TriggeringPolicy{}
if len(b) != 0 {
if err := proto.Unmarshal(b, msg); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal TriggeringPolicy").Err()
}
}
if msg.Kind == 0 {
msg.Kind = defaultPolicy.Kind
}
if msg.MaxConcurrentInvocations == 0 {
msg.MaxConcurrentInvocations = defaultPolicy.MaxConcurrentInvocations
}
if msg.MaxBatchSize == 0 {
msg.MaxBatchSize = defaultPolicy.MaxBatchSize
}
return msg, nil
}