/
tasks.go
107 lines (86 loc) · 2.41 KB
/
tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package tasks
const (
schedulePolicyNameFIFO = "fifo"
schedulePolicyNameUserTaskPolling = "user-task-polling"
)
// NewScheduler create a scheduler by policyName.
func NewScheduler(policyName string) Scheduler {
switch policyName {
case "":
fallthrough
case schedulePolicyNameFIFO:
return newScheduler(
newFIFOPolicy(),
)
case schedulePolicyNameUserTaskPolling:
return newScheduler(
newUserTaskPollingPolicy(),
)
default:
panic("invalid schedule task policy")
}
}
// tryIntoMergeTask convert inner task into MergeTask,
// Return nil if inner task is not a MergeTask.
func tryIntoMergeTask(t Task) MergeTask {
if mt, ok := t.(MergeTask); ok {
return mt
}
return nil
}
type Scheduler interface {
// Add a new task into scheduler, follow some constraints.
// 1. It's a non-block operation.
// 2. Error will be returned if scheduler reaches some limit.
// 3. Concurrent safe.
Add(task Task) error
// Start schedule the owned task asynchronously and continuously.
// Shall be called only once
Start()
// Stop make scheduler deny all incoming tasks
// and cleans up all related resources
Stop()
// GetWaitingTaskTotalNQ
GetWaitingTaskTotalNQ() int64
// GetWaitingTaskTotal
GetWaitingTaskTotal() int64
}
// schedulePolicy is the policy of scheduler.
type schedulePolicy interface {
// Push add a new task into scheduler.
// Return the count of new task added (task may be chunked, merged or dropped)
// 0 and an error will be returned if scheduler reaches some limit.
Push(task Task) (int, error)
// Pop get the task next ready to run.
Pop() Task
Len() int
}
// MergeTask is a Task which can be merged with other task
type MergeTask interface {
Task
// MergeWith other task, return true if merge success.
// After success, the task merged should be dropped.
MergeWith(Task) bool
}
// A task is execute unit of scheduler.
type Task interface {
// Return the username which task is belong to.
// Return "" if the task do not contain any user info.
Username() string
// Return whether the task would be running on GPU.
IsGpuIndex() bool
// PreExecute the task, only call once.
PreExecute() error
// Execute the task, only call once.
Execute() error
// Done notify the task finished.
Done(err error)
// Check if the Task is canceled.
// Concurrent safe.
Canceled() error
// Wait for task finish.
// Concurrent safe.
Wait() error
// Return the NQ of task.
NQ() int64
}