-
Notifications
You must be signed in to change notification settings - Fork 32
/
scheduler.go
151 lines (134 loc) · 4.06 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Package scheduler contains Funnel's builtin compute scheduler and node.
package scheduler
import (
"fmt"
"time"
"github.com/ohsu-comp-bio/funnel/config"
"github.com/ohsu-comp-bio/funnel/events"
"github.com/ohsu-comp-bio/funnel/logger"
"github.com/ohsu-comp-bio/funnel/tes"
"golang.org/x/net/context"
)
// TaskQueue describes the interface the scheduler uses to find tasks that need scheduling.
type TaskQueue interface {
ReadQueue(count int) []*tes.Task
}
// Scheduler handles scheduling tasks to nodes and support many backends.
type Scheduler struct {
Conf config.Scheduler
Log *logger.Logger
Nodes SchedulerServiceServer
Queue TaskQueue
Event events.Writer
}
// Run starts the scheduling loop. This blocks.
//
// The scheduler will take a chunk of tasks from the queue,
// request the the configured backend schedule them, and
// act on offers made by the backend.
func (s *Scheduler) Run(ctx context.Context) error {
ticker := time.NewTicker(time.Duration(s.Conf.ScheduleRate))
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
err := s.Schedule(ctx)
if err != nil {
return fmt.Errorf("schedule error: %s", err)
}
}
}
}
// CheckNodes is used by the scheduler to check for dead/gone nodes.
// This is not an RPC endpoint
func (s *Scheduler) CheckNodes() error {
ctx := context.Background()
resp, err := s.Nodes.ListNodes(ctx, &ListNodesRequest{})
if err != nil {
return err
}
updated := UpdateNodeState(resp.Nodes, s.Conf)
for _, node := range updated {
var err error
if node.State == NodeState_GONE {
for _, tid := range node.TaskIds {
s.Event.WriteEvent(ctx, events.NewState(tid, tes.State_SYSTEM_ERROR))
s.Event.WriteEvent(ctx, events.NewSystemLog(tid, 0, 0, "info",
"Cleaning up Task assigned to dead/gone node", map[string]string{
"nodeID": node.Id,
}))
}
_, err = s.Nodes.DeleteNode(ctx, node)
} else {
_, err = s.Nodes.PutNode(ctx, node)
}
if err != nil {
return err
}
}
return nil
}
// Schedule does a scheduling iteration. It checks the health of nodes
// in the database, gets a chunk of tasks from the queue (configurable by config.ScheduleChunk),
// and calls the given scheduler backend. If the backend returns a valid offer, the
// task is assigned to the offered node.
func (s *Scheduler) Schedule(ctx context.Context) error {
err := s.CheckNodes()
if err != nil {
s.Log.Error("Error checking nodes", err)
}
for _, task := range s.Queue.ReadQueue(s.Conf.ScheduleChunk) {
offer := s.GetOffer(task)
if offer != nil {
s.Log.Info("Assigning task to node",
"taskID", task.Id,
"nodeID", offer.Node.Id,
"node", offer.Node,
)
s.Event.WriteEvent(ctx, events.NewSystemLog(task.Id, 0, 0, "info",
"Assigning task to node", map[string]string{
"nodeID": offer.Node.Id,
}))
// TODO this is important! write a test for this line.
// when a task is assigned, its state is immediately Initializing
// even before the node has received it.
offer.Node.TaskIds = append(offer.Node.TaskIds, task.Id)
_, err = s.Nodes.PutNode(ctx, offer.Node)
if err != nil {
s.Log.Error("Error in AssignTask",
"error", err,
"taskID", task.Id,
"nodeID", offer.Node.Id,
)
s.Event.WriteEvent(ctx, events.NewSystemLog(task.Id, 0, 0, "error",
"Error in AssignTask", map[string]string{
"error": err.Error(),
"nodeID": offer.Node.Id,
}))
continue
}
err = s.Event.WriteEvent(ctx, events.NewState(task.Id, tes.State_INITIALIZING))
if err != nil {
s.Log.Error("Error marking task as initializing",
"error", err,
"taskID", task.Id,
"nodeID", offer.Node.Id,
)
}
} else {
s.Log.Debug("Scheduling failed for task", "taskID", task.Id)
}
}
return nil
}
// GetOffer returns an offer based on available funnel nodes.
func (s *Scheduler) GetOffer(j *tes.Task) *Offer {
// Get the nodes from the funnel server
nodes := []*Node{}
resp, err := s.Nodes.ListNodes(context.Background(), &ListNodesRequest{})
if err == nil {
nodes = resp.Nodes
}
return DefaultScheduleAlgorithm(j, nodes, nil)
}