Skip to content

Commit

Permalink
Some re-factoring of the examples
Browse files Browse the repository at this point in the history
  - Schedulers now take no arguments to the constructor
  - They just get the compute resources from the scheduleTasks() methods
  • Loading branch information
henricasanova committed Feb 21, 2018
1 parent 5710fb8 commit d326543
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 43 deletions.
2 changes: 1 addition & 1 deletion examples/simple-wms/SimpleWMSBatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ int main(int argc, char **argv) {
std::unique_ptr<wrench::WMS>(
new wrench::SimpleWMS(&workflow,
std::unique_ptr<wrench::Scheduler>(
new wrench::BatchScheduler(batch_service, &simulation)),
new wrench::BatchScheduler()),
compute_services, storage_services, wms_host)));

/* Instantiate a file registry service to be started on some host. This service is
Expand Down
2 changes: 1 addition & 1 deletion examples/simple-wms/SimpleWMSCloud.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ int main(int argc, char **argv) {
std::unique_ptr<wrench::WMS>(
new wrench::SimpleWMS(&workflow,
std::unique_ptr<wrench::Scheduler>(
new wrench::CloudScheduler(cloud_service, &simulation)),
new wrench::CloudScheduler()),
compute_services, storage_services, wms_host)));

/* Instantiate a file registry service to be started on some host. This service is
Expand Down
26 changes: 11 additions & 15 deletions examples/simple-wms/scheduler/BatchScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,8 @@ namespace wrench {
/**
* @brief Constructor
*
* @param batch_service: a pointer to a batch service
* @param execution_hosts: list of physical execution hosts to run the batch job
* @param simulation: a pointer to the simulation object
*
* @throw std::runtime_error
*/
BatchScheduler::BatchScheduler(ComputeService *batch_service, Simulation *simulation) : simulation(simulation) {

if (typeid(batch_service) == typeid(BatchService)) {
throw std::runtime_error("The provided batch service is not a BatchService object.");
}
this->batch_service = batch_service;
BatchScheduler::BatchScheduler() {
}

/**
Expand All @@ -44,10 +34,16 @@ namespace wrench {
std::map<std::string, std::vector<WorkflowTask *>> ready_tasks,
const std::set<ComputeService *> &compute_services) {

if (compute_services.find(batch_service) == compute_services.end()) {
throw std::runtime_error("The default batch service is not listed as a compute service.");
// Check that the right compute_services is passed
if (compute_services.size() != 1) {
throw std::runtime_error("This example Batch Scheduler requires a single compute service");
}

ComputeService *compute_service = *compute_services.begin();
BatchService *batch_service;
if (not(batch_service = dynamic_cast<BatchService *>(compute_service))) {
throw std::runtime_error("This example Batch Scheduler can only handle a batch service");
}
auto *cs = (BatchService *) this->batch_service;

WRENCH_INFO("There are %ld ready tasks to schedule", ready_tasks.size());

Expand All @@ -59,7 +55,7 @@ namespace wrench {
batch_job_args["-N"] = "1";
batch_job_args["-t"] = "2000000"; //time in minutes
batch_job_args["-c"] = "1"; //number of cores per node
job_manager->submitJob(job, cs, batch_job_args);
job_manager->submitJob(job, batch_service, batch_job_args);
}
WRENCH_INFO("Done with scheduling tasks as standard jobs");
}
Expand Down
6 changes: 1 addition & 5 deletions examples/simple-wms/scheduler/BatchScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@

namespace wrench {

class Simulation;

/**
* @brief A batch Scheduler
*/
class BatchScheduler : public Scheduler {

public:
BatchScheduler(ComputeService *batch_service, Simulation *simulation);
BatchScheduler();

/***********************/
/** \cond DEVELOPER */
Expand All @@ -40,8 +38,6 @@ namespace wrench {

private:

ComputeService *batch_service;
Simulation *simulation;
};
}

Expand Down
39 changes: 20 additions & 19 deletions examples/simple-wms/scheduler/CloudScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,9 @@ namespace wrench {
/**
* @brief Constructor
*
* @param cloud_service: a pointer to a cloud service
* @param simulation: a pointer to the simulation object
*
* @throw std::runtime_error
*/
CloudScheduler::CloudScheduler(CloudService *cloud_service, Simulation *simulation) : simulation(simulation) {
CloudScheduler::CloudScheduler() {

if (cloud_service == nullptr) {
throw std::runtime_error("A cloud service should be provided.");
}
this->cloud_service = cloud_service;
}

/**
Expand All @@ -47,13 +39,22 @@ namespace wrench {
std::map<std::string, std::vector<WorkflowTask *>> ready_tasks,
const std::set<ComputeService *> &compute_services) {

// obtain list of execution hosts
if (this->execution_hosts.empty()) {
this->execution_hosts = this->cloud_service->getExecutionHosts();
// Check that the right compute_services is passed
if (compute_services.size() != 1) {
throw std::runtime_error("This example Cloud Scheduler requires a single compute service");
}

if (compute_services.find(cloud_service) == compute_services.end()) {
throw std::runtime_error("The default cloud service is not listed as a compute service.");
ComputeService *compute_service = *compute_services.begin();
CloudService *cloud_service;

if (not(cloud_service = dynamic_cast<CloudService *>(compute_service))) {
throw std::runtime_error("This example Cloud Scheduler can only handle a cloud service");
}


// obtain list of execution hosts, if not already done
if (this->execution_hosts.empty()) {
this->execution_hosts = cloud_service->getExecutionHosts();
}

WRENCH_INFO("There are %ld ready tasks to schedule", ready_tasks.size());
Expand All @@ -62,12 +63,12 @@ namespace wrench {
for (auto itc : ready_tasks) {
//TODO add support to pilot jobs

long sum_num_idle_cores = 0;
unsigned long sum_num_idle_cores = 0;

// Check that it can run it right now in terms of idle cores
try {
std::vector<unsigned long> num_idle_cores = this->cloud_service->getNumIdleCores();
sum_num_idle_cores = std::accumulate(num_idle_cores.begin(), num_idle_cores.end(), 0);
std::vector<unsigned long> num_idle_cores = compute_service->getNumIdleCores();
sum_num_idle_cores = (unsigned long)std::accumulate(num_idle_cores.begin(), num_idle_cores.end(), 0);
} catch (WorkflowExecutionException &e) {
// The service has some problem, forget it
throw std::runtime_error("Unable to get the number of idle cores.");
Expand All @@ -81,7 +82,7 @@ namespace wrench {
std::string vm_host = "vm" + std::to_string(VM_ID++) + "_" + pm_host;

// TODO: provide proper VM RAM requests
if (this->cloud_service->createVM(pm_host, vm_host, ((StandardJob *) (job))->getMinimumRequiredNumCores(), 1000)) {
if (cloud_service->createVM(pm_host, vm_host, ((StandardJob *) (job))->getMinimumRequiredNumCores(), 1000)) {
this->vm_list[pm_host].push_back(vm_host);
}

Expand All @@ -90,7 +91,7 @@ namespace wrench {
return;
}
}
job_manager->submitJob(job, this->cloud_service);
job_manager->submitJob(job, cloud_service);
scheduled++;
}
WRENCH_INFO("Done with scheduling tasks as standard jobs");
Expand Down
3 changes: 1 addition & 2 deletions examples/simple-wms/scheduler/CloudScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace wrench {
class CloudScheduler : public Scheduler {

public:
CloudScheduler(CloudService *cloud_service, Simulation *simulation);
CloudScheduler();

/***********************/
/** \cond DEVELOPER */
Expand All @@ -39,7 +39,6 @@ namespace wrench {
private:
std::string choosePMHostname();

CloudService *cloud_service;
std::vector<std::string> execution_hosts;
std::map<std::string, std::vector<std::string>> vm_list;
Simulation *simulation;
Expand Down

0 comments on commit d326543

Please sign in to comment.