Skip to content

Commit

Permalink
fix(hatchery): improvements (#3025)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored and sguiheux committed Jul 11, 2018
1 parent bf58db8 commit 6bf8b45
Show file tree
Hide file tree
Showing 27 changed files with 1,040 additions and 651 deletions.
26 changes: 3 additions & 23 deletions engine/api/worker.go
Expand Up @@ -68,37 +68,22 @@ func (api *API) disableWorkerHandler() Handler {
vars := mux.Vars(r)
id := vars["id"]

tx, err := api.mustDB().Begin()
if err != nil {
return sdk.WrapError(err, "disabledWorkerHandler> Cannot start tx")
}
defer tx.Rollback()

wor, err := worker.LoadWorker(tx, id)
if err != nil {
if _, err := worker.LoadWorker(api.mustDB(), id); err != nil {
if err != sql.ErrNoRows {
return sdk.WrapError(err, "disabledWorkerHandler> Cannot load worker %s", id)
}
return sdk.WrapError(sdk.ErrNotFound, "disabledWorkerHandler> Cannot load worker %s", id)
}

if wor.Status == sdk.StatusBuilding || wor.Status == sdk.StatusChecking {
return sdk.WrapError(sdk.ErrForbidden, "Cannot disable a worker with status %s", wor.Status)
}

if err := worker.UpdateWorkerStatus(tx, id, sdk.StatusDisabled); err != nil {
if err := worker.DisableWorker(api.mustDB(), id); err != nil {
if err == worker.ErrNoWorker || err == sql.ErrNoRows {
return sdk.WrapError(sdk.ErrWrongRequest, "disableWorkerHandler> worker %s does not exists", id)
}
return sdk.WrapError(err, "disableWorkerHandler> cannot update worker status")
}

if err := tx.Commit(); err != nil {
return sdk.WrapError(err, "disableWorkerHandler> cannot commit tx")
}

//Remove the worker from the cache
key := cache.Key("worker", wor.ID)
key := cache.Key("worker", id)
api.Cache.Delete(key)

return nil
Expand Down Expand Up @@ -131,11 +116,6 @@ func (api *API) workerCheckingHandler() Handler {
return sdk.WrapError(errW, "workerCheckingHandler> Unable to load worker %s", workerC.ID)
}

if wk.Status != sdk.StatusWaiting {
log.Debug("workerCheckingHandler> Worker %s cannot be Checking. Current status: %s", wk.Name, wk.Status)
return nil
}

if err := worker.SetStatus(api.mustDB(), wk.ID, sdk.StatusChecking); err != nil {
return sdk.WrapError(err, "workerCheckingHandler> cannot update worker %s", workerC.ID)
}
Expand Down
1 change: 0 additions & 1 deletion engine/api/worker_model.go
Expand Up @@ -470,7 +470,6 @@ func (api *API) getWorkerModelsHandler() Handler {
if errbyuser != nil {
return sdk.WrapError(errbyuser, "getWorkerModels> cannot load worker models for user id %d", getUser(ctx).ID)
}
log.Debug("getWorkerModels> for user %d named %s (admin:%t)", getUser(ctx).ID, getUser(ctx).Username, getUser(ctx).Admin)

return WriteJSON(w, models, http.StatusOK)
}
Expand Down
3 changes: 3 additions & 0 deletions engine/api/workflow/execute_node_job_run.go
Expand Up @@ -482,6 +482,9 @@ func BookNodeJobRun(store cache.Store, id int64, hatchery *sdk.Hatchery) (*sdk.H
store.SetWithTTL(k, hatchery, 120)
return nil, nil
}
if h.ID == hatchery.ID {
return nil, nil
}
return &h, sdk.WrapError(sdk.ErrJobAlreadyBooked, "BookNodeJobRun> job %d already booked by %s (%d)", id, h.Name, h.ID)
}

Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow_queue.go
Expand Up @@ -40,7 +40,7 @@ func (api *API) postTakeWorkflowJobHandler() Handler {

p, errP := project.LoadProjectByNodeJobRunID(ctx, api.mustDB(), api.Cache, id, getUser(ctx), project.LoadOptions.WithVariables, project.LoadOptions.WithClearKeys)
if errP != nil {
return sdk.WrapError(errP, "postTakeWorkflowJobHandler> Cannot load project nodeJobRunID:%d", id)
return sdk.WrapError(errP, "postTakeWorkflowJobHandler> Cannot load project by nodeJobRunID:%d", id)
}

//Load worker model
Expand Down Expand Up @@ -134,7 +134,7 @@ func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store,

//Change worker status
if err := worker.SetToBuilding(tx, getWorker(ctx).ID, job.ID, sdk.JobTypeWorkflowNode); err != nil {
return nil, sdk.WrapError(err, "takeJob> Cannot update worker status")
return nil, sdk.WrapError(err, "takeJob> Cannot update worker %s status", getWorker(ctx).Name)
}

//Load the node run
Expand Down
33 changes: 26 additions & 7 deletions engine/hatchery/kubernetes/kubernetes.go
Expand Up @@ -97,7 +97,7 @@ func (h *HatcheryKubernetes) ApplyConfiguration(cfg interface{}) error {
func (h *HatcheryKubernetes) Status() sdk.MonitoringStatus {
m := h.CommonMonitoring()
if h.IsInitialized() {
m.Lines = append(m.Lines, sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", h.WorkersStarted(), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK})
m.Lines = append(m.Lines, sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(h.WorkersStarted()), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK})
}
return m
}
Expand Down Expand Up @@ -203,6 +203,8 @@ func (h *HatcheryKubernetes) SpawnWorker(spawnArgs hatchery.SpawnArguments) (str
label = "register"
}

log.Debug("hatchery> kubernetes> SpawnWorker> %s", name)

var logJob string
if spawnArgs.JobID > 0 {
if spawnArgs.IsWorkflowJob {
Expand Down Expand Up @@ -332,25 +334,27 @@ func (h *HatcheryKubernetes) SpawnWorker(spawnArgs hatchery.SpawnArguments) (str
},
})

log.Debug("hatchery> kubernetes> SpawnWorker> %s > Pod created", name)

return pod.Name, err
}

// WorkersStarted returns the number of instances started but
// not necessarily register on CDS yet
func (h *HatcheryKubernetes) WorkersStarted() int {
workersLen := 0
func (h *HatcheryKubernetes) WorkersStarted() []string {
list, err := h.k8sClient.CoreV1().Pods(h.Config.KubernetesNamespace).List(metav1.ListOptions{LabelSelector: LABEL_HATCHERY_NAME})
if err != nil {
return workersLen
log.Warning("WorkersStarted> unable to list pods on namespace %s", h.Config.KubernetesNamespace)
return nil
}
workerNames := make([]string, 0, list.Size())
for _, pod := range list.Items {
labels := pod.GetLabels()
if labels[LABEL_HATCHERY_NAME] == h.Configuration().Name {
workersLen++
workerNames = append(workerNames, pod.GetName())
}
}

return workersLen
return workerNames
}

// WorkersStartedByModel returns the number of instances of given model started but
Expand Down Expand Up @@ -406,6 +410,21 @@ func (h *HatcheryKubernetes) killAwolWorkers() error {
}
}
if toDelete {
// If its a worker "register", check registration before deleting it
if strings.Contains(pod.Name, "register-") {
var modelIDS string
for _, e := range pod.Spec.Containers[0].Env {
if e.Name == "CDS_MODEL" {
modelIDS = e.Value
}
}
modelID, err := strconv.ParseInt(modelIDS, 10, 64)
if err != nil {
log.Error("killAndRemove> unable to get model from registering container %s", pod.Name)
} else {
hatchery.CheckWorkerModelRegister(h, modelID)
}
}
if err := h.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
globalErr = err
log.Error("hatchery:kubernetes> killAwolWorkers> Cannot delete pod %s (%s)", pod.Name, err)
Expand Down
54 changes: 28 additions & 26 deletions engine/hatchery/local/local.go
Expand Up @@ -174,24 +174,13 @@ func (h *HatcheryLocal) CanSpawn(model *sdk.Model, jobID int64, requirements []s
}

// killWorker kill a local process
func (h *HatcheryLocal) killWorker(worker sdk.Worker) error {
for name, workerCmd := range h.workers {
if worker.Name == name {
log.Info("KillLocalWorker> Killing %s", worker.Name)
return workerCmd.cmd.Process.Kill()
}
}
return fmt.Errorf("Worker %s not found", worker.Name)
func (h *HatcheryLocal) killWorker(name string, workerCmd workerCmd) error {
log.Info("KillLocalWorker> Killing %s", name)
return workerCmd.cmd.Process.Kill()
}

// SpawnWorker starts a new worker process
func (h *HatcheryLocal) SpawnWorker(spawnArgs hatchery.SpawnArguments) (string, error) {
var err error

if len(h.workers) >= h.Config.Provision.MaxWorker {
return "", fmt.Errorf("Max capacity reached (%d)", h.Config.Provision.MaxWorker)
}

wName := fmt.Sprintf("%s-%s", h.hatch.Name, namesgenerator.GetRandomName(0))
if spawnArgs.RegisterOnly {
wName = "register-" + wName
Expand Down Expand Up @@ -271,32 +260,48 @@ func (h *HatcheryLocal) SpawnWorker(spawnArgs hatchery.SpawnArguments) (string,
}
}

if err = cmd.Start(); err != nil {
if err := cmd.Start(); err != nil {
log.Error("hatchery> local> %v", err)
return "", err
}

log.Debug("worker %s has been spawned by %s", wName, h.Name)

h.Lock()
h.workers[wName] = workerCmd{cmd: cmd, created: time.Now()}
h.Unlock()
// Wait in a goroutine so that when process exits, Wait() update cmd.ProcessState
go func() {
cmd.Wait()
if err := cmd.Wait(); err != nil {
log.Error("hatchery> local> %v", err)
}
}()

return wName, nil
}

// WorkersStarted returns the number of instances started but
// not necessarily register on CDS yet
func (h *HatcheryLocal) WorkersStarted() int {
return len(h.workers)
func (h *HatcheryLocal) WorkersStarted() []string {
h.Mutex.Lock()
defer h.Mutex.Unlock()
workers := make([]string, len(h.workers))
var i int
for n := range h.workers {
workers[i] = n
i++
}
return workers
}

// WorkersStartedByModel returns the number of instances of given model started but
// not necessarily register on CDS yet
func (h *HatcheryLocal) WorkersStartedByModel(model *sdk.Model) int {
h.localWorkerIndexCleanup()
var x int

h.Mutex.Lock()
defer h.Mutex.Unlock()
for name := range h.workers {
if strings.Contains(name, model.Name) {
x++
Expand Down Expand Up @@ -339,7 +344,7 @@ func (h *HatcheryLocal) Init() error {
return fmt.Errorf("Cannot register: %s", err)
}

go h.startKillAwolWorkerRoutine()
sdk.GoRoutine("startKillAwolWorkerRoutine", h.startKillAwolWorkerRoutine)
return nil
}

Expand Down Expand Up @@ -401,21 +406,18 @@ func (h *HatcheryLocal) killAwolWorkers() error {
}
w.Name = name
log.Info("Killing AWOL worker %s", w.Name)
if err := h.killWorker(w); err != nil {
if err := h.killWorker(name, workerCmd); err != nil {
log.Warning("Error killing worker %s :%s", name, err)
}
killedWorkers = append(killedWorkers, name)
continue
}
// Worker is disabled. kill it
if w.Status == sdk.StatusDisabled {
} else if w.Status == sdk.StatusDisabled {
// Worker is disabled. kill it
log.Info("Killing disabled worker %s", w.Name)

if err := h.killWorker(w); err != nil {
if err := h.killWorker(name, workerCmd); err != nil {
log.Warning("Error killing worker %s :%s", name, err)
}
killedWorkers = append(killedWorkers, name)
continue
}
}

Expand Down
33 changes: 24 additions & 9 deletions engine/hatchery/marathon/marathon.go
Expand Up @@ -454,13 +454,17 @@ func (h *HatcheryMarathon) listApplications(idPrefix string) ([]string, error) {

// WorkersStarted returns the number of instances started but
// not necessarily register on CDS yet
func (h *HatcheryMarathon) WorkersStarted() int {
func (h *HatcheryMarathon) WorkersStarted() []string {
apps, err := h.listApplications(h.Config.MarathonIDPrefix)
if err != nil {
log.Warning("WorkersStarted> error on list applications err:%s", err)
return 0
return nil
}
res := make([]string, len(apps))
for i, s := range apps {
res[i] = strings.Replace(s, h.Config.MarathonIDPrefix, "", 1)
}
return len(apps)
return res
}

// WorkersStartedByModel returns the number of instances of given model started but
Expand Down Expand Up @@ -564,17 +568,19 @@ func (h *HatcheryMarathon) killAwolWorkers() error {
// then for each RUNNING marathon application
for _, app := range apps.Apps {
log.Debug("killAwolWorkers> check app %s", app.ID)
// Worker is deploying, leave him alone
if app.TasksRunning == 0 {
log.Debug("killAwolWorkers> app %s is deploying, do nothing", app.ID)
continue
}

t, err := time.Parse(time.RFC3339, app.Version)
if err != nil {
log.Warning("killAwolWorkers> app %s - Cannot parse last update: %s", app.ID, err)
break
}

// We let 2 minutes to worker to start and 5 minutes to a worker to register
var maxDeploymentDuration = time.Duration(2) * time.Minute
if strings.Contains(app.ID, "register-") {
maxDeploymentDuration = time.Duration(5) * time.Minute
}

// check that there is a worker matching
found = false
for _, w := range workers {
Expand All @@ -586,8 +592,17 @@ func (h *HatcheryMarathon) killAwolWorkers() error {
}

// then if it's not found, kill it !
if !found && time.Since(t) > 1*time.Minute {
if !found && time.Since(t) > maxDeploymentDuration {
log.Debug("killAwolWorkers> killing awol worker %s", app.ID)
// If its a worker "register", check registration before deleting it
if strings.Contains(app.ID, "register-") && app.Env != nil {
modelID, err := strconv.ParseInt((*app.Env)["CDS_MODEL"], 10, 64)
if err != nil {
log.Error("killAndRemove> unable to get model from registering container %s", app.ID)
} else {
hatchery.CheckWorkerModelRegister(h, modelID)
}
}
if _, err := h.marathonClient.DeleteApplication(app.ID, true); err != nil {
log.Warning("killAwolWorkers> Error while delete app %s err:%s", app.ID, err)
// continue to next app
Expand Down

0 comments on commit 6bf8b45

Please sign in to comment.