-
Notifications
You must be signed in to change notification settings - Fork 149
/
rules.go
195 lines (180 loc) · 7.4 KB
/
rules.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package controller
import (
"context"
"fmt"
"log"
"time"
. "github.com/mesos/mesos-go/api/v1/lib/extras/scheduler/eventrules"
"github.com/mesos/mesos-go/api/v1/lib/extras/store"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
)
// ErrEvent errors are generated by LiftErrors upon receiving an ERROR event from Mesos.
type ErrEvent string
func (e ErrEvent) Error() string {
return string(e)
}
// LiftErrors extract the error message from a scheduler error event and returns it as an ErrEvent
// so that downstream rules/handlers may continue processing.
func LiftErrors() Rule {
return func(ctx context.Context, e *scheduler.Event, err error, chain Chain) (context.Context, *scheduler.Event, error) {
if err != nil {
return chain(ctx, e, err)
}
if e.GetType() == scheduler.Event_ERROR {
// it's recommended that we abort and re-try subscribing; returning an
// error here will cause the event loop to terminate and the connection
// will be reset.
return chain(ctx, e, ErrEvent(e.GetError().GetMessage()))
}
return chain(ctx, e, nil)
}
}
// StateError is returned when the system encounters an unresolvable state transition error and
// should likely exit.
type StateError string
func (err StateError) Error() string { return string(err) }
func TrackSubscription(frameworkIDStore store.Singleton, failoverTimeout time.Duration) Rule {
return func(ctx context.Context, e *scheduler.Event, err error, chain Chain) (context.Context, *scheduler.Event, error) {
if err != nil {
return chain(ctx, e, err)
}
if e.GetType() == scheduler.Event_SUBSCRIBED {
var (
storedFrameworkID, err = frameworkIDStore.Get()
frameworkID = e.GetSubscribed().GetFrameworkID().GetValue()
)
if err != nil && err != store.ErrNotFound {
return chain(ctx, e, err)
}
// order of `if` statements are important: tread carefully w/ respect to future changes
if frameworkID == "" {
// sanity check, should **never** happen
return chain(ctx, e, StateError("mesos sent an empty frameworkID?!"))
}
if storedFrameworkID != "" && storedFrameworkID != frameworkID && failoverTimeout > 0 {
return chain(ctx, e, StateError(fmt.Sprintf(
"frameworkID changed unexpectedly; failover exceeded timeout? (%s).", failoverTimeout)))
}
if storedFrameworkID != frameworkID {
frameworkIDStore.Set(frameworkID)
}
}
return chain(ctx, e, nil)
}
}
// AckStatusUpdates sends an acknowledgement of a task status update back to mesos and drops the event if
// sending the ack fails. If successful, the specified err param (if any) is forwarded. Acknowledgements
// are only attempted for task status updates tagged with a UUID.
func AckStatusUpdates(caller calls.Caller) Rule {
return AckStatusUpdatesF(func() calls.Caller { return caller })
}
// AckStatusUpdatesF is a functional adapter for AckStatusUpdates, useful for cases where the caller may
// change over time. An error that occurs while ack'ing the status update is returned as a calls.AckError.
func AckStatusUpdatesF(callerLookup func() calls.Caller) Rule {
return func(ctx context.Context, e *scheduler.Event, err error, chain Chain) (context.Context, *scheduler.Event, error) {
// aggressively attempt to ack updates: even if there's pre-existing error state attempt
// to acknowledge all status updates.
origErr := err
if e.GetType() == scheduler.Event_UPDATE {
var (
s = e.GetUpdate().GetStatus()
uuid = s.GetUUID()
)
// only ACK non-empty UUID's, as per mesos scheduler spec
if len(uuid) > 0 {
ack := calls.Acknowledge(
s.GetAgentID().GetValue(),
s.TaskID.Value,
uuid,
)
err = calls.CallNoData(ctx, callerLookup(), ack)
if err != nil {
// TODO(jdef): not sure how important this is; if there's an error ack'ing
// because we beacame disconnected, then we'll just reconnect later and
// Mesos will ask us to ACK anyway -- why pay special attention to these
// call failures vs others?
err = &calls.AckError{Ack: ack, Cause: err}
return ctx, e, Error2(origErr, err) // drop (do not propagate to chain)
}
}
}
return chain(ctx, e, origErr)
}
}
// DefaultEventLabel is, by default, logged as the first argument by DefaultEventLogger
const DefaultEventLabel = "event"
// DefaultEventLogger logs the event via the `log` package.
func DefaultEventLogger(eventLabel string) func(*scheduler.Event) {
if eventLabel == "" {
return func(e *scheduler.Event) { log.Println(e) }
}
return func(e *scheduler.Event) { log.Println(eventLabel, e) }
}
// LogEvents returns a rule that logs scheduler events to the EventLogger
func LogEvents(f func(*scheduler.Event)) Rule {
if f == nil {
f = DefaultEventLogger(DefaultEventLabel)
}
return Rule(func(ctx context.Context, e *scheduler.Event, err error, chain Chain) (context.Context, *scheduler.Event, error) {
f(e)
return chain(ctx, e, err)
})
}
// AckOperationUpdates acknowledges an offer operation status update sent to the scheduler by the master.
// The AgentID isn't part of the event reported by the master, so it cannot be included in the generated ACK.
func AckOperationUpdates(caller calls.Caller) Rule {
return AckOperationUpdatesF(func() calls.Caller { return caller })
}
// AckOperationUpdatesF is a functional adapter for AckOperationUpdates, useful for cases where the caller may
// change over time. An error that occurs while ack'ing the status update is returned as a calls.AckError.
func AckOperationUpdatesF(callerLookup func() calls.Caller) Rule {
return func(ctx context.Context, e *scheduler.Event, err error, chain Chain) (context.Context, *scheduler.Event, error) {
// aggressively attempt to ack updates: even if there's pre-existing error state attempt
// to acknowledge all offer operation status updates.
origErr := err
if e.GetType() == scheduler.Event_UPDATE_OPERATION_STATUS {
var (
s = e.GetUpdateOperationStatus().GetStatus()
uuid = s.GetUUID().GetValue()
)
// only ACK non-empty UUID's, as per mesos scheduler spec.
if len(uuid) > 0 {
// the fact that we're receiving this offer operation status update means that the
// framework supplied an operation_id to the master when executing the offer operation,
// therefore the operation_id included in the status object here should be non-empty.
opID := s.GetOperationID().GetValue()
if opID == "" {
panic("expected non-empty offer operation ID for offer operation status update")
}
// try to extract a resource provider ID; we can safely assume that all converted resources
// are for the same provider ID (including a non-specified one).
rpID := ""
conv := s.GetConvertedResources()
for i := range conv {
id := conv[i].GetProviderID().GetValue()
if id != "" {
rpID = id
break
}
}
ack := calls.AcknowledgeOperationStatus(
"", // agentID: optional
rpID, // optional
uuid,
opID,
)
err = calls.CallNoData(ctx, callerLookup(), ack)
if err != nil {
// TODO(jdef): not sure how important this is; if there's an error ack'ing
// because we became disconnected, then we'll just reconnect later and
// Mesos will ask us to ACK anyway -- why pay special attention to these
// call failures vs others?
err = &calls.AckError{Ack: ack, Cause: err}
return ctx, e, Error2(origErr, err) // drop (do not propagate to chain)
}
}
}
return chain(ctx, e, origErr)
}
}