diff --git a/src/wrench/services/compute_services/standard_job_executor/StandardJobExecutor.cpp b/src/wrench/services/compute_services/standard_job_executor/StandardJobExecutor.cpp index ad71f0b405..0590203f01 100644 --- a/src/wrench/services/compute_services/standard_job_executor/StandardJobExecutor.cpp +++ b/src/wrench/services/compute_services/standard_job_executor/StandardJobExecutor.cpp @@ -207,12 +207,12 @@ namespace wrench { std::string core_allocation_algorithm = this->getPropertyValueAsString(StandardJobExecutorProperty::CORE_ALLOCATION_ALGORITHM); - minimum_num_cores = (unsigned long) (wu->tasks[0]->getMinNumCores()); + minimum_num_cores = wu->tasks[0]->getMinNumCores(); if (core_allocation_algorithm == "maximum") { - desired_num_cores = (unsigned long) wu->tasks[0]->getMaxNumCores(); + desired_num_cores = wu->tasks[0]->getMaxNumCores(); } else if (core_allocation_algorithm == "minimum") { - desired_num_cores = (unsigned long) wu->tasks[0]->getMinNumCores(); + desired_num_cores = wu->tasks[0]->getMinNumCores(); } else { throw std::runtime_error("Unknown StandardJobExecutorProperty::CORE_ALLOCATION_ALGORITHM property '" + core_allocation_algorithm + "'"); @@ -228,6 +228,7 @@ namespace wrench { // Find a host on which to run the workunit + WRENCH_INFO("Looking for a host to run a working unit that needs at least %ld cores and would like %ld cores", minimum_num_cores, desired_num_cores); std::string host_selection_algorithm = this->getPropertyValueAsString(StandardJobExecutorProperty::HOST_SELECTION_ALGORITHM); @@ -451,6 +452,9 @@ namespace wrench { } } + // Update core availabilities + this->core_availabilities[workunit_executor->getHostname()] += workunit_executor->getNumCores(); + // Remove the work from the running work queue if (this->running_workunits.find(workunit) == this->running_workunits.end()) { throw std::runtime_error( @@ -493,9 +497,8 @@ namespace wrench { return; } - } - + } diff --git a/src/wrench/services/compute_services/standard_job_executor/WorkunitMulticoreExecutor.cpp b/src/wrench/services/compute_services/standard_job_executor/WorkunitMulticoreExecutor.cpp index 6c227f9b20..4545a29f94 100644 --- a/src/wrench/services/compute_services/standard_job_executor/WorkunitMulticoreExecutor.cpp +++ b/src/wrench/services/compute_services/standard_job_executor/WorkunitMulticoreExecutor.cpp @@ -187,7 +187,7 @@ namespace wrench { } // Run the task's computation (which can be multicore) - WRENCH_INFO("Executing task %s (%lf flops)", task->getId().c_str(), task->getFlops()); + WRENCH_INFO("Executing task %s (%lf flops) on %ld cores", task->getId().c_str(), task->getFlops(), this->num_cores); task->setRunning(); task->setStartDate(S4U_Simulation::getClock()); @@ -266,7 +266,7 @@ namespace wrench { } // Wait for all actors to complete - for (int i=0; i < compute_threads.size(); i++) { + for (unsigned long i=0; i < compute_threads.size(); i++) { compute_threads[i]->join(); } diff --git a/src/wrench/services/storage_services/StorageService.cpp b/src/wrench/services/storage_services/StorageService.cpp index 034845b761..ae60089843 100644 --- a/src/wrench/services/storage_services/StorageService.cpp +++ b/src/wrench/services/storage_services/StorageService.cpp @@ -519,8 +519,6 @@ namespace wrench { void StorageService::copyFile(WorkflowFile *file, StorageService *src) { - WRENCH_INFO("IN COPY_FILE"); - if ((file == nullptr) || (src == nullptr)) { throw std::invalid_argument("StorageService::copyFile(): Invalid arguments"); } diff --git a/src/wrench/simgrid_S4U_util/S4U_Mailbox.cpp b/src/wrench/simgrid_S4U_util/S4U_Mailbox.cpp index 5fd4004fb8..36a65af9bd 100644 --- a/src/wrench/simgrid_S4U_util/S4U_Mailbox.cpp +++ b/src/wrench/simgrid_S4U_util/S4U_Mailbox.cpp @@ -56,7 +56,7 @@ namespace wrench { throw std::shared_ptr(new NetworkError(NetworkError::RECEIVING, mailbox_name)); } - WRENCH_INFO("GOT a '%s' message from %s", msg->getName().c_str(), mailbox_name.c_str()); + WRENCH_DEBUG("GOT a '%s' message from %s", msg->getName().c_str(), mailbox_name.c_str()); return std::unique_ptr(msg); } diff --git a/test/simulation/StandardJobExecutorTest.cpp b/test/simulation/StandardJobExecutorTest.cpp index 0a1e7a932d..98872587ea 100644 --- a/test/simulation/StandardJobExecutorTest.cpp +++ b/test/simulation/StandardJobExecutorTest.cpp @@ -31,6 +31,9 @@ class StandardJobExecutorTest : public ::testing::Test { void do_OneMultiCoreTaskTest_test(); void do_TwoMultiCoreTasksTest_test(); + static bool isJustABitGreater(double base, double variable) { + return ((variable > base) && (variable < base + EPSILON)); + } protected: StandardJobExecutorTest() { @@ -61,7 +64,7 @@ class StandardJobExecutorTest : public ::testing::Test { }; /**********************************************************************/ -/** ONE SINGLE-CORE TASK SIMULATION TEST **/ +/** ONE SINGLE-CORE TASK SIMULATION TEST ON ONE HOST **/ /**********************************************************************/ class OneSingleCoreTaskTestWMS : public wrench::WMS { @@ -143,18 +146,21 @@ class OneSingleCoreTaskTestWMS : public wrench::WMS { double expected_duration = task->getFlops() + 2 * thread_startup_overhead; // Does the task completion time make sense? - if ((task->getEndDate() < expected_duration) || - (task->getEndDate() > expected_duration + thread_startup_overhead + EPSILON)) { + if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, task->getEndDate())) { throw std::runtime_error("Unexpected task completion time (should be around " + std::to_string(expected_duration) + " but is " + std::to_string(task->getEndDate()) + ")"); } // Doe the task-stored time information look good - if ((task->getStartDate() > thread_startup_overhead + EPSILON) || (task->getEndDate() > wrench::S4U_Simulation::getClock() - thread_startup_overhead)) { + if (!StandardJobExecutorTest::isJustABitGreater(thread_startup_overhead, task->getStartDate())) { + throw std::runtime_error( + "Case 1: Unexpected task start end date: " + std::to_string(task->getStartDate())); + } + + if (task->getEndDate() > wrench::S4U_Simulation::getClock() - thread_startup_overhead) { throw std::runtime_error( - "Case 1: Unexpected task start and/or end date (start = " + std::to_string(task->getStartDate()) + - "; end = " + std::to_string(task->getEndDate())); + "Case 1: Unexpected task end date: " + std::to_string(task->getEndDate())); } // Has the output file been created? @@ -323,7 +329,7 @@ class OneMultiCoreTaskTestWMS : public wrench::WMS { double expected_duration = task->getFlops() / 6; // Does the task completion time make sense? - if ((observed_duration < expected_duration) || (observed_duration > expected_duration + EPSILON)) { + if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, observed_duration)) { throw std::runtime_error( "Case 1: Unexpected task duration (should be around " + std::to_string(expected_duration) + " but is " + std::to_string(observed_duration) + ")"); @@ -383,7 +389,7 @@ class OneMultiCoreTaskTestWMS : public wrench::WMS { double expected_duration = task->getFlops() / (10 * task->getParallelEfficiency()); // Does the task completion time make sense? - if ((observed_duration < expected_duration) || (observed_duration > expected_duration + EPSILON)) { + if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, observed_duration)) { throw std::runtime_error( "Case 2: Unexpected task duration (should be around " + std::to_string(expected_duration) + " but is " + std::to_string(observed_duration) + ")"); @@ -443,7 +449,7 @@ class OneMultiCoreTaskTestWMS : public wrench::WMS { double expected_duration = 10 * thread_startup_overhead + task->getFlops() / (10 * task->getParallelEfficiency()); // Does the task completion time make sense? - if ((observed_duration < expected_duration) || (observed_duration > expected_duration + EPSILON)) { + if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, observed_duration)) { throw std::runtime_error( "Case 3: Unexpected job duration (should be around " + std::to_string(expected_duration) + " but is " + std::to_string(observed_duration) + ")"); @@ -528,7 +534,7 @@ void StandardJobExecutorTest::do_OneMultiCoreTaskTest_test() { /**********************************************************************/ -/** TWO MULTI-CORE TASKS SIMULATION TEST **/ +/** TWO MULTI-CORE TASKS SIMULATION TEST ON ONE HOST **/ /**********************************************************************/ class TwoMultiCoreTasksTestWMS : public wrench::WMS { @@ -558,18 +564,175 @@ class TwoMultiCoreTasksTestWMS : public wrench::WMS { // std::unique_ptr(new wrench::DataMovementManager(this->workflow)); - /** Case 1: Create two tasks that will run in sequence **/ +// /** Case 1: Create two tasks that will run in sequence with the default scheduling options **/ +// { +// wrench::WorkflowTask *task1 = this->workflow->addTask("task1", 3600, 2, 6, 1.0); +// wrench::WorkflowTask *task2 = this->workflow->addTask("task2", 300, 6, 6, 1.0); +// task1->addInputFile(workflow->getFileById("input_file")); +// task1->addOutputFile(workflow->getFileById("output_file")); +// task2->addInputFile(workflow->getFileById("input_file")); +// task2->addOutputFile(workflow->getFileById("output_file")); +// +// // Create a StandardJob with both tasks +// wrench::StandardJob *job = job_manager->createStandardJob( +// {task1, task2}, +// { +// {workflow->getFileById("input_file"), this->test->storage_service1}, +// {workflow->getFileById("output_file"), this->test->storage_service1} +// }, +// {std::tuple(workflow->getFileById("input_file"), this->test->storage_service1, this->test->storage_service2)}, +// {}, +// {std::tuple(workflow->getFileById("input_file"), this->test->storage_service2)} +// ); +// +// std::string my_mailbox = "test_callback_mailbox"; +// +// double before = wrench::S4U_Simulation::getClock(); +// +// // Create a StandardJobExecutor that will run stuff on one host and all 10 cores +// wrench::StandardJobExecutor *executor = new wrench::StandardJobExecutor( +// test->simulation, +// my_mailbox, +// test->simulation->getHostnameList()[0], +// job, +// {std::pair{test->simulation->getHostnameList()[0], 10}}, +// nullptr, +// {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}} +// ); +// +// // Wait for a message on my mailbox +// std::unique_ptr message; +// try { +// message = wrench::S4U_Mailbox::getMessage(my_mailbox); +// } catch (std::shared_ptr cause) { +// throw std::runtime_error("Network error while getting reply from StandardJobExecutor!" + cause->toString()); +// } +// +// // Did we get the expected message? +// wrench::StandardJobExecutorDoneMessage *msg = dynamic_cast(message.get()); +// if (!msg) { +// throw std::runtime_error("Unexpected '" + message->getName() + "' message"); +// } +// +// double after = wrench::S4U_Simulation::getClock(); +// +// double observed_duration = after - before; +// +// double expected_duration = task1->getFlops() / 6 + task2->getFlops() / 6; +// +// // Does the task completion time make sense? +// if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, observed_duration)) { +// throw std::runtime_error( +// "Case 1: Unexpected job duration (should be around " + +// std::to_string(expected_duration) + " but is " + +// std::to_string(observed_duration) + ")"); +// } +// +// // Do individual task completion times make sense? +// if (!StandardJobExecutorTest::isJustABitGreater(before + task1->getFlops() / 6, task1->getEndDate())) { +// throw std::runtime_error("Case 1: Unexpected task1 end date: " + std::to_string(task1->getEndDate())); +// } +// +// if (!StandardJobExecutorTest::isJustABitGreater(task1->getFlops() / 6 + task2->getFlops() / 6, task2->getEndDate())) { +// throw std::runtime_error("Case 1: Unexpected task2 end date: " + std::to_string(task2->getEndDate()) + " AFTER=" + std::to_string(after)); +// } +// +// workflow->removeTask(task1); +// workflow->removeTask(task2); +// } +// +// /** Case 2: Create two tasks that will run in parallel with the default scheduling options **/ +// { +// wrench::WorkflowTask *task1 = this->workflow->addTask("task1", 3600, 6, 6, 1.0); +// wrench::WorkflowTask *task2 = this->workflow->addTask("task2", 300, 2, 6, 1.0); +// task1->addInputFile(workflow->getFileById("input_file")); +// task1->addOutputFile(workflow->getFileById("output_file")); +// task2->addInputFile(workflow->getFileById("input_file")); +// task2->addOutputFile(workflow->getFileById("output_file")); +// +// // Create a StandardJob with both tasks +// wrench::StandardJob *job = job_manager->createStandardJob( +// {task1, task2}, +// { +// {workflow->getFileById("input_file"), this->test->storage_service1}, +// {workflow->getFileById("output_file"), this->test->storage_service1} +// }, +// {std::tuple(workflow->getFileById("input_file"), this->test->storage_service1, this->test->storage_service2)}, +// {}, +// {std::tuple(workflow->getFileById("input_file"), this->test->storage_service2)} +// ); +// +// std::string my_mailbox = "test_callback_mailbox"; +// +// double before = wrench::S4U_Simulation::getClock(); +// +// // Create a StandardJobExecutor that will run stuff on one host and all 10 cores +// wrench::StandardJobExecutor *executor = new wrench::StandardJobExecutor( +// test->simulation, +// my_mailbox, +// test->simulation->getHostnameList()[0], +// job, +// {std::pair{test->simulation->getHostnameList()[0], 10}}, +// nullptr, +// {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}} +// ); +// +// // Wait for a message on my mailbox +// std::unique_ptr message; +// try { +// message = wrench::S4U_Mailbox::getMessage(my_mailbox); +// } catch (std::shared_ptr cause) { +// throw std::runtime_error("Network error while getting reply from StandardJobExecutor!" + cause->toString()); +// } +// +// // Did we get the expected message? +// wrench::StandardJobExecutorDoneMessage *msg = dynamic_cast(message.get()); +// if (!msg) { +// throw std::runtime_error("Unexpected '" + message->getName() + "' message"); +// } +// +// double after = wrench::S4U_Simulation::getClock(); +// +// double observed_duration = after - before; +// +// double expected_duration = MAX(task1->getFlops() / 6, task2->getFlops() / 4); +// +// // Does the overall completion time make sense? +// if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, observed_duration)) { +// throw std::runtime_error( +// "Case 2: Unexpected job duration (should be around " + +// std::to_string(expected_duration) + " but is " + +// std::to_string(observed_duration) + ")"); +// } +// +// // Do individual task completion times make sense? +// if (!StandardJobExecutorTest::isJustABitGreater(before + task1->getFlops()/6, task1->getEndDate())) { +// throw std::runtime_error("Case 2: Unexpected task1 end date: " + std::to_string(task1->getEndDate())); +// } +// +// if (!StandardJobExecutorTest::isJustABitGreater(before + task2->getFlops()/4, task2->getEndDate())) { +// throw std::runtime_error("Case 2: Unexpected task2 end date: " + std::to_string(task2->getEndDate())); +// } +// +// workflow->removeTask(task1); +// workflow->removeTask(task2); +// } + + /** Case 3: Create three tasks that will run in parallel and then sequential with the default scheduling options **/ { - wrench::WorkflowTask *task1 = this->workflow->addTask("task1", 3600, 2, 6, 1.0); - wrench::WorkflowTask *task2 = this->workflow->addTask("task2", 300, 6, 6, 1.0); + wrench::WorkflowTask *task1 = this->workflow->addTask("task1", 3600, 6, 6, 1.0); + wrench::WorkflowTask *task2 = this->workflow->addTask("task2", 400, 2, 6, 1.0); + wrench::WorkflowTask *task3 = this->workflow->addTask("task3", 300, 10, 10, 0.6); task1->addInputFile(workflow->getFileById("input_file")); task1->addOutputFile(workflow->getFileById("output_file")); task2->addInputFile(workflow->getFileById("input_file")); task2->addOutputFile(workflow->getFileById("output_file")); + task3->addInputFile(workflow->getFileById("input_file")); + task3->addOutputFile(workflow->getFileById("output_file")); - // Create a StandardJob with both tasks + // Create a StandardJob with all three tasks wrench::StandardJob *job = job_manager->createStandardJob( - {task1, task2}, + {task1, task2, task3}, { {workflow->getFileById("input_file"), this->test->storage_service1}, {workflow->getFileById("output_file"), this->test->storage_service1} @@ -612,22 +775,32 @@ class TwoMultiCoreTasksTestWMS : public wrench::WMS { double observed_duration = after - before; - double expected_duration = task1->getFlops() / 6 + task2->getFlops() / 6; - // Does the task completion time make sense? - if ((observed_duration < expected_duration) || (observed_duration > expected_duration + EPSILON)) { + double expected_duration = MAX(task1->getFlops() / 6, task2->getFlops() /4) + task3->getFlops() / (task3->getParallelEfficiency() * 10); + + // Does the job completion time make sense? + if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, observed_duration)) { throw std::runtime_error( - "Case 1: Unexpected job duration (should be around " + + "Case 3: Unexpected job duration (should be around " + std::to_string(expected_duration) + " but is " + std::to_string(observed_duration) + ")"); } + // Do the individual task completion times make sense + if (!StandardJobExecutorTest::isJustABitGreater(before + task1->getFlops()/6.0, task1->getEndDate())) { + throw std::runtime_error("Case 3: Unexpected task1 end date: " + std::to_string(task1->getEndDate())); + } + if (!StandardJobExecutorTest::isJustABitGreater(before + task2->getFlops()/4.0, task2->getEndDate())) { + throw std::runtime_error("Case 3: Unexpected task1 end date: " + std::to_string(task2->getEndDate())); + } + if (!StandardJobExecutorTest::isJustABitGreater(task1->getEndDate() + task3->getFlops()/(task3->getParallelEfficiency() * 10.0), task3->getEndDate())) { + throw std::runtime_error("Case 3: Unexpected task3 end date: " + std::to_string(task3->getEndDate())); + } + workflow->removeTask(task1); workflow->removeTask(task2); + workflow->removeTask(task3); } - - - // Terminate everything this->simulation->shutdownAllComputeServices(); this->simulation->shutdownAllStorageServices();