Skip to content

Commit

Permalink
Added a new example (#157)
Browse files Browse the repository at this point in the history
API changes/additions  for getting  workflow lists of tasks and files
Bug fix in JobManager
  • Loading branch information
henricasanova committed Apr 20, 2020
1 parent 727b19d commit b20e51e
Show file tree
Hide file tree
Showing 23 changed files with 3,972 additions and 3,420 deletions.
1 change: 1 addition & 0 deletions conf/cmake/Examples.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
set(EXAMPLES_CMAKEFILES_TXT
examples/simple-example/CMakeLists.txt
examples/basic-examples/bare-metal-chain/CMakeLists.txt
examples/basic-examples/bare-metal-chain-scratch/CMakeLists.txt
)

foreach (cmakefile ${EXAMPLES_CMAKEFILES_TXT})
Expand Down
13 changes: 8 additions & 5 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ used in these examples.

- ```bare-metal-chain```: A simulation of the execution of a
chain workflow by a Workflow Management System on a compute service, with all workflow data being read/written
from/to a single storage service.
from/to a single storage service. The compute service runs on a 10-core host, and each task
is executed as a single job that uses 10 cores

- ```bare-metal-chain-scratch```: A simulation of the execution of a
chain workflow by a Workflow Management System on a compute service that has
scratch space to hold intermediate workflow files.
- ```bare-metal-chain-scratch```: Similar, but the compute service now has
scratch space to hold intermediate workflow files. Since files created in the scratch
space during a job's execution are erased after that job's completion. As a result,
the workflow is executed as a single multi-task job.

- ```bare-metal-bag-of-tasks```: A simulation of the execution of a
bag-of-task workflow by a Workflow Management System on a compute service, with all workflow data being read/written
from/to a single storage service.
from/to a single storage service. Up to two workflow tasks are executed concurrently on the compute service, in which case
one task is executed on 6 cores and the other on 4 cores.

Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/**
* Copyright (c) 2017-2018. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*/

/**
** This simulator simulates the execution of a chain workflow, that is, of a workflow
** in which each task has at most a single parent and at most a single child:
**
** File #0 -> Task #0 -> File #1 -> Task #1 -> File #2 -> .... -> Task #n-1 -> File #n
**
** The compute platform comprises two hosts, WMSHost and ComputeHost. On WMSHost runs a simple storage
** service and a WMS (defined in class OneTaskAtATimeWMS). On ComputeHost runs a bare metal
** compute service, that has access to the 10 cores of that host. Once the simulation is done,
** the completion time of each workflow task is printed.
**
** Example invocation of the simulator for a 10-task workflow, with only WMS logging:
** ./bare-meta-chain-simulator 10 ./two_hosts.xml --wrench-no-logs --log=workflow_as_a_single_job_wms.threshold=info
**
** Example invocation of the simulator for a 5-task workflow with full logging:
** ./bare-meta-chain-simulator 5 ./two_hosts.xml
**/


#include <iostream>
#include <wrench.h>

#include "WorkflowAsAsingleJobWMS.h" // WMS implementation

/**
* @brief The Simulator's main function
*
* @param argc: argument count
* @param argv: argument array
* @return 0 on success, non-zero otherwise
*/
int main(int argc, char **argv) {

/*
* Declare a WRENCH simulation object
*/
wrench::Simulation simulation;

/* Initialize the simulation, which may entail extracting WRENCH-specific and
* Simgrid-specific command-line arguments that can modify general simulation behavior.
* Two special command-line arguments are --help-wrench and --help-simgrid, which print
* details about available command-line arguments. */
simulation.init(&argc, argv);

/* Parsing of the command-line arguments for this WRENCH simulation */
if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " <number of tasks> <xml platform file> [optional logging arguments]" << std::endl;
exit(1);
}

/* Reading and parsing the platform description file, written in XML following the SimGrid-defined DTD,
* to instantiate the simulated platform */
std::cerr << "Instantiating simulated platform..." << std::endl;
simulation.instantiatePlatform(argv[2]);


/* Parse the first command-line argument (number of tasks) */
int num_tasks = 0;
try {
num_tasks = std::atoi(argv[1]);
} catch (std::invalid_argument &e) {
std::cerr << "Invalid number of tasks\n";
exit(1);
}

/* Declare a workflow */
wrench::Workflow workflow;

/* Add workflow tasks */
for (int i=0; i < num_tasks; i++) {
/* Create a task: 10GFlop, 1 to 10 cores, 0.90 parallel efficiency, 10MB memory footprint */
auto task = workflow.addTask("task_" + std::to_string(i), 10000000000.0, 1, 10, 0.90, 10000000);
}

/* Add workflow files */
for (int i=0; i < num_tasks+1; i++) {
/* Create a 10MB file */
workflow.addFile("file_" + std::to_string(i), 10000000);
}

/* Set input/output files for each task */
for (int i=0; i < num_tasks; i++) {
auto task = workflow.getTaskByID("task_" + std::to_string(i));
task->addInputFile(workflow.getFileByID("file_" + std::to_string(i)));
task->addOutputFile(workflow.getFileByID("file_" + std::to_string(i + 1)));
}

/* Instantiate a storage service, and add it to the simulation.
* A wrench::StorageService is an abstraction of a service on
* which files can be written and read. This particular storage service, which is an instance
* of wrench::SimpleStorageService, is started on WMSHost in the
* platform , which has an attached disk mounted at "/". The SimpleStorageService
* is a basic storage service implementation provided by WRENCH.
* Throughout the simulation execution, input/output files of workflow tasks will be located
* in this storage service, and accessed remotely by the compute service. Note that the
* storage service is configured to use a buffer size of 500M when transferring data over
* the network (i.e., to pipeline disk reads/writes and network revs/sends). */
std::cerr << "Instantiating a SimpleStorageService on WMSHost..." << std::endl;
auto storage_service = simulation.add(new wrench::SimpleStorageService(
"WMSHost", {"/"}, {{wrench::SimpleStorageServiceProperty::BUFFER_SIZE, "500000000"}}, {}));

/* Instantiate a bare-metal compute service, and add it to the simulation.
* A wrench::BareMetalComputeService is an abstraction of a compute service that corresponds to a
* to a software infrastructure that can execute tasks on hardware resources.
* This particular service is started on ComputeHost and has scratch storage space (mount point argument = "/scratch").
* This means that the WMS can opt to leave files in scratch. However, files in scratch are removed after
* a job completes */
std::cerr << "Instantiating a BareMetalComputeService on WMSHost..." << std::endl;
auto baremetal_service = simulation.add(new wrench::BareMetalComputeService(
"ComputeHost", {"ComputeHost"}, "/scratch/", {}, {}));

/* Instantiate a WMS, to be stated on WMSHost, which is responsible
* for executing the workflow. See comments in OneTaskAtATimeWMS.cpp
* for more details */
auto wms = simulation.add(
new wrench::WorkflowAsAsingleJobWMS({baremetal_service}, {storage_service}, "WMSHost"));

/* Associate the workflow to the WMS */
wms->addWorkflow(&workflow);

/* Instantiate a file registry service to be started on WMSHost. This service is
* essentially a replica catalog that stores <file , storage service> pairs so that
* any service, in particular a WMS, can discover where workflow files are stored. */
std::cerr << "Instantiating a FileRegistryService on WMSHost ..." << std::endl;
auto file_registry_service = new wrench::FileRegistryService("WMSHost");
simulation.add(file_registry_service);

/* It is necessary to store, or "stage", input files that only input. The getInputFiles()
* method of the Workflow class returns the set of all workflow files that are not generated
* by workflow tasks, and thus are only input files. These files are then staged on the storage service. */
std::cerr << "Staging task input files..." << std::endl;
for (auto const &f : workflow.getInputFiles()) {
simulation.stageFile(f, storage_service);
}

/* Launch the simulation. This call only returns when the simulation is complete. */
std::cerr << "Launching the Simulation..." << std::endl;
try {
simulation.launch();
} catch (std::runtime_error &e) {
std::cerr << "Exception: " << e.what() << std::endl;
return 1;
}
std::cerr << "Simulation done!" << std::endl;

/* Simulation results can be examined via simulation.output, which provides access to traces
* of events. In the code below, we print the retrieve the trace of all task completion events, print how
* many such events there are, and print some information for the first such event. */
auto trace = simulation.getOutput().getTrace<wrench::SimulationTimestampTaskCompletion>();
for (auto const &item : trace) {
std::cerr << "Task " << item->getContent()->getTask()->getID() << " completed at time " << item->getDate() << std::endl;
}

return 0;
}
13 changes: 13 additions & 0 deletions examples/basic-examples/bare-metal-chain-scratch/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

