@@ -31,6 +31,7 @@ type Scheduler interface {
3131 GetChecksum (ctx context.Context , uuid string ) (string , error )
3232 RequestJob (ctx context.Context , workerName string ) (* model.TaskEncode , error )
3333 HandleWorkerEvent (ctx context.Context , taskEvent * model.TaskEvent ) error
34+ CancelJob (ctx context.Context , id string ) error
3435}
3536
3637type SchedulerConfig struct {
@@ -97,6 +98,30 @@ func (R *RuntimeScheduler) HandleWorkerEvent(ctx context.Context, jobEvent *mode
9798 return nil
9899}
99100
101+ func (R * RuntimeScheduler ) CancelJob (ctx context.Context , id string ) error {
102+ job , err := R .repo .GetJob (ctx , id )
103+ if err != nil {
104+ return err
105+ }
106+
107+ status := job .Events .GetStatus ()
108+ switch {
109+ case status == model .CompletedNotificationStatus :
110+ return fmt .Errorf ("job already completed" )
111+ case status == model .FailedNotificationStatus :
112+ return fmt .Errorf ("job is failed" )
113+ case status == model .CanceledNotificationStatus :
114+ return fmt .Errorf ("job already canceled" )
115+ case status == model .AssignedNotificationStatus , status == model .StartedNotificationStatus :
116+ newEvent := job .AddEventComplete (model .NotificationEvent , model .JobNotification , model .CanceledNotificationStatus , "Job canceled by user" )
117+ err = R .repo .AddNewTaskEvent (ctx , newEvent )
118+ if err != nil {
119+ return err
120+ }
121+ }
122+ return fmt .Errorf ("job %s is in unknown state" , id )
123+ }
124+
100125func (R * RuntimeScheduler ) processEvent (ctx context.Context , taskEvent * model.TaskEvent ) error {
101126 var err error
102127 switch taskEvent .EventType {
0 commit comments