Skip to content

Commit

Permalink
More tests for the Standard Job Executor
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Aug 28, 2017
1 parent a6a7824 commit cf3baec
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 32 deletions.
Expand Up @@ -207,12 +207,12 @@ namespace wrench {
std::string core_allocation_algorithm =
this->getPropertyValueAsString(StandardJobExecutorProperty::CORE_ALLOCATION_ALGORITHM);

minimum_num_cores = (unsigned long) (wu->tasks[0]->getMinNumCores());
minimum_num_cores = wu->tasks[0]->getMinNumCores();

if (core_allocation_algorithm == "maximum") {
desired_num_cores = (unsigned long) wu->tasks[0]->getMaxNumCores();
desired_num_cores = wu->tasks[0]->getMaxNumCores();
} else if (core_allocation_algorithm == "minimum") {
desired_num_cores = (unsigned long) wu->tasks[0]->getMinNumCores();
desired_num_cores = wu->tasks[0]->getMinNumCores();
} else {
throw std::runtime_error("Unknown StandardJobExecutorProperty::CORE_ALLOCATION_ALGORITHM property '"
+ core_allocation_algorithm + "'");
Expand All @@ -228,6 +228,7 @@ namespace wrench {
// Find a host on which to run the workunit


WRENCH_INFO("Looking for a host to run a working unit that needs at least %ld cores and would like %ld cores", minimum_num_cores, desired_num_cores);
std::string host_selection_algorithm =
this->getPropertyValueAsString(StandardJobExecutorProperty::HOST_SELECTION_ALGORITHM);

Expand Down Expand Up @@ -451,6 +452,9 @@ namespace wrench {
}
}

// Update core availabilities
this->core_availabilities[workunit_executor->getHostname()] += workunit_executor->getNumCores();

// Remove the work from the running work queue
if (this->running_workunits.find(workunit) == this->running_workunits.end()) {
throw std::runtime_error(
Expand Down Expand Up @@ -493,9 +497,8 @@ namespace wrench {
return;
}

}


}



Expand Down
Expand Up @@ -187,7 +187,7 @@ namespace wrench {
}

// Run the task's computation (which can be multicore)
WRENCH_INFO("Executing task %s (%lf flops)", task->getId().c_str(), task->getFlops());
WRENCH_INFO("Executing task %s (%lf flops) on %ld cores", task->getId().c_str(), task->getFlops(), this->num_cores);
task->setRunning();
task->setStartDate(S4U_Simulation::getClock());

Expand Down Expand Up @@ -266,7 +266,7 @@ namespace wrench {
}

// Wait for all actors to complete
for (int i=0; i < compute_threads.size(); i++) {
for (unsigned long i=0; i < compute_threads.size(); i++) {
compute_threads[i]->join();
}

Expand Down
2 changes: 0 additions & 2 deletions src/wrench/services/storage_services/StorageService.cpp
Expand Up @@ -519,8 +519,6 @@ namespace wrench {
void StorageService::copyFile(WorkflowFile *file, StorageService *src) {


WRENCH_INFO("IN COPY_FILE");

if ((file == nullptr) || (src == nullptr)) {
throw std::invalid_argument("StorageService::copyFile(): Invalid arguments");
}
Expand Down
2 changes: 1 addition & 1 deletion src/wrench/simgrid_S4U_util/S4U_Mailbox.cpp
Expand Up @@ -56,7 +56,7 @@ namespace wrench {
throw std::shared_ptr<NetworkError>(new NetworkError(NetworkError::RECEIVING, mailbox_name));
}

WRENCH_INFO("GOT a '%s' message from %s", msg->getName().c_str(), mailbox_name.c_str());
WRENCH_DEBUG("GOT a '%s' message from %s", msg->getName().c_str(), mailbox_name.c_str());
return std::unique_ptr<SimulationMessage>(msg);
}

Expand Down
217 changes: 195 additions & 22 deletions test/simulation/StandardJobExecutorTest.cpp
Expand Up @@ -31,6 +31,9 @@ class StandardJobExecutorTest : public ::testing::Test {
void do_OneMultiCoreTaskTest_test();
void do_TwoMultiCoreTasksTest_test();

static bool isJustABitGreater(double base, double variable) {
return ((variable > base) && (variable < base + EPSILON));
}

protected:
StandardJobExecutorTest() {
Expand Down Expand Up @@ -61,7 +64,7 @@ class StandardJobExecutorTest : public ::testing::Test {
};

/**********************************************************************/
/** ONE SINGLE-CORE TASK SIMULATION TEST **/
/** ONE SINGLE-CORE TASK SIMULATION TEST ON ONE HOST **/
/**********************************************************************/

class OneSingleCoreTaskTestWMS : public wrench::WMS {
Expand Down Expand Up @@ -143,18 +146,21 @@ class OneSingleCoreTaskTestWMS : public wrench::WMS {
double expected_duration = task->getFlops() + 2 * thread_startup_overhead;

// Does the task completion time make sense?
if ((task->getEndDate() < expected_duration) ||
(task->getEndDate() > expected_duration + thread_startup_overhead + EPSILON)) {
if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, task->getEndDate())) {
throw std::runtime_error("Unexpected task completion time (should be around " +
std::to_string(expected_duration) + " but is " +
std::to_string(task->getEndDate()) + ")");
}

// Doe the task-stored time information look good
if ((task->getStartDate() > thread_startup_overhead + EPSILON) || (task->getEndDate() > wrench::S4U_Simulation::getClock() - thread_startup_overhead)) {
if (!StandardJobExecutorTest::isJustABitGreater(thread_startup_overhead, task->getStartDate())) {
throw std::runtime_error(
"Case 1: Unexpected task start end date: " + std::to_string(task->getStartDate()));
}

if (task->getEndDate() > wrench::S4U_Simulation::getClock() - thread_startup_overhead) {
throw std::runtime_error(
"Case 1: Unexpected task start and/or end date (start = " + std::to_string(task->getStartDate()) +
"; end = " + std::to_string(task->getEndDate()));
"Case 1: Unexpected task end date: " + std::to_string(task->getEndDate()));
}

// Has the output file been created?
Expand Down Expand Up @@ -323,7 +329,7 @@ class OneMultiCoreTaskTestWMS : public wrench::WMS {

double expected_duration = task->getFlops() / 6;
// Does the task completion time make sense?
if ((observed_duration < expected_duration) || (observed_duration > expected_duration + EPSILON)) {
if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, observed_duration)) {
throw std::runtime_error(
"Case 1: Unexpected task duration (should be around " + std::to_string(expected_duration) + " but is " +
std::to_string(observed_duration) + ")");
Expand Down Expand Up @@ -383,7 +389,7 @@ class OneMultiCoreTaskTestWMS : public wrench::WMS {
double expected_duration = task->getFlops() / (10 * task->getParallelEfficiency());

// Does the task completion time make sense?
if ((observed_duration < expected_duration) || (observed_duration > expected_duration + EPSILON)) {
if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, observed_duration)) {
throw std::runtime_error(
"Case 2: Unexpected task duration (should be around " + std::to_string(expected_duration) + " but is " +
std::to_string(observed_duration) + ")");
Expand Down Expand Up @@ -443,7 +449,7 @@ class OneMultiCoreTaskTestWMS : public wrench::WMS {
double expected_duration = 10 * thread_startup_overhead + task->getFlops() / (10 * task->getParallelEfficiency());

// Does the task completion time make sense?
if ((observed_duration < expected_duration) || (observed_duration > expected_duration + EPSILON)) {
if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, observed_duration)) {
throw std::runtime_error(
"Case 3: Unexpected job duration (should be around " + std::to_string(expected_duration) + " but is " +
std::to_string(observed_duration) + ")");
Expand Down Expand Up @@ -528,7 +534,7 @@ void StandardJobExecutorTest::do_OneMultiCoreTaskTest_test() {


/**********************************************************************/
/** TWO MULTI-CORE TASKS SIMULATION TEST **/
/** TWO MULTI-CORE TASKS SIMULATION TEST ON ONE HOST **/
/**********************************************************************/

class TwoMultiCoreTasksTestWMS : public wrench::WMS {
Expand Down Expand Up @@ -558,18 +564,175 @@ class TwoMultiCoreTasksTestWMS : public wrench::WMS {
// std::unique_ptr<wrench::DataMovementManager>(new wrench::DataMovementManager(this->workflow));


/** Case 1: Create two tasks that will run in sequence **/
// /** Case 1: Create two tasks that will run in sequence with the default scheduling options **/
// {
// wrench::WorkflowTask *task1 = this->workflow->addTask("task1", 3600, 2, 6, 1.0);
// wrench::WorkflowTask *task2 = this->workflow->addTask("task2", 300, 6, 6, 1.0);
// task1->addInputFile(workflow->getFileById("input_file"));
// task1->addOutputFile(workflow->getFileById("output_file"));
// task2->addInputFile(workflow->getFileById("input_file"));
// task2->addOutputFile(workflow->getFileById("output_file"));
//
// // Create a StandardJob with both tasks
// wrench::StandardJob *job = job_manager->createStandardJob(
// {task1, task2},
// {
// {workflow->getFileById("input_file"), this->test->storage_service1},
// {workflow->getFileById("output_file"), this->test->storage_service1}
// },
// {std::tuple<wrench::WorkflowFile *, wrench::StorageService *, wrench::StorageService *>(workflow->getFileById("input_file"), this->test->storage_service1, this->test->storage_service2)},
// {},
// {std::tuple<wrench::WorkflowFile *, wrench::StorageService *>(workflow->getFileById("input_file"), this->test->storage_service2)}
// );
//
// std::string my_mailbox = "test_callback_mailbox";
//
// double before = wrench::S4U_Simulation::getClock();
//
// // Create a StandardJobExecutor that will run stuff on one host and all 10 cores
// wrench::StandardJobExecutor *executor = new wrench::StandardJobExecutor(
// test->simulation,
// my_mailbox,
// test->simulation->getHostnameList()[0],
// job,
// {std::pair<std::string, unsigned long>{test->simulation->getHostnameList()[0], 10}},
// nullptr,
// {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}}
// );
//
// // Wait for a message on my mailbox
// std::unique_ptr<wrench::SimulationMessage> message;
// try {
// message = wrench::S4U_Mailbox::getMessage(my_mailbox);
// } catch (std::shared_ptr<wrench::NetworkError> cause) {
// throw std::runtime_error("Network error while getting reply from StandardJobExecutor!" + cause->toString());
// }
//
// // Did we get the expected message?
// wrench::StandardJobExecutorDoneMessage *msg = dynamic_cast<wrench::StandardJobExecutorDoneMessage *>(message.get());
// if (!msg) {
// throw std::runtime_error("Unexpected '" + message->getName() + "' message");
// }
//
// double after = wrench::S4U_Simulation::getClock();
//
// double observed_duration = after - before;
//
// double expected_duration = task1->getFlops() / 6 + task2->getFlops() / 6;
//
// // Does the task completion time make sense?
// if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, observed_duration)) {
// throw std::runtime_error(
// "Case 1: Unexpected job duration (should be around " +
// std::to_string(expected_duration) + " but is " +
// std::to_string(observed_duration) + ")");
// }
//
// // Do individual task completion times make sense?
// if (!StandardJobExecutorTest::isJustABitGreater(before + task1->getFlops() / 6, task1->getEndDate())) {
// throw std::runtime_error("Case 1: Unexpected task1 end date: " + std::to_string(task1->getEndDate()));
// }
//
// if (!StandardJobExecutorTest::isJustABitGreater(task1->getFlops() / 6 + task2->getFlops() / 6, task2->getEndDate())) {
// throw std::runtime_error("Case 1: Unexpected task2 end date: " + std::to_string(task2->getEndDate()) + " AFTER=" + std::to_string(after));
// }
//
// workflow->removeTask(task1);
// workflow->removeTask(task2);
// }
//
// /** Case 2: Create two tasks that will run in parallel with the default scheduling options **/
// {
// wrench::WorkflowTask *task1 = this->workflow->addTask("task1", 3600, 6, 6, 1.0);
// wrench::WorkflowTask *task2 = this->workflow->addTask("task2", 300, 2, 6, 1.0);
// task1->addInputFile(workflow->getFileById("input_file"));
// task1->addOutputFile(workflow->getFileById("output_file"));
// task2->addInputFile(workflow->getFileById("input_file"));
// task2->addOutputFile(workflow->getFileById("output_file"));
//
// // Create a StandardJob with both tasks
// wrench::StandardJob *job = job_manager->createStandardJob(
// {task1, task2},
// {
// {workflow->getFileById("input_file"), this->test->storage_service1},
// {workflow->getFileById("output_file"), this->test->storage_service1}
// },
// {std::tuple<wrench::WorkflowFile *, wrench::StorageService *, wrench::StorageService *>(workflow->getFileById("input_file"), this->test->storage_service1, this->test->storage_service2)},
// {},
// {std::tuple<wrench::WorkflowFile *, wrench::StorageService *>(workflow->getFileById("input_file"), this->test->storage_service2)}
// );
//
// std::string my_mailbox = "test_callback_mailbox";
//
// double before = wrench::S4U_Simulation::getClock();
//
// // Create a StandardJobExecutor that will run stuff on one host and all 10 cores
// wrench::StandardJobExecutor *executor = new wrench::StandardJobExecutor(
// test->simulation,
// my_mailbox,
// test->simulation->getHostnameList()[0],
// job,
// {std::pair<std::string, unsigned long>{test->simulation->getHostnameList()[0], 10}},
// nullptr,
// {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}}
// );
//
// // Wait for a message on my mailbox
// std::unique_ptr<wrench::SimulationMessage> message;
// try {
// message = wrench::S4U_Mailbox::getMessage(my_mailbox);
// } catch (std::shared_ptr<wrench::NetworkError> cause) {
// throw std::runtime_error("Network error while getting reply from StandardJobExecutor!" + cause->toString());
// }
//
// // Did we get the expected message?
// wrench::StandardJobExecutorDoneMessage *msg = dynamic_cast<wrench::StandardJobExecutorDoneMessage *>(message.get());
// if (!msg) {
// throw std::runtime_error("Unexpected '" + message->getName() + "' message");
// }
//
// double after = wrench::S4U_Simulation::getClock();
//
// double observed_duration = after - before;
//
// double expected_duration = MAX(task1->getFlops() / 6, task2->getFlops() / 4);
//
// // Does the overall completion time make sense?
// if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, observed_duration)) {
// throw std::runtime_error(
// "Case 2: Unexpected job duration (should be around " +
// std::to_string(expected_duration) + " but is " +
// std::to_string(observed_duration) + ")");
// }
//
// // Do individual task completion times make sense?
// if (!StandardJobExecutorTest::isJustABitGreater(before + task1->getFlops()/6, task1->getEndDate())) {
// throw std::runtime_error("Case 2: Unexpected task1 end date: " + std::to_string(task1->getEndDate()));
// }
//
// if (!StandardJobExecutorTest::isJustABitGreater(before + task2->getFlops()/4, task2->getEndDate())) {
// throw std::runtime_error("Case 2: Unexpected task2 end date: " + std::to_string(task2->getEndDate()));
// }
//
// workflow->removeTask(task1);
// workflow->removeTask(task2);
// }

/** Case 3: Create three tasks that will run in parallel and then sequential with the default scheduling options **/
{
wrench::WorkflowTask *task1 = this->workflow->addTask("task1", 3600, 2, 6, 1.0);
wrench::WorkflowTask *task2 = this->workflow->addTask("task2", 300, 6, 6, 1.0);
wrench::WorkflowTask *task1 = this->workflow->addTask("task1", 3600, 6, 6, 1.0);
wrench::WorkflowTask *task2 = this->workflow->addTask("task2", 400, 2, 6, 1.0);
wrench::WorkflowTask *task3 = this->workflow->addTask("task3", 300, 10, 10, 0.6);
task1->addInputFile(workflow->getFileById("input_file"));
task1->addOutputFile(workflow->getFileById("output_file"));
task2->addInputFile(workflow->getFileById("input_file"));
task2->addOutputFile(workflow->getFileById("output_file"));
task3->addInputFile(workflow->getFileById("input_file"));
task3->addOutputFile(workflow->getFileById("output_file"));

// Create a StandardJob with both tasks
// Create a StandardJob with all three tasks
wrench::StandardJob *job = job_manager->createStandardJob(
{task1, task2},
{task1, task2, task3},
{
{workflow->getFileById("input_file"), this->test->storage_service1},
{workflow->getFileById("output_file"), this->test->storage_service1}
Expand Down Expand Up @@ -612,22 +775,32 @@ class TwoMultiCoreTasksTestWMS : public wrench::WMS {

double observed_duration = after - before;

double expected_duration = task1->getFlops() / 6 + task2->getFlops() / 6;
// Does the task completion time make sense?
if ((observed_duration < expected_duration) || (observed_duration > expected_duration + EPSILON)) {
double expected_duration = MAX(task1->getFlops() / 6, task2->getFlops() /4) + task3->getFlops() / (task3->getParallelEfficiency() * 10);

// Does the job completion time make sense?
if (!StandardJobExecutorTest::isJustABitGreater(expected_duration, observed_duration)) {
throw std::runtime_error(
"Case 1: Unexpected job duration (should be around " +
"Case 3: Unexpected job duration (should be around " +
std::to_string(expected_duration) + " but is " +
std::to_string(observed_duration) + ")");
}

// Do the individual task completion times make sense
if (!StandardJobExecutorTest::isJustABitGreater(before + task1->getFlops()/6.0, task1->getEndDate())) {
throw std::runtime_error("Case 3: Unexpected task1 end date: " + std::to_string(task1->getEndDate()));
}
if (!StandardJobExecutorTest::isJustABitGreater(before + task2->getFlops()/4.0, task2->getEndDate())) {
throw std::runtime_error("Case 3: Unexpected task1 end date: " + std::to_string(task2->getEndDate()));
}
if (!StandardJobExecutorTest::isJustABitGreater(task1->getEndDate() + task3->getFlops()/(task3->getParallelEfficiency() * 10.0), task3->getEndDate())) {
throw std::runtime_error("Case 3: Unexpected task3 end date: " + std::to_string(task3->getEndDate()));
}

workflow->removeTask(task1);
workflow->removeTask(task2);
workflow->removeTask(task3);
}




// Terminate everything
this->simulation->shutdownAllComputeServices();
this->simulation->shutdownAllStorageServices();
Expand Down

0 comments on commit cf3baec

Please sign in to comment.