Skip to content

Commit

Permalink
feat(sdk, hatchery, worker): get queue from server side events (#3376)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored and bnjjj committed Oct 8, 2018
1 parent cd8441b commit ec92876
Show file tree
Hide file tree
Showing 16 changed files with 287 additions and 145 deletions.
2 changes: 1 addition & 1 deletion engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (api *API) InitRouter() {
r.Handle("/queue/workflows/{id}/take", r.POST(api.postTakeWorkflowJobHandler, NeedWorker(), EnableTracing()))
r.Handle("/queue/workflows/{id}/book", r.POST(api.postBookWorkflowJobHandler, NeedHatchery(), EnableTracing()), r.DELETE(api.deleteBookWorkflowJobHandler, NeedHatchery(), EnableTracing()))
r.Handle("/queue/workflows/{id}/attempt", r.POST(api.postIncWorkflowJobAttemptHandler, NeedHatchery(), EnableTracing()))
r.Handle("/queue/workflows/{id}/infos", r.GET(api.getWorkflowJobHandler, NeedWorker(), EnableTracing()))
r.Handle("/queue/workflows/{id}/infos", r.GET(api.getWorkflowJobHandler, NeedWorker(), NeedHatchery(), EnableTracing()))
r.Handle("/queue/workflows/{permID}/vulnerability", r.POSTEXECUTE(api.postVulnerabilityReportHandler, NeedWorker(), EnableTracing()))
r.Handle("/queue/workflows/{id}/spawn/infos", r.POST(r.Asynchronous(api.postSpawnInfosWorkflowJobHandler, 1), NeedHatchery(), EnableTracing()))
r.Handle("/queue/workflows/{permID}/result", r.POSTEXECUTE(api.postWorkflowJobResultHandler, NeedWorker(), EnableTracing()))
Expand Down
2 changes: 1 addition & 1 deletion engine/api/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func CheckServiceAuth(ctx context.Context, db *gorp.DbMap, store cache.Store, he

serviceHash := string(id)
if serviceHash == "" {
return ctx, fmt.Errorf("bad service id")
return ctx, fmt.Errorf("missing service Hash")
}

srv, err := GetService(db, store, serviceHash)
Expand Down
14 changes: 14 additions & 0 deletions engine/api/event/publish_workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,17 @@ func PublishWorkflowNodeRun(db gorp.SqlExecutor, nr sdk.WorkflowNodeRun, w sdk.W
}
publishRunWorkflow(e, w.ProjectKey, w.Name, appName, pipName, envName, nr.Number, nr.SubNumber, nr.Status, nil)
}

// PublishWorkflowNodeJobRun publish a WorkflowNodeJobRun
func PublishWorkflowNodeJobRun(db gorp.SqlExecutor, pkey, wname string, jr sdk.WorkflowNodeJobRun) {
e := sdk.EventRunWorkflowJob{
ID: jr.ID,
Status: jr.Status,
Start: jr.Start.Unix(),
}

if sdk.StatusIsTerminated(jr.Status) {
e.Done = jr.Done.Unix()
}
publishRunWorkflow(e, pkey, wname, "", "", "", 0, 0, jr.Status, nil)
}
35 changes: 17 additions & 18 deletions engine/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/permission"
"github.com/ovh/cds/engine/api/sessionstore"
"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
Expand Down Expand Up @@ -137,19 +137,10 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
return sdk.WrapError(fmt.Errorf("streaming unsupported"), "")
}

uuidSK, errS := sessionstore.NewSessionKey()
if errS != nil {
return sdk.WrapError(errS, "eventsBroker.Serve> Cannot generate UUID")
}
uuid := string(uuidSK)
user := getUser(ctx)
if err := loadUserPermissions(b.dbFunc(), b.cache, user); err != nil {
return sdk.WrapError(err, "eventsBroker.Serve Cannot load user permission")
}

uuid := sdk.UUID()
client := eventsBrokerSubscribe{
UUID: uuid,
User: user,
User: getUser(ctx),
Queue: make(chan sdk.Event, 10), // chan buffered, to avoid goroutine Start() wait on push in queue
}

Expand Down Expand Up @@ -237,38 +228,46 @@ func (b *eventsBroker) canSend(client eventsBrokerSubscribe) bool {
}

func (s *eventsBrokerSubscribe) manageEvent(event sdk.Event) bool {
var isSharedInfra bool
for _, g := range s.User.Groups {
if g.ID == group.SharedInfraGroup.ID {
isSharedInfra = true
break
}
}

if strings.HasPrefix(event.EventType, "sdk.EventProject") {
if s.User.Admin || permission.ProjectPermission(event.ProjectKey, s.User) >= permission.PermissionRead {
if s.User.Admin || isSharedInfra || permission.ProjectPermission(event.ProjectKey, s.User) >= permission.PermissionRead {
return true
}
return false
}
if strings.HasPrefix(event.EventType, "sdk.EventWorkflow") || strings.HasPrefix(event.EventType, "sdk.EventRunWorkflow") {
if s.User.Admin || permission.WorkflowPermission(event.ProjectKey, event.WorkflowName, s.User) >= permission.PermissionRead {
if s.User.Admin || isSharedInfra || permission.WorkflowPermission(event.ProjectKey, event.WorkflowName, s.User) >= permission.PermissionRead {
return true
}
return false
}
if strings.HasPrefix(event.EventType, "sdk.EventApplication") {
if s.User.Admin || permission.ApplicationPermission(event.ProjectKey, event.ApplicationName, s.User) >= permission.PermissionRead {
if s.User.Admin || isSharedInfra || permission.ApplicationPermission(event.ProjectKey, event.ApplicationName, s.User) >= permission.PermissionRead {
return true
}
return false
}
if strings.HasPrefix(event.EventType, "sdk.EventPipeline") {
if s.User.Admin || permission.PipelinePermission(event.ProjectKey, event.PipelineName, s.User) >= permission.PermissionRead {
if s.User.Admin || isSharedInfra || permission.PipelinePermission(event.ProjectKey, event.PipelineName, s.User) >= permission.PermissionRead {
return true
}
return false
}
if strings.HasPrefix(event.EventType, "sdk.EventEnvironment") {
if s.User.Admin || permission.EnvironmentPermission(event.ProjectKey, event.EnvironmentName, s.User) >= permission.PermissionRead {
if s.User.Admin || isSharedInfra || permission.EnvironmentPermission(event.ProjectKey, event.EnvironmentName, s.User) >= permission.PermissionRead {
return true
}
return false
}
if strings.HasPrefix(event.EventType, "sdk.EventBroadcast") {
if s.User.Admin || event.ProjectKey == "" || permission.AccessToProject(event.ProjectKey, s.User, permission.PermissionRead) {
if s.User.Admin || isSharedInfra || event.ProjectKey == "" || permission.AccessToProject(event.ProjectKey, s.User, permission.PermissionRead) {
return true
}
return false
Expand Down
6 changes: 2 additions & 4 deletions engine/api/grpc_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,8 @@ func (h *grpcHandlers) SendResult(c context.Context, res *sdk.Result) (*empty.Em
return new(empty.Empty), sdk.WrapError(err, "SendResult> Cannot post job result")
}

workflowRuns, workflowNodeRuns := workflow.GetWorkflowRunEventData(report, p.Key)
workflow.ResyncNodeRunsWithCommits(c, db, h.store, p, workflowNodeRuns)

go workflow.SendEvent(db, workflowRuns, workflowNodeRuns, p.Key)
workflow.ResyncNodeRunsWithCommits(c, db, h.store, p, report)
go workflow.SendEvent(db, p.Key, report)

return new(empty.Empty), nil
}
9 changes: 4 additions & 5 deletions engine/api/router_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strings"

"github.com/go-gorp/gorp"

"github.com/gorilla/mux"

"github.com/ovh/cds/engine/api/auth"
Expand Down Expand Up @@ -132,6 +131,10 @@ func (api *API) authMiddleware(ctx context.Context, w http.ResponseWriter, req *
return ctx, sdk.WrapError(sdk.ErrUnauthorized, "Router> Unable to find connected user")
}

if rc.Options["allowServices"] == "true" && getService(ctx) != nil {
return ctx, nil
}

if rc.Options["needHatchery"] == "true" && getHatchery(ctx) != nil {
return ctx, nil
}
Expand All @@ -144,10 +147,6 @@ func (api *API) authMiddleware(ctx context.Context, w http.ResponseWriter, req *
return ctx, nil
}

if rc.Options["allowServices"] == "true" && getService(ctx) != nil {
return ctx, nil
}

if getUser(ctx).Admin {
return ctx, nil
}
Expand Down
5 changes: 5 additions & 0 deletions engine/api/workflow/execute_node_job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type ProcessorReport struct {
errors []error
}

// WorkflowRuns returns the list of concerned workflow runs
func (r *ProcessorReport) WorkflowRuns() []sdk.WorkflowRun {
return r.workflows
}

// Add something to the report
func (r *ProcessorReport) Add(i ...interface{}) {
r.mutex.Lock()
Expand Down
3 changes: 2 additions & 1 deletion engine/api/workflow/resync_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func ResyncWorkflowRunStatus(db gorp.SqlExecutor, wr *sdk.WorkflowRun) (*Process
}

// ResyncNodeRunsWithCommits load commits build in this node run and save it into node run
func ResyncNodeRunsWithCommits(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, nodeRuns []sdk.WorkflowNodeRun) {
func ResyncNodeRunsWithCommits(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, report *ProcessorReport) {
nodeRuns := report.nodes
for _, nodeRun := range nodeRuns {
if len(nodeRun.Commits) > 0 {
continue
Expand Down
28 changes: 19 additions & 9 deletions engine/api/workflow/workflow_run_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@ import (
"github.com/ovh/cds/sdk/log"
)

// GetWorkflowRunEventData read channel to get elements to push
// TODO: refactor this useless function
func GetWorkflowRunEventData(report *ProcessorReport, projectKey string) ([]sdk.WorkflowRun, []sdk.WorkflowNodeRun) {
return report.workflows, report.nodes
}

// SendEvent Send event on workflow run
func SendEvent(db gorp.SqlExecutor, wrs []sdk.WorkflowRun, wnrs []sdk.WorkflowNodeRun, key string) {
for _, wr := range wrs {
func SendEvent(db gorp.SqlExecutor, key string, report *ProcessorReport) {
for _, wr := range report.workflows {
event.PublishWorkflowRun(wr, key)
}
for _, wnr := range wnrs {
for _, wnr := range report.nodes {
wr, errWR := LoadRunByID(db, wnr.WorkflowRunID, LoadRunOptions{
WithLightTests: true,
})
Expand Down Expand Up @@ -56,6 +50,22 @@ func SendEvent(db gorp.SqlExecutor, wrs []sdk.WorkflowRun, wnrs []sdk.WorkflowNo

event.PublishWorkflowNodeRun(db, wnr, wr.Workflow, &previousNodeRun)
}

for _, jobrun := range report.jobs {
noderun, err := LoadNodeRunByID(db, jobrun.WorkflowNodeRunID, LoadRunOptions{})
if err != nil {
log.Warning("SendEvent.workflow> Cannot load workflow node run %d: %s", jobrun.WorkflowNodeRunID, err)
continue
}
wr, errWR := LoadRunByID(db, noderun.WorkflowRunID, LoadRunOptions{
WithLightTests: true,
})
if errWR != nil {
log.Warning("SendEvent.workflow> Cannot load workflow run %d: %s", noderun.WorkflowRunID, errWR)
continue
}
event.PublishWorkflowNodeJobRun(db, key, wr.Workflow.Name, jobrun)
}
}

// ResyncCommitStatus resync commit status for a workflow run
Expand Down
14 changes: 5 additions & 9 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,8 @@ func (api *API) postTakeWorkflowJobHandler() service.Handler {
return sdk.WrapError(errT, "postTakeWorkflowJobHandler> Cannot takeJob nodeJobRunID:%d", id)
}

workflowRuns, workflowNodeRuns := workflow.GetWorkflowRunEventData(report, p.Key)
workflow.ResyncNodeRunsWithCommits(ctx, api.mustDB(), api.Cache, p, workflowNodeRuns)

go workflow.SendEvent(api.mustDB(), workflowRuns, workflowNodeRuns, p.Key)
workflow.ResyncNodeRunsWithCommits(ctx, api.mustDB(), api.Cache, p, report)
go workflow.SendEvent(api.mustDB(), p.Key, report)

return service.WriteJSON(w, pbji, http.StatusOK)
}
Expand Down Expand Up @@ -425,9 +423,7 @@ func (api *API) postWorkflowJobResultHandler() service.Handler {
return sdk.WrapError(err, "postWorkflowJobResultHandler> unable to post job result")
}

observability.Record(ctx, api.Stats.WorkflowRunStarted, 1)
workflowRuns, workflowNodeRuns := workflow.GetWorkflowRunEventData(report, proj.Key)

workflowRuns := report.WorkflowRuns()
if len(workflowRuns) > 0 {
observability.Current(ctx,
observability.Tag(observability.TagWorkflow, workflowRuns[0].Workflow.Name))
Expand All @@ -440,10 +436,10 @@ func (api *API) postWorkflowJobResultHandler() service.Handler {
db := api.mustDB()

_, next = observability.Span(ctx, "workflow.ResyncNodeRunsWithCommits")
workflow.ResyncNodeRunsWithCommits(ctx, db, api.Cache, proj, workflowNodeRuns)
workflow.ResyncNodeRunsWithCommits(ctx, db, api.Cache, proj, report)
next()

go workflow.SendEvent(db, workflowRuns, workflowNodeRuns, proj.Key)
go workflow.SendEvent(db, proj.Key, report)

return nil
}
Expand Down
16 changes: 7 additions & 9 deletions engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,9 @@ func (api *API) stopWorkflowRunHandler() service.Handler {
if err != nil {
return sdk.WrapError(err, "stopWorkflowRun> Unable to stop workflow")
}
workflowRuns := report.WorkflowRuns()

workflowRuns, workflowNodeRuns := workflow.GetWorkflowRunEventData(report, proj.Key)
go workflow.SendEvent(api.mustDB(), workflowRuns, workflowNodeRuns, proj.Key)
go workflow.SendEvent(api.mustDB(), proj.Key, report)

if len(workflowRuns) > 0 {
observability.Current(ctx,
Expand Down Expand Up @@ -536,8 +536,7 @@ func (api *API) stopWorkflowNodeRunHandler() service.Handler {
return sdk.WrapError(err, "stopWorkflowNodeRunHandler> Unable to stop workflow run")
}

workflowRuns, workflowNodeRuns := workflow.GetWorkflowRunEventData(report, p.Key)
go workflow.SendEvent(api.mustDB(), workflowRuns, workflowNodeRuns, p.Key)
go workflow.SendEvent(api.mustDB(), p.Key, report)

return service.WriteJSON(w, nodeRun, http.StatusOK)
}
Expand Down Expand Up @@ -718,9 +717,8 @@ func (api *API) postWorkflowRunHandler() service.Handler {
if errS != nil {
return sdk.WrapError(errS, "postWorkflowRunHandler> Unable to start workflow %s/%s", key, name)
}
workflowRuns, workflowNodeRuns := workflow.GetWorkflowRunEventData(report, p.Key)
workflow.ResyncNodeRunsWithCommits(ctx, api.mustDB(), api.Cache, p, workflowNodeRuns)
go workflow.SendEvent(api.mustDB(), workflowRuns, workflowNodeRuns, p.Key)
workflow.ResyncNodeRunsWithCommits(ctx, api.mustDB(), api.Cache, p, report)
go workflow.SendEvent(api.mustDB(), p.Key, report)

// Purge workflow run
sdk.GoRoutine(
Expand All @@ -732,8 +730,8 @@ func (api *API) postWorkflowRunHandler() service.Handler {
})

var wr *sdk.WorkflowRun
if len(workflowRuns) > 0 {
wr = &workflowRuns[0]
if len(report.WorkflowRuns()) > 0 {
wr = &report.WorkflowRuns()[0]
wr.Translate(r.Header.Get("Accept-Language"))
}
return service.WriteJSON(w, wr, http.StatusAccepted)
Expand Down
2 changes: 1 addition & 1 deletion engine/hatchery/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ func (h *HatcherySwarm) listAwolWorkers(dockerClient *dockerClient) ([]types.Con
found = true
// If worker is disabled, kill it
if n.Status == sdk.StatusDisabled {
log.Debug("lhatchery> swarm> listAwolWorkers> Worker %s is disabled. Kill it with fire!", c.Names[0])
log.Debug("hatchery> swarm> listAwolWorkers> Worker %s is disabled. Kill it with fire!", c.Names[0])
oldContainers = append(oldContainers, c)
break
}
Expand Down
Loading

0 comments on commit ec92876

Please sign in to comment.