diff --git a/cli/cdsctl/monitoring.go b/cli/cdsctl/monitoring.go index b26a197965..50b8bc8b0b 100644 --- a/cli/cdsctl/monitoring.go +++ b/cli/cdsctl/monitoring.go @@ -131,8 +131,10 @@ func (ui *Termui) execLoadData() error { } ui.elapsedStatus = time.Since(start) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() start = time.Now() - ui.workers, err = client.WorkerList() + ui.workers, err = client.WorkerList(ctx) if err != nil { return err } diff --git a/cli/cdsctl/worker.go b/cli/cdsctl/worker.go index 2966f7acc2..6cdcdf2eeb 100644 --- a/cli/cdsctl/worker.go +++ b/cli/cdsctl/worker.go @@ -1,8 +1,10 @@ package main import ( + "context" "fmt" "strings" + "time" "github.com/spf13/cobra" @@ -29,7 +31,9 @@ var workerListCmd = cli.Command{ } func workerListRun(v cli.Values) (cli.ListResult, error) { - workers, err := client.WorkerList() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + workers, err := client.WorkerList(ctx) if err != nil { return nil, err } @@ -52,7 +56,9 @@ $ cdsctl worker disable $(cdsctl worker list)`, func workerDisableRun(v cli.Values) error { names := v.GetStringSlice("name") - workers, err := client.WorkerList() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + workers, err := client.WorkerList(ctx) if err != nil { return err } @@ -63,7 +69,7 @@ func workerDisableRun(v cli.Values) error { if w.ID == n || strings.ToLower(w.Name) == strings.ToLower(n) { found = true fmt.Printf("Disabling worker %s [status %s]... ", cli.Magenta(w.Name), w.Status) - if err := client.WorkerDisable(w.ID); err != nil { + if err := client.WorkerDisable(context.Background(), w.ID); err != nil { fmt.Printf("Error disabling worker %s : %s\n", w.ID, err) } else { fmt.Printf("Done\n") diff --git a/contrib/misc/hello-sdk/main.go b/contrib/misc/hello-sdk/main.go index a61cb122e2..bbcc659124 100644 --- a/contrib/misc/hello-sdk/main.go +++ b/contrib/misc/hello-sdk/main.go @@ -1,9 +1,11 @@ package main import ( + "context" "flag" "fmt" "os" + "time" "github.com/ovh/cds/sdk/cdsclient" ) @@ -27,7 +29,9 @@ func main() { // go on https://godoc.org/github.com/ovh/cds/sdk/cdsclient to // see all available funcs - workers, err := client.WorkerList() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + workers, err := client.WorkerList(ctx) if err != nil { fmt.Fprintf(os.Stderr, "Error while getting workers:%s", err) os.Exit(1) diff --git a/engine/api/observability/tracing.go b/engine/api/observability/tracing.go index fb59e0389b..280d6d298b 100644 --- a/engine/api/observability/tracing.go +++ b/engine/api/observability/tracing.go @@ -40,7 +40,7 @@ func Start(ctx context.Context, serviceName string, w http.ResponseWriter, req * } var span *trace.Span - rootSpanContext, hasSpanContext := DefaultFormat.SpanContextFromRequest(req) + rootSpanContext, hasSpanContext := tracingutils.DefaultFormat.SpanContextFromRequest(req) var pkey string var ok bool diff --git a/engine/api/observability/types.go b/engine/api/observability/types.go index 2424b416ec..c8050e33f4 100644 --- a/engine/api/observability/types.go +++ b/engine/api/observability/types.go @@ -1,9 +1,6 @@ package observability import ( - "go.opencensus.io/plugin/ochttp/propagation/b3" - "go.opencensus.io/trace/propagation" - "github.com/ovh/cds/sdk" ) @@ -17,8 +14,6 @@ const ( StatusCodeAttribute = "http.status_code" ) -var DefaultFormat propagation.HTTPFormat = &b3.HTTPFormat{} - // Configuration is the global tracing configuration type Configuration struct { Enable bool `json:"enable"` diff --git a/engine/api/services/http.go b/engine/api/services/http.go index e51e8995a3..35a78fb4a5 100644 --- a/engine/api/services/http.go +++ b/engine/api/services/http.go @@ -12,7 +12,6 @@ import ( "net/url" "time" - "github.com/ovh/cds/engine/api/observability" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/log" "github.com/ovh/cds/sdk/tracingutils" @@ -167,7 +166,7 @@ func doRequest(ctx context.Context, httpURL string, hash string, method, path st spanCtx, ok := tracingutils.ContextToSpanContext(ctx) log.Debug("setup tracing = %v (%v) on request to %s", ok, spanCtx, req.URL.String()) if ok { - observability.DefaultFormat.SpanContextToRequest(spanCtx, req) + tracingutils.DefaultFormat.SpanContextToRequest(spanCtx, req) } req.Header.Set("Connection", "close") diff --git a/engine/hatchery/kubernetes/services.go b/engine/hatchery/kubernetes/services.go index e1c57350d0..4e03af30fb 100644 --- a/engine/hatchery/kubernetes/services.go +++ b/engine/hatchery/kubernetes/services.go @@ -1,8 +1,10 @@ package kubernetes import ( + "context" "fmt" "strconv" + "time" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -63,9 +65,12 @@ func (h *HatcheryKubernetes) getServicesLogs() error { if len(servicesLogs) > 0 { // Do call api - if err := h.Client.QueueServiceLogs(servicesLogs); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + if err := h.Client.QueueServiceLogs(ctx, servicesLogs); err != nil { + cancel() return fmt.Errorf("Hatchery> Swarm> Cannot send service logs : %v", err) } + cancel() } return nil diff --git a/engine/hatchery/local/local.go b/engine/hatchery/local/local.go index de37af3dba..a602f5ef22 100644 --- a/engine/hatchery/local/local.go +++ b/engine/hatchery/local/local.go @@ -390,7 +390,9 @@ func (h *HatcheryLocal) killAwolWorkers() error { h.Lock() defer h.Unlock() - apiWorkers, err := h.CDSClient().WorkerList() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + apiWorkers, err := h.CDSClient().WorkerList(ctx) if err != nil { return err } diff --git a/engine/hatchery/marathon/marathon.go b/engine/hatchery/marathon/marathon.go index 785e554b3c..f954618b64 100644 --- a/engine/hatchery/marathon/marathon.go +++ b/engine/hatchery/marathon/marathon.go @@ -508,7 +508,9 @@ func (h *HatcheryMarathon) startKillAwolWorkerRoutine() { } func (h *HatcheryMarathon) killDisabledWorkers() error { - workers, err := h.CDSClient().WorkerList() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + workers, err := h.CDSClient().WorkerList(ctx) if err != nil { return err } @@ -539,7 +541,9 @@ func (h *HatcheryMarathon) killDisabledWorkers() error { } func (h *HatcheryMarathon) killAwolWorkers() error { - workers, err := h.CDSClient().WorkerList() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + workers, err := h.CDSClient().WorkerList(ctx) if err != nil { return err } diff --git a/engine/hatchery/openstack/openstack.go b/engine/hatchery/openstack/openstack.go index ef18c745e6..64bf20f3ac 100644 --- a/engine/hatchery/openstack/openstack.go +++ b/engine/hatchery/openstack/openstack.go @@ -223,7 +223,9 @@ func (h *HatcheryOpenstack) updateServerList() { } func (h *HatcheryOpenstack) killAwolServers() { - workers, err := h.CDSClient().WorkerList() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + workers, err := h.CDSClient().WorkerList(ctx) now := time.Now().Unix() if err != nil { log.Warning("killAwolServers> Cannot fetch worker list: %s", err) @@ -383,7 +385,9 @@ func (h *HatcheryOpenstack) killErrorServers() { } func (h *HatcheryOpenstack) killDisabledWorkers() { - workers, err := h.CDSClient().WorkerList() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + workers, err := h.CDSClient().WorkerList(ctx) if err != nil { log.Warning("killDisabledWorkers> Cannot fetch worker list: %s", err) return diff --git a/engine/hatchery/swarm/swarm.go b/engine/hatchery/swarm/swarm.go index 96e657a5bd..9ed1f11c63 100644 --- a/engine/hatchery/swarm/swarm.go +++ b/engine/hatchery/swarm/swarm.go @@ -632,7 +632,9 @@ func (h *HatcherySwarm) routines(ctx context.Context) { } func (h *HatcherySwarm) listAwolWorkers(dockerClient *dockerClient) ([]types.Container, error) { - apiworkers, err := h.CDSClient().WorkerList() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + apiworkers, err := h.CDSClient().WorkerList(ctx) if err != nil { return nil, sdk.WrapError(err, "hatchery> swarm> listAwolWorkers> Cannot get workers on %s", dockerClient.name) } diff --git a/engine/hatchery/swarm/swarm_util_logs.go b/engine/hatchery/swarm/swarm_util_logs.go index 4f7a01572f..392099acd8 100644 --- a/engine/hatchery/swarm/swarm_util_logs.go +++ b/engine/hatchery/swarm/swarm_util_logs.go @@ -77,9 +77,11 @@ func (h *HatcherySwarm) getServicesLogs() error { if len(servicesLogs) > 0 { // Do call api - if err := h.Client.QueueServiceLogs(servicesLogs); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if err := h.Client.QueueServiceLogs(ctx, servicesLogs); err != nil { log.Error("Hatchery> Swarm> Cannot send service logs : %v", err) } + cancel() } } } diff --git a/engine/hatchery/vsphere/vsphere.go b/engine/hatchery/vsphere/vsphere.go index ecbb4114b1..918bbcc90e 100644 --- a/engine/hatchery/vsphere/vsphere.go +++ b/engine/hatchery/vsphere/vsphere.go @@ -237,7 +237,9 @@ func (h *HatcheryVSphere) updateServerList() { // killDisabledWorkers kill workers which are disabled func (h *HatcheryVSphere) killDisabledWorkers() { - workers, err := h.CDSClient().WorkerList() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + workers, err := h.CDSClient().WorkerList(ctx) if err != nil { log.Warning("killDisabledWorkers> Cannot fetch worker list: %s", err) return diff --git a/engine/worker/builtin_artifact_upload.go b/engine/worker/builtin_artifact_upload.go index e994f7d984..b41b722099 100644 --- a/engine/worker/builtin_artifact_upload.go +++ b/engine/worker/builtin_artifact_upload.go @@ -145,7 +145,7 @@ func runArtifactUpload(w *currentWorker) BuiltInAction { go func(path string) { log.Debug("Uploading %s", path) defer wg.Done() - throughTempURL, duration, err := w.client.QueueArtifactUpload(buildID, tag.Value, path) + throughTempURL, duration, err := w.client.QueueArtifactUpload(ctx, buildID, tag.Value, path) if err != nil { chanError <- sdk.WrapError(err, "Error while uploading artifact %s", path) wgErrors.Add(1) diff --git a/engine/worker/cmd_run.go b/engine/worker/cmd_run.go index f66839bf9e..9521d61675 100644 --- a/engine/worker/cmd_run.go +++ b/engine/worker/cmd_run.go @@ -116,7 +116,7 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { //Before start the loop, take the bookJobID if !w.disableOldWorkflows && w.bookedPBJobID != 0 { - w.processBookedPBJob(pbjobs) + w.processBookedPBJob(ctx, pbjobs) } //Definition of the function which must be called to stop the worker @@ -149,7 +149,7 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { var exceptJobID int64 if w.bookedWJobID != 0 { - if errP := w.processBookedWJob(wjobs); errP != nil { + if errP := w.processBookedWJob(ctx, wjobs); errP != nil { // Unbook job if errR := w.client.QueueJobRelease(true, w.bookedWJobID); errR != nil { log.Error("runCmd> QueueJobRelease> Cannot release job") @@ -165,8 +165,9 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { } exceptJobID = w.bookedWJobID } - if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil { - log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err) + + if err := w.client.WorkerSetStatus(ctx, sdk.StatusWaiting); err != nil { + log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusWaiting): %s", err) } go func(ctx context.Context, exceptID *int64) { @@ -197,13 +198,14 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { case <-ctx.Done(): return case <-refreshTick.C: - if err := w.client.WorkerRefresh(); err != nil { + if err := w.client.WorkerRefresh(ctx); err != nil { log.Error("Heartbeat failed: %v", err) nbErrors++ if nbErrors == 5 { errs <- err } } + cancel() nbErrors = 0 case <-registerTick.C: if err := w.doRegister(); err != nil { @@ -235,8 +237,8 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { continue } - if err := w.client.WorkerSetStatus(sdk.StatusChecking); err != nil { - log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusChecking): %s", err) + if err := w.client.WorkerSetStatus(ctx, sdk.StatusChecking); err != nil { + log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusChecking): %s", err) } requirementsOK, _ := checkRequirements(w, &j.Job.Action, j.ExecGroups, j.ID) @@ -253,8 +255,8 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { continue } } else if ttl > 0 { - if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil { - log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err) + if err := w.client.WorkerSetStatus(ctx, sdk.StatusWaiting); err != nil { + log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusWaiting): %s", err) } log.Debug("Unable to run pipeline build job %d, requirements not OK, let's continue %s", j.ID, t) continue @@ -273,8 +275,8 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { if continueTakeJob { //Continue - if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil { - log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err) + if err := w.client.WorkerSetStatus(ctx, sdk.StatusWaiting); err != nil { + log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusWaiting): %s", err) } continue } @@ -320,8 +322,8 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { } } else if ttl > 0 { // If requirements are KO and the ttl > 0, keep alive - if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil { - log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err) + if err := w.client.WorkerSetStatus(ctx, sdk.StatusWaiting); err != nil { + log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusWaiting): %s", err) } w.bookedWJobID = 0 log.Debug("Unable to run this job %d%s, requirements not ok. let's continue", j.ID, t) @@ -342,8 +344,8 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { if continueTakeJob { //Continue - if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil { - log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err) + if err := w.client.WorkerSetStatus(ctx, sdk.StatusWaiting); err != nil { + log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusWaiting): %s", err) } w.bookedWJobID = 0 continue @@ -359,7 +361,7 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { } } -func (w *currentWorker) processBookedPBJob(pbjobs chan<- sdk.PipelineBuildJob) { +func (w *currentWorker) processBookedPBJob(ctx context.Context, pbjobs chan<- sdk.PipelineBuildJob) { log.Debug("Try to take the pipeline build job %d", w.bookedPBJobID) b, _, err := sdk.Request("GET", fmt.Sprintf("/queue/%d/infos", w.bookedPBJobID), nil) if err != nil { @@ -373,8 +375,8 @@ func (w *currentWorker) processBookedPBJob(pbjobs chan<- sdk.PipelineBuildJob) { return } - if err := w.client.WorkerSetStatus(sdk.StatusChecking); err != nil { - log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusChecking): %s", err) + if err := w.client.WorkerSetStatus(ctx, sdk.StatusChecking); err != nil { + log.Error("WorkerSetStatus> error on WorkerSetStatus(ctx, sdk.StatusChecking): %s", err) } requirementsOK, errRequirements := checkRequirements(w, &j.Job.Action, j.ExecGroups, w.bookedPBJobID) if !requirementsOK { @@ -396,7 +398,7 @@ func (w *currentWorker) processBookedPBJob(pbjobs chan<- sdk.PipelineBuildJob) { pbjobs <- *j } -func (w *currentWorker) processBookedWJob(wjobs chan<- sdk.WorkflowNodeJobRun) error { +func (w *currentWorker) processBookedWJob(ctx context.Context, wjobs chan<- sdk.WorkflowNodeJobRun) error { log.Debug("Try to take the workflow node job %d", w.bookedWJobID) wjob, err := w.client.QueueJobInfo(w.bookedWJobID) if err != nil { @@ -413,7 +415,7 @@ func (w *currentWorker) processBookedWJob(wjobs chan<- sdk.WorkflowNodeJobRun) e RemoteTime: time.Now(), Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.status.Name, details}}, }} - if err := w.client.QueueJobSendSpawnInfo(true, wjob.ID, infos); err != nil { + if err := w.client.QueueJobSendSpawnInfo(ctx, true, wjob.ID, infos); err != nil { return sdk.WrapError(err, "processBookedWJob> Cannot record QueueJobSendSpawnInfo for job (err spawn): %d", wjob.ID) } return fmt.Errorf("processBookedWJob> the worker have no all requirements") @@ -427,7 +429,7 @@ func (w *currentWorker) processBookedWJob(wjobs chan<- sdk.WorkflowNodeJobRun) e RemoteTime: time.Now(), Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.status.Name, details}}, }} - if err := w.client.QueueJobSendSpawnInfo(true, wjob.ID, infos); err != nil { + if err := w.client.QueueJobSendSpawnInfo(ctx, true, wjob.ID, infos); err != nil { return sdk.WrapError(err, "processBookedWJob> Cannot record QueueJobSendSpawnInfo for job (err spawn): %d", wjob.ID) } return fmt.Errorf("processBookedWJob> the worker have no all plugins") diff --git a/engine/worker/cmd_tag.go b/engine/worker/cmd_tag.go index 680e561286..6f48787e3c 100644 --- a/engine/worker/cmd_tag.go +++ b/engine/worker/cmd_tag.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "io/ioutil" "net/http" @@ -102,7 +103,9 @@ func (wk *currentWorker) tagHandler(w http.ResponseWriter, r *http.Request) { }) } - if err := wk.client.QueueJobTag(wk.currentJob.wJob.ID, tags); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := wk.client.QueueJobTag(ctx, wk.currentJob.wJob.ID, tags); err != nil { writeError(w, r, err) return } diff --git a/engine/worker/register.go b/engine/worker/register.go index 863aab1e91..1e7adb7860 100644 --- a/engine/worker/register.go +++ b/engine/worker/register.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "github.com/ovh/cds/sdk" @@ -24,8 +25,7 @@ func (w *currentWorker) register(form sdk.WorkerRegistrationForm) error { form.Version = sdk.VERSION form.OS = sdk.GOOS form.Arch = sdk.GOARCH - - worker, uptodate, err := w.client.WorkerRegister(form) + worker, uptodate, err := w.client.WorkerRegister(context.Background(), form) if err != nil { sdk.Exit("register> Got HTTP %d, exiting\n", err) return err diff --git a/engine/worker/run.go b/engine/worker/run.go index d355da10c0..6920c6d76b 100644 --- a/engine/worker/run.go +++ b/engine/worker/run.go @@ -300,11 +300,14 @@ func (w *currentWorker) updateStepStatus(ctx context.Context, buildID int64, ste for try := 1; try <= 10; try++ { log.Info("updateStepStatus> Sending step status %s buildID:%d stepOrder:%d", status, buildID, stepOrder) - code, lasterr := w.client.(cdsclient.Raw).PostJSON(path, step, nil) + ctxt, cancel := context.WithTimeout(ctx, 10*time.Second) + code, lasterr := w.client.(cdsclient.Raw).PostJSON(ctxt, path, step, nil) if lasterr == nil && code < 300 { log.Info("updateStepStatus> Sending step status %s buildID:%d stepOrder:%d OK", status, buildID, stepOrder) + cancel() return nil } + cancel() log.Warning("updateStepStatus> Cannot send step %d result: HTTP %d err: %s - try: %d - new try in 15s", stepOrder, code, lasterr, try) time.Sleep(15 * time.Second) } diff --git a/engine/worker/take_workflow_node_run_job.go b/engine/worker/take_workflow_node_run_job.go index e43fab99f4..a0020dc449 100644 --- a/engine/worker/take_workflow_node_run_job.go +++ b/engine/worker/take_workflow_node_run_job.go @@ -19,7 +19,9 @@ import ( // If Take is not possible (as Job already booked for example) // it will return true (-> can work on another job), false, otherwise func (w *currentWorker) takeWorkflowJob(ctx context.Context, job sdk.WorkflowNodeJobRun) (bool, error) { - info, err := w.client.QueueTakeJob(job, w.bookedWJobID == job.ID) + ctxQueueTakeJob, cancelQueueTakeJob := context.WithTimeout(ctx, 5*time.Second) + defer cancelQueueTakeJob() + info, err := w.client.QueueTakeJob(ctxQueueTakeJob, job, w.bookedWJobID == job.ID) if err != nil { if w.bookedWJobID == job.ID { return false, sdk.WrapError(err, "takeWorkflowJob> Unable to take workflow node run job. This worker can't work on another job.") @@ -43,7 +45,7 @@ func (w *currentWorker) takeWorkflowJob(ctx context.Context, job sdk.WorkflowNod start := time.Now() - //This goroutine try to get the pipeline build job every 5 seconds, if it fails, it cancel the build. + //This goroutine try to get the job every 5 seconds, if it fails, it cancel the build. ctx, cancel := context.WithCancel(ctx) tick := time.NewTicker(5 * time.Second) go func(cancel context.CancelFunc, jobID int64, tick *time.Ticker) { @@ -57,7 +59,9 @@ func (w *currentWorker) takeWorkflowJob(ctx context.Context, job sdk.WorkflowNod return } j := &sdk.WorkflowNodeJobRun{} - code, err := w.client.(cdsclient.Raw).GetJSON(fmt.Sprintf("/queue/workflows/%d/infos", jobID), j) + ctxGetJSON, cancelGetJSON := context.WithTimeout(ctx, 5*time.Second) + defer cancelGetJSON() + code, err := w.client.(cdsclient.Raw).GetJSON(ctxGetJSON, fmt.Sprintf("/queue/workflows/%d/infos", jobID), j) if err != nil { if code == http.StatusNotFound { log.Info("takeWorkflowJob> Unable to load workflow job - Not Found (Request) %d: %v", jobID, err) @@ -114,11 +118,14 @@ func (w *currentWorker) takeWorkflowJob(ctx context.Context, job sdk.WorkflowNod var lasterr error for try := 1; try <= 10; try++ { log.Info("takeWorkflowJob> Sending build result...") - lasterr = w.client.QueueSendResult(job.ID, res) + ctxSendResult, cancelSendResult := context.WithTimeout(ctx, 5*time.Second) + lasterr = w.client.QueueSendResult(ctxSendResult, job.ID, res) if lasterr == nil { log.Info("takeWorkflowJob> Send build result OK") + cancelSendResult() return false, nil } + cancelSendResult() log.Warning("takeWorkflowJob> Cannot send build result: HTTP %v - try: %d - new try in 15s", lasterr, try) time.Sleep(15 * time.Second) } diff --git a/sdk/cdsclient/client_action.go b/sdk/cdsclient/client_action.go index 253e789da8..433241f2dc 100644 --- a/sdk/cdsclient/client_action.go +++ b/sdk/cdsclient/client_action.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "fmt" "io" "net/http" @@ -11,20 +12,20 @@ import ( func (c *client) Requirements() ([]sdk.Requirement, error) { var req []sdk.Requirement - if _, err := c.GetJSON("/action/requirement", &req); err != nil { + if _, err := c.GetJSON(context.Background(), "/action/requirement", &req); err != nil { return nil, err } return req, nil } func (c *client) ActionDelete(actionName string) error { - _, err := c.DeleteJSON("/action/"+actionName, nil) + _, err := c.DeleteJSON(context.Background(), "/action/"+actionName, nil) return err } func (c *client) ActionGet(actionName string, mods ...RequestModifier) (*sdk.Action, error) { action := &sdk.Action{} - if _, err := c.GetJSON("/action/"+actionName, action, mods...); err != nil { + if _, err := c.GetJSON(context.Background(), "/action/"+actionName, action, mods...); err != nil { return nil, err } return action, nil @@ -32,7 +33,7 @@ func (c *client) ActionGet(actionName string, mods ...RequestModifier) (*sdk.Act func (c *client) ActionList() ([]sdk.Action, error) { actions := []sdk.Action{} - if _, err := c.GetJSON("/action", &actions); err != nil { + if _, err := c.GetJSON(context.Background(), "/action", &actions); err != nil { return nil, err } return actions, nil @@ -58,7 +59,7 @@ func (c *client) ActionImport(content io.Reader, format string) error { return exportentities.ErrUnsupportedFormat } - _, _, code, err := c.Request("POST", url, content, mods...) + _, _, code, err := c.Request(context.Background(), "POST", url, content, mods...) if err != nil { return err } @@ -72,7 +73,7 @@ func (c *client) ActionImport(content io.Reader, format string) error { func (c *client) ActionExport(name string, format string) ([]byte, error) { path := fmt.Sprintf("/action/%s/export?format=%s", name, format) - body, _, _, err := c.Request("GET", path, nil) + body, _, _, err := c.Request(context.Background(), "GET", path, nil) if err != nil { return nil, err } diff --git a/sdk/cdsclient/client_admin.go b/sdk/cdsclient/client_admin.go index 177c485c5f..67a9b03bc3 100644 --- a/sdk/cdsclient/client_admin.go +++ b/sdk/cdsclient/client_admin.go @@ -2,6 +2,7 @@ package cdsclient import ( "bytes" + "context" "net/url" "github.com/ovh/cds/sdk" @@ -9,7 +10,7 @@ import ( func (c *client) Services() ([]sdk.Service, error) { srvs := []sdk.Service{} - if _, err := c.GetJSON("/admin/services", &srvs); err != nil { + if _, err := c.GetJSON(context.Background(), "/admin/services", &srvs); err != nil { return nil, err } return srvs, nil @@ -17,7 +18,7 @@ func (c *client) Services() ([]sdk.Service, error) { func (c *client) ServicesByName(name string) (*sdk.Service, error) { srv := sdk.Service{} - if _, err := c.GetJSON("/admin/service/"+name, &srv); err != nil { + if _, err := c.GetJSON(context.Background(), "/admin/service/"+name, &srv); err != nil { return nil, err } return &srv, nil @@ -25,30 +26,30 @@ func (c *client) ServicesByName(name string) (*sdk.Service, error) { func (c *client) ServicesByType(stype string) ([]sdk.Service, error) { srvs := []sdk.Service{} - if _, err := c.GetJSON("/admin/services?type="+stype, &srvs); err != nil { + if _, err := c.GetJSON(context.Background(), "/admin/services?type="+stype, &srvs); err != nil { return nil, err } return srvs, nil } func (c *client) ServiceCallGET(stype string, query string) ([]byte, error) { - btes, _, _, err := c.Request("GET", "/admin/services/call?type="+stype+"&query="+url.QueryEscape(query), nil) + btes, _, _, err := c.Request(context.Background(), "GET", "/admin/services/call?type="+stype+"&query="+url.QueryEscape(query), nil) return btes, err } func (c *client) ServiceCallPOST(stype string, query string, body []byte) ([]byte, error) { rBody := bytes.NewReader(body) - btes, _, _, err := c.Request("POST", "/admin/services/call?type="+stype+"&query="+url.QueryEscape(query), rBody) + btes, _, _, err := c.Request(context.Background(), "POST", "/admin/services/call?type="+stype+"&query="+url.QueryEscape(query), rBody) return btes, err } func (c *client) ServiceCallPUT(stype string, query string, body []byte) ([]byte, error) { rBody := bytes.NewReader(body) - btes, _, _, err := c.Request("PUT", "/admin/services/call?type="+stype+"&query="+url.QueryEscape(query), rBody) + btes, _, _, err := c.Request(context.Background(), "PUT", "/admin/services/call?type="+stype+"&query="+url.QueryEscape(query), rBody) return btes, err } func (c *client) ServiceCallDELETE(stype string, query string) error { - _, _, _, err := c.Request("DELETE", "/admin/services/call?type="+stype+"&query="+url.QueryEscape(query), nil) + _, _, _, err := c.Request(context.Background(), "DELETE", "/admin/services/call?type="+stype+"&query="+url.QueryEscape(query), nil) return err } diff --git a/sdk/cdsclient/client_application.go b/sdk/cdsclient/client_application.go index 6ded95b083..387e152fc9 100644 --- a/sdk/cdsclient/client_application.go +++ b/sdk/cdsclient/client_application.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "encoding/json" "fmt" "io" @@ -10,18 +11,18 @@ import ( ) func (c *client) ApplicationCreate(key string, app *sdk.Application) error { - _, err := c.PostJSON("/project/"+key+"/applications", app, nil) + _, err := c.PostJSON(context.Background(), "/project/"+key+"/applications", app, nil) return err } func (c *client) ApplicationDelete(key string, appName string) error { - _, err := c.DeleteJSON("/project/"+key+"/application/"+appName, nil) + _, err := c.DeleteJSON(context.Background(), "/project/"+key+"/application/"+appName, nil) return err } func (c *client) ApplicationGet(key string, appName string, mods ...RequestModifier) (*sdk.Application, error) { app := &sdk.Application{} - if _, err := c.GetJSON("/project/"+key+"/application/"+appName, app, mods...); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+key+"/application/"+appName, app, mods...); err != nil { return nil, err } return app, nil @@ -29,7 +30,7 @@ func (c *client) ApplicationGet(key string, appName string, mods ...RequestModif func (c *client) ApplicationList(key string) ([]sdk.Application, error) { apps := []sdk.Application{} - if _, err := c.GetJSON("/project/"+key+"/applications", &apps); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+key+"/applications", &apps); err != nil { return nil, err } return apps, nil @@ -43,7 +44,7 @@ func (c *client) ApplicationGroupsImport(projectKey, appName string, content io. uri += "&forceUpdate=true" } - btes, _, _, errReq := c.Request("POST", uri, content) + btes, _, _, errReq := c.Request(context.Background(), "POST", uri, content) if errReq != nil { return app, errReq } @@ -58,6 +59,6 @@ func (c *client) ApplicationGroupsImport(projectKey, appName string, content io. //ApplicationAttachToReposistoriesManager attachs the application to the repo identified by its fullname in the reposManager func (c *client) ApplicationAttachToReposistoriesManager(projectKey, appName, reposManager, repoFullname string) error { uri := fmt.Sprintf("/project/%s/repositories_manager/%s/application/%s/attach?fullname=%s", projectKey, reposManager, appName, url.QueryEscape(repoFullname)) - _, _, _, err := c.Request("POST", uri, nil) + _, _, _, err := c.Request(context.Background(), "POST", uri, nil) return err } diff --git a/sdk/cdsclient/client_application_key.go b/sdk/cdsclient/client_application_key.go index d7cc370cc4..2adad74c8e 100644 --- a/sdk/cdsclient/client_application_key.go +++ b/sdk/cdsclient/client_application_key.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "net/url" "github.com/ovh/cds/sdk" @@ -8,18 +9,18 @@ import ( func (c *client) ApplicationKeysList(key string, appName string) ([]sdk.ApplicationKey, error) { k := []sdk.ApplicationKey{} - if _, err := c.GetJSON("/project/"+key+"/application/"+appName+"/keys", &k); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+key+"/application/"+appName+"/keys", &k); err != nil { return nil, err } return k, nil } func (c *client) ApplicationKeyCreate(projectKey string, appName string, keyApplication *sdk.ApplicationKey) error { - _, err := c.PostJSON("/project/"+projectKey+"/application/"+appName+"/keys", keyApplication, keyApplication) + _, err := c.PostJSON(context.Background(), "/project/"+projectKey+"/application/"+appName+"/keys", keyApplication, keyApplication) return err } func (c *client) ApplicationKeysDelete(projectKey string, appName string, keyName string) error { - _, _, _, err := c.Request("DELETE", "/project/"+projectKey+"/application/"+appName+"/keys/"+url.QueryEscape(keyName), nil) + _, _, _, err := c.Request(context.Background(), "DELETE", "/project/"+projectKey+"/application/"+appName+"/keys/"+url.QueryEscape(keyName), nil) return err } diff --git a/sdk/cdsclient/client_application_variable.go b/sdk/cdsclient/client_application_variable.go index 1957956727..d960eafdd1 100644 --- a/sdk/cdsclient/client_application_variable.go +++ b/sdk/cdsclient/client_application_variable.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "net/url" "github.com/ovh/cds/sdk" @@ -8,30 +9,30 @@ import ( func (c *client) ApplicationVariablesList(key string, appName string) ([]sdk.Variable, error) { k := []sdk.Variable{} - if _, err := c.GetJSON("/project/"+key+"/application/"+appName+"/variable", &k); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+key+"/application/"+appName+"/variable", &k); err != nil { return nil, err } return k, nil } func (c *client) ApplicationVariableCreate(projectKey string, appName string, variable *sdk.Variable) error { - _, err := c.PostJSON("/project/"+projectKey+"/application/"+appName+"/variable/"+url.QueryEscape(variable.Name), variable, variable) + _, err := c.PostJSON(context.Background(), "/project/"+projectKey+"/application/"+appName+"/variable/"+url.QueryEscape(variable.Name), variable, variable) return err } func (c *client) ApplicationVariableDelete(projectKey string, appName string, varName string) error { - _, _, _, err := c.Request("DELETE", "/project/"+projectKey+"/application/"+appName+"/variable/"+url.QueryEscape(varName), nil) + _, _, _, err := c.Request(context.Background(), "DELETE", "/project/"+projectKey+"/application/"+appName+"/variable/"+url.QueryEscape(varName), nil) return err } func (c *client) ApplicationVariableUpdate(projectKey string, appName string, variable *sdk.Variable) error { - _, err := c.PutJSON("/project/"+projectKey+"/application/"+appName+"/variable/"+url.QueryEscape(variable.Name), variable, variable, nil) + _, err := c.PutJSON(context.Background(), "/project/"+projectKey+"/application/"+appName+"/variable/"+url.QueryEscape(variable.Name), variable, variable, nil) return err } func (c *client) ApplicationVariableGet(projectKey string, appName string, varName string) (*sdk.Variable, error) { variable := &sdk.Variable{} - if _, err := c.GetJSON("/project/"+projectKey+"/application/"+appName+"/variable/"+url.QueryEscape(varName), variable, nil); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+projectKey+"/application/"+appName+"/variable/"+url.QueryEscape(varName), variable, nil); err != nil { return nil, err } return variable, nil diff --git a/sdk/cdsclient/client_broadcast.go b/sdk/cdsclient/client_broadcast.go index 984ca0c06b..283136b389 100644 --- a/sdk/cdsclient/client_broadcast.go +++ b/sdk/cdsclient/client_broadcast.go @@ -1,18 +1,19 @@ package cdsclient import ( + "context" "fmt" "github.com/ovh/cds/sdk" ) func (c *client) BroadcastDelete(id string) error { - _, err := c.DeleteJSON("/broadcast/"+id, nil) + _, err := c.DeleteJSON(context.Background(), "/broadcast/"+id, nil) return err } func (c *client) BroadcastCreate(broadcast *sdk.Broadcast) error { - code, err := c.PostJSON("/broadcast", broadcast, nil) + code, err := c.PostJSON(context.Background(), "/broadcast", broadcast, nil) if code != 201 { if err == nil { return fmt.Errorf("HTTP Code %d", code) @@ -23,7 +24,7 @@ func (c *client) BroadcastCreate(broadcast *sdk.Broadcast) error { func (c *client) BroadcastGet(id string) (*sdk.Broadcast, error) { bc := &sdk.Broadcast{} - if _, err := c.GetJSON("/broadcast/"+id, bc); err != nil { + if _, err := c.GetJSON(context.Background(), "/broadcast/"+id, bc); err != nil { return nil, err } return bc, nil @@ -31,7 +32,7 @@ func (c *client) BroadcastGet(id string) (*sdk.Broadcast, error) { func (c *client) Broadcasts() ([]sdk.Broadcast, error) { bcs := []sdk.Broadcast{} - if _, err := c.GetJSON("/broadcast", &bcs); err != nil { + if _, err := c.GetJSON(context.Background(), "/broadcast", &bcs); err != nil { return nil, err } return bcs, nil diff --git a/sdk/cdsclient/client_config.go b/sdk/cdsclient/client_config.go index fc5bb10caa..fcd2d4e221 100644 --- a/sdk/cdsclient/client_config.go +++ b/sdk/cdsclient/client_config.go @@ -1,8 +1,10 @@ package cdsclient +import "context" + func (c *client) ConfigUser() (map[string]string, error) { var res map[string]string - if _, err := c.GetJSON("/config/user", &res); err != nil { + if _, err := c.GetJSON(context.Background(), "/config/user", &res); err != nil { return nil, err } diff --git a/sdk/cdsclient/client_download.go b/sdk/cdsclient/client_download.go index bf6498d1b1..c37c34f45d 100644 --- a/sdk/cdsclient/client_download.go +++ b/sdk/cdsclient/client_download.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "encoding/json" "fmt" "net/http" @@ -12,7 +13,7 @@ import ( func (c *client) Download() ([]sdk.Download, error) { var res []sdk.Download - if _, err := c.GetJSON("/download", &res); err != nil { + if _, err := c.GetJSON(context.Background(), "/download", &res); err != nil { return nil, err } return res, nil diff --git a/sdk/cdsclient/client_environment.go b/sdk/cdsclient/client_environment.go index 5925468467..536cf554b5 100644 --- a/sdk/cdsclient/client_environment.go +++ b/sdk/cdsclient/client_environment.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "encoding/json" "fmt" "io" @@ -10,14 +11,14 @@ import ( ) func (c *client) EnvironmentCreate(key string, env *sdk.Environment) error { - if _, err := c.PostJSON("/project/"+key+"/environment", env, nil); err != nil { + if _, err := c.PostJSON(context.Background(), "/project/"+key+"/environment", env, nil); err != nil { return err } return nil } func (c *client) EnvironmentDelete(key string, envName string) error { - if _, err := c.DeleteJSON("/project/"+key+"/environment/"+url.QueryEscape(envName), nil, nil); err != nil { + if _, err := c.DeleteJSON(context.Background(), "/project/"+key+"/environment/"+url.QueryEscape(envName), nil, nil); err != nil { return err } return nil @@ -25,7 +26,7 @@ func (c *client) EnvironmentDelete(key string, envName string) error { func (c *client) EnvironmentGet(key string, envName string, mods ...RequestModifier) (*sdk.Environment, error) { env := &sdk.Environment{} - if _, err := c.GetJSON("/project/"+key+"/environment/"+url.QueryEscape(envName), env); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+key+"/environment/"+url.QueryEscape(envName), env); err != nil { return nil, err } return env, nil @@ -33,7 +34,7 @@ func (c *client) EnvironmentGet(key string, envName string, mods ...RequestModif func (c *client) EnvironmentList(key string) ([]sdk.Environment, error) { envs := []sdk.Environment{} - if _, err := c.GetJSON("/project/"+key+"/environment", &envs); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+key+"/environment", &envs); err != nil { return nil, err } return envs, nil @@ -47,7 +48,7 @@ func (c *client) EnvironmentGroupsImport(projectKey, envName string, content io. url += "&forceUpdate=true" } - btes, _, _, errReq := c.Request("POST", url, content) + btes, _, _, errReq := c.Request(context.Background(), "POST", url, content) if errReq != nil { return env, errReq } diff --git a/sdk/cdsclient/client_environment_key.go b/sdk/cdsclient/client_environment_key.go index d6e8e5e7cb..8256c8429d 100644 --- a/sdk/cdsclient/client_environment_key.go +++ b/sdk/cdsclient/client_environment_key.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "net/url" "github.com/ovh/cds/sdk" @@ -8,18 +9,18 @@ import ( func (c *client) EnvironmentKeysList(key string, envName string) ([]sdk.EnvironmentKey, error) { k := []sdk.EnvironmentKey{} - if _, err := c.GetJSON("/project/"+key+"/environment/"+url.QueryEscape(envName)+"/keys", &k); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+key+"/environment/"+url.QueryEscape(envName)+"/keys", &k); err != nil { return nil, err } return k, nil } func (c *client) EnvironmentKeyCreate(projectKey string, envName string, keyEnvironment *sdk.EnvironmentKey) error { - _, err := c.PostJSON("/project/"+projectKey+"/environment/"+url.QueryEscape(envName)+"/keys", keyEnvironment, keyEnvironment) + _, err := c.PostJSON(context.Background(), "/project/"+projectKey+"/environment/"+url.QueryEscape(envName)+"/keys", keyEnvironment, keyEnvironment) return err } func (c *client) EnvironmentKeysDelete(projectKey string, envName string, keyName string) error { - _, _, _, err := c.Request("DELETE", "/project/"+projectKey+"/environment/"+url.QueryEscape(envName)+"/keys/"+url.QueryEscape(keyName), nil) + _, _, _, err := c.Request(context.Background(), "DELETE", "/project/"+projectKey+"/environment/"+url.QueryEscape(envName)+"/keys/"+url.QueryEscape(keyName), nil) return err } diff --git a/sdk/cdsclient/client_environment_variable.go b/sdk/cdsclient/client_environment_variable.go index 89042c52da..19021de710 100644 --- a/sdk/cdsclient/client_environment_variable.go +++ b/sdk/cdsclient/client_environment_variable.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "net/url" "github.com/ovh/cds/sdk" @@ -8,30 +9,30 @@ import ( func (c *client) EnvironmentVariablesList(key string, envName string) ([]sdk.Variable, error) { k := []sdk.Variable{} - if _, err := c.GetJSON("/project/"+key+"/environment/"+url.QueryEscape(envName)+"/variable", &k); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+key+"/environment/"+url.QueryEscape(envName)+"/variable", &k); err != nil { return nil, err } return k, nil } func (c *client) EnvironmentVariableCreate(projectKey string, envName string, variable *sdk.Variable) error { - _, err := c.PostJSON("/project/"+projectKey+"/environment/"+url.QueryEscape(envName)+"/variable/"+url.QueryEscape(variable.Name), variable, variable) + _, err := c.PostJSON(context.Background(), "/project/"+projectKey+"/environment/"+url.QueryEscape(envName)+"/variable/"+url.QueryEscape(variable.Name), variable, variable) return err } func (c *client) EnvironmentVariableDelete(projectKey string, envName string, varName string) error { - _, _, _, err := c.Request("DELETE", "/project/"+projectKey+"/environment/"+url.QueryEscape(envName)+"/variable/"+url.QueryEscape(varName), nil) + _, _, _, err := c.Request(context.Background(), "DELETE", "/project/"+projectKey+"/environment/"+url.QueryEscape(envName)+"/variable/"+url.QueryEscape(varName), nil) return err } func (c *client) EnvironmentVariableUpdate(projectKey string, envName string, variable *sdk.Variable) error { - _, err := c.PutJSON("/project/"+projectKey+"/environment/"+url.QueryEscape(envName)+"/variable/"+url.QueryEscape(variable.Name), variable, variable, nil) + _, err := c.PutJSON(context.Background(), "/project/"+projectKey+"/environment/"+url.QueryEscape(envName)+"/variable/"+url.QueryEscape(variable.Name), variable, variable, nil) return err } func (c *client) EnvironmentVariableGet(projectKey string, envName string, varName string) (*sdk.Variable, error) { variable := &sdk.Variable{} - if _, err := c.GetJSON("/project/"+projectKey+"/environment/"+envName+"/variable/"+url.QueryEscape(varName), variable, nil); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+projectKey+"/environment/"+envName+"/variable/"+url.QueryEscape(varName), variable, nil); err != nil { return nil, err } return variable, nil diff --git a/sdk/cdsclient/client_export.go b/sdk/cdsclient/client_export.go index c7a5bf70ec..f0df09884f 100644 --- a/sdk/cdsclient/client_export.go +++ b/sdk/cdsclient/client_export.go @@ -3,6 +3,7 @@ package cdsclient import ( "archive/tar" "bytes" + "context" "fmt" "io/ioutil" @@ -38,7 +39,7 @@ func (c *client) ApplicationExport(projectKey, name string, exportWithPermission if exportWithPermissions { path += "&withPermissions=true" } - body, _, _, err := c.Request("GET", path, nil) + body, _, _, err := c.Request(context.Background(), "GET", path, nil) if err != nil { return nil, err } @@ -50,7 +51,7 @@ func (c *client) EnvironmentExport(projectKey, name string, exportWithPermission if exportWithPermissions { path += "&withPermissions=true" } - body, _, _, err := c.Request("GET", path, nil) + body, _, _, err := c.Request(context.Background(), "GET", path, nil) if err != nil { return nil, err } @@ -62,7 +63,7 @@ func (c *client) WorkflowExport(projectKey, name string, exportWithPermissions b if exportWithPermissions { path += "&withPermissions=true" } - bodyReader, _, _, err := c.Stream("GET", path, nil, true) + bodyReader, _, _, err := c.Stream(context.Background(), "GET", path, nil, true) if err != nil { return nil, err } @@ -80,7 +81,7 @@ func (c *client) WorkflowPull(projectKey, name string, exportWithPermissions boo if exportWithPermissions { path += "?withPermissions=true" } - body, _, _, err := c.Request("GET", path, nil) + body, _, _, err := c.Request(context.Background(), "GET", path, nil) if err != nil { return nil, err } diff --git a/sdk/cdsclient/client_group.go b/sdk/cdsclient/client_group.go index 6fba05dec7..312b1bfb17 100644 --- a/sdk/cdsclient/client_group.go +++ b/sdk/cdsclient/client_group.go @@ -1,13 +1,14 @@ package cdsclient import ( + "context" "fmt" "github.com/ovh/cds/sdk" ) func (c *client) GroupCreate(group *sdk.Group) error { - code, err := c.PostJSON("/group", group, nil) + code, err := c.PostJSON(context.Background(), "/group", group, nil) if code != 201 { if err == nil { return fmt.Errorf("HTTP Code %d", code) @@ -17,13 +18,13 @@ func (c *client) GroupCreate(group *sdk.Group) error { } func (c *client) GroupDelete(name string) error { - _, err := c.DeleteJSON("/group/"+name, nil, nil) + _, err := c.DeleteJSON(context.Background(), "/group/"+name, nil, nil) return err } func (c *client) GroupGet(name string, mods ...RequestModifier) (*sdk.Group, error) { group := &sdk.Group{} - if _, err := c.GetJSON("/group/"+name, group, mods...); err != nil { + if _, err := c.GetJSON(context.Background(), "/group/"+name, group, mods...); err != nil { return nil, err } return group, nil @@ -31,7 +32,7 @@ func (c *client) GroupGet(name string, mods ...RequestModifier) (*sdk.Group, err func (c *client) GroupList() ([]sdk.Group, error) { groups := []sdk.Group{} - if _, err := c.GetJSON("/group", &groups); err != nil { + if _, err := c.GetJSON(context.Background(), "/group", &groups); err != nil { return nil, err } return groups, nil @@ -39,12 +40,12 @@ func (c *client) GroupList() ([]sdk.Group, error) { func (c *client) GroupRename(oldGroupname, newGroupname string) error { group := &sdk.Group{} - if _, err := c.GetJSON("/group/"+oldGroupname, group); err != nil { + if _, err := c.GetJSON(context.Background(), "/group/"+oldGroupname, group); err != nil { return err } group.Name = newGroupname - code, err := c.PutJSON("/group/"+oldGroupname, group, nil) + code, err := c.PutJSON(context.Background(), "/group/"+oldGroupname, group, nil) if code > 400 { if err == nil { return fmt.Errorf("HTTP Code %d", code) diff --git a/sdk/cdsclient/client_group_token.go b/sdk/cdsclient/client_group_token.go index 567e6cd438..6004b49206 100644 --- a/sdk/cdsclient/client_group_token.go +++ b/sdk/cdsclient/client_group_token.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "fmt" "net/url" @@ -16,7 +17,7 @@ func (c *client) GroupGenerateToken(groupName, expiration, description string) ( }{Description: description, Expiration: expiration} var token sdk.Token - if _, err := c.PostJSON(path, desc, &token); err != nil { + if _, err := c.PostJSON(context.Background(), path, desc, &token); err != nil { return nil, err } return &token, nil @@ -26,7 +27,7 @@ func (c *client) GroupGenerateToken(groupName, expiration, description string) ( func (c *client) GroupDeleteToken(groupName string, tokenID int64) error { path := fmt.Sprintf("/group/%s/token/%d", url.QueryEscape(groupName), tokenID) - if _, err := c.DeleteJSON(path, nil); err != nil { + if _, err := c.DeleteJSON(context.Background(), path, nil); err != nil { return err } return nil @@ -37,7 +38,7 @@ func (c *client) GroupListToken(groupName string) ([]sdk.Token, error) { path := fmt.Sprintf("/group/%s/token", url.QueryEscape(groupName)) tokens := []sdk.Token{} - _, err := c.GetJSON(path, &tokens) + _, err := c.GetJSON(context.Background(), path, &tokens) return tokens, err } diff --git a/sdk/cdsclient/client_group_user.go b/sdk/cdsclient/client_group_user.go index a608dd60ac..7945ed2a9f 100644 --- a/sdk/cdsclient/client_group_user.go +++ b/sdk/cdsclient/client_group_user.go @@ -1,25 +1,26 @@ package cdsclient import ( + "context" "net/url" ) func (c *client) GroupUserAdd(groupname string, users []string) error { - _, err := c.PostJSON("/group/"+url.QueryEscape(groupname)+"/user", users, nil) + _, err := c.PostJSON(context.Background(), "/group/"+url.QueryEscape(groupname)+"/user", users, nil) return err } func (c *client) GroupUserRemove(groupname, username string) error { - _, _, _, err := c.Request("DELETE", "/group/"+url.QueryEscape(groupname)+"/user/"+url.QueryEscape(username), nil) + _, _, _, err := c.Request(context.Background(), "DELETE", "/group/"+url.QueryEscape(groupname)+"/user/"+url.QueryEscape(username), nil) return err } func (c *client) GroupUserAdminSet(groupname string, username string) error { - _, err := c.PostJSON("/group/"+url.QueryEscape(groupname)+"/user/"+url.QueryEscape(username)+"/admin", nil, nil) + _, err := c.PostJSON(context.Background(), "/group/"+url.QueryEscape(groupname)+"/user/"+url.QueryEscape(username)+"/admin", nil, nil) return err } func (c *client) GroupUserAdminRemove(groupname, username string) error { - _, _, _, err := c.Request("DELETE", "/group/"+url.QueryEscape(groupname)+"/user/"+url.QueryEscape(username)+"/admin", nil) + _, _, _, err := c.Request(context.Background(), "DELETE", "/group/"+url.QueryEscape(groupname)+"/user/"+url.QueryEscape(username)+"/admin", nil) return err } diff --git a/sdk/cdsclient/client_hatchery.go b/sdk/cdsclient/client_hatchery.go index bf269f1344..8c829c2163 100644 --- a/sdk/cdsclient/client_hatchery.go +++ b/sdk/cdsclient/client_hatchery.go @@ -1,14 +1,15 @@ package cdsclient import ( + "context" "fmt" "github.com/ovh/cds/sdk" ) -func (c *client) HatcheryCount(workflowNodeRunID int64) (int64, error) { +func (c *client) HatcheryCount(ctx context.Context, workflowNodeRunID int64) (int64, error) { var hatcheriesCount int64 - code, err := c.GetJSON(fmt.Sprintf("/hatchery/count/%d", workflowNodeRunID), &hatcheriesCount) + code, err := c.GetJSON(ctx, fmt.Sprintf("/hatchery/count/%d", workflowNodeRunID), &hatcheriesCount) if code > 300 && err == nil { return hatcheriesCount, fmt.Errorf("HatcheryCount> HTTP %d", code) } else if err != nil { diff --git a/sdk/cdsclient/client_import.go b/sdk/cdsclient/client_import.go index d747a128c5..8c45b71a3b 100644 --- a/sdk/cdsclient/client_import.go +++ b/sdk/cdsclient/client_import.go @@ -2,6 +2,7 @@ package cdsclient import ( "archive/tar" + "context" "encoding/json" "fmt" "io" @@ -20,7 +21,7 @@ func (c *client) PipelineImport(projectKey string, content io.Reader, format str url += "&forceUpdate=true" } - btes, _, code, errReq := c.Request("POST", url, content) + btes, _, code, errReq := c.Request(context.Background(), "POST", url, content) if errReq != nil { return nil, errReq } @@ -62,7 +63,7 @@ func (c *client) ApplicationImport(projectKey string, content io.Reader, format return nil, exportentities.ErrUnsupportedFormat } - btes, _, code, err := c.Request("POST", url, content, mods...) + btes, _, code, err := c.Request(context.Background(), "POST", url, content, mods...) if err != nil { return nil, err } @@ -111,7 +112,7 @@ func (c *client) EnvironmentImport(projectKey string, content io.Reader, format return nil, exportentities.ErrUnsupportedFormat } - btes, _, code, err := c.Request("POST", url, content, mods...) + btes, _, code, err := c.Request(context.Background(), "POST", url, content, mods...) if err != nil { return nil, err } @@ -153,7 +154,7 @@ func (c *client) WorkflowImport(projectKey string, content io.Reader, format str return nil, exportentities.ErrUnsupportedFormat } - btes, _, code, err := c.Request("POST", url, content, mods...) + btes, _, code, err := c.Request(context.Background(), "POST", url, content, mods...) if err != nil { return nil, err } @@ -178,7 +179,7 @@ func (c *client) WorkflowPush(projectKey string, tarContent io.Reader, mods ...R r.Header.Set("Content-Type", "application/tar") }) - btes, headers, code, err := c.Request("POST", url, tarContent, mods...) + btes, headers, code, err := c.Request(context.Background(), "POST", url, tarContent, mods...) if err != nil { return nil, nil, err } diff --git a/sdk/cdsclient/client_mon.go b/sdk/cdsclient/client_mon.go index c55931e77a..2446d306be 100644 --- a/sdk/cdsclient/client_mon.go +++ b/sdk/cdsclient/client_mon.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "encoding/json" "fmt" @@ -9,7 +10,7 @@ import ( func (c *client) MonStatus() (*sdk.MonitoringStatus, error) { monStatus := sdk.MonitoringStatus{} - if _, err := c.GetJSON("/mon/status", &monStatus); err != nil { + if _, err := c.GetJSON(context.Background(), "/mon/status", &monStatus); err != nil { return nil, err } return &monStatus, nil @@ -17,7 +18,7 @@ func (c *client) MonStatus() (*sdk.MonitoringStatus, error) { func (c *client) MonVersion() (*sdk.Version, error) { monVersion := sdk.Version{} - if _, err := c.GetJSON("/mon/version", &monVersion); err != nil { + if _, err := c.GetJSON(context.Background(), "/mon/version", &monVersion); err != nil { return nil, err } return &monVersion, nil @@ -25,14 +26,14 @@ func (c *client) MonVersion() (*sdk.Version, error) { func (c *client) MonDBMigrate() ([]sdk.MonDBMigrate, error) { monDBMigrate := []sdk.MonDBMigrate{} - if _, err := c.GetJSON("/mon/db/migrate", &monDBMigrate); err != nil { + if _, err := c.GetJSON(context.Background(), "/mon/db/migrate", &monDBMigrate); err != nil { return nil, err } return monDBMigrate, nil } func (c *client) MonErrorsGet(uuid string) (*sdk.Error, error) { - res, _, _, err := c.Request("GET", fmt.Sprintf("/mon/errors/%s", uuid), nil) + res, _, _, err := c.Request(context.Background(), "GET", fmt.Sprintf("/mon/errors/%s", uuid), nil) if err != nil { return nil, err } diff --git a/sdk/cdsclient/client_navbar.go b/sdk/cdsclient/client_navbar.go index 503494d653..81c5fd2806 100644 --- a/sdk/cdsclient/client_navbar.go +++ b/sdk/cdsclient/client_navbar.go @@ -1,12 +1,14 @@ package cdsclient import ( + "context" + "github.com/ovh/cds/sdk" ) func (c *client) Navbar() ([]sdk.NavbarProjectData, error) { navbar := []sdk.NavbarProjectData{} - if _, err := c.GetJSON("/navbar", &navbar); err != nil { + if _, err := c.GetJSON(context.Background(), "/navbar", &navbar); err != nil { return nil, err } return navbar, nil diff --git a/sdk/cdsclient/client_pipeline.go b/sdk/cdsclient/client_pipeline.go index 05ca133a46..dd0bccbaac 100644 --- a/sdk/cdsclient/client_pipeline.go +++ b/sdk/cdsclient/client_pipeline.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "encoding/json" "fmt" "io" @@ -11,25 +12,25 @@ import ( func (c *client) PipelineGet(projectKey, name string) (*sdk.Pipeline, error) { pipeline := sdk.Pipeline{} - if _, err := c.GetJSON("/project/"+projectKey+"/pipeline/"+name, &pipeline); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+projectKey+"/pipeline/"+name, &pipeline); err != nil { return nil, err } return &pipeline, nil } func (c *client) PipelineCreate(projectKey string, pip *sdk.Pipeline) error { - _, err := c.PostJSON("/project/"+projectKey+"/pipeline", pip, nil) + _, err := c.PostJSON(context.Background(), "/project/"+projectKey+"/pipeline", pip, nil) return err } func (c *client) PipelineDelete(projectKey, name string) error { - _, err := c.DeleteJSON("/project/"+projectKey+"/pipeline/"+url.QueryEscape(name), nil, nil) + _, err := c.DeleteJSON(context.Background(), "/project/"+projectKey+"/pipeline/"+url.QueryEscape(name), nil, nil) return err } func (c *client) PipelineList(projectKey string) ([]sdk.Pipeline, error) { pipelines := []sdk.Pipeline{} - if _, err := c.GetJSON("/project/"+projectKey+"/pipeline", &pipelines); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+projectKey+"/pipeline", &pipelines); err != nil { return nil, err } return pipelines, nil @@ -43,7 +44,7 @@ func (c *client) PipelineGroupsImport(projectKey, pipelineName string, content i url += "&forceUpdate=true" } - btes, _, _, errReq := c.Request("POST", url, content) + btes, _, _, errReq := c.Request(context.Background(), "POST", url, content) if errReq != nil { return pip, errReq } diff --git a/sdk/cdsclient/client_platform.go b/sdk/cdsclient/client_platform.go index db47c4c1a1..f85c0668ac 100644 --- a/sdk/cdsclient/client_platform.go +++ b/sdk/cdsclient/client_platform.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "net/url" "github.com/ovh/cds/sdk" @@ -8,7 +9,7 @@ import ( func (c *client) PlatformModelList() ([]sdk.PlatformModel, error) { models := []sdk.PlatformModel{} - if _, err := c.GetJSON("/platform/models", &models); err != nil { + if _, err := c.GetJSON(context.Background(), "/platform/models", &models); err != nil { return nil, err } return models, nil @@ -16,28 +17,28 @@ func (c *client) PlatformModelList() ([]sdk.PlatformModel, error) { func (c *client) PlatformModelGet(name string) (sdk.PlatformModel, error) { var model sdk.PlatformModel - if _, err := c.GetJSON("/platform/models/"+url.QueryEscape(name), &model); err != nil { + if _, err := c.GetJSON(context.Background(), "/platform/models/"+url.QueryEscape(name), &model); err != nil { return model, err } return model, nil } func (c *client) PlatformModelAdd(m *sdk.PlatformModel) error { - if _, err := c.PostJSON("/platform/models", m, m); err != nil { + if _, err := c.PostJSON(context.Background(), "/platform/models", m, m); err != nil { return err } return nil } func (c *client) PlatformModelUpdate(m *sdk.PlatformModel) error { - if _, err := c.PutJSON("/platform/models/"+m.Name, m, m); err != nil { + if _, err := c.PutJSON(context.Background(), "/platform/models/"+m.Name, m, m); err != nil { return err } return nil } func (c *client) PlatformModelDelete(name string) error { - if _, err := c.DeleteJSON("/platform/models/"+name, nil, nil); err != nil { + if _, err := c.DeleteJSON(context.Background(), "/platform/models/"+name, nil, nil); err != nil { return err } return nil diff --git a/sdk/cdsclient/client_plugins.go b/sdk/cdsclient/client_plugins.go index 0e4f81e20b..6881b3fe6d 100644 --- a/sdk/cdsclient/client_plugins.go +++ b/sdk/cdsclient/client_plugins.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "fmt" "io" @@ -9,7 +10,7 @@ import ( func (c client) PluginsList() ([]sdk.GRPCPlugin, error) { res := []sdk.GRPCPlugin{} - if _, err := c.GetJSON("/admin/plugin", &res); err != nil { + if _, err := c.GetJSON(context.Background(), "/admin/plugin", &res); err != nil { return nil, err } return res, nil @@ -18,44 +19,44 @@ func (c client) PluginsList() ([]sdk.GRPCPlugin, error) { func (c client) PluginsGet(name string) (*sdk.GRPCPlugin, error) { path := "/admin/plugin/" + name res := sdk.GRPCPlugin{} - if _, err := c.GetJSON(path, &res); err != nil { + if _, err := c.GetJSON(context.Background(), path, &res); err != nil { return nil, err } return &res, nil } func (c client) PluginAdd(p *sdk.GRPCPlugin) error { - _, err := c.PostJSON("/admin/plugin", p, p) + _, err := c.PostJSON(context.Background(), "/admin/plugin", p, p) return err } func (c client) PluginUpdate(p *sdk.GRPCPlugin) error { - _, err := c.PutJSON("/admin/plugin/"+p.Name, p, p) + _, err := c.PutJSON(context.Background(), "/admin/plugin/"+p.Name, p, p) return err } func (c client) PluginDelete(name string) error { path := "/admin/plugin/" + name - _, err := c.DeleteJSON(path, nil) + _, err := c.DeleteJSON(context.Background(), path, nil) return err } func (c client) PluginAddBinary(p *sdk.GRPCPlugin, b *sdk.GRPCPluginBinary) error { path := fmt.Sprintf("/admin/plugin/%s/binary", p.Name) - _, err := c.PostJSON(path, b, b) + _, err := c.PostJSON(context.Background(), path, b, b) return err } func (c client) PluginDeleteBinary(name, os, arch string) error { path := fmt.Sprintf("/admin/plugin/%s/binary/%s/%s", name, os, arch) - _, err := c.DeleteJSON(path, nil, nil) + _, err := c.DeleteJSON(context.Background(), path, nil, nil) return err } func (c client) PluginGetBinaryInfos(name, os, arch string) (*sdk.GRPCPluginBinary, error) { path := fmt.Sprintf("/admin/plugin/%s/binary/%s/%s/infos", name, os, arch) var res sdk.GRPCPluginBinary - _, err := c.GetJSON(path, &res) + _, err := c.GetJSON(context.Background(), path, &res) return &res, err } @@ -64,7 +65,7 @@ func (c client) PluginGetBinary(name, os, arch string, w io.Writer) error { var reader io.ReadCloser var err error - reader, _, _, err = c.Stream("GET", path, nil, true) + reader, _, _, err = c.Stream(context.Background(), "GET", path, nil, true) if err != nil { return err } diff --git a/sdk/cdsclient/client_project.go b/sdk/cdsclient/client_project.go index 60b10922f8..be790816a5 100644 --- a/sdk/cdsclient/client_project.go +++ b/sdk/cdsclient/client_project.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "encoding/json" "fmt" "io" @@ -21,20 +22,20 @@ func (c *client) ProjectCreate(p *sdk.Project, groupName string) error { } } - if _, err := c.PostJSON("/project", p, nil); err != nil { + if _, err := c.PostJSON(context.Background(), "/project", p, nil); err != nil { return err } return nil } func (c *client) ProjectDelete(key string) error { - _, err := c.DeleteJSON("/project/"+key, nil, nil) + _, err := c.DeleteJSON(context.Background(), "/project/"+key, nil, nil) return err } func (c *client) ProjectGet(key string, mods ...RequestModifier) (*sdk.Project, error) { p := &sdk.Project{} - if _, err := c.GetJSON("/project/"+key, p, mods...); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+key, p, mods...); err != nil { return nil, err } return p, nil @@ -48,7 +49,7 @@ func (c *client) ProjectList(withApplications, withWorkflows bool, filters ...Fi path += fmt.Sprintf("&%s=%s", url.QueryEscape(f.Name), url.QueryEscape(f.Value)) } - if _, err := c.GetJSON(path, &p); err != nil { + if _, err := c.GetJSON(context.Background(), path, &p); err != nil { return nil, err } return p, nil @@ -62,7 +63,7 @@ func (c *client) ProjectGroupsImport(projectKey string, content io.Reader, forma url += "&forceUpdate=true" } - btes, _, _, errReq := c.Request("POST", url, content) + btes, _, _, errReq := c.Request(context.Background(), "POST", url, content) if errReq != nil { return proj, errReq } diff --git a/sdk/cdsclient/client_project_kafka.go b/sdk/cdsclient/client_project_kafka.go index 971d9d25c7..dabc6732b0 100644 --- a/sdk/cdsclient/client_project_kafka.go +++ b/sdk/cdsclient/client_project_kafka.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "fmt" "github.com/ovh/cds/sdk" @@ -8,7 +9,7 @@ import ( func (c *client) ProjectPlatform(projectKey string, platformName string, clearPassword bool) (sdk.ProjectPlatform, error) { var platform sdk.ProjectPlatform - if _, err := c.GetJSON(fmt.Sprintf("/project/%s/platforms/%s?clearPassword=%t", projectKey, platformName, clearPassword), &platform); err != nil { + if _, err := c.GetJSON(context.Background(), fmt.Sprintf("/project/%s/platforms/%s?clearPassword=%t", projectKey, platformName, clearPassword), &platform); err != nil { return platform, err } return platform, nil diff --git a/sdk/cdsclient/client_project_key.go b/sdk/cdsclient/client_project_key.go index 8e7dd810eb..b9b717d028 100644 --- a/sdk/cdsclient/client_project_key.go +++ b/sdk/cdsclient/client_project_key.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "net/url" "github.com/ovh/cds/sdk" @@ -8,18 +9,18 @@ import ( func (c *client) ProjectKeysList(key string) ([]sdk.ProjectKey, error) { k := []sdk.ProjectKey{} - if _, err := c.GetJSON("/project/"+key+"/keys", &k); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+key+"/keys", &k); err != nil { return nil, err } return k, nil } func (c *client) ProjectKeyCreate(projectKey string, keyProject *sdk.ProjectKey) error { - _, err := c.PostJSON("/project/"+projectKey+"/keys", keyProject, keyProject) + _, err := c.PostJSON(context.Background(), "/project/"+projectKey+"/keys", keyProject, keyProject) return err } func (c *client) ProjectKeysDelete(projectKey string, keyName string) error { - _, _, _, err := c.Request("DELETE", "/project/"+projectKey+"/keys/"+url.QueryEscape(keyName), nil) + _, _, _, err := c.Request(context.Background(), "DELETE", "/project/"+projectKey+"/keys/"+url.QueryEscape(keyName), nil) return err } diff --git a/sdk/cdsclient/client_project_variable.go b/sdk/cdsclient/client_project_variable.go index 23d96717a1..87e83addef 100644 --- a/sdk/cdsclient/client_project_variable.go +++ b/sdk/cdsclient/client_project_variable.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "net/url" "github.com/ovh/cds/sdk" @@ -8,30 +9,30 @@ import ( func (c *client) ProjectVariablesList(key string) ([]sdk.Variable, error) { k := []sdk.Variable{} - if _, err := c.GetJSON("/project/"+key+"/variable", &k); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+key+"/variable", &k); err != nil { return nil, err } return k, nil } func (c *client) ProjectVariableCreate(projectKey string, variable *sdk.Variable) error { - _, err := c.PostJSON("/project/"+projectKey+"/variable/"+url.QueryEscape(variable.Name), variable, variable) + _, err := c.PostJSON(context.Background(), "/project/"+projectKey+"/variable/"+url.QueryEscape(variable.Name), variable, variable) return err } func (c *client) ProjectVariableDelete(projectKey string, varName string) error { - _, _, _, err := c.Request("DELETE", "/project/"+projectKey+"/variable/"+url.QueryEscape(varName), nil) + _, _, _, err := c.Request(context.Background(), "DELETE", "/project/"+projectKey+"/variable/"+url.QueryEscape(varName), nil) return err } func (c *client) ProjectVariableUpdate(projectKey string, variable *sdk.Variable) error { - _, err := c.PutJSON("/project/"+projectKey+"/variable/"+url.QueryEscape(variable.Name), variable, variable, nil) + _, err := c.PutJSON(context.Background(), "/project/"+projectKey+"/variable/"+url.QueryEscape(variable.Name), variable, variable, nil) return err } func (c *client) ProjectVariableGet(projectKey string, varName string) (*sdk.Variable, error) { variable := &sdk.Variable{} - if _, err := c.GetJSON("/project/"+projectKey+"/variable/"+url.QueryEscape(varName), variable, nil); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+projectKey+"/variable/"+url.QueryEscape(varName), variable, nil); err != nil { return nil, err } return variable, nil @@ -43,7 +44,7 @@ func (c *client) VariableEncrypt(projectKey string, varName string, content stri Value: content, Type: sdk.SecretVariable, } - if _, err := c.PostJSON("/project/"+projectKey+"/encrypt", variable, variable); err != nil { + if _, err := c.PostJSON(context.Background(), "/project/"+projectKey+"/encrypt", variable, variable); err != nil { return nil, err } return variable, nil diff --git a/sdk/cdsclient/client_provider.go b/sdk/cdsclient/client_provider.go index 5986429e34..bada7a51a2 100644 --- a/sdk/cdsclient/client_provider.go +++ b/sdk/cdsclient/client_provider.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "fmt" "io" "io/ioutil" @@ -14,7 +15,7 @@ import ( func (c *client) ProjectsList(opts ...RequestModifier) ([]sdk.Project, error) { p := []sdk.Project{} path := fmt.Sprintf("/project") - if _, err := c.GetJSON(path, &p, opts...); err != nil { + if _, err := c.GetJSON(context.Background(), path, &p, opts...); err != nil { return nil, err } return p, nil @@ -22,7 +23,7 @@ func (c *client) ProjectsList(opts ...RequestModifier) ([]sdk.Project, error) { func (c *client) ApplicationsList(projectKey string, opts ...RequestModifier) ([]sdk.Application, error) { apps := []sdk.Application{} - if _, err := c.GetJSON("/project/"+projectKey+"/applications", &apps, opts...); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+projectKey+"/applications", &apps, opts...); err != nil { return nil, err } return apps, nil @@ -30,7 +31,7 @@ func (c *client) ApplicationsList(projectKey string, opts ...RequestModifier) ([ func (c *client) ApplicationDeploymentStrategyUpdate(projectKey, applicationName, platformName string, config sdk.PlatformConfig) error { path := fmt.Sprintf("/project/%s/application/%s/deployment/config/%s", projectKey, applicationName, platformName) - if _, err := c.PostJSON(path, config, nil); err != nil { + if _, err := c.PostJSON(context.Background(), path, config, nil); err != nil { return err } return nil @@ -38,7 +39,7 @@ func (c *client) ApplicationDeploymentStrategyUpdate(projectKey, applicationName func (c *client) ApplicationMetadataUpdate(projectKey, applicationName, key, value string) error { path := fmt.Sprintf("/project/%s/application/%s/metadata/%s", projectKey, applicationName, url.PathEscape(key)) - if _, _, _, err := c.Request("POST", path, strings.NewReader(value)); err != nil { + if _, _, _, err := c.Request(context.Background(), "POST", path, strings.NewReader(value)); err != nil { return err } return nil @@ -46,7 +47,7 @@ func (c *client) ApplicationMetadataUpdate(projectKey, applicationName, key, val func (c *client) WorkflowsList(projectKey string) ([]sdk.Workflow, error) { ws := []sdk.Workflow{} - if _, err := c.GetJSON("/project/"+projectKey+"/workflows", &ws); err != nil { + if _, err := c.GetJSON(context.Background(), "/project/"+projectKey+"/workflows", &ws); err != nil { return nil, err } return ws, nil @@ -55,7 +56,7 @@ func (c *client) WorkflowsList(projectKey string) ([]sdk.Workflow, error) { func (c *client) WorkflowLoad(projectKey, workflowName string) (*sdk.Workflow, error) { url := fmt.Sprintf("/project/%s/workflows/%s?withDeepPipelines=true", projectKey, workflowName) w := &sdk.Workflow{} - if _, err := c.GetJSON(url, &w); err != nil { + if _, err := c.GetJSON(context.Background(), url, &w); err != nil { return nil, err } return w, nil @@ -64,7 +65,7 @@ func (c *client) WorkflowLoad(projectKey, workflowName string) (*sdk.Workflow, e func (c *client) ProjectPlatformGet(projectKey string, platformName string, clearPassword bool) (sdk.ProjectPlatform, error) { path := fmt.Sprintf("/project/%s/platforms/%s?clearPassword=%v", projectKey, platformName, clearPassword) var pf sdk.ProjectPlatform - if _, err := c.GetJSON(path, &pf); err != nil { + if _, err := c.GetJSON(context.Background(), path, &pf); err != nil { return pf, err } return pf, nil @@ -73,7 +74,7 @@ func (c *client) ProjectPlatformGet(projectKey string, platformName string, clea func (c *client) ProjectPlatformList(projectKey string) ([]sdk.ProjectPlatform, error) { path := fmt.Sprintf("/project/%s/platforms", projectKey) var pfs []sdk.ProjectPlatform - if _, err := c.GetJSON(path, &pfs); err != nil { + if _, err := c.GetJSON(context.Background(), path, &pfs); err != nil { return pfs, err } return pfs, nil @@ -82,7 +83,7 @@ func (c *client) ProjectPlatformList(projectKey string) ([]sdk.ProjectPlatform, func (c *client) ProjectPlatformDelete(projectKey string, platformName string) error { path := fmt.Sprintf("/project/%s/platforms/%s", projectKey, platformName) var pf sdk.ProjectPlatform - if _, err := c.DeleteJSON(path, &pf); err != nil { + if _, err := c.DeleteJSON(context.Background(), path, &pf); err != nil { return err } return nil @@ -109,14 +110,14 @@ func (c *client) ProjectPlatformImport(projectKey string, content io.Reader, for oldPF, _ := c.ProjectPlatformGet(projectKey, pf.Name, false) if oldPF.Name == "" { path := fmt.Sprintf("/project/%s/platforms", projectKey) - if _, err := c.PostJSON(path, &pf, &pf); err != nil { + if _, err := c.PostJSON(context.Background(), path, &pf, &pf); err != nil { return pf, err } return pf, nil } path := fmt.Sprintf("/project/%s/platforms/%s", projectKey, pf.Name) - if _, err := c.PutJSON(path, &pf, &pf); err != nil { + if _, err := c.PutJSON(context.Background(), path, &pf, &pf); err != nil { return pf, err } return pf, nil diff --git a/sdk/cdsclient/client_queue.go b/sdk/cdsclient/client_queue.go index 7103445591..4ca897d24f 100644 --- a/sdk/cdsclient/client_queue.go +++ b/sdk/cdsclient/client_queue.go @@ -75,9 +75,11 @@ func (c *client) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJ if jobs != nil { queue := sdk.WorkflowQueue{} - if _, err := c.GetJSON("/queue/workflows", &queue); err != nil { + ctxt, cancel := context.WithTimeout(ctx, 10*time.Second) + if _, err := c.GetJSON(ctxt, "/queue/workflows", &queue); err != nil { errs <- sdk.WrapError(err, "Unable to load old jobs") } + cancel() if c.config.Verbose { fmt.Println("Old Jobs Queue size: ", len(queue)) @@ -105,11 +107,14 @@ func (c *client) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJ q.Add("modelType", modelType) } url.RawQuery = q.Encode() - _, header, _, errReq := c.RequestJSON(http.MethodGet, url.String(), nil, &queue, SetHeader(RequestedIfModifiedSinceHeader, t0.Format(time.RFC1123))) + ctxt, cancel := context.WithTimeout(ctx, 10*time.Second) + _, header, _, errReq := c.RequestJSON(ctxt, http.MethodGet, url.String(), nil, &queue, SetHeader(RequestedIfModifiedSinceHeader, t0.Format(time.RFC1123))) if errReq != nil { + cancel() errs <- sdk.WrapError(errReq, "Unable to load jobs") continue } + cancel() apiTimeHeader := header.Get(ResponseAPITimeHeader) @@ -152,12 +157,14 @@ func (c *client) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJ if pbjobs != nil { queue := []sdk.PipelineBuildJob{} - if _, err := c.GetJSON("/queue?status=all", &queue); err != nil { + ctxt, cancel := context.WithTimeout(ctx, 10*time.Second) + if _, err := c.GetJSON(ctxt, "/queue?status=all", &queue); err != nil { errs <- sdk.WrapError(err, "Unable to load pipeline build jobs") } for _, j := range queue { pbjobs <- j } + cancel() } } } @@ -175,7 +182,7 @@ func (c *client) QueueWorkflowNodeJobRun(status ...sdk.Status) ([]sdk.WorkflowNo url.RawQuery = q.Encode() } - if _, err := c.GetJSON(url.String(), &wJobs); err != nil { + if _, err := c.GetJSON(context.Background(), url.String(), &wJobs); err != nil { return nil, err } return wJobs, nil @@ -209,13 +216,13 @@ func (c *client) QueueCountWorkflowNodeJobRun(since *time.Time, until *time.Time func (c *client) QueuePipelineBuildJob() ([]sdk.PipelineBuildJob, error) { pbJobs := []sdk.PipelineBuildJob{} - if _, err := c.GetJSON("/queue?status=all", &pbJobs); err != nil { + if _, err := c.GetJSON(context.Background(), "/queue?status=all", &pbJobs); err != nil { return nil, err } return pbJobs, nil } -func (c *client) QueueTakeJob(job sdk.WorkflowNodeJobRun, isBooked bool) (*sdk.WorkflowNodeJobRunData, error) { +func (c *client) QueueTakeJob(ctx context.Context, job sdk.WorkflowNodeJobRun, isBooked bool) (*sdk.WorkflowNodeJobRunData, error) { in := sdk.WorkerTakeForm{ Time: time.Now(), Version: sdk.VERSION, @@ -229,7 +236,7 @@ func (c *client) QueueTakeJob(job sdk.WorkflowNodeJobRun, isBooked bool) (*sdk.W path := fmt.Sprintf("/queue/workflows/%d/take", job.ID) var info sdk.WorkflowNodeJobRunData - if _, err := c.PostJSON(path, &in, &info); err != nil { + if _, err := c.PostJSON(ctx, path, &in, &info); err != nil { return nil, err } return &info, nil @@ -240,40 +247,40 @@ func (c *client) QueueJobInfo(id int64) (*sdk.WorkflowNodeJobRun, error) { path := fmt.Sprintf("/queue/workflows/%d/infos", id) var job sdk.WorkflowNodeJobRun - if _, err := c.GetJSON(path, &job); err != nil { + if _, err := c.GetJSON(context.Background(), path, &job); err != nil { return nil, err } return &job, nil } // QueueJobSendSpawnInfo sends a spawn info on a job -func (c *client) QueueJobSendSpawnInfo(isWorkflowJob bool, id int64, in []sdk.SpawnInfo) error { +func (c *client) QueueJobSendSpawnInfo(ctx context.Context, isWorkflowJob bool, id int64, in []sdk.SpawnInfo) error { path := fmt.Sprintf("/queue/workflows/%d/spawn/infos", id) if !isWorkflowJob { // DEPRECATED code -> it's for pipelineBuildJob path = fmt.Sprintf("/queue/%d/spawn/infos", id) } - _, err := c.PostJSON(path, &in, nil) + _, err := c.PostJSON(ctx, path, &in, nil) return err } // QueueJobIncAttempts add hatcheryID that cannot run this job and return the spawn attempts list -func (c *client) QueueJobIncAttempts(jobID int64) ([]int64, error) { +func (c *client) QueueJobIncAttempts(ctx context.Context, jobID int64) ([]int64, error) { var spawnAttempts []int64 path := fmt.Sprintf("/queue/workflows/%d/attempt", jobID) - _, err := c.PostJSON(path, nil, &spawnAttempts) + _, err := c.PostJSON(ctx, path, nil, &spawnAttempts) return spawnAttempts, err } // QueueJobBook books a job for a Hatchery -func (c *client) QueueJobBook(isWorkflowJob bool, id int64) error { +func (c *client) QueueJobBook(ctx context.Context, isWorkflowJob bool, id int64) error { path := fmt.Sprintf("/queue/workflows/%d/book", id) if !isWorkflowJob { // DEPRECATED code -> it's for pipelineBuildJob path = fmt.Sprintf("/queue/%d/book", id) } - _, err := c.PostJSON(path, nil, nil) + _, err := c.PostJSON(ctx, path, nil, nil) return err } @@ -285,39 +292,42 @@ func (c *client) QueueJobRelease(isWorkflowJob bool, id int64) error { return fmt.Errorf("Not implemented") } - _, err := c.DeleteJSON(path, nil) + _, err := c.DeleteJSON(context.Background(), path, nil) return err } -func (c *client) QueueSendResult(id int64, res sdk.Result) error { +func (c *client) QueueSendResult(ctx context.Context, id int64, res sdk.Result) error { path := fmt.Sprintf("/queue/workflows/%d/result", id) - _, err := c.PostJSON(path, res, nil) + _, err := c.PostJSON(ctx, path, res, nil) return err } -func (c *client) QueueArtifactUpload(id int64, tag, filePath string) (bool, time.Duration, error) { +func (c *client) QueueArtifactUpload(ctx context.Context, id int64, tag, filePath string) (bool, time.Duration, error) { t0 := time.Now() store := new(sdk.ArtifactsStore) - _, _ = c.GetJSON("/artifact/store", store) + _, _ = c.GetJSON(ctx, "/artifact/store", store) if store.TemporaryURLSupported { - err := c.queueIndirectArtifactUpload(id, tag, filePath) + err := c.queueIndirectArtifactUpload(ctx, id, tag, filePath) return true, time.Since(t0), err } err := c.queueDirectArtifactUpload(id, tag, filePath) return false, time.Since(t0), err } -func (c *client) queueIndirectArtifactTempURL(id int64, art *sdk.WorkflowNodeRunArtifact) error { +func (c *client) queueIndirectArtifactTempURL(ctx context.Context, id int64, art *sdk.WorkflowNodeRunArtifact) error { var retryURL = 10 var globalURLErr error uri := fmt.Sprintf("/queue/workflows/%d/artifact/%s/url", id, art.Ref) for i := 0; i < retryURL; i++ { var code int - code, globalURLErr = c.PostJSON(uri, art, art) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + code, globalURLErr = c.PostJSON(ctx, uri, art, art) if code < 300 { + cancel() break } + cancel() time.Sleep(500 * time.Millisecond) } @@ -363,7 +373,7 @@ func (c *client) queueIndirectArtifactTempURLPost(url string, content []byte) er return globalErr } -func (c *client) queueIndirectArtifactUpload(id int64, tag, filePath string) error { +func (c *client) queueIndirectArtifactUpload(ctx context.Context, id int64, tag, filePath string) error { f, errop := os.Open(filePath) if errop != nil { return errop @@ -400,7 +410,7 @@ func (c *client) queueIndirectArtifactUpload(id int64, tag, filePath string) err Created: time.Now(), } - if err := c.queueIndirectArtifactTempURL(id, &art); err != nil { + if err := c.queueIndirectArtifactTempURL(ctx, id, &art); err != nil { return err } @@ -417,7 +427,7 @@ func (c *client) queueIndirectArtifactUpload(id int64, tag, filePath string) err if err := c.queueIndirectArtifactTempURLPost(art.TempURL, fileContent); err != nil { // If we got a 401 error from the objectstore, ask for a fresh temporary url and repost the artifact if strings.Contains(err.Error(), "401 Unauthorized: Temp URL invalid") { - if err := c.queueIndirectArtifactTempURL(id, &art); err != nil { + if err := c.queueIndirectArtifactTempURL(ctx, id, &art); err != nil { return err } @@ -433,10 +443,13 @@ func (c *client) queueIndirectArtifactUpload(id int64, tag, filePath string) err retry := 50 for i := 0; i < retry; i++ { uri := fmt.Sprintf("/queue/workflows/%d/artifact/%s/url/callback", id, art.Ref) - _, callbackErr = c.PostJSON(uri, &art, nil) + ctxt, cancel := context.WithTimeout(ctx, 5*time.Second) + _, callbackErr = c.PostJSON(ctxt, uri, &art, nil) if callbackErr == nil { + cancel() return nil } + cancel() } return callbackErr @@ -508,14 +521,14 @@ func (c *client) queueDirectArtifactUpload(id int64, tag, filePath string) error return fmt.Errorf("x%d: %v", c.config.Retry, err) } -func (c *client) QueueJobTag(jobID int64, tags []sdk.WorkflowRunTag) error { +func (c *client) QueueJobTag(ctx context.Context, jobID int64, tags []sdk.WorkflowRunTag) error { path := fmt.Sprintf("/queue/workflows/%d/tag", jobID) - _, err := c.PostJSON(path, tags, nil) + _, err := c.PostJSON(ctx, path, tags, nil) return err } -func (c *client) QueueServiceLogs(logs []sdk.ServiceLog) error { - status, err := c.PostJSON("/queue/workflows/log/service", logs, nil) +func (c *client) QueueServiceLogs(ctx context.Context, logs []sdk.ServiceLog) error { + status, err := c.PostJSON(ctx, "/queue/workflows/log/service", logs, nil) if status >= 400 { return fmt.Errorf("Error: HTTP code %d", status) } diff --git a/sdk/cdsclient/client_repositorires_manager.go b/sdk/cdsclient/client_repositorires_manager.go index 06fa642ba6..7b5638efe9 100644 --- a/sdk/cdsclient/client_repositorires_manager.go +++ b/sdk/cdsclient/client_repositorires_manager.go @@ -1,11 +1,15 @@ package cdsclient -import "github.com/ovh/cds/sdk" +import ( + "context" + + "github.com/ovh/cds/sdk" +) func (c *client) RepositoriesList(projectKey string, repoManager string) ([]sdk.VCSRepo, error) { repos := []sdk.VCSRepo{} path := "/project/" + projectKey + "/repositories_manager/" + repoManager + "/repos" - if _, err := c.GetJSON(path, &repos); err != nil { + if _, err := c.GetJSON(context.Background(), path, &repos); err != nil { return nil, err } return repos, nil diff --git a/sdk/cdsclient/client_services.go b/sdk/cdsclient/client_services.go index 5ee62c1144..816cfd228f 100644 --- a/sdk/cdsclient/client_services.go +++ b/sdk/cdsclient/client_services.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "fmt" "github.com/ovh/cds/sdk" @@ -12,7 +13,7 @@ func (c *client) GetService() *sdk.Service { } func (c *client) ServiceRegister(s sdk.Service) (string, error) { - code, err := c.PostJSON("/services/register", &s, &s) + code, err := c.PostJSON(context.Background(), "/services/register", &s, &s) if code != 201 && code != 200 { if err == nil { return "", fmt.Errorf("HTTP Code %d", code) diff --git a/sdk/cdsclient/client_user.go b/sdk/cdsclient/client_user.go index 1c89563856..3e41b021ec 100644 --- a/sdk/cdsclient/client_user.go +++ b/sdk/cdsclient/client_user.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "fmt" "net/http" "net/url" @@ -23,7 +24,7 @@ func (c *client) UserLogin(username, password string) (bool, string, error) { Token string `json:"token,omitempty"` }{} - if _, err := c.PostJSON("/login", r, &response); err != nil { + if _, err := c.PostJSON(context.Background(), "/login", r, &response); err != nil { return false, "", err } @@ -35,7 +36,7 @@ func (c *client) UserLogin(username, password string) (bool, string, error) { func (c *client) UserList() ([]sdk.User, error) { res := []sdk.User{} - if _, err := c.GetJSON("/user", &res); err != nil { + if _, err := c.GetJSON(context.Background(), "/user", &res); err != nil { return nil, err } return res, nil @@ -51,7 +52,7 @@ func (c *client) UserSignup(username, fullname, email, callback string) error { Callback: callback, } - code, err := c.PostJSON("/user/signup", request, nil) + code, err := c.PostJSON(context.Background(), "/user/signup", request, nil) if err != nil { return err } @@ -63,7 +64,7 @@ func (c *client) UserSignup(username, fullname, email, callback string) error { func (c *client) UserGet(username string) (*sdk.User, error) { res := sdk.User{} - if _, err := c.GetJSON("/user/"+url.QueryEscape(username), &res); err != nil { + if _, err := c.GetJSON(context.Background(), "/user/"+url.QueryEscape(username), &res); err != nil { return nil, err } return &res, nil @@ -71,7 +72,7 @@ func (c *client) UserGet(username string) (*sdk.User, error) { func (c *client) UserGetGroups(username string) (map[string][]sdk.Group, error) { res := map[string][]sdk.Group{} - if _, err := c.GetJSON("/user/"+url.QueryEscape(username)+"/groups", &res); err != nil { + if _, err := c.GetJSON(context.Background(), "/user/"+url.QueryEscape(username)+"/groups", &res); err != nil { return nil, err } return res, nil @@ -85,7 +86,7 @@ func (c *client) UserReset(username, email, callback string) error { Callback: callback, } - code, err := c.PostJSON("/user/"+url.QueryEscape(username)+"/reset", req, nil) + code, err := c.PostJSON(context.Background(), "/user/"+url.QueryEscape(username)+"/reset", req, nil) if err != nil { return err } @@ -97,7 +98,7 @@ func (c *client) UserReset(username, email, callback string) error { func (c *client) UserConfirm(username, token string) (bool, string, error) { res := sdk.UserAPIResponse{} - if _, err := c.GetJSON("/user/"+url.QueryEscape(username)+"/confirm/"+url.QueryEscape(token), &res); err != nil { + if _, err := c.GetJSON(context.Background(), "/user/"+url.QueryEscape(username)+"/confirm/"+url.QueryEscape(token), &res); err != nil { return false, "", err } return true, res.Password, nil @@ -106,7 +107,7 @@ func (c *client) UserConfirm(username, token string) (bool, string, error) { // ListAllTokens Get all tokens that an user can access func (c *client) ListAllTokens() ([]sdk.Token, error) { tokens := []sdk.Token{} - if _, err := c.GetJSON("/user/token", &tokens); err != nil { + if _, err := c.GetJSON(context.Background(), "/user/token", &tokens); err != nil { return tokens, err } return tokens, nil @@ -115,7 +116,7 @@ func (c *client) ListAllTokens() ([]sdk.Token, error) { // FindToken Get a specific token with his value to have description func (c *client) FindToken(tokenValue string) (sdk.Token, error) { token := sdk.Token{} - if code, err := c.GetJSON("/user/token/"+url.QueryEscape(tokenValue), &token); err != nil { + if code, err := c.GetJSON(context.Background(), "/user/token/"+url.QueryEscape(tokenValue), &token); err != nil { if code == http.StatusNotFound { return token, sdk.ErrTokenNotFound } @@ -129,20 +130,20 @@ func (c *client) UpdateFavorite(params sdk.FavoriteParams) (interface{}, error) switch params.Type { case "workflow": var wf sdk.Workflow - if _, err := c.PostJSON("/user/favorite", params, &wf); err != nil { + if _, err := c.PostJSON(context.Background(), "/user/favorite", params, &wf); err != nil { return wf, err } return wf, nil case "project": var proj sdk.Project - if _, err := c.PostJSON("/user/favorite", params, &proj); err != nil { + if _, err := c.PostJSON(context.Background(), "/user/favorite", params, &proj); err != nil { return proj, err } return proj, nil } var res interface{} - if _, err := c.PostJSON("/user/favorite", params, &res); err != nil { + if _, err := c.PostJSON(context.Background(), "/user/favorite", params, &res); err != nil { return res, err } return res, nil diff --git a/sdk/cdsclient/client_version.go b/sdk/cdsclient/client_version.go index 13336f944f..9f66b6b0b8 100644 --- a/sdk/cdsclient/client_version.go +++ b/sdk/cdsclient/client_version.go @@ -1,12 +1,14 @@ package cdsclient import ( + "context" + "github.com/ovh/cds/sdk" ) func (c *client) Version() (*sdk.Version, error) { v := &sdk.Version{} - if _, err := c.GetJSON("/mon/version", v); err != nil { + if _, err := c.GetJSON(context.Background(), "/mon/version", v); err != nil { return nil, err } return v, nil diff --git a/sdk/cdsclient/client_worker.go b/sdk/cdsclient/client_worker.go index 4cd43cf16f..6615157c5d 100644 --- a/sdk/cdsclient/client_worker.go +++ b/sdk/cdsclient/client_worker.go @@ -1,39 +1,47 @@ package cdsclient import ( + "context" "fmt" "net/http" + "time" "github.com/ovh/cds/sdk" ) -func (c *client) WorkerList() ([]sdk.Worker, error) { +func (c *client) WorkerList(ctx context.Context) ([]sdk.Worker, error) { p := []sdk.Worker{} - if _, err := c.GetJSON("/worker", &p); err != nil { + if _, err := c.GetJSON(ctx, "/worker", &p); err != nil { return nil, err } return p, nil } -func (c *client) WorkerDisable(id string) error { +func (c *client) WorkerDisable(ctx context.Context, id string) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() url := fmt.Sprintf("/worker/%s/disable", id) - if _, err := c.PostJSON(url, nil, nil); err != nil { + if _, err := c.PostJSON(ctx, url, nil, nil); err != nil { return err } return nil } -func (c *client) WorkerRefresh() error { +func (c *client) WorkerRefresh(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() url := fmt.Sprintf("/worker/refresh") - if _, err := c.PostJSON(url, nil, nil); err != nil { + if _, err := c.PostJSON(ctx, url, nil, nil); err != nil { return err } return nil } -func (c *client) WorkerRegister(r sdk.WorkerRegistrationForm) (*sdk.Worker, bool, error) { +func (c *client) WorkerRegister(ctx context.Context, form sdk.WorkerRegistrationForm) (*sdk.Worker, bool, error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() var w sdk.Worker - code, err := c.PostJSON("/worker", r, &w) + code, err := c.PostJSON(ctx, "/worker", form, &w) if code == http.StatusUnauthorized { return nil, false, sdk.ErrUnauthorized } @@ -49,18 +57,20 @@ func (c *client) WorkerRegister(r sdk.WorkerRegistrationForm) (*sdk.Worker, bool return &w, w.Uptodate, nil } -func (c *client) WorkerSetStatus(s sdk.Status) error { +func (c *client) WorkerSetStatus(ctx context.Context, status sdk.Status) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() var uri string - switch s { + switch status { case sdk.StatusChecking: uri = fmt.Sprintf("/worker/checking") case sdk.StatusWaiting: uri = fmt.Sprintf("/worker/waiting") default: - return fmt.Errorf("Unsupported status : %s", s.String()) + return fmt.Errorf("Unsupported status: %s", status.String()) } - code, err := c.PostJSON(uri, nil, nil) + code, err := c.PostJSON(ctx, uri, nil, nil) if err != nil { return err } diff --git a/sdk/cdsclient/client_worker_model.go b/sdk/cdsclient/client_worker_model.go index f3e96d2486..9205b3bca7 100644 --- a/sdk/cdsclient/client_worker_model.go +++ b/sdk/cdsclient/client_worker_model.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "fmt" "net/url" @@ -9,7 +10,7 @@ import ( // WorkerModelBook books a worker model for register, used by hatcheries func (c *client) WorkerModelBook(id int64) error { - code, err := c.PutJSON(fmt.Sprintf("/worker/model/book/%d", id), nil, nil) + code, err := c.PutJSON(context.Background(), fmt.Sprintf("/worker/model/book/%d", id), nil, nil) if code > 300 && err == nil { return fmt.Errorf("WorkerModelBook> HTTP %d", code) } else if err != nil { @@ -52,7 +53,7 @@ func (c *client) workerModels(withDisabled bool, binary, state string) ([]sdk.Mo } var models []sdk.Model - if _, errr := c.GetJSON(uri, &models); errr != nil { + if _, errr := c.GetJSON(context.Background(), uri, &models); errr != nil { return nil, errr } return models, nil @@ -60,7 +61,7 @@ func (c *client) workerModels(withDisabled bool, binary, state string) ([]sdk.Mo func (c *client) WorkerModelSpawnError(id int64, info string) error { data := sdk.SpawnErrorForm{Error: info} - code, err := c.PutJSON(fmt.Sprintf("/worker/model/error/%d", id), &data, nil) + code, err := c.PutJSON(context.Background(), fmt.Sprintf("/worker/model/error/%d", id), &data, nil) if code > 300 && err == nil { return fmt.Errorf("WorkerModelSpawnError> HTTP %d", code) } else if err != nil { @@ -98,7 +99,7 @@ func (c *client) WorkerModelAdd(name, modelType, patternName string, dockerModel } modelCreated := sdk.Model{} - code, err := c.PostJSON(uri, model, &modelCreated) + code, err := c.PostJSON(context.Background(), uri, model, &modelCreated) if err != nil { return modelCreated, err } @@ -137,7 +138,7 @@ func (c *client) WorkerModelUpdate(ID int64, name string, modelType string, dock } modelUpdated := sdk.Model{} - code, err := c.PutJSON(uri, model, &modelUpdated) + code, err := c.PutJSON(context.Background(), uri, model, &modelUpdated) if err != nil { return modelUpdated, err } @@ -151,7 +152,7 @@ func (c *client) WorkerModelUpdate(ID int64, name string, modelType string, dock func (c *client) WorkerModel(name string) (sdk.Model, error) { uri := fmt.Sprintf("/worker/model?name=" + name) var model sdk.Model - _, err := c.GetJSON(uri, &model) + _, err := c.GetJSON(context.Background(), uri, &model) return model, err } @@ -162,6 +163,6 @@ func (c *client) WorkerModelDelete(name string) error { } uri := fmt.Sprintf("/worker/model/%d", wm.ID) - _, errDelete := c.DeleteJSON(uri, nil) + _, errDelete := c.DeleteJSON(context.Background(), uri, nil) return errDelete } diff --git a/sdk/cdsclient/client_workflow.go b/sdk/cdsclient/client_workflow.go index 3c003a4359..89f01d3239 100644 --- a/sdk/cdsclient/client_workflow.go +++ b/sdk/cdsclient/client_workflow.go @@ -2,6 +2,7 @@ package cdsclient import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -16,7 +17,7 @@ import ( func (c *client) WorkflowList(projectKey string) ([]sdk.Workflow, error) { url := fmt.Sprintf("/project/%s/workflows", projectKey) w := []sdk.Workflow{} - if _, err := c.GetJSON(url, &w); err != nil { + if _, err := c.GetJSON(context.Background(), url, &w); err != nil { return nil, err } return w, nil @@ -25,7 +26,7 @@ func (c *client) WorkflowList(projectKey string) ([]sdk.Workflow, error) { func (c *client) WorkflowGet(projectKey, workflowName string) (*sdk.Workflow, error) { url := fmt.Sprintf("/project/%s/workflows/%s", projectKey, workflowName) w := &sdk.Workflow{} - if _, err := c.GetJSON(url, &w); err != nil { + if _, err := c.GetJSON(context.Background(), url, &w); err != nil { return nil, err } return w, nil @@ -34,7 +35,7 @@ func (c *client) WorkflowGet(projectKey, workflowName string) (*sdk.Workflow, er func (c *client) WorkflowRunGet(projectKey string, workflowName string, number int64) (*sdk.WorkflowRun, error) { url := fmt.Sprintf("/project/%s/workflows/%s/runs/%d", projectKey, workflowName, number) run := sdk.WorkflowRun{} - if _, err := c.GetJSON(url, &run); err != nil { + if _, err := c.GetJSON(context.Background(), url, &run); err != nil { return nil, err } return &run, nil @@ -53,7 +54,7 @@ func (c *client) WorkflowRunSearch(projectKey string, offset, limit int64, filte path += fmt.Sprintf("&%s=%s", url.QueryEscape(f.Name), url.QueryEscape(f.Value)) } runs := []sdk.WorkflowRun{} - if _, err := c.GetJSON(path, &runs); err != nil { + if _, err := c.GetJSON(context.Background(), path, &runs); err != nil { return nil, err } return runs, nil @@ -69,21 +70,21 @@ func (c *client) WorkflowRunList(projectKey string, workflowName string, offset, url := fmt.Sprintf("/project/%s/workflows/%s/runs?offset=%d&limit=%d", projectKey, workflowName, offset, limit) runs := []sdk.WorkflowRun{} - if _, err := c.GetJSON(url, &runs); err != nil { + if _, err := c.GetJSON(context.Background(), url, &runs); err != nil { return nil, err } return runs, nil } func (c *client) WorkflowDelete(projectKey string, workflowName string) error { - _, err := c.DeleteJSON(fmt.Sprintf("/project/%s/workflows/%s", projectKey, workflowName), nil) + _, err := c.DeleteJSON(context.Background(), fmt.Sprintf("/project/%s/workflows/%s", projectKey, workflowName), nil) return err } func (c *client) WorkflowRunArtifacts(projectKey string, workflowName string, number int64) ([]sdk.WorkflowNodeRunArtifact, error) { url := fmt.Sprintf("/project/%s/workflows/%s/runs/%d/artifacts", projectKey, workflowName, number) arts := []sdk.WorkflowNodeRunArtifact{} - if _, err := c.GetJSON(url, &arts); err != nil { + if _, err := c.GetJSON(context.Background(), url, &arts); err != nil { return nil, err } return arts, nil @@ -92,7 +93,7 @@ func (c *client) WorkflowRunArtifacts(projectKey string, workflowName string, nu func (c *client) WorkflowNodeRun(projectKey string, workflowName string, number int64, nodeRunID int64) (*sdk.WorkflowNodeRun, error) { url := fmt.Sprintf("/project/%s/workflows/%s/runs/%d/nodes/%d", projectKey, workflowName, number, nodeRunID) run := sdk.WorkflowNodeRun{} - if _, err := c.GetJSON(url, &run); err != nil { + if _, err := c.GetJSON(context.Background(), url, &run); err != nil { return nil, err } return &run, nil @@ -101,7 +102,7 @@ func (c *client) WorkflowNodeRun(projectKey string, workflowName string, number func (c *client) WorkflowRunNumberGet(projectKey string, workflowName string) (*sdk.WorkflowRunNumber, error) { url := fmt.Sprintf("/project/%s/workflows/%s/runs/num", projectKey, workflowName) runNumber := sdk.WorkflowRunNumber{} - if _, err := c.GetJSON(url, &runNumber); err != nil { + if _, err := c.GetJSON(context.Background(), url, &runNumber); err != nil { return nil, err } return &runNumber, nil @@ -110,7 +111,7 @@ func (c *client) WorkflowRunNumberGet(projectKey string, workflowName string) (* func (c *client) WorkflowRunNumberSet(projectKey string, workflowName string, number int64) error { url := fmt.Sprintf("/project/%s/workflows/%s/runs/num", projectKey, workflowName) runNumber := sdk.WorkflowRunNumber{Num: number} - code, err := c.PostJSON(url, runNumber, nil) + code, err := c.PostJSON(context.Background(), url, runNumber, nil) if err != nil { return err } @@ -123,7 +124,7 @@ func (c *client) WorkflowRunNumberSet(projectKey string, workflowName string, nu func (c *client) WorkflowNodeRunJobStep(projectKey string, workflowName string, number int64, nodeRunID, job int64, step int) (*sdk.BuildState, error) { url := fmt.Sprintf("/project/%s/workflows/%s/runs/%d/nodes/%d/job/%d/step/%d", projectKey, workflowName, number, nodeRunID, job, step) buildState := sdk.BuildState{} - if _, err := c.GetJSON(url, &buildState); err != nil { + if _, err := c.GetJSON(context.Background(), url, &buildState); err != nil { return nil, err } return &buildState, nil @@ -138,7 +139,7 @@ func (c *client) WorkflowNodeRunArtifactDownload(projectKey string, workflowName url = a.TempURL } - reader, _, _, err = c.Stream("GET", url, nil, true) + reader, _, _, err = c.Stream(context.Background(), "GET", url, nil, true) if err != nil { return err } @@ -150,7 +151,7 @@ func (c *client) WorkflowNodeRunArtifactDownload(projectKey string, workflowName func (c *client) WorkflowNodeRunRelease(projectKey string, workflowName string, runNumber int64, nodeRunID int64, release sdk.WorkflowNodeRunRelease) error { url := fmt.Sprintf("/project/%s/workflows/%s/runs/%d/nodes/%d/release", projectKey, workflowName, runNumber, nodeRunID) - code, err := c.PostJSON(url, release, nil) + code, err := c.PostJSON(context.Background(), url, release, nil) if err != nil { return err } @@ -168,7 +169,7 @@ func (c *client) WorkflowRunFromHook(projectKey string, workflowName string, hoo url := fmt.Sprintf("/project/%s/workflows/%s/runs", projectKey, workflowName) content := sdk.WorkflowRunPostHandlerOption{Hook: &hook} run := &sdk.WorkflowRun{} - code, err := c.PostJSON(url, &content, run) + code, err := c.PostJSON(context.Background(), url, &content, run) if err != nil { return nil, err } @@ -192,7 +193,7 @@ func (c *client) WorkflowRunFromManual(projectKey string, workflowName string, m content.FromNodeIDs = []int64{fromNodeID} } run := &sdk.WorkflowRun{} - code, err := c.PostJSON(url, &content, run) + code, err := c.PostJSON(context.Background(), url, &content, run) if err != nil { return nil, err } @@ -207,7 +208,7 @@ func (c *client) WorkflowStop(projectKey string, workflowName string, number int url := fmt.Sprintf("/project/%s/workflows/%s/runs/%d/stop", projectKey, workflowName, number) run := &sdk.WorkflowRun{} - code, err := c.PostJSON(url, nil, run) + code, err := c.PostJSON(context.Background(), url, nil, run) if err != nil { return nil, err } @@ -222,7 +223,7 @@ func (c *client) WorkflowNodeStop(projectKey string, workflowName string, number url := fmt.Sprintf("/project/%s/workflows/%s/runs/%d/nodes/%d/stop", projectKey, workflowName, number, fromNodeID) nodeRun := &sdk.WorkflowNodeRun{} - code, err := c.PostJSON(url, nil, nodeRun) + code, err := c.PostJSON(context.Background(), url, nil, nodeRun) if err != nil { return nil, err } @@ -235,7 +236,7 @@ func (c *client) WorkflowNodeStop(projectKey string, workflowName string, number func (c *client) WorkflowCachePush(projectKey, ref string, tarContent io.Reader) error { store := new(sdk.ArtifactsStore) - _, _ = c.GetJSON("/artifact/store", store) + _, _ = c.GetJSON(context.Background(), "/artifact/store", store) if store.TemporaryURLSupported { err := c.workflowCachePushIndirectUpload(projectKey, ref, tarContent) return err @@ -254,7 +255,7 @@ func (c *client) workflowCachePushDirectUpload(projectKey, ref string, tarConten }), } - _, _, code, err := c.Stream("POST", url, tarContent, true, mods...) + _, _, code, err := c.Stream(context.Background(), "POST", url, tarContent, true, mods...) if err != nil { return err } @@ -270,7 +271,7 @@ func (c *client) workflowCachePushIndirectUpload(projectKey, ref string, tarCont url := fmt.Sprintf("/project/%s/cache/%s/url", projectKey, ref) cacheObj := sdk.Cache{} - code, err := c.PostJSON(url, nil, &cacheObj) + code, err := c.PostJSON(context.Background(), url, nil, &cacheObj) if err != nil { return err } @@ -326,13 +327,13 @@ func (c *client) workflowCachePushIndirectUploadPost(url string, tarContent io.R func (c *client) WorkflowCachePull(projectKey, ref string) (io.Reader, error) { downloadURL := fmt.Sprintf("/project/%s/cache/%s", projectKey, ref) store := new(sdk.ArtifactsStore) - _, _ = c.GetJSON("/artifact/store", store) + _, _ = c.GetJSON(context.Background(), "/artifact/store", store) if store.TemporaryURLSupported { url := fmt.Sprintf("/project/%s/cache/%s/url", projectKey, ref) var cacheObj sdk.Cache - code, err := c.GetJSON(url, &cacheObj) + code, err := c.GetJSON(context.Background(), url, &cacheObj) if err != nil { return nil, err } @@ -348,7 +349,7 @@ func (c *client) WorkflowCachePull(projectKey, ref string) (io.Reader, error) { }), } - res, _, code, err := c.Stream("GET", downloadURL, nil, true, mods...) + res, _, code, err := c.Stream(context.Background(), "GET", downloadURL, nil, true, mods...) if err != nil { return nil, err } diff --git a/sdk/cdsclient/client_workflow_as_code.go b/sdk/cdsclient/client_workflow_as_code.go index c250f5a284..07852a9106 100644 --- a/sdk/cdsclient/client_workflow_as_code.go +++ b/sdk/cdsclient/client_workflow_as_code.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "fmt" "github.com/ovh/cds/sdk" @@ -12,7 +13,7 @@ func (c *client) WorkflowAsCodeStart(projectKey string, repoURL string, repoStra ope.RepositoryStrategy = repoStrategy path := fmt.Sprintf("/import/%s", projectKey) - if _, err := c.PostJSON(path, ope, ope); err != nil { + if _, err := c.PostJSON(context.Background(), path, ope, ope); err != nil { return nil, err } @@ -22,7 +23,7 @@ func (c *client) WorkflowAsCodeStart(projectKey string, repoURL string, repoStra func (c *client) WorkflowAsCodeInfo(projectKey string, operationID string) (*sdk.Operation, error) { ope := new(sdk.Operation) path := fmt.Sprintf("/import/%s/%s", projectKey, operationID) - if _, err := c.GetJSON(path, ope); err != nil { + if _, err := c.GetJSON(context.Background(), path, ope); err != nil { return nil, err } return ope, nil @@ -31,7 +32,7 @@ func (c *client) WorkflowAsCodeInfo(projectKey string, operationID string) (*sdk func (c *client) WorkflowAsCodePerform(projectKey string, operationID string) ([]string, error) { messages := []string{} path := fmt.Sprintf("/import/%s/%s/perform", projectKey, operationID) - if _, err := c.PostJSON(path, nil, &messages); err != nil { + if _, err := c.PostJSON(context.Background(), path, nil, &messages); err != nil { return nil, err } return messages, nil diff --git a/sdk/cdsclient/client_workflow_hooks.go b/sdk/cdsclient/client_workflow_hooks.go index 17ac7ec7f1..3bf9c33c11 100644 --- a/sdk/cdsclient/client_workflow_hooks.go +++ b/sdk/cdsclient/client_workflow_hooks.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "fmt" "github.com/ovh/cds/sdk" @@ -9,7 +10,7 @@ import ( func (c *client) WorkflowAllHooksList() ([]sdk.WorkflowNodeHook, error) { url := fmt.Sprintf("/workflow/hook") w := []sdk.WorkflowNodeHook{} - if _, err := c.GetJSON(url, &w); err != nil { + if _, err := c.GetJSON(context.Background(), url, &w); err != nil { return nil, err } return w, nil diff --git a/sdk/cdsclient/deprecated.go b/sdk/cdsclient/deprecated.go index c01f02f1a8..3a1a9b3c19 100644 --- a/sdk/cdsclient/deprecated.go +++ b/sdk/cdsclient/deprecated.go @@ -1,6 +1,7 @@ package cdsclient import ( + "context" "fmt" "net/url" @@ -9,7 +10,7 @@ import ( func (c *client) ApplicationPipelinesAttach(projectKey string, appName string, pipelineNames ...string) error { uri := fmt.Sprintf("/project/%s/application/%s/pipeline/attach", projectKey, appName) - code, err := c.PostJSON(uri, pipelineNames, nil) + code, err := c.PostJSON(context.Background(), uri, pipelineNames, nil) if err != nil { return err } @@ -29,7 +30,7 @@ func (c *client) ApplicationDoMigrationWorkflow(projectKey string, appName strin params.Add("withRepositoryWebHook", fmt.Sprintf("%t", withRepositoryWebHook)) uri := fmt.Sprintf("/project/%s/application/%s/workflow/migrate?%s", projectKey, appName, params.Encode()) - code, err := c.PostJSON(uri, nil, nil) + code, err := c.PostJSON(context.Background(), uri, nil, nil) if err != nil { return err } @@ -44,7 +45,7 @@ func (c *client) ApplicationDoMigrationWorkflow(projectKey string, appName strin func (c *client) ApplicationCleanOldWorkflow(projectKey string, appName string) error { uri := fmt.Sprintf("/project/%s/application/%s/workflow/clean", projectKey, appName) - code, err := c.PostJSON(uri, nil, nil) + code, err := c.PostJSON(context.Background(), uri, nil, nil) if err != nil { return err } @@ -63,7 +64,7 @@ func (c *client) ApplicationPipelineTriggerAdd(t *sdk.PipelineTrigger) error { uri = fmt.Sprintf("%s?env=%s", uri, url.QueryEscape(t.SrcEnvironment.Name)) } - code, err := c.PostJSON(uri, t, nil) + code, err := c.PostJSON(context.Background(), uri, t, nil) if err != nil { return err } @@ -83,7 +84,7 @@ func (c *client) ApplicationPipelineTriggersGet(projectKey string, appName strin } var triggers []sdk.PipelineTrigger - code, err := c.GetJSON(uri, &triggers) + code, err := c.GetJSON(context.Background(), uri, &triggers) if err != nil { return nil, err } @@ -103,7 +104,7 @@ func (c *client) AddHookOnRepositoriesManager(projectKey, appName, reposManager, } app := &sdk.Application{} - code, err := c.PostJSON(uri, data, app) + code, err := c.PostJSON(context.Background(), uri, data, app) if err != nil { return err } diff --git a/sdk/cdsclient/http.go b/sdk/cdsclient/http.go index 3e5a04c4ba..43d8ed043c 100644 --- a/sdk/cdsclient/http.go +++ b/sdk/cdsclient/http.go @@ -2,6 +2,7 @@ package cdsclient import ( "bytes" + "context" "encoding/base64" "encoding/json" "fmt" @@ -13,6 +14,7 @@ import ( "strings" "github.com/ovh/cds/sdk" + "github.com/ovh/cds/sdk/tracingutils" ) const ( @@ -60,37 +62,37 @@ func SetHeader(key, value string) RequestModifier { } // PostJSON post the *in* struct as json. If set, it unmarshalls the response to *out* -func (c *client) PostJSON(path string, in interface{}, out interface{}, mods ...RequestModifier) (int, error) { - _, _, code, err := c.RequestJSON(http.MethodPost, path, in, out, mods...) +func (c *client) PostJSON(ctx context.Context, path string, in interface{}, out interface{}, mods ...RequestModifier) (int, error) { + _, _, code, err := c.RequestJSON(ctx, http.MethodPost, path, in, out, mods...) return code, err } // PostJSON ut the *in* struct as json. If set, it unmarshalls the response to *out* -func (c *client) PutJSON(path string, in interface{}, out interface{}, mods ...RequestModifier) (int, error) { - _, _, code, err := c.RequestJSON(http.MethodPut, path, in, out, mods...) +func (c *client) PutJSON(ctx context.Context, path string, in interface{}, out interface{}, mods ...RequestModifier) (int, error) { + _, _, code, err := c.RequestJSON(ctx, http.MethodPut, path, in, out, mods...) return code, err } // GetJSON get the requested path If set, it unmarshalls the response to *out* -func (c *client) GetJSON(path string, out interface{}, mods ...RequestModifier) (int, error) { - _, _, code, err := c.RequestJSON(http.MethodGet, path, nil, out, mods...) +func (c *client) GetJSON(ctx context.Context, path string, out interface{}, mods ...RequestModifier) (int, error) { + _, _, code, err := c.RequestJSON(ctx, http.MethodGet, path, nil, out, mods...) return code, err } // GetJSONWithHeaders get the requested path If set, it unmarshalls the response to *out* and return response headers func (c *client) GetJSONWithHeaders(path string, out interface{}, mods ...RequestModifier) (http.Header, int, error) { - _, header, code, err := c.RequestJSON(http.MethodGet, path, nil, out, mods...) + _, header, code, err := c.RequestJSON(context.Background(), http.MethodGet, path, nil, out, mods...) return header, code, err } // DeleteJSON deletes the requested path If set, it unmarshalls the response to *out* -func (c *client) DeleteJSON(path string, out interface{}, mods ...RequestModifier) (int, error) { - _, _, code, err := c.RequestJSON(http.MethodDelete, path, nil, out, mods...) +func (c *client) DeleteJSON(ctx context.Context, path string, out interface{}, mods ...RequestModifier) (int, error) { + _, _, code, err := c.RequestJSON(ctx, http.MethodDelete, path, nil, out, mods...) return code, err } // RequestJSON does a request with the *in* struct as json. If set, it unmarshalls the response to *out* -func (c *client) RequestJSON(method, path string, in interface{}, out interface{}, mods ...RequestModifier) ([]byte, http.Header, int, error) { +func (c *client) RequestJSON(ctx context.Context, method, path string, in interface{}, out interface{}, mods ...RequestModifier) ([]byte, http.Header, int, error) { var b = []byte{} var err error @@ -106,7 +108,7 @@ func (c *client) RequestJSON(method, path string, in interface{}, out interface{ body = bytes.NewBuffer(b) } - res, header, code, err := c.Request(method, path, body, mods...) + res, header, code, err := c.Request(ctx, method, path, body, mods...) if err != nil { return nil, nil, code, err } @@ -128,8 +130,8 @@ func (c *client) RequestJSON(method, path string, in interface{}, out interface{ } // Request executes an authentificated HTTP request on $path given $method and $args -func (c *client) Request(method string, path string, body io.Reader, mods ...RequestModifier) ([]byte, http.Header, int, error) { - respBody, respHeader, code, err := c.Stream(method, path, body, false, mods...) +func (c *client) Request(ctx context.Context, method string, path string, body io.Reader, mods ...RequestModifier) ([]byte, http.Header, int, error) { + respBody, respHeader, code, err := c.Stream(ctx, method, path, body, false, mods...) if err != nil { return nil, nil, 0, err } @@ -162,7 +164,7 @@ func (c *client) Request(method string, path string, body io.Reader, mods ...Req } // Stream makes an authenticated http request and return io.ReadCloser -func (c *client) Stream(method string, path string, body io.Reader, noTimeout bool, mods ...RequestModifier) (io.ReadCloser, http.Header, int, error) { +func (c *client) Stream(ctx context.Context, method string, path string, body io.Reader, noTimeout bool, mods ...RequestModifier) (io.ReadCloser, http.Header, int, error) { var savederror error var bodyContent []byte @@ -186,6 +188,19 @@ func (c *client) Stream(method string, path string, body io.Reader, noTimeout bo continue } + req = req.WithContext(ctx) + + if c.config.Verbose { + log.Printf("Stream > context> %s\n", tracingutils.DumpContext(ctx)) + } + spanCtx, ok := tracingutils.ContextToSpanContext(ctx) + if c.config.Verbose { + log.Printf("setup tracing = %v (%v) on request to %s\n", ok, spanCtx, req.URL.String()) + } + if ok { + tracingutils.DefaultFormat.SpanContextToRequest(spanCtx, req) + } + for i := range mods { if mods[i] != nil { mods[i](req) diff --git a/sdk/cdsclient/interface.go b/sdk/cdsclient/interface.go index 0532a543d8..262b87b42d 100644 --- a/sdk/cdsclient/interface.go +++ b/sdk/cdsclient/interface.go @@ -141,7 +141,7 @@ type GroupClient interface { // HatcheryClient exposes hatcheries related functions type HatcheryClient interface { - HatcheryCount(wfNodeRunID int64) (int64, error) + HatcheryCount(ctx context.Context, wfNodeRunID int64) (int64, error) } // BroadcastClient expose all function for CDS Broadcasts @@ -198,16 +198,16 @@ type QueueClient interface { QueueCountWorkflowNodeJobRun(since *time.Time, until *time.Time, modelType string, ratioService *int) (sdk.WorkflowNodeJobRunCount, error) QueuePipelineBuildJob() ([]sdk.PipelineBuildJob, error) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJobRun, pbjobs chan<- sdk.PipelineBuildJob, errs chan<- error, delay time.Duration, graceTime int, modelType string, ratioService *int, exceptWfJobID *int64) error - QueueTakeJob(sdk.WorkflowNodeJobRun, bool) (*sdk.WorkflowNodeJobRunData, error) - QueueJobBook(isWorkflowJob bool, id int64) error + QueueTakeJob(ctx context.Context, job sdk.WorkflowNodeJobRun, isBooked bool) (*sdk.WorkflowNodeJobRunData, error) + QueueJobBook(ctx context.Context, isWorkflowJob bool, id int64) error QueueJobRelease(isWorkflowJob bool, id int64) error QueueJobInfo(id int64) (*sdk.WorkflowNodeJobRun, error) - QueueJobSendSpawnInfo(isWorkflowJob bool, id int64, in []sdk.SpawnInfo) error - QueueSendResult(int64, sdk.Result) error - QueueArtifactUpload(id int64, tag, filePath string) (bool, time.Duration, error) - QueueJobTag(jobID int64, tags []sdk.WorkflowRunTag) error - QueueJobIncAttempts(jobID int64) ([]int64, error) - QueueServiceLogs(logs []sdk.ServiceLog) error + QueueJobSendSpawnInfo(ctx context.Context, isWorkflowJob bool, id int64, in []sdk.SpawnInfo) error + QueueSendResult(ctx context.Context, id int64, res sdk.Result) error + QueueArtifactUpload(ctx context.Context, id int64, tag, filePath string) (bool, time.Duration, error) + QueueJobTag(ctx context.Context, jobID int64, tags []sdk.WorkflowRunTag) error + QueueJobIncAttempts(ctx context.Context, jobID int64) ([]int64, error) + QueueServiceLogs(ctx context.Context, logs []sdk.ServiceLog) error } // UserClient exposes users functions @@ -227,9 +227,9 @@ type UserClient interface { // WorkerClient exposes workers functions type WorkerClient interface { WorkerModelBook(id int64) error - WorkerList() ([]sdk.Worker, error) - WorkerRefresh() error - WorkerDisable(id string) error + WorkerList(ctx context.Context) ([]sdk.Worker, error) + WorkerRefresh(ctx context.Context) error + WorkerDisable(ctx context.Context, id string) error WorkerModelAdd(name, modelType, patternName string, dockerModel *sdk.ModelDocker, vmModel *sdk.ModelVirtualMachine, groupID int64) (sdk.Model, error) WorkerModelUpdate(ID int64, name string, modelType string, dockerModel *sdk.ModelDocker, vmModel *sdk.ModelVirtualMachine, groupID int64) (sdk.Model, error) WorkerModel(name string) (sdk.Model, error) @@ -239,8 +239,8 @@ type WorkerClient interface { WorkerModels() ([]sdk.Model, error) WorkerModelsByBinary(binary string) ([]sdk.Model, error) WorkerModelsByState(state string) ([]sdk.Model, error) - WorkerRegister(sdk.WorkerRegistrationForm) (*sdk.Worker, bool, error) - WorkerSetStatus(sdk.Status) error + WorkerRegister(ctx context.Context, form sdk.WorkerRegistrationForm) (*sdk.Worker, bool, error) + WorkerSetStatus(ctx context.Context, status sdk.Status) error } // HookClient exposes functions used for hooks services @@ -332,11 +332,11 @@ type InterfaceDeprecated interface { // Raw is a low-level interface exposing HTTP functions type Raw interface { - PostJSON(path string, in interface{}, out interface{}, mods ...RequestModifier) (int, error) - PutJSON(path string, in interface{}, out interface{}, mods ...RequestModifier) (int, error) - GetJSON(path string, out interface{}, mods ...RequestModifier) (int, error) - DeleteJSON(path string, out interface{}, mods ...RequestModifier) (int, error) - Request(method string, path string, body io.Reader, mods ...RequestModifier) ([]byte, http.Header, int, error) + PostJSON(ctx context.Context, path string, in interface{}, out interface{}, mods ...RequestModifier) (int, error) + PutJSON(ctx context.Context, path string, in interface{}, out interface{}, mods ...RequestModifier) (int, error) + GetJSON(ctx context.Context, path string, out interface{}, mods ...RequestModifier) (int, error) + DeleteJSON(ctx context.Context, path string, out interface{}, mods ...RequestModifier) (int, error) + Request(ctx context.Context, method string, path string, body io.Reader, mods ...RequestModifier) ([]byte, http.Header, int, error) } // GRPCPluginsClient exposes plugins API diff --git a/sdk/hatchery/hatchery.go b/sdk/hatchery/hatchery.go index e417b00e6d..6cbebf7d62 100644 --- a/sdk/hatchery/hatchery.go +++ b/sdk/hatchery/hatchery.go @@ -136,15 +136,19 @@ func Create(h Interface) error { } } if !found { - if hCount, err := h.CDSClient().HatcheryCount(startWorkerRes.request.workflowNodeRunID); err == nil { + ctxHatcheryCount, cancelHatcheryCount := context.WithTimeout(ctx, 10*time.Second) + if hCount, err := h.CDSClient().HatcheryCount(ctxHatcheryCount, startWorkerRes.request.workflowNodeRunID); err == nil { if int64(len(startWorkerRes.request.spawnAttempts)) < hCount { - if _, errQ := h.CDSClient().QueueJobIncAttempts(startWorkerRes.request.id); errQ != nil { + ctxtJobIncAttemps, cancelJobIncAttemps := context.WithTimeout(ctx, 10*time.Second) + if _, errQ := h.CDSClient().QueueJobIncAttempts(ctxtJobIncAttemps, startWorkerRes.request.id); errQ != nil { log.Warning("Hatchery> Create> cannot inc spawn attempts %v", errQ) } + cancelJobIncAttemps() } } else { log.Warning("Hatchery> Create> cannot get hatchery count: %v", err) } + cancelHatcheryCount() } } } diff --git a/sdk/hatchery/pool.go b/sdk/hatchery/pool.go index ea8adf5d67..4716ad0889 100644 --- a/sdk/hatchery/pool.go +++ b/sdk/hatchery/pool.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "go.opencensus.io/stats" @@ -16,7 +17,9 @@ func WorkerPool(h Interface, status ...sdk.Status) ([]sdk.Worker, error) { ctx := WithTags(context.Background(), h) // First: call API - registeredWorkers, err := h.CDSClient().WorkerList() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + registeredWorkers, err := h.CDSClient().WorkerList(ctx) if err != nil { return nil, fmt.Errorf("unable to get registered workers: %v", err) } @@ -47,7 +50,7 @@ func WorkerPool(h Interface, status ...sdk.Status) ([]sdk.Worker, error) { } if !found && w.Status != sdk.StatusDisabled { log.Error("Hatchery > WorkerPool> Worker %s (status = %s) inconsistency", w.Name, w.Status.String()) - if err := h.CDSClient().WorkerDisable(w.ID); err != nil { + if err := h.CDSClient().WorkerDisable(ctx, w.ID); err != nil { log.Error("Hatchery > WorkerPool> Unable to disable worker [%s]%s", w.ID, w.Name) } w.Status = sdk.StatusDisabled diff --git a/sdk/hatchery/starter.go b/sdk/hatchery/starter.go index bac8d344df..f3c5f63e7f 100644 --- a/sdk/hatchery/starter.go +++ b/sdk/hatchery/starter.go @@ -143,13 +143,16 @@ func spawnWorkerForJob(h Interface, j workerStarterRequest) (bool, error) { } _, next := observability.Span(ctx, "hatchery.QueueJobBook") - if err := h.CDSClient().QueueJobBook(j.isWorkflowJob, j.id); err != nil { + ctxt, cancel := context.WithTimeout(ctx, 10*time.Second) + if err := h.CDSClient().QueueJobBook(ctxt, j.isWorkflowJob, j.id); err != nil { next() // perhaps already booked by another hatchery log.Info("hatchery> spawnWorkerForJob> %d - cannot book job %d %s: %s", j.timestamp, j.id, j.model.Name, err) + cancel() return false, nil } next() + cancel() log.Debug("hatchery> spawnWorkerForJob> %d - send book job %d %s by hatchery %d isWorkflowJob:%t", j.timestamp, j.id, j.model.Name, h.ID(), j.isWorkflowJob) start := time.Now() @@ -170,11 +173,13 @@ func spawnWorkerForJob(h Interface, j workerStarterRequest) (bool, error) { RemoteTime: time.Now(), Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoHatcheryErrorSpawn.ID, Args: []interface{}{h.Service().Name, fmt.Sprintf("%d", h.ID()), j.model.Name, sdk.Round(time.Since(start), time.Second).String(), errSpawn.Error()}}, }) - if err := h.CDSClient().QueueJobSendSpawnInfo(j.isWorkflowJob, j.id, infos); err != nil { + ctxt, cancel := context.WithTimeout(ctx, 10*time.Second) + if err := h.CDSClient().QueueJobSendSpawnInfo(ctxt, j.isWorkflowJob, j.id, infos); err != nil { log.Warning("spawnWorkerForJob> %d - cannot client.QueueJobSendSpawnInfo for job (err spawn)%d: %s", j.timestamp, j.id, err) } log.Error("hatchery %s cannot spawn worker %s for job %d: %v", h.Service().Name, j.model.Name, j.id, errSpawn) next() + cancel() return false, nil } @@ -189,11 +194,13 @@ func spawnWorkerForJob(h Interface, j workerStarterRequest) (bool, error) { }, }) + ctxtJobSendSpawnInfo, cancelJobSendSpawnInfo := context.WithTimeout(ctx, 10*time.Second) _, next = observability.Span(ctx, "hatchery.QueueJobSendSpawnInfo", observability.Tag("status", "spawnOK")) - if err := h.CDSClient().QueueJobSendSpawnInfo(j.isWorkflowJob, j.id, infos); err != nil { + if err := h.CDSClient().QueueJobSendSpawnInfo(ctxtJobSendSpawnInfo, j.isWorkflowJob, j.id, infos); err != nil { next() log.Warning("spawnWorkerForJob> %d - cannot client.QueueJobSendSpawnInfo for job %d: %s", j.timestamp, j.id, err) } next() + cancelJobSendSpawnInfo() return true, nil // ok for this job } diff --git a/sdk/tracingutils/types.go b/sdk/tracingutils/types.go new file mode 100644 index 0000000000..98b5984fce --- /dev/null +++ b/sdk/tracingutils/types.go @@ -0,0 +1,9 @@ +package tracingutils + +import ( + "go.opencensus.io/plugin/ochttp/propagation/b3" + "go.opencensus.io/trace/propagation" +) + +// DefaultFormat used by observability as: observability.DefaultFormat.SpanContextToRequest +var DefaultFormat propagation.HTTPFormat = &b3.HTTPFormat{}