Skip to content

Commit

Permalink
fix(hatchery): various improvements (#3033)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored and sguiheux committed Jul 16, 2018
1 parent d974420 commit 183cedd
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 239 deletions.
3 changes: 0 additions & 3 deletions engine/hatchery/marathon/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,6 @@ func (h *HatcheryMarathon) killDisabledWorkers() error {
}

func (h *HatcheryMarathon) killAwolWorkers() error {
log.Debug("killAwolWorkers>")
workers, err := h.CDSClient().WorkerList()
if err != nil {
return err
Expand All @@ -555,8 +554,6 @@ func (h *HatcheryMarathon) killAwolWorkers() error {
return err
}

log.Debug("killAwolWorkers> check %d apps", len(apps.Apps))

var found bool
// then for each RUNNING marathon application
for _, app := range apps.Apps {
Expand Down
8 changes: 4 additions & 4 deletions engine/hatchery/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (c *Common) CommonServe(ctx context.Context, h hatchery.Interface) error {
log.Info("%s> Starting service %s...", c.Name, sdk.VERSION)
c.StartupTime = time.Now()

if err := hatchery.Create(h); err != nil {
return err
}

//Init the http server
c.initRouter(ctx, h)
server := &http.Server{
Expand All @@ -83,10 +87,6 @@ func (c *Common) CommonServe(ctx context.Context, h hatchery.Interface) error {
}
}()

if err := hatchery.Create(h); err != nil {
return err
}

return ctx.Err()
}

Expand Down
240 changes: 187 additions & 53 deletions sdk/hatchery/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/exec"
"os/signal"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -49,55 +50,59 @@ func Create(h Interface) error {
return fmt.Errorf("Create> Init error: %v", err)
}

hostname, errh := os.Hostname()
if errh != nil {
return fmt.Errorf("Create> Cannot retrieve hostname: %s", errh)
// Register the current hatchery to be sure it's authentifed on CDS API before doing any call
if err := Register(h); err != nil {
return fmt.Errorf("Create> Register error: %v", err)
}

sdk.GoRoutine("heartbeat", func() {
hearbeat(h, h.Configuration().API.Token, h.Configuration().API.MaxHeartbeatFailures)
})
// Call WorkerModel Enabled first
var errwm error
models, errwm = h.CDSClient().WorkerModelsEnabled()
if errwm != nil {
log.Error("error on h.CDSClient().WorkerModelsEnabled() (init call): %v", errwm)
}

tickerProvision := time.NewTicker(time.Duration(h.Configuration().Provision.Frequency) * time.Second)
tickerRegister := time.NewTicker(time.Duration(h.Configuration().Provision.RegisterFrequency) * time.Second)
tickerGetModels := time.NewTicker(10 * time.Second)

defer func() {
tickerProvision.Stop()
tickerRegister.Stop()
tickerGetModels.Stop()
}()

pbjobs := make(chan sdk.PipelineBuildJob, 1)
wjobs := make(chan sdk.WorkflowNodeJobRun, 10)
errs := make(chan error, 1)

sdk.GoRoutine("queuePolling", func() {
if err := h.CDSClient().QueuePolling(ctx, wjobs, pbjobs, errs, 2*time.Second, h.Configuration().Provision.GraceTimeQueued, nil); err != nil {
log.Error("Queues polling stopped: %v", err)
cancel()
}
})

// Create a cache with a default expiration time of 3 second, and which
// purges expired items every minute
spawnIDs := cache.New(10*time.Second, 60*time.Second)
// This is a local cache to avoid analysing a job twice at the same time
receivedIDs := cache.New(5*time.Second, 60*time.Second)

tickerProvision := time.NewTicker(time.Duration(h.Configuration().Provision.Frequency) * time.Second)
tickerRegister := time.NewTicker(time.Duration(h.Configuration().Provision.RegisterFrequency) * time.Second)
tickerGetModels := time.NewTicker(3 * time.Second)

defer func() {
tickerProvision.Stop()
tickerRegister.Stop()
tickerGetModels.Stop()
}()
sdk.GoRoutine("heartbeat", func() {
hearbeat(h, h.Configuration().API.Token, h.Configuration().API.MaxHeartbeatFailures)
})

// Call WorkerModel Enabled first
var errwm error
models, errwm = h.CDSClient().WorkerModelsEnabled()
if errwm != nil {
log.Error("error on h.CDSClient().WorkerModelsEnabled() (init call): %v", errwm)
}
sdk.GoRoutine("queuePolling", func() {
if err := h.CDSClient().QueuePolling(ctx, wjobs, pbjobs, errs, 2*time.Second, h.Configuration().Provision.GraceTimeQueued, nil); err != nil {
log.Error("Queues polling stopped: %v", err)
cancel()
}
})

// hatchery is now fully Initialized
h.SetInitialized()

// run the starters pool
workersStartChan, workerStartResultChan := startWorkerStarters(h)

hostname, errh := os.Hostname()
if errh != nil {
return fmt.Errorf("Create> Cannot retrieve hostname: %s", errh)
}
// read the result channel in another goroutine to let the main goroutine start new workers
sdk.GoRoutine("checkStarterResult", func() {
for startWorkerRes := range workerStartResultChan {
Expand Down Expand Up @@ -137,7 +142,6 @@ func Create(h Interface) error {
return ctx.Err()

case <-tickerGetModels.C:

var errwm error
models, errwm = h.CDSClient().WorkerModelsEnabled()
if errwm != nil {
Expand All @@ -160,9 +164,9 @@ func Create(h Interface) error {
continue
}

//Check if hatchery if able to provision
if !checkProvisioning(h) {
log.Info("hatchery is not able to provision new worker")
//Check if hatchery if able to start a new worker
if !checkCapacities(h) {
log.Info("hatchery %s is not able to provision new worker", h.Hatchery().Name)
continue
}

Expand All @@ -173,31 +177,54 @@ func Create(h Interface) error {
}

//Ask to start
workersStartChan <- workerStarterRequest{
workerRequest := workerStarterRequest{
id: j.ID,
isWorkflowJob: false,
execGroups: j.ExecGroups,
models: models,
requirements: j.Job.Action.Requirements,
hostname: hostname,
timestamp: time.Now().Unix(),
}

// Check at least one worker model can match
var chosenModel *sdk.Model
for i := range models {
if canRunJob(h, workerRequest, models[i]) {
chosenModel = &models[i]
}
}

if chosenModel == nil {
//do something
continue
}

workerRequest.model = *chosenModel

workersStartChan <- workerRequest

case j := <-wjobs:
t0 := time.Now()
if j.ID == 0 {
continue
}

// Check if the jobs has been received less than 10s ago
if _, exist := receivedIDs.Get(string(j.ID)); exist {
log.Debug("job %d is alrealy being analyzed", j.ID)
continue
}
receivedIDs.SetDefault(string(j.ID), j.ID)

//Check if the jobs is concerned by a pending worker creation
if _, exist := spawnIDs.Get(string(j.ID)); exist {
log.Debug("job %d already spawned in previous routine", j.ID)
continue
}

//Check bookedBy current hatchery
if j.BookedBy.ID != 0 && j.BookedBy.ID != h.ID() {
log.Debug("hatchery> job %d is booked by someone else (%d / %d)", j.ID, j.BookedBy.ID, h.ID())
receivedIDs.Delete(string(j.ID))
continue
}

Expand All @@ -208,41 +235,53 @@ func Create(h Interface) error {
continue
}

//Check spawnsID
if _, exist := spawnIDs.Get(string(j.ID)); exist {
log.Debug("job %d already spawned in previous routine", j.ID)
receivedIDs.Delete(string(j.ID))
continue
}

log.Debug("Analyzing job %d", j.ID)

//Check if hatchery is able to provision
if !checkProvisioning(h) {
receivedIDs.Delete(string(j.ID))
//Check if hatchery if able to start a new worker
if !checkCapacities(h) {
log.Info("hatchery %s is not able to provision new worker", h.Hatchery().Name)
continue
}

//Ask to start
log.Info("Request a worker for job %d", j.ID)

workersStartChan <- workerStarterRequest{
workerRequest := workerStarterRequest{
id: j.ID,
isWorkflowJob: true,
execGroups: j.ExecGroups,
models: models,
requirements: j.Job.Action.Requirements,
hostname: hostname,
timestamp: time.Now().Unix(),
spawnAttempts: j.SpawnAttempts,
workflowNodeRunID: j.WorkflowNodeRunID,
}

// Check at least one worker model can match
var chosenModel *sdk.Model
for i := range models {
if canRunJob(h, workerRequest, models[i]) {
chosenModel = &models[i]
}
}

// No model has been found, let's send a failing result
if chosenModel == nil {
workerStartResultChan <- workerStarterResult{
request: workerRequest,
isRun: false,
temptToSpawn: true,
}
continue
}

//We got a model, let's start a worker
workerRequest.model = *chosenModel

//Ask to start
log.Info("hatchery> Request a worker for job %d (%.3f seconds elapsed)", j.ID, time.Since(t0).Seconds())
workersStartChan <- workerRequest

case err := <-errs:
log.Error("%v", err)

case <-tickerProvision.C:
provisioning(h, h.Configuration().Provision.Disabled, models)
provisioning(h, models)

case <-tickerRegister.C:
if err := workerRegister(h, workersStartChan); err != nil {
Expand Down Expand Up @@ -270,6 +309,101 @@ func CheckRequirement(r sdk.Requirement) (bool, error) {
}
}

func canRunJob(h Interface, j workerStarterRequest, model sdk.Model) bool {
if model.Type != h.ModelType() {
return false
}

// If the model needs registration, don't spawn for now
if h.NeedRegistration(&model) {
log.Debug("canRunJob> model %s needs registration", model.Name)
return false
}

// if current hatchery is in same group than worker model -> do not avoid spawn, even if worker model is in error
if model.NbSpawnErr > 5 && h.Hatchery().GroupID != model.ID {
log.Warning("canRunJob> Too many errors on spawn with model %s, please check this worker model", model.Name)
return false
}

if len(j.execGroups) > 0 {
checkGroup := false
for _, g := range j.execGroups {
if g.ID == model.GroupID {
checkGroup = true
break
}
}
if !checkGroup {
log.Debug("canRunJob> job %d - model %s attached to group %d can't run this job", j.id, model.Name, model.GroupID)
return false
}
}

var containsModelRequirement, containsHostnameRequirement bool
for _, r := range j.requirements {
switch r.Type {
case sdk.ModelRequirement:
containsModelRequirement = true
case sdk.HostnameRequirement:
containsHostnameRequirement = true
}
}
// Common check
for _, r := range j.requirements {
// If requirement is a Model requirement, it's easy. It's either can or can't run
// r.Value could be: theModelName --port=8888:9999, so we take strings.Split(r.Value, " ")[0] to compare
// only modelName
if r.Type == sdk.ModelRequirement && strings.Split(r.Value, " ")[0] != model.Name {
log.Debug("canRunJob> %d - job %d - model requirement r.Value(%s) != model.Name(%s)", j.timestamp, j.id, strings.Split(r.Value, " ")[0], model.Name)
return false
}

// If requirement is an hostname requirement, it's for a specific worker
if r.Type == sdk.HostnameRequirement && r.Value != j.hostname {
log.Debug("canRunJob> %d - job %d - hostname requirement r.Value(%s) != hostname(%s)", j.timestamp, j.id, r.Value, j.hostname)
return false
}

// service and memory requirements are only supported by docker model
if model.Type != sdk.Docker && (r.Type == sdk.ServiceRequirement || r.Type == sdk.MemoryRequirement) {
log.Debug("canRunJob> %d - job %d - job with service requirement or memory requirement: only for model docker. current model:%s", j.timestamp, j.id, model.Type)
return false
}

// Skip network access requirement as we can't check it
if r.Type == sdk.NetworkAccessRequirement || r.Type == sdk.PluginRequirement || r.Type == sdk.ServiceRequirement || r.Type == sdk.MemoryRequirement {
log.Debug("canRunJob> %d - job %d - job with service requirement or memory requirement: only for model docker. current model:%s", j.timestamp, j.id, model.Type)
continue
}

if r.Type == sdk.OSArchRequirement && model.RegisteredOS != "" && model.RegisteredArch != "" && r.Value != (model.RegisteredOS+"/"+model.RegisteredArch) {
log.Debug("canRunJob> %d - job %d - job with OSArch requirement: cannot spawn on this OSArch. current model: %s/%s", j.timestamp, j.id, model.RegisteredOS, model.RegisteredArch)
return false
}

if !containsModelRequirement && !containsHostnameRequirement {
if r.Type == sdk.BinaryRequirement {
found := false
// Check binary requirement against worker model capabilities
for _, c := range model.RegisteredCapabilities {
if r.Value == c.Value || r.Value == c.Name {
found = true
break
}
}

if !found {
log.Debug("canRunJob> %d - job %d - model(%s) does not have binary %s(%s) for this job.", j.timestamp, j.id, model.Name, r.Name, r.Value)
return false
}
}
}
}

return h.CanSpawn(&model, j.id, j.requirements)
}

func logTime(h Interface, name string, then time.Time) {
d := time.Since(then)
if d > time.Duration(h.Configuration().LogOptions.SpawnOptions.ThresholdCritical)*time.Second {
Expand Down
Loading

0 comments on commit 183cedd

Please sign in to comment.