/
distributor.go
155 lines (142 loc) · 7.12 KB
/
distributor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package distributor contains all the adaptors for the various supported
// distributor protocols. At a high level, it works like this:
// * Quests specify a distributor configuration by name as part of their
// identity.
// * When an Execution for that Quest NeedsExecution, DM reads configuration
// (distributor.proto) from luci-config. This configuration is stored
// as part of the Execution so that for the duration of a given Exectuion,
// DM always interacts with the same distributor in the same way (barring
// code changes in DM's adapter logic itself).
// * DM uses the selected distributor implementation to start a task and
// record its Token. Additionally, the distributor SHOULD publish on DM's
// pubsub topic to update DM's state. When publishing updates, the
// distributor MUST include the token returned from PrepareTopic (or else
// the published update will be ignored).
// * When DM gets a hit on pubsub, it will load the Execution, load its cached
// distributor configuration, and then call HandleNotification for the
// adapter to parse the notification body and return the state of the task.
//
// Adding a new distributor requires:
// * Add a new subdir of protos with the configuration proto for the new
// distributor. Each distributor implementation must have its own unique
// Config message.
// * Add a matching subdir of this package for the implementation of the
// distributor.
// * In the implementation, add a Register method that registers the
// implementation with this package appropriately.
// * In the DM frontend, import your new package implementation and run its
// Register method.
package distributor
import (
"context"
"net/http"
"time"
dm "github.com/TriggerMail/luci-go/dm/api/service/v1"
)
// Token is an opaque token that a distributor should use to
// uniquely identify a single DM execution.
type Token string
// Notification represents a notification from the distributor to DM that
// a particular execution has a status update. Data and Attrs are interpreted
// purely by the distributor implementation.
type Notification struct {
ID *dm.Execution_ID
Data []byte
Attrs map[string]string
}
// D is the interface for all distributor implementations.
//
// Retries
//
// Unless otherwise noted, DM will retry methods here if they return an error
// marked as Transient, up to some internal limit. If they return
// a non-Transient error (or nil) DM will make a best effort not to duplicate
// calls, but it can't guarantee that.
type D interface {
// Run prepares and runs a new Task from the given parameters.
//
// Scheduling the same execution ID multiple times SHOULD return the same
// Token. It's OK if this doesn't happen, but only one of the scheduled tasks
// will be able to invoke ActivateExecution; the other one(s) will
// early-abort and/or timeout.
//
// If this returns a non-Transient error, the Execution will be marked as
// Rejected with the returned error message as the 'Reason'.
//
// The various time durations, if non-zero, will be used verbatim for DM to
// timeout that phase of the task's execution. If the task's execution times
// out in the 'STOPPING' phase, DM will poll the distributor's GetStatus
// method up to 3 times with a 30-second gap to attempt to retrieve the final
// information. After more than 3 times, DM will give up and mark the task as
// expired.
//
// If the distributor doesn't intend to use Pubsub for notifying DM about the
// final status of the job, set pollTimeout to the amount of time you want DM
// to wait before polling GetStatus. e.g. if after calling FinishAttempt or
// EnsureGraphData your distributor needs 10 seconds before it can correctly
// respond to a GetStatus request, you should set pollTimeout to >= 10s.
// Otherwise pollTimeout should be set fairly high (e.g. 12 hours) as a hedge
// against a broken pubsub notification pipeline.
//
// If you have the choice between pubsub or not, prefer to use pubsub as it
// allows DM to more proactively update the graph state (and unblock waiting
// Attempts, etc.)
Run(qst *dm.Quest_Desc, auth *dm.Execution_Auth, prevResult *dm.JsonResult) (tok Token, pollTimeout time.Duration, err error)
// Cancel attempts to cancel a running task. If a task is canceled more than
// once, this should return nil.
Cancel(*dm.Quest_Desc, Token) error
// GetStatus retrieves the current state of the task from the distributor.
//
// If this returns a non-Transient error more than 30 seconds after the task
// was Run(), the execution will be marked Missing with the returned error
// message as the 'Reason'. If it returns a non-Transient error within 30
// seconds of being run, DM will automatically treat that as Transient.
GetStatus(*dm.Quest_Desc, Token) (*dm.Result, error)
// InfoURL calculates a user-presentable information url for the task
// identified by Token. This should be a local operation, so it is not the
// implementation's responsibility to validate the token in this method (e.g.
// it could point to a non-existent job, etc.)
InfoURL(Token) string
// HandleNotification is called whenever DM receives a PubSub message sent to
// a topic created with Config.PrepareTopic. The Attrs map will omit
// the 'auth_token' field.
//
// Returning (nil, nil) will indicate that DM should ignore this notification.
//
// DM will convert pubsub Messages to a delayed GetStatus if a pubsub message
// is delivered which refers to an Attempt whose status is NeedsExecution,
// which could happen in the event of a not-fully-settled transacion.
//
// DM will ignore any notifications for executions which it doesn't know
// about.
HandleNotification(qst *dm.Quest_Desc, notification *Notification) (*dm.Result, error)
// HandleTaskQueueTask is called if the distributor used Config.EnqueueTask.
//
// It may return zero or more Notifications for DM about arbitrary Executions.
// These notifications will be handled 'later' by the HandleNotification
// implementation.
HandleTaskQueueTask(*http.Request) ([]*Notification, error)
// Validate should return a non-nil error if the given distributor parameters
// are not appropriate for this Distributor. Payload is guaranteed to be
// a valid JSON object. This should validate that the content of that JSON
// object is what the distributor expects.
Validate(parameters string) error
}
// Factory is a function which produces new distributor instance with the
// provided configuration proto.
//
// c is guaranteed to be non-transactional.
type Factory func(c context.Context, dist *Config) (D, error)