Skip to content

Commit

Permalink
The MultihostMulticoreComputeService now checks, when a StandardJob is
Browse files Browse the repository at this point in the history
submitted to it, that enough resources are available to run the job.
  • Loading branch information
henricasanova committed Oct 11, 2017
1 parent 7d8bee1 commit df319b9
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 36 deletions.
1 change: 0 additions & 1 deletion src/wrench/services/compute/batch/BatchService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,6 @@ namespace wrench {
*/
void BatchService::processStandardJobCompletion(StandardJobExecutor *executor, StandardJob *job) {
// Remove the executor from the executor list
WRENCH_INFO("====> %ld", this->standard_job_executors.size());
if (this->standard_job_executors.find(executor) == this->standard_job_executors.end()) {
throw std::runtime_error("BatchService::processStandardJobCompletion(): Received a standard job completion, but the executor is not in the executor list");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace wrench {
* @throw std::runtime_error
*/
void MultihostMulticoreComputeService::submitStandardJob(StandardJob *job,
std::map<std::string, std::string> &service_specific_args) {
std::map<std::string, std::string> &service_specific_args) {

if (this->state == Service::DOWN) {
throw WorkflowExecutionException(new ServiceIsDown(this));
Expand Down Expand Up @@ -225,17 +225,17 @@ namespace wrench {
* @param plist: a property list
*/
MultihostMulticoreComputeService::MultihostMulticoreComputeService(std::string hostname,
bool supports_standard_jobs,
bool supports_pilot_jobs,
std::set<std::pair<std::string, unsigned long>> compute_resources,
StorageService *default_storage_service,
std::map<std::string, std::string> plist) :
bool supports_standard_jobs,
bool supports_pilot_jobs,
std::set<std::pair<std::string, unsigned long>> compute_resources,
StorageService *default_storage_service,
std::map<std::string, std::string> plist) :
MultihostMulticoreComputeService::MultihostMulticoreComputeService(hostname,
supports_standard_jobs,
supports_pilot_jobs,
compute_resources,
plist, -1, nullptr, "",
default_storage_service) {
supports_standard_jobs,
supports_pilot_jobs,
compute_resources,
plist, -1, nullptr, "",
default_storage_service) {

}

Expand Down Expand Up @@ -354,7 +354,6 @@ namespace wrench {
*/
bool MultihostMulticoreComputeService::dispatchNextPendingJob() {

WRENCH_INFO("IN DISPATCH");
if (this->pending_jobs.size() == 0) {
return false;
}
Expand Down Expand Up @@ -419,6 +418,7 @@ namespace wrench {
std::set<std::pair<std::string, unsigned long>> MultihostMulticoreComputeService::computeResourceAllocationAggressive(StandardJob *job) {


WRENCH_INFO("COMPUTING RESOURCE ALLOCATION: %ld", this->core_availabilities.size());
// Make a copy of core_availabilities
std::map<std::string, unsigned long> tentative_availabilities;
for (auto r : this->core_availabilities) {
Expand All @@ -441,14 +441,18 @@ namespace wrench {
unsigned long picked_picked_num_cores = 0;

for (auto t : tasks) {
WRENCH_INFO("LOOKING AT TASK %s", t->getId().c_str());
std::string picked_host;
unsigned long picked_num_cores = 0;

WRENCH_INFO("---> %ld", tentative_availabilities.size());
for (auto r : tentative_availabilities) {
WRENCH_INFO(" LOOKING AT HOST %s", r.first.c_str());
std::string hostname = r.first;
unsigned long num_available_cores = r.second;

if (num_available_cores < t->getMinNumCores()) {
WRENCH_INFO(" NO DICE");
continue;
}

Expand All @@ -459,10 +463,13 @@ namespace wrench {
}

if (picked_num_cores == 0) {
WRENCH_INFO("NOPE");
continue;
}

if (picked_num_cores > picked_picked_num_cores) {
WRENCH_INFO("PICKED TASK %s on HOST %s with %ld cores",
t->getId().c_str(), picked_host.c_str(), picked_num_cores);
picked_task = t;
picked_picked_num_cores = picked_num_cores;
picked_picked_host = picked_host;
Expand All @@ -479,13 +486,16 @@ namespace wrench {
}
}

// Come up with allocation based on tentative availabities!
WRENCH_INFO("HERE");

// Come up with allocation based on tentative availabilities!
std::set<std::pair<std::string, unsigned long>> allocation;
for (auto r : tentative_availabilities) {
std::string hostname = r.first;
unsigned long num_cores = r.second;

if (num_cores < this->core_availabilities[hostname]) {
WRENCH_INFO("ALLOCATION %s/%ld", hostname.c_str(), this->core_availabilities[hostname] - num_cores);
allocation.insert(std::make_pair(hostname, this->core_availabilities[hostname] - num_cores));
}
}
Expand All @@ -502,17 +512,13 @@ namespace wrench {
*/
bool MultihostMulticoreComputeService::dispatchStandardJob(StandardJob *job) {

WRENCH_INFO("IN DISPATCH STANDARD");

// Compute the required minimum number of cores
unsigned long minimum_required_num_cores = 1;

for (auto t : (job)->getTasks()) {
minimum_required_num_cores = MAX(minimum_required_num_cores, t->getMinNumCores());
}

WRENCH_INFO("MIN NUM CORES = %ld", minimum_required_num_cores);

// Find the list of hosts with the required number of cores
std::set<std::string> possible_hosts;
for (auto it = this->core_availabilities.begin(); it != this->core_availabilities.end(); it++) {
Expand Down Expand Up @@ -607,13 +613,13 @@ namespace wrench {

ComputeService *cs =
new MultihostMulticoreComputeService(this->hostname,
true, false,
compute_resources,
this->property_list,
job->getDuration(),
job,
"_pilot",
this->default_storage_service);
true, false,
compute_resources,
this->property_list,
job->getDuration(),
job,
"_pilot",
this->default_storage_service);

cs->setSimulation(this->simulation);
job->setComputeService(cs);
Expand Down Expand Up @@ -1003,7 +1009,6 @@ namespace wrench {
void MultihostMulticoreComputeService::processStandardJobCompletion(StandardJobExecutor *executor, StandardJob *job) {

// Remove the executor from the executor list
WRENCH_INFO("====> %ld", this->standard_job_executors.size());
if (this->standard_job_executors.find(executor) == this->standard_job_executors.end()) {
throw std::runtime_error(
"MultihostMulticoreComputeService::processStandardJobCompletion(): Received a standard job completion, but the executor is not in the executor list");
Expand Down Expand Up @@ -1038,8 +1043,8 @@ namespace wrench {
* @param cause: the cause of the failure
*/
void MultihostMulticoreComputeService::processStandardJobFailure(StandardJobExecutor *executor,
StandardJob *job,
std::shared_ptr<FailureCause> cause) {
StandardJob *job,
std::shared_ptr<FailureCause> cause) {

// Remove the executor from the executor list
if (this->standard_job_executors.find(executor) == this->standard_job_executors.end()) {
Expand Down Expand Up @@ -1387,8 +1392,10 @@ namespace wrench {
* @throw std::runtime_error
*/
void MultihostMulticoreComputeService::processSubmitStandardJob(std::string &answer_mailbox, StandardJob *job,
std::map<std::string, std::string> &service_specific_arguments) {
std::map<std::string, std::string> &service_specific_arguments) {
WRENCH_INFO("Asked to run a standard job with %ld tasks", job->getNumTasks());

// Do we support standard jobs?
if (not this->supportsStandardJobs()) {
try {
S4U_Mailbox::dputMessage(
Expand All @@ -1403,6 +1410,34 @@ namespace wrench {
return;
}

// Can we run this job assuming the whole set of resources is available?
unsigned long max_min_num_cores = 0;
for (auto t : job->getTasks()) {
max_min_num_cores = MAX(max_min_num_cores, t->getMinNumCores());
}
bool enough_resources = false;
for (auto r : this->compute_resources) {
unsigned long num_cores = r.second;
if (num_cores >= max_min_num_cores) {
enough_resources = true;
}
}

if (!enough_resources) {
try {
S4U_Mailbox::dputMessage(
answer_mailbox,
new ComputeServiceSubmitStandardJobAnswerMessage(
job, this, false, std::shared_ptr<FailureCause>(new NotEnoughComputeResources(job, this)),
this->getPropertyValueAsDouble(
MultihostMulticoreComputeServiceProperty::NOT_ENOUGH_CORES_MESSAGE_PAYLOAD)));
} catch (std::shared_ptr<NetworkError> &cause) {
return;
}
return;
}

// Since we can run, add the job to the list of pending jobs
this->pending_jobs.push_front((WorkflowJob *) job);

try {
Expand Down
52 changes: 45 additions & 7 deletions test/simulation/MultihostMulticoreComputeServiceSchedulingTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class MultihostMulticoreComputeServiceTestScheduling : public ::testing::Test {
" <AS id=\"AS0\" routing=\"Full\"> "
" <host id=\"Host1\" speed=\"1f\" core=\"4\"/> "
" <host id=\"Host2\" speed=\"1f\" core=\"4\"/> "
" <link id=\"1\" bandwidth=\"5000GBps\" latency=\"0us\"/>"
" <route src=\"Host1\" dst=\"Host2\"> <link_ctn id=\"1\"/> </route>"
" </AS> "
"</platform>";
FILE *platform_file = fopen(platform_file_path.c_str(), "w");
Expand Down Expand Up @@ -90,7 +92,45 @@ class XNoopTestWMS : public wrench::WMS {
tasks.push_back(t1);
tasks.push_back(t2);
tasks.push_back(t3);
job_manager->createStandardJob(tasks, {}, {}, {}, {});
wrench::StandardJob *job = job_manager->createStandardJob(tasks, {}, {}, {}, {});

job_manager->submitJob(job, this->test->compute_service);

// Wait for a workflow execution event
std::unique_ptr<wrench::WorkflowExecutionEvent> event;
try {
event = workflow->waitForNextExecutionEvent();
} catch (wrench::WorkflowExecutionException &e) {
throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString());
}
switch (event->type) {
case wrench::WorkflowExecutionEvent::STANDARD_JOB_COMPLETION: {
// success, do nothing for now
break;
}
default: {
throw std::runtime_error("Unexpected workflow execution event: " + std::to_string((int) (event->type)));
}
}

// Check completion states and times
if ((t1->getState() != wrench::WorkflowTask::COMPLETED) ||
(t2->getState() != wrench::WorkflowTask::COMPLETED) ||
(t3->getState() != wrench::WorkflowTask::COMPLETED)) {
throw std::runtime_error("Unexpected task states");
}

double task1_end_date = t1->getEndDate();
double task2_end_date = t2->getEndDate();
double task3_end_date = t3->getEndDate();

WRENCH_INFO("t1:%lf t2:%lf t3:%lf", task1_end_date, task2_end_date, task3_end_date);

// double delta = fabs(task1_end_date - task2_end_date);
// if (delta > 0.1) {
// throw std::runtime_error("Task completion times should be about 0.0 seconds apart but they are " +
// std::to_string(delta) + " apart.");
// }


// Terminate
Expand All @@ -117,21 +157,19 @@ void MultihostMulticoreComputeServiceTestScheduling::do_Noop_test() {
// Setting up the platform
EXPECT_NO_THROW(simulation->instantiatePlatform(platform_file_path));

// Get a hostname
std::string hostname = simulation->getHostnameList()[0];

// Create a WMS
EXPECT_NO_THROW(wrench::WMS *wms = simulation->setWMS(
std::unique_ptr<wrench::WMS>(new XNoopTestWMS(this, workflow,
std::unique_ptr<wrench::Scheduler>(
new NoopScheduler()),
hostname))));
"Host1"))));

// Create a Compute Service
EXPECT_NO_THROW(compute_service = simulation->add(
std::unique_ptr<wrench::MultihostMulticoreComputeService>(
new wrench::MultihostMulticoreComputeService(hostname, true, true,
{std::pair<std::string, unsigned long>(hostname, 0)},
new wrench::MultihostMulticoreComputeService("Host1", true, true,
{std::make_pair("Host1", 0),
std::make_pair("Host2", 0)},
nullptr,
{}))));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class MultihostMulticoreComputeServiceTestStandardJobs : public ::testing::Test
" <AS id=\"AS0\" routing=\"Full\"> "
" <host id=\"DualCoreHost\" speed=\"1f\" core=\"2\"/> "
" <host id=\"QuadCoreHost\" speed=\"1f\" core=\"4\"/> "
" <link id=\"1\" bandwidth=\"5000GBps\" latency=\"0us\"/>"
" <route src=\"DualCoreHost\" dst=\"QuadCoreHost\"> <link_ctn id=\"1\"/> </route>"
" </AS> "
"</platform>";
FILE *platform_file = fopen(platform_file_path.c_str(), "w");
Expand Down

0 comments on commit df319b9

Please sign in to comment.