-
Notifications
You must be signed in to change notification settings - Fork 38
/
cancel.go
101 lines (94 loc) · 2.69 KB
/
cancel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package handlers
import (
"context"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/runabol/tork"
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/middleware/job"
"github.com/runabol/tork/mq"
)
type cancelHandler struct {
ds datastore.Datastore
broker mq.Broker
}
func NewCancelHandler(ds datastore.Datastore, b mq.Broker) job.HandlerFunc {
h := &cancelHandler{
ds: ds,
broker: b,
}
return h.handle
}
func (h *cancelHandler) handle(ctx context.Context, _ job.EventType, j *tork.Job) error {
// mark the job as cancelled
if err := h.ds.UpdateJob(ctx, j.ID, func(u *tork.Job) error {
if u.State != tork.JobStateRunning && u.State != tork.JobStateScheduled {
// job is not running -- nothing to cancel
return nil
}
u.State = tork.JobStateCancelled
return nil
}); err != nil {
return err
}
// if there's a parent task notify the parent job to cancel as well
if j.ParentID != "" {
pt, err := h.ds.GetTaskByID(ctx, j.ParentID)
if err != nil {
return errors.Wrapf(err, "error fetching parent task: %s", pt.ID)
}
pj, err := h.ds.GetJobByID(ctx, pt.JobID)
if err != nil {
return errors.Wrapf(err, "error fetching parent job: %s", pj.ID)
}
pj.State = tork.JobStateCancelled
if err := h.broker.PublishJob(ctx, pj); err != nil {
log.Error().Err(err).Msgf("error cancelling sub-job: %s", pj.ID)
}
}
// cancel all running tasks
if err := cancelActiveTasks(ctx, h.ds, h.broker, j.ID); err != nil {
return err
}
return nil
}
func cancelActiveTasks(ctx context.Context, ds datastore.Datastore, b mq.Broker, jobID string) error {
// get a list of active tasks for the job
tasks, err := ds.GetActiveTasks(ctx, jobID)
if err != nil {
return errors.Wrapf(err, "error getting active tasks for job: %s", jobID)
}
for _, t := range tasks {
t.State = tork.TaskStateCancelled
// mark tasks as cancelled
if err := ds.UpdateTask(ctx, t.ID, func(u *tork.Task) error {
u.State = tork.TaskStateCancelled
return nil
}); err != nil {
return errors.Wrapf(err, "error cancelling task: %s", t.ID)
}
// if this task is a sub-job, notify the sub-job to cancel
if t.SubJob != nil {
// cancel the sub-job
sj, err := ds.GetJobByID(ctx, t.SubJob.ID)
if err != nil {
return err
}
sj.State = tork.JobStateCancelled
if err := b.PublishJob(ctx, sj); err != nil {
return errors.Wrapf(err, "error publishing cancelllation for sub-job %s", sj.ID)
}
} else if t.NodeID != "" {
// notify the node currently running the task
// to cancel it
node, err := ds.GetNodeByID(ctx, t.NodeID)
if err != nil {
return err
}
if err := b.PublishTask(ctx, node.Queue, t); err != nil {
return err
}
}
}
return nil
}