Skip to content

Commit

Permalink
feat(api): add trace on workflow queue (#3306)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault committed Sep 11, 2018
1 parent 7b89844 commit 0476cc2
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 17 deletions.
4 changes: 2 additions & 2 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ func (api *API) InitRouter() {
r.Handle("/build/{id}/step", r.POST(api.updateStepStatusHandler))

//Workflow queue
r.Handle("/queue/workflows", r.GET(api.getWorkflowJobQueueHandler))
r.Handle("/queue/workflows/count", r.GET(api.countWorkflowJobQueueHandler))
r.Handle("/queue/workflows", r.GET(api.getWorkflowJobQueueHandler, EnableTracing()))
r.Handle("/queue/workflows/count", r.GET(api.countWorkflowJobQueueHandler, EnableTracing()))
r.Handle("/queue/workflows/{id}/take", r.POST(api.postTakeWorkflowJobHandler, NeedWorker(), EnableTracing()))
r.Handle("/queue/workflows/{id}/book", r.POST(api.postBookWorkflowJobHandler, NeedHatchery(), EnableTracing()), r.DELETE(api.deleteBookWorkflowJobHandler, NeedHatchery(), EnableTracing()))
r.Handle("/queue/workflows/{id}/attempt", r.POST(api.postIncWorkflowJobAttemptHandler, NeedHatchery(), EnableTracing()))
Expand Down
2 changes: 1 addition & 1 deletion engine/api/mon_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (api *API) getMonDBTimesDBQueueWorkflow(ctx context.Context, r *http.Reques
usr = nil
}

if _, err := workflow.LoadNodeJobRunQueue(api.mustDB(), api.Cache, permissions, groupsID, usr, nil, nil, nil); err != nil {
if _, err := workflow.LoadNodeJobRunQueue(ctx, api.mustDB(), api.Cache, permissions, groupsID, usr, nil, nil, nil); err != nil {
return fmt.Sprintf("getMonDBTimesDBQueueWorkflow> Unable to load queue:: %s", err)
}
return elapsed("getMonDBTimesDBQueueWorkflow", s1)
Expand Down
19 changes: 16 additions & 3 deletions engine/api/workflow/dao_node_run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ func isSharedInfraGroup(groupsID []int64) bool {
}

// CountNodeJobRunQueue count all workflow_node_run_job accessible
func CountNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, groupsID []int64, usr *sdk.User, since *time.Time, until *time.Time, statuses ...string) (sdk.WorkflowNodeJobRunCount, error) {
func CountNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.Store, groupsID []int64, usr *sdk.User, since *time.Time, until *time.Time, statuses ...string) (sdk.WorkflowNodeJobRunCount, error) {
c := sdk.WorkflowNodeJobRunCount{}

queue, err := LoadNodeJobRunQueue(db, store, permission.PermissionRead, groupsID, usr, since, until, nil, statuses...)
queue, err := LoadNodeJobRunQueue(ctx, db, store, permission.PermissionRead, groupsID, usr, since, until, nil, statuses...)
if err != nil {
return c, sdk.WrapError(err, "CountNodeJobRunQueue> unable to load queue")
}
Expand All @@ -44,7 +44,9 @@ func CountNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, groupsID []int
}

// LoadNodeJobRunQueue load all workflow_node_run_job accessible
func LoadNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, rights int, groupsID []int64, usr *sdk.User, since *time.Time, until *time.Time, limit *int, statuses ...string) ([]sdk.WorkflowNodeJobRun, error) {
func LoadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.Store, rights int, groupsID []int64, usr *sdk.User, since *time.Time, until *time.Time, limit *int, statuses ...string) ([]sdk.WorkflowNodeJobRun, error) {
ctx, end := observability.Span(ctx, "LoadNodeJobRunQueue")
defer end()
if since == nil {
since = new(time.Time)
}
Expand All @@ -68,6 +70,7 @@ func LoadNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, rights int, gro
args := []interface{}{*since, *until, strings.Join(statuses, ",")}

if usr != nil && !usr.Admin {
observability.Current(ctx, observability.Tag("isAdmin", false))
query = `
SELECT DISTINCT workflow_node_run_job.*
FROM workflow_node_run_job
Expand Down Expand Up @@ -97,6 +100,8 @@ func LoadNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, rights int, gro
}
}
args = append(args, groupID, group.SharedInfraGroup.ID)
} else {
observability.Current(ctx, observability.Tag("isAdmin", true))
}

if limit != nil && *limit > 0 {
Expand All @@ -106,13 +111,20 @@ func LoadNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, rights int, gro

isSharedInfraGroup := isSharedInfraGroup(groupsID)
sqlJobs := []JobRun{}
_, next := observability.Span(ctx, "LoadNodeJobRunQueue.select")
if _, err := db.Select(&sqlJobs, query, args...); err != nil {
next()
return nil, sdk.WrapError(err, "workflow.LoadNodeJobRun> Unable to load job runs (Select)")
}
next()

ctx2, next2 := observability.Span(ctx, "LoadNodeJobRunQueue.sqlJobs")

jobs := make([]sdk.WorkflowNodeJobRun, 0, len(sqlJobs))
for i := range sqlJobs {
_, next3 := observability.Span(ctx2, "LoadNodeJobRunQueue.loadHatcheryInfo")
getHatcheryInfo(store, &sqlJobs[i])
next3()
jr, err := sqlJobs[i].WorkflowNodeRunJob()
if err != nil {
log.Error("LoadNodeJobRunQueue> WorkflowNodeRunJob error: %v", err)
Expand All @@ -139,6 +151,7 @@ func LoadNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, rights int, gro

jobs = append(jobs, jr)
}
next2()

return jobs, nil
}
Expand Down
21 changes: 12 additions & 9 deletions engine/api/workflow/run_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestManualRun1(t *testing.T) {
u, _ := assets.InsertAdminUser(db)
key := sdk.RandomString(10)
proj := assets.InsertTestProject(t, db, cache, key, key, u)
ctx := context.Background()

//First pipeline
pip := sdk.Pipeline{
Expand Down Expand Up @@ -123,7 +124,7 @@ func TestManualRun1(t *testing.T) {
test.Equal(t, lastrun.WorkflowNodeRuns[w1.RootID][0], nodeRun)

//TestLoadNodeJobRun
jobs, err := workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
jobs, err := workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
test.NoError(t, err)
test.Equal(t, 2, len(jobs))

Expand All @@ -150,6 +151,7 @@ func TestManualRun2(t *testing.T) {
u, _ := assets.InsertAdminUser(db)
key := sdk.RandomString(10)
proj := assets.InsertTestProject(t, db, cache, key, key, u)
ctx := context.Background()

//First pipeline
pip := sdk.Pipeline{
Expand Down Expand Up @@ -236,7 +238,7 @@ func TestManualRun2(t *testing.T) {
_, _, err = workflow.ManualRunFromNode(context.TODO(), db, cache, proj, w1, 1, &sdk.WorkflowNodeRunManual{User: *u}, w1.RootID)
test.NoError(t, err)

jobs, err := workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
jobs, err := workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
test.NoError(t, err)

assert.Len(t, jobs, 3)
Expand All @@ -247,6 +249,7 @@ func TestManualRun3(t *testing.T) {
u, _ := assets.InsertAdminUser(db)
key := sdk.RandomString(10)
proj := assets.InsertTestProject(t, db, cache, key, key, u)
ctx := context.Background()

test.NoError(t, project.AddKeyPair(db, proj, "key", u))

Expand Down Expand Up @@ -330,17 +333,17 @@ func TestManualRun3(t *testing.T) {
test.NoError(t, err)

// test nil since/until
_, err = workflow.CountNodeJobRunQueue(db, cache, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil)
_, err = workflow.CountNodeJobRunQueue(ctx, db, cache, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil)
test.NoError(t, err)

// queue should be empty with since 0,0 until 0,0
t0 := time.Unix(0, 0)
t1 := time.Unix(0, 0)
countAlreadyInQueueNone, err := workflow.CountNodeJobRunQueue(db, cache, []int64{proj.ProjectGroups[0].Group.ID}, u, &t0, &t1)
countAlreadyInQueueNone, err := workflow.CountNodeJobRunQueue(ctx, db, cache, []int64{proj.ProjectGroups[0].Group.ID}, u, &t0, &t1)
test.NoError(t, err)
assert.Equal(t, 0, int(countAlreadyInQueueNone.Count))

jobs, err := workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
jobs, err := workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
test.NoError(t, err)

for i := range jobs {
Expand Down Expand Up @@ -436,7 +439,7 @@ func TestManualRun3(t *testing.T) {
tx.Commit()
}

jobs, err = workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
jobs, err = workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
test.NoError(t, err)
assert.Equal(t, 1, len(jobs))

Expand All @@ -450,15 +453,15 @@ func TestManualRun3(t *testing.T) {

t0 := since.Add(-2 * time.Minute)
t1 := since.Add(-1 * time.Minute)
jobsSince, errW := workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, &t0, &t1, nil)
jobsSince, errW := workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, &t0, &t1, nil)
test.NoError(t, errW)
for _, job := range jobsSince {
if jobs[0].ID == job.ID {
assert.Fail(t, " this job should not be in queue since/until")
}
}

jobsSince, errW = workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, &since, nil, nil)
jobsSince, errW = workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, &since, nil, nil)
test.NoError(t, errW)
var found bool
for _, job := range jobsSince {
Expand All @@ -472,7 +475,7 @@ func TestManualRun3(t *testing.T) {

t0 = since.Add(10 * time.Second)
t1 = since.Add(15 * time.Second)
jobsSince, errW = workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, &t0, &t1, nil)
jobsSince, errW = workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, &t0, &t1, nil)
test.NoError(t, errW)
for _, job := range jobsSince {
if jobs[0].ID == job.ID {
Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ func (api *API) countWorkflowJobQueueHandler() service.Handler {
usr = nil
}

count, err := workflow.CountNodeJobRunQueue(api.mustDB(), api.Cache, groupsID, usr, &since, &until)
count, err := workflow.CountNodeJobRunQueue(ctx, api.mustDB(), api.Cache, groupsID, usr, &since, &until)
if err != nil {
return sdk.WrapError(err, "countWorkflowJobQueueHandler> Unable to count queue")
}
Expand Down Expand Up @@ -725,7 +725,7 @@ func (api *API) getWorkflowJobQueueHandler() service.Handler {
usr = nil
}

jobs, err := workflow.LoadNodeJobRunQueue(api.mustDB(), api.Cache, permissions, groupsID, usr, &since, &until, &limit, status...)
jobs, err := workflow.LoadNodeJobRunQueue(ctx, api.mustDB(), api.Cache, permissions, groupsID, usr, &since, &until, &limit, status...)
if err != nil {
return sdk.WrapError(err, "getWorkflowJobQueueHandler> Unable to load queue")
}
Expand Down

0 comments on commit 0476cc2

Please sign in to comment.