Skip to content

Commit

Permalink
Merge branch 'trunk' into github-mesos-master
Browse files Browse the repository at this point in the history
  • Loading branch information
Andy Konwinski committed Nov 23, 2011
2 parents c1f9087 + 2b8c16c commit 059aabb
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 72 deletions.
54 changes: 20 additions & 34 deletions Makefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ DEPLOYDIR = @top_builddir@/deploy
OS_NAME = @OS_NAME@
PYTHON = @PYTHON@

MESOS_HOME = @prefix@/mesos
MESOS_INSTALL_HOME = $(DESTDIR)@prefix@/mesos

WITH_INCLUDED_ZOOKEEPER = @WITH_INCLUDED_ZOOKEEPER@

Expand Down Expand Up @@ -66,42 +66,28 @@ endif

# TODO(benh): Install generated C++ header files.
install:
if test ! -d $(MESOS_HOME)/bin; \
then mkdir -p $(MESOS_HOME)/bin; \
fi
if test ! -d $(MESOS_HOME)/lib; \
then mkdir -p $(MESOS_HOME)/lib; \
fi
if test ! -d $(MESOS_HOME)/lib/java; \
then mkdir -p $(MESOS_HOME)/lib/java; \
fi
if test ! -d $(MESOS_HOME)/conf; \
then mkdir -p $(MESOS_HOME)/conf; \
fi
if test ! -d $(MESOS_HOME)/deploy; \
then mkdir -p $(MESOS_HOME)/deploy; \
fi
install -m 755 $(BINDIR)/mesos-master $(MESOS_HOME)/bin
install -m 755 $(BINDIR)/mesos-slave $(MESOS_HOME)/bin
install -m 755 $(BINDIR)/mesos-local $(MESOS_HOME)/bin
install -m 755 $(BINDIR)/mesos-launcher $(MESOS_HOME)/bin
install -m 755 $(BINDIR)/mesos-getconf $(MESOS_HOME)/bin
install -m 755 $(BINDIR)/killtree.sh $(MESOS_HOME)/bin
rsync -avz $(BINDIR)/webui $(MESOS_HOME)/bin
rsync -avz $(DEPLOYDIR) $(MESOS_HOME)
install -m 755 $(LIBDIR)/libmesos_exec.a $(MESOS_HOME)/lib
install -m 755 $(LIBDIR)/libmesos_sched.a $(MESOS_HOME)/lib
install -d $(MESOS_INSTALL_HOME)/bin $(MESOS_INSTALL_HOME)/lib/java
install -d $(MESOS_INSTALL_HOME)/conf $(MESOS_INSTALL_HOME)/deploy
install -m 755 $(BINDIR)/mesos-master $(MESOS_INSTALL_HOME)/bin
install -m 755 $(BINDIR)/mesos-slave $(MESOS_INSTALL_HOME)/bin
install -m 755 $(BINDIR)/mesos-local $(MESOS_INSTALL_HOME)/bin
install -m 755 $(BINDIR)/mesos-launcher $(MESOS_INSTALL_HOME)/bin
install -m 755 $(BINDIR)/mesos-getconf $(MESOS_INSTALL_HOME)/bin
install -m 755 $(BINDIR)/killtree.sh $(MESOS_INSTALL_HOME)/bin
cd $(BINDIR)/webui && find . | cpio -pdmu $(MESOS_INSTALL_HOME)/bin/webui
install -m 755 $(LIBDIR)/libmesos_exec.a $(MESOS_INSTALL_HOME)/lib
install -m 755 $(LIBDIR)/libmesos_sched.a $(MESOS_INSTALL_HOME)/lib
ifeq ($(OS_NAME),darwin)
install -m 755 $(LIBDIR)/libmesos.dylib $(MESOS_HOME)/lib
install -m 755 $(LIBDIR)/java/libmesos.dylib $(MESOS_HOME)/lib/java
install -m 755 $(LIBDIR)/libmesos.dylib $(MESOS_INSTALL_HOME)/lib
install -m 755 $(LIBDIR)/java/libmesos.dylib $(MESOS_INSTALL_HOME)/lib/java
else
install -m 755 $(LIBDIR)/libmesos.so $(MESOS_HOME)/lib
install -m 755 $(LIBDIR)/java/libmesos.so $(MESOS_HOME)/lib/java
install -m 755 $(LIBDIR)/libmesos.so $(MESOS_INSTALL_HOME)/lib
install -m 755 $(LIBDIR)/java/libmesos.so $(MESOS_INSTALL_HOME)/lib/java
endif
install -m 755 $(LIBDIR)/java/mesos.jar $(MESOS_HOME)/lib/java
mkdir -p $(MESOS_HOME)/lib/python
PYTHONPATH=$(SETUPTOOLS):$(MESOS_HOME)/lib/python $(PYTHON) -m easy_install --install-dir $(MESOS_HOME)/lib/python $(BUILDSRC)/python/dist/*.egg
PYTHONPATH=$(SETUPTOOLS):$(MESOS_HOME)/lib/python $(PYTHON) -m easy_install --install-dir $(MESOS_HOME)/lib/python @top_builddir@/$(PROTOBUF)/python/dist/*.egg
install -m 755 $(LIBDIR)/java/mesos.jar $(MESOS_INSTALL_HOME)/lib/java
install -d $(MESOS_INSTALL_HOME)/lib/python
PYTHONPATH=$(SETUPTOOLS):$(MESOS_INSTALL_HOME)/lib/python $(PYTHON) -m easy_install --install-dir $(MESOS_INSTALL_HOME)/lib/python -a $(BUILDSRC)/python/dist/*.egg
PYTHONPATH=$(SETUPTOOLS):$(MESOS_INSTALL_HOME)/lib/python $(PYTHON) -m easy_install --install-dir $(MESOS_INSTALL_HOME)/lib/python -a @top_builddir@/$(PROTOBUF)/python/dist/*.egg

uninstall:
$(error unimplemented)
Expand Down
9 changes: 9 additions & 0 deletions src/configurator/configuration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <boost/lexical_cast.hpp>

#include "common/foreach.hpp"
#include "common/option.hpp"
#include "common/strings.hpp"


Expand Down Expand Up @@ -96,6 +97,14 @@ class Configuration
return params[key];
}

Option<std::string> get(const std::string& key) const
{
if (!contains(key)) {
return Option<std::string>::none();
}
return get(key, "");
}

const std::string& get(const std::string& key,
const std::string& defaultValue) const
{
Expand Down
6 changes: 3 additions & 3 deletions src/examples/python/test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def statusUpdate(self, driver, update):
self.tasksFinished += 1
if self.tasksFinished == TOTAL_TASKS:
print "All tasks done, exiting"
driver.stop()
driver.stop(False)

if __name__ == "__main__":
print "Connecting to %s" % sys.argv[1]
Expand All @@ -82,5 +82,5 @@ def statusUpdate(self, driver, update):
execInfo.executor_id.value = "default"
execInfo.uri = execPath

mesos.MesosSchedulerDriver(MyScheduler(), "Python test framework",
execInfo, sys.argv[1]).run()
sys.exit(mesos.MesosSchedulerDriver(MyScheduler(), "Python test framework",
execInfo, sys.argv[1]).run())
8 changes: 8 additions & 0 deletions src/master/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ const int MAX_SLAVE_TIMEOUTS = 5;
// Time to wait for a framework to failover (TODO(benh): Make configurable)).
const double FRAMEWORK_FAILOVER_TIMEOUT = 60 * 60 * 24;

// Maximum number of completed frameworks to store in the cache.
// TODO(thomasm): make configurable
const int MAX_COMPLETED_FRAMEWORKS = 100;

// Maximum number of completed tasks per framework to store in the cache.
// TODO(thomasm): make configurable
const int MAX_COMPLETED_TASKS_PER_FRAMEWORK = 500;

} // namespace mesos {
} // namespace internal {
} // namespace master {
Expand Down
27 changes: 25 additions & 2 deletions src/master/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ JSON::Object model(const Framework& framework)
object.values["name"] = framework.info.name();
object.values["user"] = framework.info.user();
object.values["executor_uri"] = framework.info.executor().uri();
object.values["connect_time"] = framework.registeredTime;
object.values["registered_time"] = framework.registeredTime;
object.values["unregistered_time"] = framework.unregisteredTime;
object.values["reregistered_time"] = framework.reregisteredTime;
object.values["active"] = framework.active;
object.values["resources"] = model(framework.resources);

// Model all of the tasks associated with a framework.
Expand All @@ -90,6 +93,15 @@ JSON::Object model(const Framework& framework)
object.values["tasks"] = array;
}

{
JSON::Array array;
foreach (const Task& task, framework.completedTasks) {
array.values.push_back(model(task));
}

object.values["completed_tasks"] = array;
}

// Model all of the offers associated with a framework.
{
JSON::Array array;
Expand All @@ -111,7 +123,7 @@ JSON::Object model(const Slave& slave)
object.values["id"] = slave.id.value();
object.values["hostname"] = slave.info.hostname();
object.values["web_ui_url"] = slave.info.public_hostname();
object.values["connect_time"] = slave.registeredTime;
object.values["registered_time"] = slave.registeredTime;
object.values["resources"] = model(slave.info.resources());
return object;
}
Expand Down Expand Up @@ -241,6 +253,17 @@ Promise<HttpResponse> state(
object.values["frameworks"] = array;
}

// Model all of the completed frameworks.
{
JSON::Array array;

foreach (const Framework& framework, master.completedFrameworks) {
array.values.push_back(model(framework));
}

object.values["completed_frameworks"] = array;
}

std::ostringstream out;

JSON::render(out, object);
Expand Down
8 changes: 8 additions & 0 deletions src/master/master.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,14 @@ void Master::removeFramework(Framework* framework)

// TODO(benh): unlink(framework->pid);

framework->unregisteredTime = elapsedTime();

completedFrameworks.push_back(*framework);

if (completedFrameworks.size() > MAX_COMPLETED_FRAMEWORKS) {
completedFrameworks.pop_front();
}

// Delete it.
frameworks.erase(framework->id);
allocator->frameworkRemoved(framework);
Expand Down
13 changes: 13 additions & 0 deletions src/master/master.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ class Master : public ProtobufProcess<Master>
hashmap<SlaveID, Slave*> slaves;
hashmap<OfferID, Offer*> offers;

std::list<Framework> completedFrameworks;

int64_t nextFrameworkId; // Used to give each framework a unique ID.
int64_t nextOfferId; // Used to give each slot offer a unique ID.
int64_t nextSlaveId; // Used to give each slave a unique ID.
Expand Down Expand Up @@ -391,6 +393,13 @@ struct Framework
void removeTask(Task* task)
{
CHECK(tasks.contains(task->task_id()));

completedTasks.push_back(*task);

if (completedTasks.size() > MAX_COMPLETED_TASKS_PER_FRAMEWORK) {
completedTasks.pop_front();
}

tasks.erase(task->task_id());
resources -= task->resources();
}
Expand Down Expand Up @@ -463,8 +472,12 @@ struct Framework
bool active; // Turns false when framework is being removed.
double registeredTime;
double reregisteredTime;
double unregisteredTime;

hashmap<TaskID, Task*> tasks;

std::list<Task> completedTasks;

hashset<Offer*> offers; // Active offers for framework.

Resources resources; // Total resources (tasks + offers + executors).
Expand Down
44 changes: 28 additions & 16 deletions src/slave/slave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <process/timer.hpp>

#include "common/build.hpp"
#include "common/option.hpp"
#include "common/type_utils.hpp"
#include "common/utils.hpp"

Expand Down Expand Up @@ -441,17 +442,19 @@ void Slave::runTask(const FrameworkInfo& frameworkInfo,

stats.tasks[TASK_STARTING]++;

// Update the resources.
// TODO(Charles Reiss): The isolation module is not guaranteed to update
// the resources before the executor acts on its RunTaskMessage.
dispatch(isolationModule,
&IsolationModule::resourcesChanged,
framework->id, executor->id, executor->resources);

RunTaskMessage message;
message.mutable_framework()->MergeFrom(framework->info);
message.mutable_framework_id()->MergeFrom(framework->id);
message.set_pid(framework->pid);
message.mutable_task()->MergeFrom(task);
send(executor->pid, message);

// Now update the resources.
dispatch(isolationModule,
&IsolationModule::resourcesChanged,
framework->id, executor->id, executor->resources);
}
} else {
// Launch an executor for this task.
Expand Down Expand Up @@ -747,7 +750,17 @@ void Slave::registerExecutor(const FrameworkID& frameworkId,
// Save the pid for the executor.
executor->pid = from();

// Now that the executor is up, set its resource limits.
// First account for the tasks we're about to start.
foreachvalue (const TaskDescription& task, executor->queuedTasks) {
// Add the task to the executor.
executor->addTask(task);
}

// Now that the executor is up, set its resource limits including the
// currently queued tasks.
// TODO(Charles Reiss): We don't actually have a guarantee that this will
// be delivered or (where necessary) acted on before the executor gets its
// RunTaskMessages.
dispatch(isolationModule,
&IsolationModule::resourcesChanged,
framework->id, executor->id, executor->resources);
Expand All @@ -765,11 +778,7 @@ void Slave::registerExecutor(const FrameworkID& frameworkId,
LOG(INFO) << "Flushing queued tasks for framework " << framework->id;

foreachvalue (const TaskDescription& task, executor->queuedTasks) {
// Add the task to the executor.
executor->addTask(task);

stats.tasks[TASK_STARTING]++;

RunTaskMessage message;
message.mutable_framework_id()->MergeFrom(framework->id);
message.mutable_framework()->MergeFrom(framework->info);
Expand Down Expand Up @@ -1378,14 +1387,17 @@ string Slave::createUniqueWorkDirectory(const FrameworkID& frameworkId,
LOG(INFO) << "Generating a unique work directory for executor '"
<< executorId << "' of framework " << frameworkId;

string workDir = ".";
if (conf.contains("work_dir")) {
workDir = conf.get("work_dir", workDir);
} else if (conf.contains("home")) {
workDir = conf.get("home", workDir);
string workDir = "work"; // No relevant conf options set.
Option<string> option = conf.get("work_dir");
if (!option.isSome()) {
option = conf.get("home");
if (option.isSome()) {
workDir = option.get() + "/work";
}
} else {
workDir = option.get();
}

workDir = workDir + "/work";

std::ostringstream out(std::ios_base::app | std::ios_base::out);
out << workDir << "/slaves/" << id
Expand Down
2 changes: 1 addition & 1 deletion src/tests/Makefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ JAVA_HEADERS = @JAVA_HEADERS@

ifeq ($(OS_NAME),darwin)
LDFLAGS += -framework JavaVM
else ($(OS_ARCH),x86_64)
else ifeq ($(OS_ARCH),x86_64)
LDFLAGS += -L$(JAVA_HOME)/jre/lib/amd64/server -ljvm
else
LDFLAGS += -L$(JAVA_HOME)/jre/lib/i386/server -ljvm
Expand Down
9 changes: 7 additions & 2 deletions src/tests/master_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ TEST(MasterTest, TaskRunning)
vector<Offer> offers;
TaskStatus status;

trigger resourceOffersCall, statusUpdateCall;
trigger resourceOffersCall, statusUpdateCall, resourcesChangedCall;

EXPECT_CALL(sched, registered(&driver, _))
.Times(1);
Expand Down Expand Up @@ -129,12 +129,17 @@ TEST(MasterTest, TaskRunning)
vector<TaskDescription> tasks;
tasks.push_back(task);

EXPECT_CALL(isolationModule, resourcesChanged(_, _,
Resources(offers[0].resources())))
.WillOnce(Trigger(&resourcesChangedCall));

driver.launchTasks(offers[0].id(), tasks);

WAIT_UNTIL(statusUpdateCall);

EXPECT_EQ(TASK_RUNNING, status.state());

WAIT_UNTIL(resourcesChangedCall);

driver.stop();
driver.join();

Expand Down
Loading

0 comments on commit 059aabb

Please sign in to comment.