-
Notifications
You must be signed in to change notification settings - Fork 6
/
DirectScheduler.java
172 lines (146 loc) · 6.84 KB
/
DirectScheduler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import backtype.storm.scheduler.*;
import clojure.lang.PersistentArrayMap;
import java.util.*;
/**
* 直接分配调度器,可以分配组件到指定节点中
* Created by zhexuan on 15/7/6.
*/
public class DirectScheduler implements IScheduler{
@Override
public void prepare(Map conf) {
}
@Override
public void schedule(Topologies topologies, Cluster cluster) {
System.out.println("DirectScheduler: begin scheduling");
// Gets the topology which we want to schedule
Collection<TopologyDetails> topologyDetailes;
TopologyDetails topology;
//作业是否要指定分配的标识
String assignedFlag;
Map map;
Iterator<String> iterator = null;
topologyDetailes = topologies.getTopologies();
for(TopologyDetails td: topologyDetailes){
map = td.getConf();
assignedFlag = (String)map.get("assigned_flag");
//如何找到的拓扑逻辑的分配标为1则代表是要分配的,否则走系统的调度
if(assignedFlag != null && assignedFlag.equals("1")){
System.out.println("finding topology named " + td.getName());
topologyAssign(cluster, td, map);
}else {
System.out.println("topology assigned is null");
}
}
//其余的任务由系统自带的调度器执行
new EvenScheduler().schedule(topologies, cluster);
}
/**
* 拓扑逻辑的调度
* @param cluster
* 集群
* @param topology
* 具体要调度的拓扑逻辑
* @param map
* map配置项
*/
private void topologyAssign(Cluster cluster, TopologyDetails topology, Map map){
Set<String> keys;
PersistentArrayMap designMap;
Iterator<String> iterator;
iterator = null;
// make sure the special topology is submitted,
if (topology != null) {
designMap = (PersistentArrayMap)map.get("design_map");
if(designMap != null){
System.out.println("design map size is " + designMap.size());
keys = designMap.keySet();
iterator = keys.iterator();
System.out.println("keys size is " + keys.size());
}
if(designMap == null || designMap.size() == 0){
System.out.println("design map is null");
}
boolean needsScheduling = cluster.needsScheduling(topology);
if (!needsScheduling) {
System.out.println("Our special topology does not need scheduling.");
} else {
System.out.println("Our special topology needs scheduling.");
// find out all the needs-scheduling components of this topology
Map<String, List<ExecutorDetails>> componentToExecutors = cluster.getNeedsSchedulingComponentToExecutors(topology);
System.out.println("needs scheduling(component->executor): " + componentToExecutors);
System.out.println("needs scheduling(executor->components): " + cluster.getNeedsSchedulingExecutorToComponents(topology));
SchedulerAssignment currentAssignment = cluster.getAssignmentById(topology.getId());
if (currentAssignment != null) {
System.out.println("current assignments: " + currentAssignment.getExecutorToSlot());
} else {
System.out.println("current assignments: {}");
}
String componentName;
String nodeName;
if(designMap != null && iterator != null){
while (iterator.hasNext()){
componentName = iterator.next();
nodeName = (String)designMap.get(componentName);
System.out.println("现在进行调度 组件名称->节点名称:" + componentName + "->" + nodeName);
componentAssign(cluster, topology, componentToExecutors, componentName, nodeName);
}
}
}
}
}
/**
* 组件调度
* @param cluster
* 集群的信息
* @param topology
* 待调度的拓扑细节信息
* @param totalExecutors
* 组件的执行器
* @param componentName
* 组件的名称
* @param supervisorName
* 节点的名称
*/
private void componentAssign(Cluster cluster, TopologyDetails topology, Map<String, List<ExecutorDetails>> totalExecutors, String componentName, String supervisorName){
if (!totalExecutors.containsKey(componentName)) {
System.out.println("Our special-spout does not need scheduling.");
} else {
System.out.println("Our special-spout needs scheduling.");
List<ExecutorDetails> executors = totalExecutors.get(componentName);
// find out the our "special-supervisor" from the supervisor metadata
Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
SupervisorDetails specialSupervisor = null;
for (SupervisorDetails supervisor : supervisors) {
Map meta = (Map) supervisor.getSchedulerMeta();
if(meta != null && meta.get("name") != null){
System.out.println("supervisor name:" + meta.get("name"));
if (meta.get("name").equals(supervisorName)) {
System.out.println("Supervisor finding");
specialSupervisor = supervisor;
break;
}
}else {
System.out.println("Supervisor meta null");
}
}
// found the special supervisor
if (specialSupervisor != null) {
System.out.println("Found the special-supervisor");
List<WorkerSlot> availableSlots = cluster.getAvailableSlots(specialSupervisor);
// 如果目标节点上已经没有空闲的slot,则进行强制释放
if (availableSlots.isEmpty() && !executors.isEmpty()) {
for (Integer port : cluster.getUsedPorts(specialSupervisor)) {
cluster.freeSlot(new WorkerSlot(specialSupervisor.getId(), port));
}
}
// 重新获取可用的slot
availableSlots = cluster.getAvailableSlots(specialSupervisor);
// 选取节点上第一个slot,进行分配
cluster.assign(availableSlots.get(0), topology.getId(), executors);
System.out.println("We assigned executors:" + executors + " to slot: [" + availableSlots.get(0).getNodeId() + ", " + availableSlots.get(0).getPort() + "]");
} else {
System.out.println("There is no supervisor find!!!");
}
}
}
}