Skip to content

Commit

Permalink
feat(sdk): add ctx on WorkerList() (#3405)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault committed Oct 8, 2018
1 parent 29c6e42 commit cd8441b
Show file tree
Hide file tree
Showing 63 changed files with 455 additions and 310 deletions.
4 changes: 3 additions & 1 deletion cli/cdsctl/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,10 @@ func (ui *Termui) execLoadData() error {
}
ui.elapsedStatus = time.Since(start)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
start = time.Now()
ui.workers, err = client.WorkerList()
ui.workers, err = client.WorkerList(ctx)
if err != nil {
return err
}
Expand Down
12 changes: 9 additions & 3 deletions cli/cdsctl/worker.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package main

import (
"context"
"fmt"
"strings"
"time"

"github.com/spf13/cobra"

Expand All @@ -29,7 +31,9 @@ var workerListCmd = cli.Command{
}

func workerListRun(v cli.Values) (cli.ListResult, error) {
workers, err := client.WorkerList()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
workers, err := client.WorkerList(ctx)
if err != nil {
return nil, err
}
Expand All @@ -52,7 +56,9 @@ $ cdsctl worker disable $(cdsctl worker list)`,
func workerDisableRun(v cli.Values) error {
names := v.GetStringSlice("name")

workers, err := client.WorkerList()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
workers, err := client.WorkerList(ctx)
if err != nil {
return err
}
Expand All @@ -63,7 +69,7 @@ func workerDisableRun(v cli.Values) error {
if w.ID == n || strings.ToLower(w.Name) == strings.ToLower(n) {
found = true
fmt.Printf("Disabling worker %s [status %s]... ", cli.Magenta(w.Name), w.Status)
if err := client.WorkerDisable(w.ID); err != nil {
if err := client.WorkerDisable(context.Background(), w.ID); err != nil {
fmt.Printf("Error disabling worker %s : %s\n", w.ID, err)
} else {
fmt.Printf("Done\n")
Expand Down
6 changes: 5 additions & 1 deletion contrib/misc/hello-sdk/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"time"

"github.com/ovh/cds/sdk/cdsclient"
)
Expand All @@ -27,7 +29,9 @@ func main() {

// go on https://godoc.org/github.com/ovh/cds/sdk/cdsclient to
// see all available funcs
workers, err := client.WorkerList()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
workers, err := client.WorkerList(ctx)
if err != nil {
fmt.Fprintf(os.Stderr, "Error while getting workers:%s", err)
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion engine/api/observability/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Start(ctx context.Context, serviceName string, w http.ResponseWriter, req *
}

var span *trace.Span
rootSpanContext, hasSpanContext := DefaultFormat.SpanContextFromRequest(req)
rootSpanContext, hasSpanContext := tracingutils.DefaultFormat.SpanContextFromRequest(req)

var pkey string
var ok bool
Expand Down
5 changes: 0 additions & 5 deletions engine/api/observability/types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package observability

import (
"go.opencensus.io/plugin/ochttp/propagation/b3"
"go.opencensus.io/trace/propagation"

"github.com/ovh/cds/sdk"
)

Expand All @@ -17,8 +14,6 @@ const (
StatusCodeAttribute = "http.status_code"
)

var DefaultFormat propagation.HTTPFormat = &b3.HTTPFormat{}

// Configuration is the global tracing configuration
type Configuration struct {
Enable bool `json:"enable"`
Expand Down
3 changes: 1 addition & 2 deletions engine/api/services/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"net/url"
"time"

"github.com/ovh/cds/engine/api/observability"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
"github.com/ovh/cds/sdk/tracingutils"
Expand Down Expand Up @@ -167,7 +166,7 @@ func doRequest(ctx context.Context, httpURL string, hash string, method, path st
spanCtx, ok := tracingutils.ContextToSpanContext(ctx)
log.Debug("setup tracing = %v (%v) on request to %s", ok, spanCtx, req.URL.String())
if ok {
observability.DefaultFormat.SpanContextToRequest(spanCtx, req)
tracingutils.DefaultFormat.SpanContextToRequest(spanCtx, req)
}

req.Header.Set("Connection", "close")
Expand Down
7 changes: 6 additions & 1 deletion engine/hatchery/kubernetes/services.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package kubernetes

import (
"context"
"fmt"
"strconv"
"time"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -63,9 +65,12 @@ func (h *HatcheryKubernetes) getServicesLogs() error {

if len(servicesLogs) > 0 {
// Do call api
if err := h.Client.QueueServiceLogs(servicesLogs); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
if err := h.Client.QueueServiceLogs(ctx, servicesLogs); err != nil {
cancel()
return fmt.Errorf("Hatchery> Swarm> Cannot send service logs : %v", err)
}
cancel()
}

return nil
Expand Down
4 changes: 3 additions & 1 deletion engine/hatchery/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,9 @@ func (h *HatcheryLocal) killAwolWorkers() error {
h.Lock()
defer h.Unlock()

apiWorkers, err := h.CDSClient().WorkerList()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
apiWorkers, err := h.CDSClient().WorkerList(ctx)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions engine/hatchery/marathon/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,9 @@ func (h *HatcheryMarathon) startKillAwolWorkerRoutine() {
}

func (h *HatcheryMarathon) killDisabledWorkers() error {
workers, err := h.CDSClient().WorkerList()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
workers, err := h.CDSClient().WorkerList(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -539,7 +541,9 @@ func (h *HatcheryMarathon) killDisabledWorkers() error {
}

func (h *HatcheryMarathon) killAwolWorkers() error {
workers, err := h.CDSClient().WorkerList()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
workers, err := h.CDSClient().WorkerList(ctx)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions engine/hatchery/openstack/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,9 @@ func (h *HatcheryOpenstack) updateServerList() {
}

func (h *HatcheryOpenstack) killAwolServers() {
workers, err := h.CDSClient().WorkerList()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
workers, err := h.CDSClient().WorkerList(ctx)
now := time.Now().Unix()
if err != nil {
log.Warning("killAwolServers> Cannot fetch worker list: %s", err)
Expand Down Expand Up @@ -383,7 +385,9 @@ func (h *HatcheryOpenstack) killErrorServers() {
}

func (h *HatcheryOpenstack) killDisabledWorkers() {
workers, err := h.CDSClient().WorkerList()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
workers, err := h.CDSClient().WorkerList(ctx)
if err != nil {
log.Warning("killDisabledWorkers> Cannot fetch worker list: %s", err)
return
Expand Down
4 changes: 3 additions & 1 deletion engine/hatchery/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,9 @@ func (h *HatcherySwarm) routines(ctx context.Context) {
}

func (h *HatcherySwarm) listAwolWorkers(dockerClient *dockerClient) ([]types.Container, error) {
apiworkers, err := h.CDSClient().WorkerList()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
apiworkers, err := h.CDSClient().WorkerList(ctx)
if err != nil {
return nil, sdk.WrapError(err, "hatchery> swarm> listAwolWorkers> Cannot get workers on %s", dockerClient.name)
}
Expand Down
4 changes: 3 additions & 1 deletion engine/hatchery/swarm/swarm_util_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ func (h *HatcherySwarm) getServicesLogs() error {

if len(servicesLogs) > 0 {
// Do call api
if err := h.Client.QueueServiceLogs(servicesLogs); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := h.Client.QueueServiceLogs(ctx, servicesLogs); err != nil {
log.Error("Hatchery> Swarm> Cannot send service logs : %v", err)
}
cancel()
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion engine/hatchery/vsphere/vsphere.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ func (h *HatcheryVSphere) updateServerList() {

// killDisabledWorkers kill workers which are disabled
func (h *HatcheryVSphere) killDisabledWorkers() {
workers, err := h.CDSClient().WorkerList()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
workers, err := h.CDSClient().WorkerList(ctx)
if err != nil {
log.Warning("killDisabledWorkers> Cannot fetch worker list: %s", err)
return
Expand Down
2 changes: 1 addition & 1 deletion engine/worker/builtin_artifact_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func runArtifactUpload(w *currentWorker) BuiltInAction {
go func(path string) {
log.Debug("Uploading %s", path)
defer wg.Done()
throughTempURL, duration, err := w.client.QueueArtifactUpload(buildID, tag.Value, path)
throughTempURL, duration, err := w.client.QueueArtifactUpload(ctx, buildID, tag.Value, path)
if err != nil {
chanError <- sdk.WrapError(err, "Error while uploading artifact %s", path)
wgErrors.Add(1)
Expand Down
44 changes: 23 additions & 21 deletions engine/worker/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {

//Before start the loop, take the bookJobID
if !w.disableOldWorkflows && w.bookedPBJobID != 0 {
w.processBookedPBJob(pbjobs)
w.processBookedPBJob(ctx, pbjobs)
}

//Definition of the function which must be called to stop the worker
Expand Down Expand Up @@ -149,7 +149,7 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {

var exceptJobID int64
if w.bookedWJobID != 0 {
if errP := w.processBookedWJob(wjobs); errP != nil {
if errP := w.processBookedWJob(ctx, wjobs); errP != nil {
// Unbook job
if errR := w.client.QueueJobRelease(true, w.bookedWJobID); errR != nil {
log.Error("runCmd> QueueJobRelease> Cannot release job")
Expand All @@ -165,8 +165,9 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
}
exceptJobID = w.bookedWJobID
}
if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err)

if err := w.client.WorkerSetStatus(ctx, sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusWaiting): %s", err)
}

go func(ctx context.Context, exceptID *int64) {
Expand Down Expand Up @@ -197,13 +198,14 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
case <-ctx.Done():
return
case <-refreshTick.C:
if err := w.client.WorkerRefresh(); err != nil {
if err := w.client.WorkerRefresh(ctx); err != nil {
log.Error("Heartbeat failed: %v", err)
nbErrors++
if nbErrors == 5 {
errs <- err
}
}
cancel()
nbErrors = 0
case <-registerTick.C:
if err := w.doRegister(); err != nil {
Expand Down Expand Up @@ -235,8 +237,8 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
continue
}

if err := w.client.WorkerSetStatus(sdk.StatusChecking); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusChecking): %s", err)
if err := w.client.WorkerSetStatus(ctx, sdk.StatusChecking); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusChecking): %s", err)
}
requirementsOK, _ := checkRequirements(w, &j.Job.Action, j.ExecGroups, j.ID)

Expand All @@ -253,8 +255,8 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
continue
}
} else if ttl > 0 {
if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err)
if err := w.client.WorkerSetStatus(ctx, sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusWaiting): %s", err)
}
log.Debug("Unable to run pipeline build job %d, requirements not OK, let's continue %s", j.ID, t)
continue
Expand All @@ -273,8 +275,8 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {

if continueTakeJob {
//Continue
if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err)
if err := w.client.WorkerSetStatus(ctx, sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusWaiting): %s", err)
}
continue
}
Expand Down Expand Up @@ -320,8 +322,8 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
}
} else if ttl > 0 {
// If requirements are KO and the ttl > 0, keep alive
if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err)
if err := w.client.WorkerSetStatus(ctx, sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusWaiting): %s", err)
}
w.bookedWJobID = 0
log.Debug("Unable to run this job %d%s, requirements not ok. let's continue", j.ID, t)
Expand All @@ -342,8 +344,8 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {

if continueTakeJob {
//Continue
if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err)
if err := w.client.WorkerSetStatus(ctx, sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusWaiting): %s", err)
}
w.bookedWJobID = 0
continue
Expand All @@ -359,7 +361,7 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
}
}

func (w *currentWorker) processBookedPBJob(pbjobs chan<- sdk.PipelineBuildJob) {
func (w *currentWorker) processBookedPBJob(ctx context.Context, pbjobs chan<- sdk.PipelineBuildJob) {
log.Debug("Try to take the pipeline build job %d", w.bookedPBJobID)
b, _, err := sdk.Request("GET", fmt.Sprintf("/queue/%d/infos", w.bookedPBJobID), nil)
if err != nil {
Expand All @@ -373,8 +375,8 @@ func (w *currentWorker) processBookedPBJob(pbjobs chan<- sdk.PipelineBuildJob) {
return
}

if err := w.client.WorkerSetStatus(sdk.StatusChecking); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusChecking): %s", err)
if err := w.client.WorkerSetStatus(ctx, sdk.StatusChecking); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusChecking): %s", err)
}
requirementsOK, errRequirements := checkRequirements(w, &j.Job.Action, j.ExecGroups, w.bookedPBJobID)
if !requirementsOK {
Expand All @@ -396,7 +398,7 @@ func (w *currentWorker) processBookedPBJob(pbjobs chan<- sdk.PipelineBuildJob) {
pbjobs <- *j
}

func (w *currentWorker) processBookedWJob(wjobs chan<- sdk.WorkflowNodeJobRun) error {
func (w *currentWorker) processBookedWJob(ctx context.Context, wjobs chan<- sdk.WorkflowNodeJobRun) error {
log.Debug("Try to take the workflow node job %d", w.bookedWJobID)
wjob, err := w.client.QueueJobInfo(w.bookedWJobID)
if err != nil {
Expand All @@ -413,7 +415,7 @@ func (w *currentWorker) processBookedWJob(wjobs chan<- sdk.WorkflowNodeJobRun) e
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.status.Name, details}},
}}
if err := w.client.QueueJobSendSpawnInfo(true, wjob.ID, infos); err != nil {
if err := w.client.QueueJobSendSpawnInfo(ctx, true, wjob.ID, infos); err != nil {
return sdk.WrapError(err, "processBookedWJob> Cannot record QueueJobSendSpawnInfo for job (err spawn): %d", wjob.ID)
}
return fmt.Errorf("processBookedWJob> the worker have no all requirements")
Expand All @@ -427,7 +429,7 @@ func (w *currentWorker) processBookedWJob(wjobs chan<- sdk.WorkflowNodeJobRun) e
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.status.Name, details}},
}}
if err := w.client.QueueJobSendSpawnInfo(true, wjob.ID, infos); err != nil {
if err := w.client.QueueJobSendSpawnInfo(ctx, true, wjob.ID, infos); err != nil {
return sdk.WrapError(err, "processBookedWJob> Cannot record QueueJobSendSpawnInfo for job (err spawn): %d", wjob.ID)
}
return fmt.Errorf("processBookedWJob> the worker have no all plugins")
Expand Down

0 comments on commit cd8441b

Please sign in to comment.