forked from remind101/empire
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.go
246 lines (187 loc) · 6.03 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
// Package scheduler provides the core interface that Empire uses when
// interacting with a cluster of machines to run tasks.
package scheduler
import (
"fmt"
"io"
"time"
"golang.org/x/net/context"
"github.com/remind101/empire/pkg/image"
"github.com/remind101/pkg/logger"
)
type App struct {
// The id of the app.
ID string
// An identifier that represents the version of this release.
Release string
// The name of the app.
Name string
// The application environment.
Env map[string]string
// The application labels.
Labels map[string]string
// Process that belong to this app.
Processes []*Process
}
type Process struct {
// The type of process.
Type string
// The Image to run.
Image image.Image
// The Command to run.
Command []string
// Environment variables to set.
Env map[string]string
// Labels to set on the container.
Labels map[string]string
// The amount of RAM to allocate to this process in bytes.
MemoryLimit uint
// The amount of CPU to allocate to this process, out of 1024. Maps to
// the --cpu-shares flag for docker.
CPUShares uint
// ulimit -u
Nproc uint
// Instances is the desired instances of this service to run.
Instances uint
// Exposure is the level of exposure for this process.
Exposure *Exposure
// Can be used to setup a CRON schedule to run this task periodically.
Schedule Schedule
}
// Schedule represents a Schedule for scheduled tasks that run periodically.
type Schedule interface{}
// CRONSchedule is a Schedule implementation that represents a CRON expression.
type CRONSchedule string
// Exposure controls the exposure settings for a process.
type Exposure struct {
// External means that this process will be exposed to internet facing
// traffic, as opposed to being internal. How this is used is
// implementation specific. For ECS, this means that the attached ELB
// will be "internet-facing".
External bool
// The ports to expose and map to the container.
Ports []Port
}
// Port maps a host port to a container port.
type Port struct {
// The port that external applications will connect to. It's
// implementation specific as to what this is used for. For example,
// with ECS, this is used as the LoadBalancerPort.
Host int
// The port within the container that the process should bind to.
Container int
// The exposure type (e.g. HTTPExposure, HTTPSExposure, TCPExposure).
Protocol Protocol
}
// Protocol represents a service that a process exposes, like HTTP/HTTPS/TCP or
// SSL.
type Protocol interface {
Protocol() string
}
// HTTP represents an HTTP exposure.
type HTTP struct{}
func (e *HTTP) Protocol() string { return "http" }
// TCP represents a tcp exposure.
type TCP struct{}
func (e *TCP) Protocol() string { return "tcp" }
// HTTPS represents an HTTPS exposure
type HTTPS struct {
// The certificate to attach to the process.
Cert string
}
func (e *HTTPS) Protocol() string { return "https" }
// SSL represents a secure TCP exposure
type SSL struct {
// The certificate to attach to the process.
Cert string
}
func (e *SSL) Protocol() string { return "ssl" }
// Host represents the host of an instance
type Host struct {
// The host ID.
ID string
}
// Instance represents an Instance of a Process.
type Instance struct {
Process *Process
// The instance ID.
ID string
// The instance host
Host Host
// The State that this Instance is in.
State string
// The time that this instance was last updated.
UpdatedAt time.Time
}
type Runner interface {
// Run runs a process.
Run(ctx context.Context, app *App, process *Process, in io.Reader, out io.Writer) error
}
// Scheduler is an interface for interfacing with Services.
type Scheduler interface {
Runner
// Submit submits an app, creating it or updating it as necessary.
// When StatusStream is nil, Submit should return as quickly as possible,
// usually when the new version has been received, and validated. If
// StatusStream is not nil, it's recommended that the method not return until
// the deployment has fully completed.
Submit(context.Context, *App, StatusStream) error
// Remove removes the App.
Remove(ctx context.Context, app string) error
// Instance lists the instances of a Process for an app.
Instances(ctx context.Context, app string) ([]*Instance, error)
// Stop stops an instance. The scheduler will automatically start a new
// instance.
Stop(ctx context.Context, instanceID string) error
// Restart restarts the processes within the App.
Restart(context.Context, *App, StatusStream) error
}
// Env merges the App environment with any environment variables provided
// in the process.
func Env(app *App, process *Process) map[string]string {
return merge(app.Env, process.Env)
}
// Labels merges the App labels with any labels provided in the process.
func Labels(app *App, process *Process) map[string]string {
return merge(app.Labels, process.Labels)
}
// merges the maps together, favoring keys from the right to the left.
func merge(envs ...map[string]string) map[string]string {
merged := make(map[string]string)
for _, env := range envs {
for k, v := range env {
merged[k] = v
}
}
return merged
}
type Status struct {
// A friendly human readable message about the status change.
Message string
}
// String implements the fmt.Stringer interface.
func (s *Status) String() string {
return s.Message
}
// StatusStream is an interface for publishing status updates while a scheduler
// is executing.
type StatusStream interface {
// Publish publishes an update to the status stream
Publish(Status) error
}
// StatusStreamFunc is a function that implements the Statusstream interface
type StatusStreamFunc func(Status) error
func (fn StatusStreamFunc) Publish(status Status) error {
return fn(status)
}
// NullStatusStream a status stream that does nothing.
var NullStatusStream = StatusStreamFunc(func(status Status) error {
return nil
})
func Publish(ctx context.Context, stream StatusStream, msg string) {
if stream != nil {
if err := stream.Publish(Status{Message: msg}); err != nil {
logger.Warn(ctx, fmt.Sprintf("error publishing to stream: %v", err))
}
}
}