-
Notifications
You must be signed in to change notification settings - Fork 23
/
server.go
265 lines (230 loc) · 8.2 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
// Copyright 2017-2019, Square, Inc.
// Package server bootstraps and runs the Job Runner.
package server
import (
"encoding/json"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/orcaman/concurrent-map"
log "github.com/sirupsen/logrus"
"github.com/square/spincycle/config"
"github.com/square/spincycle/job-runner/api"
"github.com/square/spincycle/job-runner/app"
"github.com/square/spincycle/job-runner/chain"
"github.com/square/spincycle/job-runner/runner"
"github.com/square/spincycle/job-runner/status"
"github.com/square/spincycle/jobs"
"github.com/square/spincycle/request-manager"
)
type Server struct {
appCtx app.Context
api *api.API
traverserRepo cmap.ConcurrentMap
chainRepo chain.Repo
rmc rm.Client
shutdownChan chan struct{}
apiStopped chan struct{}
stopMux sync.Mutex
stopped bool
}
func NewServer(appCtx app.Context) *Server {
return &Server{
appCtx: appCtx,
stopMux: sync.Mutex{},
apiStopped: make(chan struct{}),
shutdownChan: make(chan struct{}),
}
}
// Run runs the Job Runner API in the foreground. It returns when the API stops
// running (either from an error, or after a call to Stop). If a custom RunAPI
// hook has been provided, it will be called to run the API instead of the default
// api.Run.
//
// If stopOnSignal = true, the server will listen for TERM and INT signals from the
// OS and call Stop to shut itself down when those signals are received. Else, the
// caller must call Stop to shut down the server.
func (s *Server) Run(stopOnSignal bool) error {
if s.api == nil {
panic("Server.Run called before Server.Boot")
}
if s.stopped {
return fmt.Errorf("server stopped")
}
// If stopOnSignal = true, watch for TERM + INT signals from the OS and shut
// down the Job Runner when we receive them.
if stopOnSignal {
go s.waitForShutdown()
}
// Every second, send updated finished jobs counts for all running chains.
// This is best effort, so no error handling or logger here. When a chain
// completes, its final finished jobs count is sent with FinishRequest.
go func() {
finishedJobs := status.FinishedJobs{
ChainRepo: s.chainRepo,
RMC: s.rmc,
}
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
finishedJobs.Update()
case <-s.shutdownChan:
return
}
}
}()
// Run the API - this will block until the API is stopped (or encounters
// some fatal error). If the RunAPI hook has been provided, call that instead
// of the default api.Run.
var err error
if s.appCtx.Hooks.RunAPI != nil {
err = s.appCtx.Hooks.RunAPI()
} else {
err = s.api.Run()
}
// If the server was stopped (as opposed to some error within the API), wait
// to make sure it's done shutting down the API before returning.
if s.stopped {
<-s.apiStopped
}
if err != nil {
return fmt.Errorf("error from API: %s", err)
}
return nil
}
// Boot sets up the server. It must be called before calling Run.
func (s *Server) Boot() error {
// Only run Boot once.
if s.api != nil {
return nil
}
// Either both or neither RunAPI and StopAPI hooks must be provided - can't
// have just one.
// @todo: this needs to happen earlier
if (s.appCtx.Hooks.RunAPI == nil) != (s.appCtx.Hooks.StopAPI == nil) {
return fmt.Errorf("Only one of RunAPI and StopAPI hooks provided - either both or neither must be provided.")
}
// Load config file
cfg, err := s.appCtx.Hooks.LoadConfig(s.appCtx)
if err != nil {
return fmt.Errorf("error loading config: %s", err)
}
// Override with env vars, if set
cfg.Server.Addr = config.Env("SPINCYCLE_SERVER_ADDR", cfg.Server.Addr)
cfg.Server.TLS.CertFile = config.Env("SPINCYCLE_SERVER_TLS_CERT_FILE", cfg.Server.TLS.CertFile)
cfg.Server.TLS.KeyFile = config.Env("SPINCYCLE_SERVER_TLS_KEY_FILE", cfg.Server.TLS.KeyFile)
cfg.Server.TLS.CAFile = config.Env("SPINCYCLE_SERVER_TLS_CA_FILE", cfg.Server.TLS.CAFile)
cfg.RMClient.ServerURL = config.Env("SPINCYCLE_RM_CLIENT_URL", cfg.RMClient.ServerURL)
cfg.RMClient.TLS.CertFile = config.Env("SPINCYCLE_RM_CLIENT_TLS_CERT_FILE", cfg.RMClient.TLS.CertFile)
cfg.RMClient.TLS.KeyFile = config.Env("SPINCYCLE_RM_CLIENT_TLS_KEY_FILE", cfg.RMClient.TLS.KeyFile)
cfg.RMClient.TLS.CAFile = config.Env("SPINCYCLE_RM_CLIENT_TLS_CA_FILE", cfg.RMClient.TLS.CAFile)
s.appCtx.Config = cfg
cfgstr, _ := json.MarshalIndent(cfg, "", " ")
log.Printf("Config: %s", cfgstr)
// JR uses Request Manager client to send back job logs, suspend job chains,
// and tell the RM when a job chain (request) is done
rmc, err := s.appCtx.Factories.MakeRequestManagerClient(s.appCtx)
if err != nil {
return fmt.Errorf("MakeRequestManagerClient: %s", err)
}
s.rmc = rmc
// Chain repo holds running job chains in memory. It's primarily used by
// chain.Traversers while running chains. It's also used by status.Manager
// to report status back to RM (then back to user).
s.chainRepo = chain.NewMemoryRepo()
// Runner Factory makes a job.Runner to run one job. It's used by chain.Traversers
// to run jobs.
rf := runner.NewFactory(jobs.Factory, rmc)
// Traverser Factory is used by API to make a new chain.Traverser to run a
// job chain. These are stored in a Traverser Repo (just a map) so API can
// keep track of what's running.
trFactory := chain.NewTraverserFactory(s.chainRepo, rf, rmc, s.shutdownChan)
s.traverserRepo = cmap.New()
// Status Manager reports what's happening in the JR
stat := status.NewManager(s.traverserRepo)
// Base URL is what this JR reports itself as, e.g. https://spin-jr.prod.local:32307
// The RM saves this so it knows which JR to query to get the status of a
// given request.
baseURL, err := s.appCtx.Hooks.ServerURL(s.appCtx)
if err != nil {
return fmt.Errorf("error getting base server URL: %s", err)
}
// The API instance
apiCfg := api.Config{
AppCtx: s.appCtx,
TraverserFactory: trFactory,
TraverserRepo: s.traverserRepo,
StatusManager: stat,
ShutdownChan: s.shutdownChan,
BaseURL: baseURL,
}
s.api = api.NewAPI(apiCfg)
return nil
}
// Stop stops the server. It signals running traversers to shut down and then
// stops the API (using either the default api.Stop or the StopAPI hook if
// provided). Once Stop has been called, the server cannot be reused - future calls
// to Run will return an error.
//
// If stopOnSignal was set when calling Run, Stop will automatically be called by
// the server on receiving a TERM or INT signal from the OS. Otherwise, you must
// call Stop when you want to shut down the Job Runner.
func (s *Server) Stop() error {
// Only stop once. We lock the whole Stop call, so that, if Stop is called
// multiple times in quick succession, no calls will return before the server
// has actually been shut down.
s.stopMux.Lock()
defer s.stopMux.Unlock()
if s.stopped {
return nil
}
s.stopped = true
log.Infof("Stopping Job Runner server")
// Running traversers watch shutdownChan - closing this tells them to shut down.
// The API will also begin refusing to start running new job chains.
close(s.shutdownChan)
// Wait for all traversers to shut down. Timeout if they aren't done
// within 20 seconds, and continue to shutting down the API.
timeout := time.After(20 * time.Second)
WAIT_FOR_TRAVERSERS:
for !s.traverserRepo.IsEmpty() {
select {
case <-time.After(10 * time.Millisecond):
// Check again if traversers are all done.
case <-timeout:
break WAIT_FOR_TRAVERSERS
}
}
// Stop the API, using the StopAPI hook if provided and api.Stop otherwise.
var err error
if s.appCtx.Hooks.StopAPI != nil {
err = s.appCtx.Hooks.StopAPI()
} else {
err = s.api.Stop()
}
close(s.apiStopped) // indicate to Run that the API is done shutting down
if err != nil {
return fmt.Errorf("error stopping API: %s", err)
}
return nil
}
// API returns the Job Runner API created in Boot.
func (s *Server) API() *api.API {
return s.api
}
// --------------------------------------------------------------------------
// Catch TERM and INT signals to gracefully shut down the Job Runner
func (s *Server) waitForShutdown() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
err := s.Stop()
if err != nil {
log.Errorf("error shutting down server: %s", err)
}
}