set(SOURCE_FILES
WorkflowAsAsingleJobWMS.h
WorkflowAsAsingleJobWMS.cpp
BareMetalChainScratch.cpp
)

add_executable(wrench-bare-metal-chain-scratch-simulator ${SOURCE_FILES})

target_link_libraries(wrench-bare-metal-chain-scratch-simulator wrench ${SimGrid_LIBRARY} ${PUGIXML_LIBRARY} ${LEMON_LIBRARY})

install(TARGETS wrench-bare-metal-chain-scratch-simulator DESTINATION bin)

Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/**
* Copyright (c) 2017-2018. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*/

/**
** A Workflow Management System (WMS) implementation that operates as follows:
** - While the workflow is not done, repeat:
** - Pick a ready task if any
** - Submit it to the first available BareMetalComputeService as a job in a way that
** - Uses 5 cores
** - Reads the input file from the StorageService
** - Writes the output file from the StorageService
**/

#include <iostream>

#include "WorkflowAsAsingleJobWMS.h"

XBT_LOG_NEW_DEFAULT_CATEGORY(workflow_as_a_single_job_wms, "Log category for WorkflowAsAsingleJobWMS");

namespace wrench {

/**
* @brief Constructor, which calls the super constructor
*
* @param compute_services: a set of compute services available to run tasks
* @param storage_services: a set of storage services available to store files
* @param hostname: the name of the host on which to start the WMS
*/
WorkflowAsAsingleJobWMS::WorkflowAsAsingleJobWMS(const std::set<std::shared_ptr<ComputeService>> &compute_services,
const std::set<std::shared_ptr<StorageService>> &storage_services,
const std::string &hostname) : WMS(
nullptr, nullptr,
compute_services,
storage_services,
{}, nullptr,
hostname,
"one-task-at-a-time") {}

/**
* @brief main method of the OneTaskAtATimeWMS daemon
*
* @return 0 on completion
*
* @throw std::runtime_error
*/
int WorkflowAsAsingleJobWMS::main() {

/* Set the logging output to GREEN */
TerminalOutput::setThisProcessLoggingColor(TerminalOutput::COLOR_GREEN);

WRENCH_INFO("WMS starting on host %s", Simulation::getHostName().c_str());
WRENCH_INFO("About to execute a workflow with %lu tasks", this->getWorkflow()->getNumberOfTasks());

/* Create a job manager so that we can create/submit jobs */
auto job_manager = this->createJobManager();

/* Get the first available bare-metal compute service and storage servic */
auto compute_service = *(this->getAvailableComputeServices<BareMetalComputeService>().begin());
auto storage_service = *(this->getAvailableStorageServices().begin());

/* We want to execute the workflow tasks so that intermediate files are
* kept in the compute service's scratch place, and only the final output
* file is written back to the storage service at host WMSHost. However,
* files stored in the compute service's scratch space are erased
* after the job that created them has completed. So we ave the run
* the entire workflow as a single multi-task job! */

/* First, we need to create a map of file locations, stating for each file
* where is should be read/written */
std::map<WorkflowFile *, std::shared_ptr<FileLocation>> file_locations;
// Set each file's location to the compute service's scratch space
for (auto const &file : this->getWorkflow()->getFiles()) {
file_locations[file] = FileLocation::SCRATCH;
}
// For the workflow input files, in fact, set the location to the storage service
for (auto const &f : this->getWorkflow()->getInputFiles()) {
file_locations[f] = FileLocation::LOCATION(storage_service);
}
// For the workflow output files, in fact, set the location to the storage service
for (auto const &f : this->getWorkflow()->getOutputFiles()) {
file_locations[f] = FileLocation::LOCATION(storage_service);
}

/* Second, we create a job */
auto job = job_manager->createStandardJob(this->getWorkflow()->getTasks(), file_locations);

/* Submit the job to the compute service */
job_manager->submitJob(job, compute_service);

/* Wait for a workflow execution event and process it. In this case we know that
* the event will be a StandardJobCompletionEvent, which is processed by the method
* processEventStandardJobCompletion() that this class overrides. */
this->waitForAndProcessNextEvent();

WRENCH_INFO("Workflow execution complete");
return 0;
}

/**
* @brief Process a standard job completion event
*
* @param event: the event
*/
void WorkflowAsAsingleJobWMS::processEventStandardJobCompletion(std::shared_ptr<StandardJobCompletedEvent> event) {
/* Retrieve the job that this event is for */
auto job = event->standard_job;
/* Retrieve the job's first (and in our case only) task */
auto task = job->getTasks().at(0);
WRENCH_INFO("Notified that a standard job has completed task %s", task->getID().c_str());
}

/**
* @brief Process a standard job failure event
*
* @param event: the event
*/
void WorkflowAsAsingleJobWMS::processEventStandardJobFailure(std::shared_ptr<StandardJobFailedEvent> event) {
/* Retrieve the job that this event is for */
auto job = event->standard_job;
/* Retrieve the job's first (and in our case only) task */
auto task = job->getTasks().at(0);
/* Print some error message */
WRENCH_INFO("Notified that a standard job has failed for task %s with error %s",
task->getID().c_str(),
event->failure_cause->toString().c_str());
throw std::runtime_error("ABORTING DUE TO JOB FAILURE");
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright (c) 2017-2018. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*/


#ifndef WRENCH_ONE_TASK_AT_A_TIME_SCRATCH_H
#define WRENCH_ONE_TASK_AT_A_TIME_SCRATCH_H

#include <wrench-dev.h>


namespace wrench {

class Simulation;

/**
* @brief A Workflow Management System (WMS) implementation (inherits from WMS)
*/
class WorkflowAsAsingleJobWMS : public WMS {

public:
// Constructor
WorkflowAsAsingleJobWMS(
const std::set<std::shared_ptr<ComputeService>> &compute_services,
const std::set<std::shared_ptr<StorageService>> &storage_services,
const std::string &hostname);

protected:

// Overriden methods
void processEventStandardJobCompletion(std::shared_ptr<StandardJobCompletedEvent>) override;
void processEventStandardJobFailure(std::shared_ptr<StandardJobFailedEvent>) override;

private:
// main() method of the WMS
int main() override;

};
}
#endif //WRENCH_ONE_TASK_AT_A_TIME_SCRATCH_H
Loading

0 comments on commit b20e51e

Please sign in to comment.