Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/wrench-project/wrench
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Oct 3, 2017
2 parents 2d78f1e + d05709c commit 1f944ce
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 145 deletions.
2 changes: 1 addition & 1 deletion examples/simple-wms/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ int main(int argc, char **argv) {

std::string executor_host = hostname_list[(hostname_list.size() > 1) ? 1 : 0];

// wrench::ComputeService *cloud_service = new wrench::CloudService(wms_host, true, true,
// wrench::ComputeService *cloud_service = new wrench::CloudService(wms_host, true, true, storage_service,
// {{wrench::CloudServiceProperty::STOP_DAEMON_MESSAGE_PAYLOAD, "666"}});
std::vector<std::string> execution_hosts = {executor_host};

Expand Down
1 change: 1 addition & 0 deletions examples/simple-wms/scheduler/CloudScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ namespace wrench {
std::string vm_host = "vm" + std::to_string(VM_ID++) + "_" + execution_hosts[0];

cs->createVM(execution_hosts[0], vm_host, job->getNumCores());

} catch (WorkflowExecutionException &e) {
//TODO launch error
}
Expand Down
9 changes: 1 addition & 8 deletions include/wrench.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,11 @@

// Scheduler
#include "wrench/wms/scheduler/Scheduler.h"

// Pilot Job Scheduler
#include "wrench/wms/scheduler/PilotJobScheduler.h"
//#include "wms/scheduler/pilot_job/CriticalPathScheduler.h"

// Static Optimizations
// Scheduling Optimizations
#include "wrench/wms/StaticOptimization.h"
//#include "wms/optimizations/static/SimplePipelineClustering.h"

// Dynamic Optimizations
#include "wrench/wms/DynamicOptimization.h"
//#include "wms/optimizations/dynamic/FailureDynamicClustering.h"

// Simulation Output Analysis
#include "wrench/simulation/SimulationTimestamp.h"
Expand Down
4 changes: 2 additions & 2 deletions include/wrench/services/cloud/CloudService.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace wrench {
private:
std::map<std::string, std::string> default_property_values =
{{CloudServiceProperty::STOP_DAEMON_MESSAGE_PAYLOAD, "1024"},
{CloudServiceProperty::DAEMON_STOPPED_MESSAGE_PAYLOAD, "1024"},
{CloudServiceProperty::NUM_IDLE_CORES_REQUEST_MESSAGE_PAYLOAD, "1024"},
{CloudServiceProperty::NUM_IDLE_CORES_ANSWER_MESSAGE_PAYLOAD, "1024"},
{CloudServiceProperty::NUM_CORES_REQUEST_MESSAGE_PAYLOAD, "1024"},
Expand All @@ -46,6 +47,7 @@ namespace wrench {
CloudService(std::string &hostname,
bool supports_standard_jobs,
bool supports_pilot_jobs,
StorageService *default_storage_service,
std::map<std::string, std::string> plist);

/***********************/
Expand All @@ -57,8 +59,6 @@ namespace wrench {
int num_cores,
std::map<std::string, std::string> plist = {});

void turnAllVMsOff(std::string pm_hostname);

/***********************/
/** \endcond **/
/***********************/
Expand Down
64 changes: 22 additions & 42 deletions src/wrench/services/cloud/CloudService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@
*/

#include "wrench/services/cloud/CloudService.h"
#include "wrench/services/cloud/CloudServiceProperty.h"
#include "wrench/services/compute/MulticoreComputeService.h"
#include "wrench/services/storage/SimpleStorageService.h"
#include "wrench/simgrid_S4U_util/S4U_Simulation.h"
#include "wrench/exceptions/WorkflowExecutionException.h"
#include "wrench/logging/TerminalOutput.h"
#include "wrench/workflow/job/StandardJob.h"
#include "services/ServiceMessage.h"
#include "services/cloud/CloudServiceMessage.h"
#include "simgrid_S4U_util/S4U_Mailbox.h"
Expand All @@ -39,6 +36,7 @@ namespace wrench {
CloudService::CloudService(std::string &hostname,
bool supports_standard_jobs,
bool supports_pilot_jobs,
StorageService *default_storage_service,
std::map<std::string, std::string> plist = {}) :
ComputeService("cloud_service", "cloud_service", supports_standard_jobs, supports_pilot_jobs,
default_storage_service) {
Expand Down Expand Up @@ -107,15 +105,6 @@ namespace wrench {
}
}

/**
*
* @param pm_hostname
*/
void CloudService::turnAllVMsOff(std::string pm_hostname) {


}

/**
* @brief Main method of the daemon
*
Expand All @@ -129,6 +118,7 @@ namespace wrench {

/** Main loop **/
while (this->processNextMessage()) {
// no specific action
}

WRENCH_INFO("Cloud Service on host %s terminated!", S4U_Simulation::getHostName().c_str());
Expand Down Expand Up @@ -189,6 +179,18 @@ namespace wrench {
processSubmitStandardJob(msg->answer_mailbox, msg->job);
return true;

} else if (auto *msg = dynamic_cast<ServiceStopDaemonMessage *>(message.get())) {
this->terminate();
// This is Synchronous
try {
S4U_Mailbox::putMessage(msg->ack_mailbox,
new ServiceDaemonStoppedMessage(this->getPropertyValueAsDouble(
CloudServiceProperty::DAEMON_STOPPED_MESSAGE_PAYLOAD)));
} catch (std::shared_ptr<NetworkError> &cause) {
return false;
}
return false;

} else {
throw std::runtime_error("Unexpected [" + message->getName() + "] message");
}
Expand All @@ -197,7 +199,9 @@ namespace wrench {
/**
* @brief Create a multicore executor VM in a physical machine
*
* @param answer_mailbox: the mailbox to which the answer message should be sent
* @param pm_hostname: the name of the physical machine host
* @param vm_hostname: the name of the VM host
* @param num_cores: the number of cores the service can use (0 means "use as many as there are cores on the host")
* @param supports_standard_jobs: true if the compute service should support standard jobs
* @param supports_pilot_jobs: true if the compute service should support pilot jobs
Expand All @@ -211,7 +215,7 @@ namespace wrench {
int num_cores,
bool supports_standard_jobs,
bool supports_pilot_jobs,
std::map<std::string, std::string> plist) {
std::map<std::string, std::string> plist = {}) {

WRENCH_INFO("Asked to create a VM on %s with %d cores", pm_hostname.c_str(), num_cores);

Expand All @@ -224,15 +228,10 @@ namespace wrench {
this->vm_list[vm_hostname] = new simgrid::s4u::VirtualMachine(vm_hostname.c_str(),
simgrid::s4u::Host::by_name(
pm_hostname), num_cores);

// create a storage service to the VM
StorageService *storage_service = this->simulation->add(std::unique_ptr<SimpleStorageService>(
new SimpleStorageService(vm_hostname, 10000000000000.0)));

// create a multicore executor for the VM
this->cs_list[vm_hostname] = std::unique_ptr<ComputeService>(
new MulticoreComputeService(vm_hostname, supports_standard_jobs, supports_pilot_jobs,
storage_service, plist));
default_storage_service, plist));
this->cs_list[vm_hostname]->setSimulation(this->simulation);

S4U_Mailbox::dputMessage(
Expand Down Expand Up @@ -324,33 +323,16 @@ namespace wrench {
return;
}

FileRegistryService *file_registry_service = this->simulation->getFileRegistryService();

for (auto &cs : cs_list) {
if (cs.second->getNumIdleCores() >= job->getNumCores()) {
// stage in files to VM storage
for (auto task : job->getTasks()) {
for (auto file : task->getInputFiles()) {
std::set<StorageService *> storage_list = file_registry_service->lookupEntry(file);
std::cout << "--------- LOOKING FOR FILE: " << file->getId() << std::endl;
std::cout << "----------- STORAGE LIST SIZE: " << storage_list.size() << std::endl;
if (storage_list.find(cs.second->getDefaultStorageService()) == storage_list.end()) {
std::cout << "------------ CREATING REQUEST FROM: " << (*storage_list.begin())->getName() << std::endl;
job->pre_file_copies.insert(std::tuple<WorkflowFile *, StorageService *, StorageService *>(
file, (*storage_list.begin()), cs.second->getDefaultStorageService()));
}
}
}

cs.second->getDefaultStorageService();

cs.second->submitStandardJob(job);
try {
S4U_Mailbox::dputMessage(
answer_mailbox,
new ComputeServiceSubmitStandardJobAnswerMessage(
job, this, true, nullptr, this->getPropertyValueAsDouble(
ComputeServiceProperty::SUBMIT_STANDARD_JOB_ANSWER_MESSAGE_PAYLOAD)));
return;
} catch (std::shared_ptr<NetworkError> &cause) {
return;
}
Expand All @@ -376,11 +358,9 @@ namespace wrench {
void CloudService::terminate() {
this->setStateToDown();

//TODO: call terminate for multicore executors

// destroy VMs
for (auto &vm : this->vm_list) {
vm.second->destroy();
WRENCH_INFO("Stopping VMs Compute Service");
for (auto &cs : this->cs_list) {
cs.second->stop();
}
}
}
Loading

0 comments on commit 1f944ce

Please sign in to comment.