Skip to content

Commit

Permalink
feat(hatchery): add SSE metrics (#3436)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored and bnjjj committed Oct 10, 2018
1 parent 054fef0 commit d27c2c7
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 0 deletions.
10 changes: 10 additions & 0 deletions engine/hatchery/stats.go
Expand Up @@ -21,6 +21,9 @@ func (c *Common) initStats(hatcheryName string) error {
label := fmt.Sprintf("cds/%s/%s/jobs", c.ServiceName(), hatcheryName)
c.stats.Jobs = stats.Int64(label, "number of analyzed jobs", stats.UnitDimensionless)

label = fmt.Sprintf("cds/%s/%s/jobs_sse", c.ServiceName(), hatcheryName)
c.stats.JobsSSE = stats.Int64(label, "number of analyzed jobs from SSE", stats.UnitDimensionless)

label = fmt.Sprintf("cds/%s/%s/spawned_workers", c.ServiceName(), hatcheryName)
c.stats.SpawnedWorkers = stats.Int64(label, "number of spawned workers", stats.UnitDimensionless)

Expand Down Expand Up @@ -54,6 +57,13 @@ func (c *Common) initStats(hatcheryName string) error {
Aggregation: view.Count(),
TagKeys: tags,
},
&view.View{
Name: "jobs_sse_count",
Description: c.stats.JobsSSE.Description(),
Measure: c.stats.JobsSSE,
Aggregation: view.Count(),
TagKeys: tags,
},
&view.View{
Name: "spawned_worker_count",
Description: c.stats.SpawnedWorkers.Description(),
Expand Down
1 change: 1 addition & 0 deletions sdk/cdsclient/client_queue.go
Expand Up @@ -109,6 +109,7 @@ func (c *client) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJ

// push the job in the channel
if job.Status == sdk.StatusWaiting.String() && job.BookedBy.Name == "" {
job.Header["SSE"] = "true"
jobs <- *job
}
}()
Expand Down
11 changes: 11 additions & 0 deletions sdk/hatchery/hatchery.go
Expand Up @@ -261,6 +261,13 @@ func Create(h Interface) error {
observability.Tag(observability.TagProjectKey, p),
observability.Tag(observability.TagWorkflowNodeJobRun, j.ID),
)

if _, ok := j.Header["SSE"]; ok {
log.Debug("hatchery> received job from SSE")
observability.Current(currentCtx,
observability.Tag("from", "sse"),
)
}
}
endTrace := func(reason string) {
if reason != "" {
Expand All @@ -282,6 +289,10 @@ func Create(h Interface) error {

stats.Record(currentCtx, h.Stats().Jobs.M(1))

if _, ok := j.Header["SSE"]; ok {
stats.Record(currentCtx, h.Stats().JobsSSE.M(1))
}

//Check if the jobs is concerned by a pending worker creation
if _, exist := spawnIDs.Get(strconv.FormatInt(j.ID, 10)); exist {
log.Debug("job %d already spawned in previous routine", j.ID)
Expand Down
1 change: 1 addition & 0 deletions sdk/hatchery/types.go
Expand Up @@ -100,6 +100,7 @@ type Interface interface {

type Stats struct {
Jobs *stats.Int64Measure
JobsSSE *stats.Int64Measure
SpawnedWorkers *stats.Int64Measure
PendingWorkers *stats.Int64Measure
RegisteringWorkers *stats.Int64Measure
Expand Down

0 comments on commit d27c2c7

Please sign in to comment.