From 1e8c86d3f5703a11f3dd5f7fa0ed26a6c1bbef62 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Mon, 20 Apr 2020 13:42:22 -1000 Subject: [PATCH] (#157) Added a pilot job example --- conf/cmake/Examples.cmake | 1 + .../BareMetalBagOfTasks.cpp | 4 +- .../TwoTasksAtATimeWMS.cpp | 28 ++- .../BareMetalChainScratch.cpp | 4 +- .../WorkflowAsAsingleJobWMS.cpp | 10 +- .../WorkflowAsAsingleJobWMS.h | 6 +- .../bare-metal-chain/BareMetalChain.cpp | 4 +- .../bare-metal-chain/OneTaskAtATimeWMS.cpp | 4 +- .../bare-metal-complex-job/ComplexJob.cpp | 4 +- .../bare-metal-complex-job/ComplexJobWMS.cpp | 7 +- .../batch-bag-of-tasks/BatchBagOfTasks.cpp | 4 +- .../TwoTasksAtATimeBatchWMS.cpp | 4 +- .../batch-pilot-job/BatchPilotJob.cpp | 147 +++++++++++++++ .../batch-pilot-job/CMakeLists.txt | 13 ++ .../batch-pilot-job/PilotJobWMS.cpp | 177 ++++++++++++++++++ .../batch-pilot-job/PilotJobWMS.h | 47 +++++ .../cloud-bag-of-tasks/CloudBagOfTasks.cpp | 6 +- .../TwoTasksAtATimeCloudWMS.cpp | 18 +- 18 files changed, 456 insertions(+), 32 deletions(-) create mode 100644 examples/basic-examples/batch-pilot-job/BatchPilotJob.cpp create mode 100644 examples/basic-examples/batch-pilot-job/CMakeLists.txt create mode 100644 examples/basic-examples/batch-pilot-job/PilotJobWMS.cpp create mode 100644 examples/basic-examples/batch-pilot-job/PilotJobWMS.h diff --git a/conf/cmake/Examples.cmake b/conf/cmake/Examples.cmake index bdb3121ae2..b364afe180 100644 --- a/conf/cmake/Examples.cmake +++ b/conf/cmake/Examples.cmake @@ -7,6 +7,7 @@ set(EXAMPLES_CMAKEFILES_TXT examples/basic-examples/bare-metal-complex-job/CMakeLists.txt examples/basic-examples/cloud-bag-of-tasks/CMakeLists.txt examples/basic-examples/batch-bag-of-tasks/CMakeLists.txt + examples/basic-examples/batch-pilot-job/CMakeLists.txt ) foreach (cmakefile ${EXAMPLES_CMAKEFILES_TXT}) diff --git a/examples/basic-examples/bare-metal-bag-of-tasks/BareMetalBagOfTasks.cpp b/examples/basic-examples/bare-metal-bag-of-tasks/BareMetalBagOfTasks.cpp index cd2f930623..67394cae93 100644 --- a/examples/basic-examples/bare-metal-bag-of-tasks/BareMetalBagOfTasks.cpp +++ b/examples/basic-examples/bare-metal-bag-of-tasks/BareMetalBagOfTasks.cpp @@ -22,7 +22,7 @@ ** the completion time of each workflow task is printed. ** ** Example invocation of the simulator for a 10-task workflow, with only WMS logging: - ** ./bare-metal-bag-of-tasks-simulator 10 ./two_hosts.xml --wrench-no-logs --log=two_tasks_at_a_time_wms.threshold=info + ** ./bare-metal-bag-of-tasks-simulator 10 ./two_hosts.xml --wrench-no-logs --log=custom_wms.threshold=info ** ** Example invocation of the simulator for a 6-task workflow with full logging: ** ./bare-metal-bag-of-tasks-simulator 6 ./two_hosts.xml @@ -56,7 +56,7 @@ int main(int argc, char **argv) { /* Parsing of the command-line arguments for this WRENCH simulation */ if (argc != 3) { - std::cerr << "Usage: " << argv[0] << " [optional logging arguments]" << std::endl; + std::cerr << "Usage: " << argv[0] << " [--wrench-no-logs --log=custom_wms.threshold=info]" << std::endl; exit(1); } diff --git a/examples/basic-examples/bare-metal-bag-of-tasks/TwoTasksAtATimeWMS.cpp b/examples/basic-examples/bare-metal-bag-of-tasks/TwoTasksAtATimeWMS.cpp index 25e275b85e..1180858d9a 100644 --- a/examples/basic-examples/bare-metal-bag-of-tasks/TwoTasksAtATimeWMS.cpp +++ b/examples/basic-examples/bare-metal-bag-of-tasks/TwoTasksAtATimeWMS.cpp @@ -22,7 +22,7 @@ #include "TwoTasksAtATimeWMS.h" -XBT_LOG_NEW_DEFAULT_CATEGORY(two_tasks_at_a_time_wms, "Log category for TwoTasksAtATimeWMS"); +XBT_LOG_NEW_DEFAULT_CATEGORY(custom_wms, "Log category for TwoTasksAtATimeWMS"); namespace wrench { @@ -87,6 +87,14 @@ namespace wrench { auto expensive_ready_task = ready_tasks.at(ready_tasks.size() - 1); /* Create a standard job for the tasks */ + if (expensive_ready_task) { + WRENCH_INFO("Creating a job for tasks %s and %s", + cheap_ready_task->getID().c_str(), + expensive_ready_task->getID().c_str()); + } else { + WRENCH_INFO("Creating a job for task %s", + cheap_ready_task->getID().c_str()); + } /* First, we need to create a map of file locations, stating for each file * where is should be read/written */ @@ -106,12 +114,20 @@ namespace wrench { service_specific_args[cheap_ready_task->getID()] = "4"; service_specific_args[expensive_ready_task->getID()] = "6"; + WRENCH_INFO("Submitting the job, asking for %s cores for task %s, and " + "%s cores for task %s", + service_specific_args[cheap_ready_task->getID()].c_str(), + cheap_ready_task->getID().c_str(), + service_specific_args[expensive_ready_task->getID()].c_str(), + expensive_ready_task->getID().c_str()); + /* Submit the job to the compute service */ job_manager->submitJob(standard_job, compute_service, service_specific_args); /* Wait for a workflow execution event and process it. In this case we know that * the event will be a StandardJobCompletionEvent, which is processed by the method * processEventStandardJobCompletion() that this class overrides. */ + WRENCH_INFO("Waiting for next event"); this->waitForAndProcessNextEvent(); } @@ -127,9 +143,11 @@ namespace wrench { void TwoTasksAtATimeWMS::processEventStandardJobCompletion(std::shared_ptr event) { /* Retrieve the job that this event is for */ auto job = event->standard_job; - /* Retrieve the job's first (and in our case only) task */ - auto task = job->getTasks().at(0); - WRENCH_INFO("Notified that a standard job has completed task %s", task->getID().c_str()); + /* Retrieve the job's tasks */ + for (auto const &task : job->getTasks()) { + WRENCH_INFO("Notified that a standard job has completed task %s", + task->getID().c_str()); + } } /** @@ -146,7 +164,7 @@ namespace wrench { WRENCH_INFO("Notified that a standard job has failed for task %s with error %s", task->getID().c_str(), event->failure_cause->toString().c_str()); - throw std::runtime_error("ABORTING DUE TO JOB FAILURE"); + throw std::runtime_error("This should not happen in this example"); } diff --git a/examples/basic-examples/bare-metal-chain-scratch/BareMetalChainScratch.cpp b/examples/basic-examples/bare-metal-chain-scratch/BareMetalChainScratch.cpp index 4ecd9ea857..bcd255f2a7 100644 --- a/examples/basic-examples/bare-metal-chain-scratch/BareMetalChainScratch.cpp +++ b/examples/basic-examples/bare-metal-chain-scratch/BareMetalChainScratch.cpp @@ -19,7 +19,7 @@ ** the completion time of each workflow task is printed. ** ** Example invocation of the simulator for a 10-task workflow, with only WMS logging: - ** ./bare-metal-chain-simulator 10 ./two_hosts.xml --wrench-no-logs --log=workflow_as_a_single_job_wms.threshold=info + ** ./bare-metal-chain-simulator 10 ./two_hosts.xml --wrench-no-logs --log=custom_wms.threshold=info ** ** Example invocation of the simulator for a 5-task workflow with full logging: ** ./bare-metal-chain-simulator 5 ./two_hosts.xml @@ -53,7 +53,7 @@ int main(int argc, char **argv) { /* Parsing of the command-line arguments for this WRENCH simulation */ if (argc != 3) { - std::cerr << "Usage: " << argv[0] << " [optional logging arguments]" << std::endl; + std::cerr << "Usage: " << argv[0] << " [--wrench-no-logs --log=custom_wms.threshold=info]" << std::endl; exit(1); } diff --git a/examples/basic-examples/bare-metal-chain-scratch/WorkflowAsAsingleJobWMS.cpp b/examples/basic-examples/bare-metal-chain-scratch/WorkflowAsAsingleJobWMS.cpp index c297729b49..40bb0b2abc 100644 --- a/examples/basic-examples/bare-metal-chain-scratch/WorkflowAsAsingleJobWMS.cpp +++ b/examples/basic-examples/bare-metal-chain-scratch/WorkflowAsAsingleJobWMS.cpp @@ -21,7 +21,7 @@ #include "WorkflowAsAsingleJobWMS.h" -XBT_LOG_NEW_DEFAULT_CATEGORY(workflow_as_a_single_job_wms, "Log category for WorkflowAsAsingleJobWMS"); +XBT_LOG_NEW_DEFAULT_CATEGORY(custom_wms, "Log category for WorkflowAsAsingleJobWMS"); namespace wrench { @@ -88,14 +88,17 @@ namespace wrench { } /* Second, we create a job */ + WRENCH_INFO("Creating a job for the entire workflow"); auto job = job_manager->createStandardJob(this->getWorkflow()->getTasks(), file_locations); /* Submit the job to the compute service */ + WRENCH_INFO("Submitting the job to the compute sercvice"); job_manager->submitJob(job, compute_service); /* Wait for a workflow execution event and process it. In this case we know that * the event will be a StandardJobCompletionEvent, which is processed by the method * processEventStandardJobCompletion() that this class overrides. */ + WRENCH_INFO("Waiting for next event"); this->waitForAndProcessNextEvent(); WRENCH_INFO("Workflow execution complete"); @@ -111,8 +114,9 @@ namespace wrench { /* Retrieve the job that this event is for */ auto job = event->standard_job; /* Retrieve the job's first (and in our case only) task */ - auto task = job->getTasks().at(0); - WRENCH_INFO("Notified that a standard job has completed task %s", task->getID().c_str()); + for (auto const &task : job->getTasks()) { + WRENCH_INFO("Notified that a standard job has completed task %s", task->getID().c_str()); + } } /** diff --git a/examples/basic-examples/bare-metal-chain-scratch/WorkflowAsAsingleJobWMS.h b/examples/basic-examples/bare-metal-chain-scratch/WorkflowAsAsingleJobWMS.h index b0d4c3e74a..acac17cbf5 100644 --- a/examples/basic-examples/bare-metal-chain-scratch/WorkflowAsAsingleJobWMS.h +++ b/examples/basic-examples/bare-metal-chain-scratch/WorkflowAsAsingleJobWMS.h @@ -8,8 +8,8 @@ */ -#ifndef WRENCH_ONE_TASK_AT_A_TIME_SCRATCH_H -#define WRENCH_ONE_TASK_AT_A_TIME_SCRATCH_H +#ifndef WRENCH_EXAMPLE_ONE_TASK_AT_A_TIME_SCRATCH_H +#define WRENCH_EXAMPLE_ONE_TASK_AT_A_TIME_SCRATCH_H #include @@ -42,4 +42,4 @@ namespace wrench { }; } -#endif //WRENCH_ONE_TASK_AT_A_TIME_SCRATCH_H +#endif //WRENCH_EXAMPLE_ONE_TASK_AT_A_TIME_SCRATCH_H diff --git a/examples/basic-examples/bare-metal-chain/BareMetalChain.cpp b/examples/basic-examples/bare-metal-chain/BareMetalChain.cpp index 314bc9210b..ca7044c50e 100644 --- a/examples/basic-examples/bare-metal-chain/BareMetalChain.cpp +++ b/examples/basic-examples/bare-metal-chain/BareMetalChain.cpp @@ -19,7 +19,7 @@ ** the completion time of each workflow task is printed. ** ** Example invocation of the simulator for a 10-task workflow, with only WMS logging: - ** ./bare-metal-chain-simulator 10 ./two_hosts.xml --wrench-no-logs --log=one_task_at_a_time_wms.threshold=info + ** ./bare-metal-chain-simulator 10 ./two_hosts.xml --wrench-no-logs --log=custom_wms.threshold=info ** ** Example invocation of the simulator for a 5-task workflow with full logging: ** ./bare-metal-chain-simulator 5 ./two_hosts.xml @@ -53,7 +53,7 @@ int main(int argc, char **argv) { /* Parsing of the command-line arguments for this WRENCH simulation */ if (argc != 3) { - std::cerr << "Usage: " << argv[0] << " [optional logging arguments]" << std::endl; + std::cerr << "Usage: " << argv[0] << " [--wrench-no-logs --log=custom_wms.threshold=info]" << std::endl; exit(1); } diff --git a/examples/basic-examples/bare-metal-chain/OneTaskAtATimeWMS.cpp b/examples/basic-examples/bare-metal-chain/OneTaskAtATimeWMS.cpp index 397cac2c29..85c5741c46 100644 --- a/examples/basic-examples/bare-metal-chain/OneTaskAtATimeWMS.cpp +++ b/examples/basic-examples/bare-metal-chain/OneTaskAtATimeWMS.cpp @@ -21,7 +21,7 @@ #include "OneTaskAtATimeWMS.h" -XBT_LOG_NEW_DEFAULT_CATEGORY(one_task_at_a_time_wms, "Log category for OneTaskAtATimeWMS"); +XBT_LOG_NEW_DEFAULT_CATEGORY(custom_wms, "Log category for OneTaskAtATimeWMS"); namespace wrench { @@ -71,6 +71,7 @@ namespace wrench { auto ready_task = this->getWorkflow()->getReadyTasks().at(0); /* Create a standard job for the task */ + WRENCH_INFO("Creating a job for task %s", ready_task->getID().c_str()); /* First, we need to create a map of file locations, stating for each file * where is should be read/written */ @@ -82,6 +83,7 @@ namespace wrench { auto standard_job = job_manager->createStandardJob(ready_task, file_locations); /* Submit the job to the compute service */ + WRENCH_INFO("Submitting the job to the compute service"); job_manager->submitJob(standard_job, compute_service); /* Wait for a workflow execution event and process it. In this case we know that diff --git a/examples/basic-examples/bare-metal-complex-job/ComplexJob.cpp b/examples/basic-examples/bare-metal-complex-job/ComplexJob.cpp index aa95c6f169..1a396d6ce6 100644 --- a/examples/basic-examples/bare-metal-complex-job/ComplexJob.cpp +++ b/examples/basic-examples/bare-metal-complex-job/ComplexJob.cpp @@ -19,7 +19,7 @@ ** the completion time of each workflow task is printed. ** ** Example invocation of the simulator with only WMS logging: - ** ./bare-metal-complex-job-simulator ./four_hosts.xml --wrench-no-logs --log=complex_job_wms.threshold=info + ** ./bare-metal-complex-job-simulator ./four_hosts.xml --wrench-no-logs --log=custom_wms.threshold=info ** ** Example invocation of the simulator for a 5-task workflow with full logging: ** ./bare-metal-complex-job-simulator ./four_hosts.xml @@ -53,7 +53,7 @@ int main(int argc, char **argv) { /* Parsing of the command-line arguments for this WRENCH simulation */ if (argc != 2) { - std::cerr << "Usage: " << argv[0] << " [optional logging arguments]" << std::endl; + std::cerr << "Usage: " << argv[0] << " [--wrench-no-logs --log=custom_wms.threshold=info]" << std::endl; exit(1); } diff --git a/examples/basic-examples/bare-metal-complex-job/ComplexJobWMS.cpp b/examples/basic-examples/bare-metal-complex-job/ComplexJobWMS.cpp index 1f5102c9d1..2472379c1c 100644 --- a/examples/basic-examples/bare-metal-complex-job/ComplexJobWMS.cpp +++ b/examples/basic-examples/bare-metal-complex-job/ComplexJobWMS.cpp @@ -21,7 +21,7 @@ #include "ComplexJobWMS.h" -XBT_LOG_NEW_DEFAULT_CATEGORY(complex_job_wms, "Log category for ComplexJobWMS"); +XBT_LOG_NEW_DEFAULT_CATEGORY(custom_wms, "Log category for ComplexJobWMS"); namespace wrench { @@ -103,17 +103,20 @@ namespace wrench { /* Let's create a set of file deletion operations to be performed * AFTER the "post" file copies have been performed */ std::vector >> cleanup_file_deletions; - cleanup_file_deletions.push_back(std::make_tuple(outfile_2, FileLocation::LOCATION(storage_service2))); + cleanup_file_deletions.emplace_back(outfile_2, FileLocation::LOCATION(storage_service2)); /* Create the standard job */ + WRENCH_INFO("Creating a complex job with pre file copies, post file copies, and post file deletions to execute task %s", task->getID().c_str()); auto job = job_manager->createStandardJob({task}, file_locations, pre_file_copies, post_file_copies, cleanup_file_deletions); /* Submit the job to the compute service */ + WRENCH_INFO("Submitting job to the compute service"); job_manager->submitJob(job, compute_service); /* Wait for a workflow execution event and process it. In this case we know that * the event will be a StandardJobCompletionEvent, which is processed by the method * processEventStandardJobCompletion() that this class overrides. */ + WRENCH_INFO("Waiting for next event"); this->waitForAndProcessNextEvent(); WRENCH_INFO("Workflow execution complete"); diff --git a/examples/basic-examples/batch-bag-of-tasks/BatchBagOfTasks.cpp b/examples/basic-examples/batch-bag-of-tasks/BatchBagOfTasks.cpp index 040818e728..a9c98e7d8a 100644 --- a/examples/basic-examples/batch-bag-of-tasks/BatchBagOfTasks.cpp +++ b/examples/basic-examples/batch-bag-of-tasks/BatchBagOfTasks.cpp @@ -23,7 +23,7 @@ ** the completion time of each workflow task is printed. ** ** Example invocation of the simulator for a 10-task workflow, with only WMS logging: - ** ./batch-bag-of-tasks-simulator 10 ./four_hosts.xml --wrench-no-logs --log=two_tasks_at_a_time_batch_wms.threshold=info + ** ./batch-bag-of-tasks-simulator 10 ./four_hosts.xml --wrench-no-logs --log=custom_wms.threshold=info ** ** Example invocation of the simulator for a 6-task workflow with full logging: ** ./batch-bag-of-tasks-simulator 6 ./four_hosts.xml @@ -56,7 +56,7 @@ int main(int argc, char **argv) { /* Parsing of the command-line arguments for this WRENCH simulation */ if (argc != 3) { - std::cerr << "Usage: " << argv[0] << " [optional logging arguments]" << std::endl; + std::cerr << "Usage: " << argv[0] << " [--wrench-no-logs --log=custom_wms.threshold=info]" << std::endl; exit(1); } diff --git a/examples/basic-examples/batch-bag-of-tasks/TwoTasksAtATimeBatchWMS.cpp b/examples/basic-examples/batch-bag-of-tasks/TwoTasksAtATimeBatchWMS.cpp index 44c716670e..65ff8bce2a 100644 --- a/examples/basic-examples/batch-bag-of-tasks/TwoTasksAtATimeBatchWMS.cpp +++ b/examples/basic-examples/batch-bag-of-tasks/TwoTasksAtATimeBatchWMS.cpp @@ -23,7 +23,7 @@ #include "TwoTasksAtATimeBatchWMS.h" -XBT_LOG_NEW_DEFAULT_CATEGORY(two_tasks_at_a_time_batch_wms, "Log category for TwoTasksAtATimeBatchWMS"); +XBT_LOG_NEW_DEFAULT_CATEGORY(custom_wms, "Log category for TwoTasksAtATimeBatchWMS"); namespace wrench { @@ -78,7 +78,7 @@ namespace wrench { } /* Initialize and seed a RNG */ - std::uniform_int_distribution dist(0.0, 1000000000); + std::uniform_int_distribution dist(0, 1000000000); std::mt19937 rng(42); /* While the workflow isn't done, repeat the main loop */ diff --git a/examples/basic-examples/batch-pilot-job/BatchPilotJob.cpp b/examples/basic-examples/batch-pilot-job/BatchPilotJob.cpp new file mode 100644 index 0000000000..c5e5131834 --- /dev/null +++ b/examples/basic-examples/batch-pilot-job/BatchPilotJob.cpp @@ -0,0 +1,147 @@ +/** + * Copyright (c) 2017-2018. The WRENCH Team. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + */ + +/** + ** This simulator simulates the execution of a 2-task workflow: + ** + ** File #0 -> Task #0 -> File #1 -> Task #1 -> File #2 + ** + ** The compute platform comprises four hosts, WMSHost, BatchHeadNode, BatchNode1, and + ** BatchNode2. On WMSHost runs a simple storage + ** service and a WMS (defined in class PilotJobWMS). On BatchHeadNode runs a batch + ** service, that has access to two hosts: BatchNode1 and BatchNode2. Once the simulation is done, + ** the completion time of each workflow task is printed. + ** + ** Example invocation of the simulator with only WMS logging: + ** ./batch-pilot-job-simulator ./four_hosts_scratch.xml --wrench-no-logs --log=custom_wms.threshold=info + ** + ** Example invocation of the simulator with full logging: + ** ./batch-pilot-job-simulator ./four_hosts_scratch.xml + **/ + + +#include +#include + +#include "PilotJobWMS.h" // WMS implementation + +#define TFLOP 1000000000000.0 +/** + * @brief The Simulator's main function + * + * @param argc: argument count + * @param argv: argument array + * @return 0 on success, non-zero otherwise + */ +int main(int argc, char **argv) { + + /* Declare a WRENCH simulation object */ + wrench::Simulation simulation; + + /* Initialize the simulation, which may entail extracting WRENCH-specific and + * Simgrid-specific command-line arguments that can modify general simulation behavior. + * PilotJobWMS special command-line arguments are --help-wrench and --help-simgrid, which print + * details about available command-line arguments. */ + simulation.init(&argc, argv); + + /* Parsing of the command-line arguments for this WRENCH simulation */ + if (argc != 2) { + std::cerr << "Usage: " << argv[0] << " [--wrench-no-logs --log=custom_wms.threshold=info]" << std::endl; + exit(1); + } + + /* Reading and parsing the platform description file, written in XML following the SimGrid-defined DTD, + * to instantiate the simulated platform */ + std::cerr << "Instantiating simulated platform..." << std::endl; + simulation.instantiatePlatform(argv[1]); + + /* Declare a workflow */ + wrench::Workflow workflow; + + /* Add workflow tasks and files */ + auto task0 = workflow.addTask("task_0", 100 * TFLOP, 1, 10, 0.90, 1000); + auto task1 = workflow.addTask("task_1", 300 * TFLOP, 1, 5, 0.90, 1000); + auto file0 = workflow.addFile("file_0", 10000000); + auto file1 = workflow.addFile("file_1", 20000000); + auto file2 = workflow.addFile("file_2", 15000000); + task0->addInputFile(file0); + task0->addOutputFile(file1); + task1->addInputFile(file1); + task1->addOutputFile(file2); + + /* Instantiate a storage service, and add it to the simulation. + * A wrench::StorageService is an abstraction of a service on + * which files can be written and read. This particular storage service, which is an instance + * of wrench::SimpleStorageService, is started on WMSHost in the + * platform , which has an attached disk mounted at "/". The SimpleStorageService + * is a basic storage service implementation provided by WRENCH. + * Throughout the simulation execution, input/output files of workflow tasks will be located + * in this storage service, and accessed remotely by the compute service. Note that the + * storage service is configured to use a buffer size of 50M when transferring data over + * the network (i.e., to pipeline disk reads/writes and network revs/sends). */ + std::cerr << "Instantiating a SimpleStorageService on WMSHost..." << std::endl; + auto storage_service = simulation.add(new wrench::SimpleStorageService( + "WMSHost", {"/"}, {{wrench::SimpleStorageServiceProperty::BUFFER_SIZE, "50000000"}}, {})); + + /* Instantiate a batch compute service, and add it to the simulation. + * A wrench::BatchComputeService is an abstraction of a compute service that corresponds + * to a batch that responds to VM creating requests, and each VM exposes a "bare-metal" compute service. + * This particular service is started on BatchProviderHost, uses BatchHost1 and BatchHost2 + * as hardware resources, and has scratch storage space. + * This means that tasks running on this service will access data only from remote storage services. */ + std::cerr << "Instantiating a BatchComputeService on BatchHeadNode..." << std::endl; + std::vector batch_nodes = {"BatchNode1", "BatchNode2"}; + auto batch_service = simulation.add(new wrench::BatchComputeService( + "BatchHeadNode", batch_nodes, "/scratch/", {}, {})); + + /* Instantiate a WMS, to be stated on WMSHost, which is responsible + * for executing the workflow. See comments in PilotJobWMS.cpp + * for more details */ + auto wms = simulation.add( + new wrench::PilotJobWMS({batch_service}, {storage_service}, "WMSHost")); + + /* Associate the workflow to the WMS */ + wms->addWorkflow(&workflow); + + /* Instantiate a file registry service to be started on WMSHost. This service is + * essentially a replica catalog that stores pairs so that + * any service, in particular a WMS, can discover where workflow files are stored. */ + std::cerr << "Instantiating a FileRegistryService on WMSHost ..." << std::endl; + auto file_registry_service = new wrench::FileRegistryService("WMSHost"); + simulation.add(file_registry_service); + + /* It is necessary to store, or "stage", input files that only input. The getInputFiles() + * method of the Workflow class returns the set of all workflow files that are not generated + * by workflow tasks, and thus are only input files. These files are then staged on the storage service. */ + std::cerr << "Staging task input files..." << std::endl; + for (auto const &f : workflow.getInputFiles()) { + simulation.stageFile(f, storage_service); + } + + /* Launch the simulation. This call only returns when the simulation is complete. */ + std::cerr << "Launching the Simulation..." << std::endl; + try { + simulation.launch(); + } catch (std::runtime_error &e) { + std::cerr << "Exception: " << e.what() << std::endl; + return 1; + } + std::cerr << "Simulation done!" << std::endl; + + /* Print task statistics */ + for (auto const &task : {task0, task1}) { + if (task->getState() == wrench::WorkflowTask::COMPLETED) { + std::cerr << "Task " << task->getID() << " completed\n"; + } else { + std::cerr << "Task " << task->getID() << " failed\n"; + } + } + + return 0; +} diff --git a/examples/basic-examples/batch-pilot-job/CMakeLists.txt b/examples/basic-examples/batch-pilot-job/CMakeLists.txt new file mode 100644 index 0000000000..bc4e3f425d --- /dev/null +++ b/examples/basic-examples/batch-pilot-job/CMakeLists.txt @@ -0,0 +1,13 @@ + +set(SOURCE_FILES + PilotJobWMS.h + PilotJobWMS.cpp + BatchPilotJob.cpp + ) + +add_executable(wrench-batch-pilot-job-simulator ${SOURCE_FILES}) + +target_link_libraries(wrench-batch-pilot-job-simulator wrench ${SimGrid_LIBRARY} ${PUGIXML_LIBRARY} ${LEMON_LIBRARY}) + +install(TARGETS wrench-batch-pilot-job-simulator DESTINATION bin) + diff --git a/examples/basic-examples/batch-pilot-job/PilotJobWMS.cpp b/examples/basic-examples/batch-pilot-job/PilotJobWMS.cpp new file mode 100644 index 0000000000..5687f7183a --- /dev/null +++ b/examples/basic-examples/batch-pilot-job/PilotJobWMS.cpp @@ -0,0 +1,177 @@ +/** + * Copyright (c) 2017-2018. The WRENCH Team. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + */ + +/** + ** A Workflow Management System (WMS) implementation that operates as follows: + ** + ** - Creates a pilot job, but not long enough to accommodate both tasks + ** - Submit the first task to the pilot job as a standard job + ** - Intermediate file is kept in the batch compute service's scratch space! + ** - Submit the second task to the pilot job as a standard job + ** - The pilot job will expire because the second task completes, and the + ** WMS gives up + **/ + +#include + +#include "PilotJobWMS.h" + +XBT_LOG_NEW_DEFAULT_CATEGORY(custom_wms, "Log category for PilotJobWMS"); + +namespace wrench { + + /** + * @brief Constructor, which calls the super constructor + * + * @param compute_services: a set of compute services available to run tasks + * @param storage_services: a set of storage services available to store files + * @param hostname: the name of the host on which to start the WMS + */ + PilotJobWMS::PilotJobWMS(const std::set> &compute_services, + const std::set> &storage_services, + const std::string &hostname) : WMS( + nullptr, nullptr, + compute_services, + storage_services, + {}, nullptr, + hostname, + "two-tasks-at-a-time-batch") {} + + /** + * @brief main method of the PilotJobWMS daemon + * + * @return 0 on completion + * + * @throw std::runtime_error + */ + int PilotJobWMS::main() { + + /* Set the logging output to GREEN */ + TerminalOutput::setThisProcessLoggingColor(TerminalOutput::COLOR_GREEN); + + WRENCH_INFO("WMS starting on host %s", Simulation::getHostName().c_str());WRENCH_INFO( + "About to execute a workflow with %lu tasks", this->getWorkflow()->getNumberOfTasks()); + + /* Create a job manager so that we can create/submit jobs */ + auto job_manager = this->createJobManager(); + + /* Get the first available bare-metal compute service and storage service */ + auto batch_service = *(this->getAvailableComputeServices().begin()); + auto storage_service = *(this->getAvailableStorageServices().begin()); + + /* Record the batch node's core flop rate */ + double core_flop_rate = (*(batch_service->getCoreFlopRate().begin())).second; + + /* Get references to tasks and files */ + auto task_0 = this->getWorkflow()->getTaskByID("task_0"); + auto task_1 = this->getWorkflow()->getTaskByID("task_1"); + auto file_0 = this->getWorkflow()->getFileByID("file_0"); + auto file_1 = this->getWorkflow()->getFileByID("file_1"); + auto file_2 = this->getWorkflow()->getFileByID("file_2"); + + /* For each task, estimate its execution time in minutes */ + std::map execution_times_in_minutes; + for (auto const &t : this->getWorkflow()->getTasks()) { + double in_seconds = (t->getFlops() / core_flop_rate) / (10 * t->getParallelEfficiency()); + execution_times_in_minutes[t] = 1 + std::lround(in_seconds / 60.0); + // The +1 above is just so that we don't cut it too tight + WRENCH_INFO("Task %s should run in under %ld minutes", + t->getID().c_str(), execution_times_in_minutes[t]); + } + + /* Create a Pilot job */ + auto pilot_job = job_manager->createPilotJob(); + + std::map service_specific_arguments; + // number of nodes + service_specific_arguments["-N"] = "2"; + // number of cores + service_specific_arguments["-c"] = "10"; + // time: not enough to run both tasks + service_specific_arguments["-t"] = + std::to_string((execution_times_in_minutes[task_0] + execution_times_in_minutes[task_1]) / 2); + + WRENCH_INFO("Submitting a pilot job that requests %s %s-core nodes for %s minutes", + service_specific_arguments["-N"].c_str(), + service_specific_arguments["-c"].c_str(), + service_specific_arguments["-t"].c_str()); + + job_manager->submitJob(pilot_job, batch_service, service_specific_arguments); + + /* Waiting for the pilot job to start */ + WRENCH_INFO("Waiting and event"); + this->waitForAndProcessNextEvent(); + + /* Submit a job that runs both tasks to the pilot job */ + WRENCH_INFO("Creating a standard job for both tasks"); + auto cs = pilot_job->getComputeService(); + + /* Create a map of file locations, stating for each file + * where is should be read/written */ + std::map> file_locations; + file_locations[file_0] = FileLocation::LOCATION(storage_service); + file_locations[file_1] = FileLocation::LOCATION(storage_service); + file_locations[file_2] = FileLocation::LOCATION(storage_service); + + auto standard_job = job_manager->createStandardJob({task_0, task_1}, file_locations); + + WRENCH_INFO("Submitting the standard job to the pilot job") + job_manager->submitJob(standard_job, cs); + + WRENCH_INFO("Wait for an event"); + this->waitForAndProcessNextEvent(); + WRENCH_INFO("Wait for an event"); + this->waitForAndProcessNextEvent(); + + return 0; + } + + /** + * @brief Process a standard job completion event + * + * @param event: the event + */ + void PilotJobWMS::processEventStandardJobCompletion(std::shared_ptr event) { + WRENCH_INFO("Notified that a standard job has completed"); + throw std::runtime_error("This shouldn't happen in this example"); + } + + /** + * @brief Process a standard job failure event + * + * @param event: the event + */ + void PilotJobWMS::processEventStandardJobFailure(std::shared_ptr event) { + /* Retrieve the job that this event is for */ + auto job = event->standard_job; + /* Retrieve the job's first (and in our case only) task */ + WRENCH_INFO("Notified that a standard job has failed due to: %s ", + event->failure_cause->toString().c_str()); + } + + /** + * @brief Process a pilot job expiration event + * + * @param event: the event + */ + void PilotJobWMS::processEventPilotJobExpiration(std::shared_ptr) { + WRENCH_INFO("Notified that a pilot job has expired"); + } + + /** + * @brief Process a pilot job start event + * + * @param event: the event + */ + void PilotJobWMS::processEventPilotJobStart(std::shared_ptr) { + WRENCH_INFO("Notified that a pilot job has started!"); + } + + +} diff --git a/examples/basic-examples/batch-pilot-job/PilotJobWMS.h b/examples/basic-examples/batch-pilot-job/PilotJobWMS.h new file mode 100644 index 0000000000..30e091f938 --- /dev/null +++ b/examples/basic-examples/batch-pilot-job/PilotJobWMS.h @@ -0,0 +1,47 @@ +/** + * Copyright (c) 2017-2018. The WRENCH Team. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + */ + + +#ifndef WRENCH_EXAMPLE_PILOT_JOB_H +#define WRENCH_EXAMPLE_PILOT_JOB_H + +#include + + +namespace wrench { + + class Simulation; + + /** + * @brief A Workflow Management System (WMS) implementation (inherits from WMS) + */ + class PilotJobWMS : public WMS { + + public: + // Constructor + PilotJobWMS( + const std::set> &compute_services, + const std::set> &storage_services, + const std::string &hostname); + + protected: + + // Overriden method + void processEventStandardJobCompletion(std::shared_ptr) override; + void processEventStandardJobFailure(std::shared_ptr) override; + void processEventPilotJobStart(std::shared_ptr) override; + void processEventPilotJobExpiration(std::shared_ptr) override; + + private: + // main() method of the WMS + int main() override; + + }; +} +#endif //WRENCH_EXAMPLE_PILOT_JOB_H diff --git a/examples/basic-examples/cloud-bag-of-tasks/CloudBagOfTasks.cpp b/examples/basic-examples/cloud-bag-of-tasks/CloudBagOfTasks.cpp index c97696d519..16b36c84ef 100644 --- a/examples/basic-examples/cloud-bag-of-tasks/CloudBagOfTasks.cpp +++ b/examples/basic-examples/cloud-bag-of-tasks/CloudBagOfTasks.cpp @@ -23,7 +23,7 @@ ** the completion time of each workflow task is printed. ** ** Example invocation of the simulator for a 10-task workflow, with only WMS logging: - ** ./cloud-bag-of-tasks-simulator 10 ./four_hosts.xml --wrench-no-logs --log=two_tasks_at_a_time_cloud_wms.threshold=info + ** ./cloud-bag-of-tasks-simulator 10 ./four_hosts.xml --wrench-no-logs --log=custom_wms.threshold=info ** ** Example invocation of the simulator for a 6-task workflow with full logging: ** ./cloud-bag-of-tasks-simulator 6 ./four_hosts.xml @@ -55,7 +55,7 @@ int main(int argc, char **argv) { /* Parsing of the command-line arguments for this WRENCH simulation */ if (argc != 3) { - std::cerr << "Usage: " << argv[0] << " [optional logging arguments]" << std::endl; + std::cerr << "Usage: " << argv[0] << " [--wrench-no-logs --log=custom_wms.threshold=info]" << std::endl; exit(1); } @@ -80,7 +80,7 @@ int main(int argc, char **argv) { wrench::Workflow workflow; /* Initialize and seed a RNG */ - std::uniform_int_distribution dist(100000000.0,10000000000.0); + std::uniform_int_distribution dist(100000000,10000000000); std::mt19937 rng(42); /* Add workflow tasks and files */ diff --git a/examples/basic-examples/cloud-bag-of-tasks/TwoTasksAtATimeCloudWMS.cpp b/examples/basic-examples/cloud-bag-of-tasks/TwoTasksAtATimeCloudWMS.cpp index 9aa5dd7fe7..4c78cb271a 100644 --- a/examples/basic-examples/cloud-bag-of-tasks/TwoTasksAtATimeCloudWMS.cpp +++ b/examples/basic-examples/cloud-bag-of-tasks/TwoTasksAtATimeCloudWMS.cpp @@ -23,7 +23,7 @@ #include "TwoTasksAtATimeCloudWMS.h" -XBT_LOG_NEW_DEFAULT_CATEGORY(two_tasks_at_a_time_cloud_wms, "Log category for TwoTasksAtATimeCloudWMS"); +XBT_LOG_NEW_DEFAULT_CATEGORY(custom_wms, "Log category for TwoTasksAtATimeCloudWMS"); namespace wrench { @@ -67,10 +67,12 @@ namespace wrench { auto storage_service = *(this->getAvailableStorageServices().begin()); /* Create a VM instance with 5 cores and one with 2 cores (and 500M of RAM) */ + WRENCH_INFO("Creating a 'large' VM with 5 cores and a 'small' VM with 2 cores"); auto large_vm = cloud_service->createVM(5, 500000); auto small_vm = cloud_service->createVM(2, 500000); /* Start the VMs */ + WRENCH_INFO("Starting both VMs"); auto large_vm_compute_service = cloud_service->startVM(large_vm); auto small_vm_compute_service = cloud_service->startVM(small_vm); @@ -103,12 +105,16 @@ namespace wrench { file_locations1[cheap_ready_task->getOutputFiles().at(0)] = FileLocation::LOCATION(storage_service); /* Create the job */ + WRENCH_INFO("Creating a job to run task %s (%.2lf)", + cheap_ready_task->getID().c_str(), cheap_ready_task->getFlops()); + auto standard_job1 = job_manager->createStandardJob(cheap_ready_task, file_locations1); /* Submit the job to the small VM */ + WRENCH_INFO("Submit this job to the small VM"); job_manager->submitJob(standard_job1, small_vm_compute_service); - /* Submit the cheap task to the small VM */ + /* Submit the expensive task to the large VM */ /* First, we need to create a map of file locations, stating for each file * where is should be read/written */ std::map> file_locations2; @@ -116,17 +122,23 @@ namespace wrench { file_locations2[expensive_ready_task->getOutputFiles().at(0)] = FileLocation::LOCATION(storage_service); /* Create the job */ + WRENCH_INFO("Creating a job to run task %s (%.2lf)", + expensive_ready_task->getID().c_str(), expensive_ready_task->getFlops()); + auto standard_job2 = job_manager->createStandardJob(expensive_ready_task, file_locations2); - /* Submit the job to the small VM */ + /* Submit the job to the large VM */ + WRENCH_INFO("Submit this job to the large VM"); job_manager->submitJob(standard_job2, large_vm_compute_service); /* Wait for workflow execution event and process it. In this case we know that * the event will be a StandardJobCompletionEvent, which is processed by the method * processEventStandardJobCompletion() that this class overrides. */ + WRENCH_INFO("Wait for next event"); this->waitForAndProcessNextEvent(); /* And again! */ + WRENCH_INFO("Wait for next event again"); this->waitForAndProcessNextEvent(); }