diff --git a/engine/hatchery/stats.go b/engine/hatchery/stats.go index b5bd6e22f6..1c97eaa6c7 100644 --- a/engine/hatchery/stats.go +++ b/engine/hatchery/stats.go @@ -21,6 +21,9 @@ func (c *Common) initStats(hatcheryName string) error { label := fmt.Sprintf("cds/%s/%s/jobs", c.ServiceName(), hatcheryName) c.stats.Jobs = stats.Int64(label, "number of analyzed jobs", stats.UnitDimensionless) + label = fmt.Sprintf("cds/%s/%s/jobs_sse", c.ServiceName(), hatcheryName) + c.stats.JobsSSE = stats.Int64(label, "number of analyzed jobs from SSE", stats.UnitDimensionless) + label = fmt.Sprintf("cds/%s/%s/spawned_workers", c.ServiceName(), hatcheryName) c.stats.SpawnedWorkers = stats.Int64(label, "number of spawned workers", stats.UnitDimensionless) @@ -54,6 +57,13 @@ func (c *Common) initStats(hatcheryName string) error { Aggregation: view.Count(), TagKeys: tags, }, + &view.View{ + Name: "jobs_sse_count", + Description: c.stats.JobsSSE.Description(), + Measure: c.stats.JobsSSE, + Aggregation: view.Count(), + TagKeys: tags, + }, &view.View{ Name: "spawned_worker_count", Description: c.stats.SpawnedWorkers.Description(), diff --git a/sdk/cdsclient/client_queue.go b/sdk/cdsclient/client_queue.go index 6ffce9a619..73f3ccc606 100644 --- a/sdk/cdsclient/client_queue.go +++ b/sdk/cdsclient/client_queue.go @@ -109,6 +109,7 @@ func (c *client) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJ // push the job in the channel if job.Status == sdk.StatusWaiting.String() && job.BookedBy.Name == "" { + job.Header["SSE"] = "true" jobs <- *job } }() diff --git a/sdk/hatchery/hatchery.go b/sdk/hatchery/hatchery.go index 850a4018ee..774df72120 100644 --- a/sdk/hatchery/hatchery.go +++ b/sdk/hatchery/hatchery.go @@ -261,6 +261,13 @@ func Create(h Interface) error { observability.Tag(observability.TagProjectKey, p), observability.Tag(observability.TagWorkflowNodeJobRun, j.ID), ) + + if _, ok := j.Header["SSE"]; ok { + log.Debug("hatchery> received job from SSE") + observability.Current(currentCtx, + observability.Tag("from", "sse"), + ) + } } endTrace := func(reason string) { if reason != "" { @@ -282,6 +289,10 @@ func Create(h Interface) error { stats.Record(currentCtx, h.Stats().Jobs.M(1)) + if _, ok := j.Header["SSE"]; ok { + stats.Record(currentCtx, h.Stats().JobsSSE.M(1)) + } + //Check if the jobs is concerned by a pending worker creation if _, exist := spawnIDs.Get(strconv.FormatInt(j.ID, 10)); exist { log.Debug("job %d already spawned in previous routine", j.ID) diff --git a/sdk/hatchery/types.go b/sdk/hatchery/types.go index dc9e8047ae..daec1d6474 100644 --- a/sdk/hatchery/types.go +++ b/sdk/hatchery/types.go @@ -100,6 +100,7 @@ type Interface interface { type Stats struct { Jobs *stats.Int64Measure + JobsSSE *stats.Int64Measure SpawnedWorkers *stats.Int64Measure PendingWorkers *stats.Int64Measure RegisteringWorkers *stats.Int64Measure