Skip to content

Commit

Permalink
Merge branch 'master' of github.com:wrench-project/wrench
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed May 22, 2018
2 parents 5669401 + 6c6a81d commit 6eaa891
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 241 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Expand Up @@ -27,7 +27,7 @@ if (NOT ${WRENCH_VERSION_EXTRA} EQUAL "")
endif ()


include_directories(src/wrench/ include/ /usr/include /usr/local/include /opt/local/include /usr/local/include/nlohmann)
include_directories(src/wrench/ include/ /usr/include /usr/local/include /opt/local/include)
#include_directories(src/wrench/ include/ /usr/include /usr/local/include /Users/casanova/Home/PROJECTS/SIMGRID/SimGrid-3.17 /opt/local/include /usr/local/include/nlohmann)


Expand Down
35 changes: 16 additions & 19 deletions examples/simple-example/scheduler/RandomStandardJobScheduler.cpp
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2017. The WRENCH Team.
* Copyright (c) 2017-2018. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -29,16 +29,16 @@ namespace wrench {
*/

void RandomStandardJobScheduler::scheduleTasks(const std::set<ComputeService *> &compute_services,
const std::map<std::string, std::vector<WorkflowTask *>> &tasks) {
const std::vector<WorkflowTask *> &tasks) {

WRENCH_INFO("There are %ld ready tasks to schedule", tasks.size());
for (auto itc : tasks) {
for (auto task : tasks) {
bool successfully_scheduled = false;

// First: attempt to run the task on a running pilot job
WRENCH_INFO("Trying to submit task '%s' to a pilot job...", itc.first.c_str());
WRENCH_INFO("Trying to submit task '%s' to a pilot job...", task->getId().c_str());

double total_flops = Workflow::getSumFlops((*tasks.begin()).second);
double total_flops = Workflow::getSumFlops({task});

std::set<PilotJob *> running_pilot_jobs = this->job_manager->getRunningPilotJobs();
for (auto pj : running_pilot_jobs) {
Expand Down Expand Up @@ -82,8 +82,8 @@ namespace wrench {
}

// We can submit!
WRENCH_INFO("Submitting task %s for execution to a pilot job", itc.first.c_str());
WorkflowJob *job = (WorkflowJob *) job_manager->createStandardJob(itc.second, {});
WRENCH_INFO("Submitting task %s for execution to a pilot job", task->getId().c_str());
WorkflowJob *job = (WorkflowJob *) job_manager->createStandardJob(task, {});
job_manager->submitJob(job, cs);
successfully_scheduled = true;
break;
Expand All @@ -96,7 +96,7 @@ namespace wrench {
}

// Second: attempt to run the task on a compute resource
WRENCH_INFO("Trying to submit task '%s' to a standard compute service...", itc.first.c_str());
WRENCH_INFO("Trying to submit task '%s' to a standard compute service...", task->getId().c_str());

for (auto cs : compute_services) {
WRENCH_INFO("Asking compute service %s if it can run this standard job...", cs->getName().c_str());
Expand All @@ -112,7 +112,7 @@ namespace wrench {
// Check that it can run it right now in terms of idle cores
try {
std::vector<unsigned long> num_idle_cores = cs->getNumIdleCores();
sum_num_idle_cores = (unsigned long)std::accumulate(num_idle_cores.begin(), num_idle_cores.end(), 0);
sum_num_idle_cores = (unsigned long) std::accumulate(num_idle_cores.begin(), num_idle_cores.end(), 0);
} catch (WorkflowExecutionException &e) {
// The service has some problem, forget it
continue;
Expand All @@ -124,18 +124,16 @@ namespace wrench {
}

std::map<WorkflowFile *, StorageService *> file_locations;
for (auto t : itc.second) {
for (auto f : t->getInputFiles()) {
file_locations.insert(std::make_pair(f, default_storage_service));
}
for (auto f : t->getOutputFiles()) {
file_locations.insert(std::make_pair(f, default_storage_service));
}
for (auto f : task->getInputFiles()) {
file_locations.insert(std::make_pair(f, default_storage_service));
}
for (auto f : task->getOutputFiles()) {
file_locations.insert(std::make_pair(f, default_storage_service));
}

// We can submit!
WRENCH_INFO("Submitting task %s for execution as a standard job", itc.first.c_str());
WorkflowJob *job = (WorkflowJob *) job_manager->createStandardJob(itc.second, file_locations);
WRENCH_INFO("Submitting task %s for execution as a standard job", task->getId().c_str());
WorkflowJob *job = (WorkflowJob *) job_manager->createStandardJob(task, file_locations);
job_manager->submitJob(job, cs);
successfully_scheduled = true;
break;
Expand All @@ -150,5 +148,4 @@ namespace wrench {
WRENCH_INFO("Done with scheduling tasks as standard jobs");
}


}
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2017. The WRENCH Team.
* Copyright (c) 2017-2018. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand All @@ -24,14 +24,15 @@ namespace wrench {

RandomStandardJobScheduler(JobManager *job_manager,
StorageService *default_storage_service) : job_manager(job_manager),
default_storage_service(default_storage_service) {}
default_storage_service(
default_storage_service) {}

/***********************/
/** \cond DEVELOPER */
/***********************/

void scheduleTasks(const std::set<ComputeService *> &compute_services,
const std::map<std::string, std::vector<WorkflowTask *>> &tasks);
const std::vector<WorkflowTask *> &tasks) override;

private:
JobManager *job_manager;
Expand Down
34 changes: 10 additions & 24 deletions include/wrench/services/compute/ComputeService.h
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2017. The WRENCH Team.
* Copyright (c) 2017-2018. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -38,13 +38,13 @@ namespace wrench {
/** \cond INTERNAL **/
/***********************/

friend class StandardJobExecutorTest;
friend class StandardJobExecutorTest;

/***********************/
/** \endcond **/
/***********************/

friend class Simulation;
friend class Simulation;

public:

Expand All @@ -59,22 +59,20 @@ namespace wrench {
static constexpr double ALL_RAM = DBL_MAX;

/** A static StorageService pointer to the SCRATCH space inside the compute service **/
static StorageService* SCRATCH;
static StorageService *SCRATCH;

/***********************/
/** \cond DEVELOPER **/
/***********************/

virtual ~ComputeService(){}
virtual ~ComputeService() {}

void stop() override;

void submitJob(WorkflowJob *job, std::map<std::string, std::string> = {});

void terminateJob(WorkflowJob *job);

void setLocalScratch();

bool supportsStandardJobs();

bool supportsPilotJobs();
Expand All @@ -97,10 +95,6 @@ namespace wrench {

double getFreeRemainingScratchSpace();

void setDefaultStorageService(StorageService *storage_service);

StorageService *getDefaultStorageService();

/***********************/
/** \cond INTERNAL **/
/***********************/
Expand All @@ -110,12 +104,11 @@ namespace wrench {

virtual void submitPilotJob(PilotJob *job, std::map<std::string, std::string> &service_specific_arguments) = 0;


virtual void terminateStandardJob(StandardJob *job) = 0;

virtual void terminatePilotJob(PilotJob *job) = 0;

ComputeService(std::string hostname,
ComputeService(const std::string &hostname,
std::string service_name,
std::string mailbox_name_prefix,
bool supports_standard_jobs,
Expand All @@ -124,29 +117,22 @@ namespace wrench {

protected:

ComputeService(std::string hostname,
ComputeService(const std::string &hostname,
std::string service_name,
std::string mailbox_name_prefix,
bool supports_standard_jobs,
bool supports_pilot_jobs,
StorageService* scratch_space = nullptr);

// virtual void processGetResourceInformation(const std::string &answer_mailbox) = 0;

// virtual void processSubmitStandardJob(const std::string &answer_mailbox, StandardJob *job,
// std::map<std::string, std::string> &service_specific_args) = 0;
//
// virtual void processSubmitPilotJob(const std::string &answer_mailbox, PilotJob *job) = 0;
StorageService *scratch_space = nullptr);

/** @brief Whether the compute service supports pilot jobs */
bool supports_pilot_jobs;
/** @brief Whether the compute service supports standard jobs */
bool supports_standard_jobs;

/** @brief A scratch storage service associated to the compute service */
StorageService* scratch_space_storage_service;
StorageService *scratch_space_storage_service;

StorageService* getScratch();
StorageService *getScratch();

private:

Expand Down
73 changes: 21 additions & 52 deletions src/wrench/services/compute/ComputeService.cpp
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2017. The WRENCH Team.
* Copyright (c) 2017-2018. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -31,7 +31,7 @@ namespace wrench {
Service::stop();
}

StorageService* ComputeService::SCRATCH = (StorageService*)(new int(553453));
StorageService *ComputeService::SCRATCH = (StorageService *) (new int(553453));

/**
* @brief Submit a job to the compute service
Expand Down Expand Up @@ -121,12 +121,11 @@ namespace wrench {
* @param mailbox_name_prefix: the mailbox name prefix
* @param supports_standard_jobs: true if the job executor should support standard jobs
* @param supports_pilot_jobs: true if the job executor should support pilot jobs
* @param default_storage_service: a storage service
* @param scratch_size: the size for the scratch space of the compute service
*/
ComputeService::ComputeService(std::string hostname,
std::string service_name,
std::string mailbox_name_prefix,
ComputeService::ComputeService(const std::string &hostname,
const std::string service_name,
const std::string mailbox_name_prefix,
bool supports_standard_jobs,
bool supports_pilot_jobs,
double scratch_size) :
Expand Down Expand Up @@ -158,20 +157,18 @@ namespace wrench {
* @param default_storage_service: a storage service
* @param scratch_space: scratch space of the compute service
*/
ComputeService::ComputeService(std::string hostname, std::string service_name, std::string mailbox_name_prefix,
ComputeService::ComputeService(const std::string &hostname,
const std::string service_name,
const std::string mailbox_name_prefix,
bool supports_standard_jobs,
bool supports_pilot_jobs,
StorageService *scratch_space): Service(hostname, service_name, mailbox_name_prefix),
supports_pilot_jobs(supports_pilot_jobs),
supports_standard_jobs(supports_standard_jobs) {
StorageService *scratch_space) :
Service(hostname, service_name, mailbox_name_prefix),
supports_pilot_jobs(supports_pilot_jobs),
supports_standard_jobs(supports_standard_jobs) {

this->state = ComputeService::UP;
if (scratch_space != nullptr) {
this->scratch_space_storage_service = scratch_space;
} else {
this->scratch_space_storage_service = nullptr;
}

this->scratch_space_storage_service = scratch_space;
}

/**
Expand Down Expand Up @@ -209,7 +206,7 @@ namespace wrench {
}

if (dict.find("num_hosts") != dict.end()) {
return (unsigned long)(*(dict["num_hosts"].begin()));
return (unsigned long) (*(dict["num_hosts"].begin()));
} else {
return 0;
}
Expand Down Expand Up @@ -238,7 +235,7 @@ namespace wrench {

if (dict.find("num_cores") != dict.end()) {
for (auto x : dict["num_cores"]) {
to_return.push_back((unsigned long)x);
to_return.push_back((unsigned long) x);
}
}

Expand Down Expand Up @@ -267,7 +264,7 @@ namespace wrench {

if (dict.find("num_idle_cores") != dict.end()) {
for (auto x : dict["num_idle_cores"]) {
to_return.push_back((unsigned long)x);
to_return.push_back((unsigned long) x);
}
}

Expand Down Expand Up @@ -349,25 +346,6 @@ namespace wrench {
return dict["ttl"][0];
}




// /**
// * @brief Set the default StorageService for the ComputeService
// * @param storage_service: a storage service
// */
// void ComputeService::setDefaultStorageService(StorageService *storage_service) {
// this->default_storage_service = storage_service;
// }
//
// /**
// * @brief Get the default StorageService for the compute service
// * @return a storage service
// */
// StorageService *ComputeService::getDefaultStorageService() {
// return this->default_storage_service;
// }

/**
* @brief Get information about the compute service as a dictionary of vectors
* @return service information
Expand Down Expand Up @@ -405,41 +383,32 @@ namespace wrench {

} else {
throw std::runtime_error(
"MultihostMulticoreComputeService::getServiceResourceInformation(): unexpected [" + msg->getName() + "] message");
"MultihostMulticoreComputeService::getServiceResourceInformation(): unexpected [" + msg->getName() +
"] message");
}
}


/**
* @brief Get the total size of the scratch space (not the remaining free space on the scratch space)
* @return return size (double)
*/
double ComputeService::getScratchSize() {
if (this->scratch_space_storage_service) {
return this->scratch_space_storage_service->getTotalSpace();
} else {
return 0.0;
}
return this->scratch_space_storage_service ? this->scratch_space_storage_service->getTotalSpace() : 0.0;
}

/**
* @brief Get the free space of the scratch service
* @return return size (double)
*/
double ComputeService::getFreeRemainingScratchSpace() {
if (this->scratch_space_storage_service) {
return this->scratch_space_storage_service->getFreeSpace();
} else {
return 0.0;
}
return this->scratch_space_storage_service ? this->scratch_space_storage_service->getFreeSpace() : 0.0;
}


/**
* @brief Get a shared pointer to the scratch space
* @return returns a pointer to the shared scratch space
*/
StorageService* ComputeService::getScratch() {
StorageService *ComputeService::getScratch() {
return this->scratch_space_storage_service;
}

Expand Down

0 comments on commit 6eaa891

Please sign in to comment.