-
Notifications
You must be signed in to change notification settings - Fork 87
/
MesosScheduler.java
449 lines (387 loc) · 15.7 KB
/
MesosScheduler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
package org.apache.hadoop.mapred;
import com.codahale.metrics.Meter;
import org.apache.commons.httpclient.HttpHost;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.mesos.MesosSchedulerDriver;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.*;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
import org.apache.mesos.hadoop.Metrics;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class MesosScheduler extends TaskScheduler implements Scheduler {
public static final Log LOG = LogFactory.getLog(MesosScheduler.class);
// This is the memory overhead for a jvm process. This needs to be added
// to a jvm process's resource requirement, in addition to its heap size.
public static final double JVM_MEM_OVERHEAD_PERCENT_DEFAULT = 0.25; // 25%.
// NOTE: It appears that there's no real resource requirements for a
// map / reduce slot. We therefore define a default slot as:
// 1 cores.
// 1024 MB memory.
// 1 GB of disk space.
public static final double SLOT_CPUS_DEFAULT = 1; // 1 cores.
public static final int SLOT_DISK_DEFAULT = 1024; // 1 GB.
public static final int SLOT_JVM_HEAP_DEFAULT = 1024; // 1024MB.
public static final double TASKTRACKER_CPUS_DEFAULT = 1.0; // 1 core.
public static final int TASKTRACKER_MEM_DEFAULT = 1024; // 1 GB.
public static final int TASKTRACKER_DISK_DEFAULT = 1024; // 1 GB.
// The default behavior in Hadoop is to use 4 slots per TaskTracker:
public static final int MAP_SLOTS_DEFAULT = 2;
public static final int REDUCE_SLOTS_DEFAULT = 2;
// The amount of time to wait for task trackers to launch before
// giving up.
public static final long LAUNCH_TIMEOUT_MS = 300000; // 5 minutes
public static final long PERIODIC_MS = 300000; // 5 minutes
public static final long DEFAULT_IDLE_CHECK_INTERVAL = 5; // 5 seconds
// Destroy task trackers after being idle for N idle checks
public static final long DEFAULT_IDLE_REVOCATION_CHECKS = 5;
private SchedulerDriver driver;
protected TaskScheduler taskScheduler;
protected JobTracker jobTracker;
protected Configuration conf;
protected File stateFile;
// Count of the launched trackers for TaskID generation.
protected long launchedTrackers = 0;
// Use a fixed slot allocation policy?
protected boolean policyIsFixed = false;
protected ResourcePolicy policy;
protected boolean enableMetrics = false;
public Metrics metrics;
// Maintains a mapping from {tracker host:port -> MesosTracker}.
// Used for tracking the slots of each TaskTracker and the corresponding
// Mesos TaskID.
protected Map<HttpHost, MesosTracker> mesosTrackers =
new ConcurrentHashMap<HttpHost, MesosTracker>();
protected final ScheduledExecutorService timerScheduler =
Executors.newScheduledThreadPool(1);
protected JobInProgressListener jobListener = new JobInProgressListener() {
@Override
public void jobAdded(JobInProgress job) throws IOException {
LOG.info("Added job " + job.getJobID());
if (metrics != null) {
metrics.jobTimerContexts.put(job.getJobID(), metrics.jobTimer.time());
}
}
@Override
public void jobRemoved(JobInProgress job) {
LOG.info("Removed job " + job.getJobID());
}
@Override
public void jobUpdated(JobChangeEvent event) {
synchronized (MesosScheduler.this) {
JobInProgress job = event.getJobInProgress();
if (metrics != null) {
Meter meter = metrics.jobStateMeter.get(job.getStatus().getRunState());
if (meter != null) {
meter.mark();
}
}
// If we have flaky tasktrackers, kill them.
final List<String> flakyTrackers = job.getBlackListedTrackers();
// Remove the task from the map. This is O(n^2), but there's no better
// way to do it, AFAIK. The flakyTrackers list should usually be
// small, so this is probably not bad.
for (String hostname : flakyTrackers) {
for (MesosTracker mesosTracker : mesosTrackers.values()) {
if (mesosTracker.host.getHostName().startsWith(hostname)) {
LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host "
+ mesosTracker.host + " because it has been marked as flaky");
if (metrics != null) {
metrics.flakyTrackerKilledMeter.mark();
}
killTracker(mesosTracker);
}
}
}
// If the job is complete, kill all the corresponding idle TaskTrackers.
if (!job.isComplete()) {
return;
}
if (metrics != null) {
com.codahale.metrics.Timer.Context context = metrics.jobTimerContexts.get(job.getJobID());
context.stop();
metrics.jobTimerContexts.remove(job.getJobID());
}
LOG.info("Completed job : " + job.getJobID());
// Remove the task from the map.
final Set<HttpHost> trackers = new HashSet<HttpHost>(mesosTrackers.keySet());
for (HttpHost tracker : trackers) {
MesosTracker mesosTracker = mesosTrackers.get(tracker);
mesosTracker.jobs.remove(job.getJobID());
// If the TaskTracker doesn't have any running job tasks assigned,
// kill it.
if (mesosTracker.jobs.isEmpty() && mesosTracker.active) {
LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host "
+ mesosTracker.host + " because it is no longer needed");
killTracker(mesosTracker);
}
}
}
}
};
// TaskScheduler methods.
@Override
public synchronized void start() throws IOException {
conf = getConf();
String taskTrackerClass = conf.get("mapred.mesos.taskScheduler",
"org.apache.hadoop.mapred.JobQueueTaskScheduler");
try {
taskScheduler =
(TaskScheduler) Class.forName(taskTrackerClass).newInstance();
taskScheduler.setConf(conf);
taskScheduler.setTaskTrackerManager(taskTrackerManager);
} catch (ClassNotFoundException e) {
LOG.fatal("Failed to initialize the TaskScheduler", e);
System.exit(1);
} catch (InstantiationException e) {
LOG.fatal("Failed to initialize the TaskScheduler", e);
System.exit(1);
} catch (IllegalAccessException e) {
LOG.fatal("Failed to initialize the TaskScheduler", e);
System.exit(1);
}
// Add the job listener to get job related updates.
taskTrackerManager.addJobInProgressListener(jobListener);
LOG.info("Starting MesosScheduler");
jobTracker = (JobTracker) super.taskTrackerManager;
String master = conf.get("mapred.mesos.master", "local");
try {
FrameworkInfo frameworkInfo = FrameworkInfo
.newBuilder()
.setUser("") // Let Mesos fill in the user.
.setCheckpoint(conf.getBoolean("mapred.mesos.checkpoint", false))
.setRole(conf.get("mapred.mesos.role", "*"))
.setName("Hadoop: (RPC port: " + jobTracker.port + ","
+ " WebUI port: " + jobTracker.infoPort + ")").build();
driver = new MesosSchedulerDriver(this, frameworkInfo, master);
driver.start();
} catch (Exception e) {
// If the MesosScheduler can't be loaded, the JobTracker won't be useful
// at all, so crash it now so that the user notices.
LOG.fatal("Failed to start MesosScheduler", e);
System.exit(1);
}
String file = conf.get("mapred.mesos.state.file", "");
if (!file.equals("")) {
this.stateFile = new File(file);
}
policyIsFixed = conf.getBoolean("mapred.mesos.scheduler.policy.fixed",
policyIsFixed);
if (policyIsFixed) {
policy = new ResourcePolicyFixed(this);
} else {
policy = new ResourcePolicyVariable(this);
}
enableMetrics = conf.getBoolean("mapred.mesos.metrics.enabled",
enableMetrics);
if (enableMetrics) {
metrics = new Metrics(conf);
}
taskScheduler.start();
}
@Override
public synchronized void terminate() throws IOException {
try {
LOG.info("Stopping MesosScheduler");
driver.stop();
} catch (Exception e) {
LOG.error("Failed to stop Mesos scheduler", e);
}
taskScheduler.terminate();
}
@Override
public void checkJobSubmission(JobInProgress job) throws IOException {
taskScheduler.checkJobSubmission(job);
}
@Override
public List<Task> assignTasks(TaskTracker taskTracker)
throws IOException {
HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(),
taskTracker.getStatus().getHttpPort());
if (!mesosTrackers.containsKey(tracker)) {
LOG.info("Unknown/exited TaskTracker: " + tracker + ". ");
return null;
}
MesosTracker mesosTracker = mesosTrackers.get(tracker);
// Make sure we're not asked to assign tasks to any task trackers that have
// been stopped. This could happen while the task tracker has not been
// removed from the cluster e.g still in the heartbeat timeout period.
synchronized (this) {
if (mesosTracker.stopped) {
LOG.info("Asked to assign tasks to stopped tracker " + tracker + ".");
return null;
}
}
// Let the underlying task scheduler do the actual task scheduling.
List<Task> tasks = taskScheduler.assignTasks(taskTracker);
// The Hadoop Fair Scheduler is known to return null.
if (tasks == null) {
return null;
}
// Keep track of which TaskTracker contains which tasks.
for (Task task : tasks) {
mesosTracker.jobs.add(task.getJobID());
}
return tasks;
}
@Override
public synchronized Collection<JobInProgress> getJobs(String queueName) {
return taskScheduler.getJobs(queueName);
}
@Override
public synchronized void refresh() throws IOException {
taskScheduler.refresh();
}
// Mesos Scheduler methods.
// These are synchronized, where possible. Some of these methods need to access the
// JobTracker, which can lead to a deadlock:
// See: https://issues.apache.org/jira/browse/MESOS-429
// The workaround employed here is to unsynchronize those methods needing access to
// the JobTracker state and use explicit synchronization instead as appropriate.
// TODO(bmahler): Provide a cleaner solution to this issue. One solution is to
// split up the Scheduler and TaskScheduler implementations in order to break the
// locking cycle. This would require a synchronized class to store the shared
// state across our Scheduler and TaskScheduler implementations, and provide
// atomic operations as needed.
@Override
public synchronized void registered(SchedulerDriver schedulerDriver,
FrameworkID frameworkID, MasterInfo masterInfo) {
LOG.info("Registered as " + frameworkID.getValue()
+ " with master " + masterInfo);
}
@Override
public synchronized void reregistered(SchedulerDriver schedulerDriver,
MasterInfo masterInfo) {
LOG.info("Re-registered with master " + masterInfo);
}
public void killTracker(MesosTracker tracker) {
if (metrics != null) {
metrics.killMeter.mark();
}
synchronized (this) {
driver.killTask(tracker.taskId);
}
tracker.stop();
if (mesosTrackers.get(tracker.host) == tracker) {
mesosTrackers.remove(tracker.host);
}
}
public synchronized void scheduleTimer(Runnable command,
long delay,
TimeUnit unit) {
timerScheduler.schedule(command, delay, unit);
}
// For some reason, pendingMaps() and pendingReduces() doesn't return the
// values we expect. We observed negative values, which may be related to
// https://issues.apache.org/jira/browse/MAPREDUCE-1238. Below is the
// algorithm that is used to calculate the pending tasks within the Hadoop
// JobTracker sources (see 'printTaskSummary' in
// src/org/apache/hadoop/mapred/jobdetails_jsp.java).
public int getPendingTasks(TaskInProgress[] tasks) {
int totalTasks = tasks.length;
int runningTasks = 0;
int finishedTasks = 0;
int killedTasks = 0;
for (int i = 0; i < totalTasks; ++i) {
TaskInProgress task = tasks[i];
if (task == null) {
continue;
}
if (task.isComplete()) {
finishedTasks += 1;
} else if (task.isRunning()) {
runningTasks += 1;
} else if (task.wasKilled()) {
killedTasks += 1;
}
}
int pendingTasks = totalTasks - runningTasks - killedTasks - finishedTasks;
return pendingTasks;
}
// This method uses explicit synchronization in order to avoid deadlocks when
// accessing the JobTracker.
@Override
public void resourceOffers(SchedulerDriver schedulerDriver,
List<Offer> offers) {
policy.resourceOffers(schedulerDriver, offers);
}
@Override
public synchronized void offerRescinded(SchedulerDriver schedulerDriver,
OfferID offerID) {
LOG.warn("Rescinded offer: " + offerID.getValue());
}
@Override
public synchronized void statusUpdate(SchedulerDriver schedulerDriver,
Protos.TaskStatus taskStatus) {
LOG.info("Status update of " + taskStatus.getTaskId().getValue()
+ " to " + taskStatus.getState().name()
+ " with message " + taskStatus.getMessage());
// Remove the TaskTracker if the corresponding Mesos task has reached a
// terminal state.
switch (taskStatus.getState()) {
case TASK_FINISHED:
case TASK_FAILED:
case TASK_KILLED:
case TASK_LOST:
// Make a copy to iterate over keys and delete values.
Set<HttpHost> trackers = new HashSet<HttpHost>(mesosTrackers.keySet());
// Remove the task from the map.
for (HttpHost tracker : trackers) {
if (mesosTrackers.get(tracker).taskId.equals(taskStatus.getTaskId())) {
LOG.info("Removing terminated TaskTracker: " + tracker);
mesosTrackers.get(tracker).stop();
mesosTrackers.remove(tracker);
}
}
break;
case TASK_STAGING:
case TASK_STARTING:
case TASK_RUNNING:
break;
default:
LOG.error("Unexpected TaskStatus: " + taskStatus.getState().name());
break;
}
if (metrics != null) {
Meter meter = metrics.taskStateMeter.get(taskStatus.getState());
if (meter != null) {
meter.mark();
}
}
}
@Override
public synchronized void frameworkMessage(SchedulerDriver schedulerDriver,
ExecutorID executorID, SlaveID slaveID, byte[] bytes) {
LOG.info("Framework Message of " + bytes.length + " bytes"
+ " from executor " + executorID.getValue()
+ " on slave " + slaveID.getValue());
}
@Override
public synchronized void disconnected(SchedulerDriver schedulerDriver) {
LOG.warn("Disconnected from Mesos master.");
}
@Override
public synchronized void slaveLost(SchedulerDriver schedulerDriver,
SlaveID slaveID) {
LOG.warn("Slave lost: " + slaveID.getValue());
}
@Override
public synchronized void executorLost(SchedulerDriver schedulerDriver,
ExecutorID executorID, SlaveID slaveID, int status) {
LOG.warn("Executor " + executorID.getValue()
+ " lost with status " + status + " on slave " + slaveID);
}
@Override
public synchronized void error(SchedulerDriver schedulerDriver, String s) {
LOG.error("Error from scheduler driver: " + s);
}
}