Skip to content

Commit

Permalink
Merge branch 'FCFSTest'
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Apr 13, 2018
2 parents 797c8da + 40161a5 commit d9c9bc2
Show file tree
Hide file tree
Showing 18 changed files with 356 additions and 111 deletions.
9 changes: 9 additions & 0 deletions include/wrench/services/Service.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ namespace wrench {

double getPropertyValueAsDouble(std::string);

double getNetworkTimeoutValue();

void setNetworkTimeoutValue(double value);

/***********************/
/** \endcond */
/***********************/
Expand Down Expand Up @@ -89,6 +93,11 @@ namespace wrench {
/** @brief The service's name */
std::string name;

/** @brief The time (in seconds) after which a service that doesn't send back a reply message cause
* a NetworkTimeOut exception. (default: 1 second; if <0 never timeout)
*/
double network_timeout = 1.0;

/***********************/
/** \endcond */
/***********************/
Expand Down
3 changes: 3 additions & 0 deletions include/wrench/services/compute/batch/BatchJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ namespace wrench {
void setAllocatedTime(double);
unsigned long getAllocatedCoresPerNode();
double getMemoryRequirement();
double getBeginTimeStamp();
void setBeginTimeStamp(double);
double getEndingTimeStamp();
double getAppearedTimeStamp();
unsigned long getNumNodes();
Expand All @@ -43,6 +45,7 @@ namespace wrench {
WorkflowJob* job;
unsigned long num_nodes;
unsigned long cores_per_node;
double begin_time_stamp;
double ending_time_stamp;
double appeared_time_stamp;
std::set<std::tuple<std::string,unsigned long, double>> resources_allocated;
Expand Down
7 changes: 7 additions & 0 deletions include/wrench/services/compute/batch/BatchService.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,15 @@ namespace wrench {
//returns jobid,started time, running time
// std::vector<std::tuple<unsigned long, double, double>> getJobsInQueue();

/***********************/
/** \cond DEVELOPER **/
/***********************/
std::map<std::string,double> getQueueWaitingTimeEstimate(std::set<std::tuple<std::string,unsigned int,double>>);

/***********************/
/** \endcond **/
/***********************/

~BatchService() override;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,22 @@ namespace wrench {
class NetworkConnectionManager {

public:
NetworkConnectionManager(unsigned long num_connections);
NetworkConnectionManager(unsigned long max_num_data_connections);

void addConnection(std::unique_ptr<NetworkConnection> connection);

std::pair<std::unique_ptr<NetworkConnection>, bool> waitForNetworkConnection();


private:
unsigned long num_connections;
unsigned long max_num_data_connections;

void startQueuedConnections();
void startQueuedDataConnections();

std::deque<std::unique_ptr<NetworkConnection>> queued_data_connections;
std::vector<std::unique_ptr<NetworkConnection>> running_data_connections;
std::unique_ptr<NetworkConnection> running_control_connection = nullptr;

std::deque<std::unique_ptr<NetworkConnection>> queued_connections;
std::vector<std::unique_ptr<NetworkConnection>> running_connections;

};

Expand Down
21 changes: 20 additions & 1 deletion src/wrench/services/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ namespace wrench {
std::unique_ptr<SimulationMessage> message = nullptr;

try {
message = S4U_Mailbox::getMessage(ack_mailbox);
message = S4U_Mailbox::getMessage(ack_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
} catch (std::shared_ptr<NetworkTimeout> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<ServiceDaemonStoppedMessage *>(message.get())) {
Expand Down Expand Up @@ -192,4 +194,21 @@ namespace wrench {
throw WorkflowExecutionException(new ServiceIsDown(this));
}
}

/**
* @brief Returns the service's network timeout value
* @return a duration in seconds
*/
double Service::getNetworkTimeoutValue() {
return this->network_timeout;
}

/**
* @brief Sets the service's network timeout value
* @param value: a duration in seconds (<0 means: never timeout)
*
*/
void Service::setNetworkTimeoutValue(double value) {
this->network_timeout = value;
}
};
4 changes: 3 additions & 1 deletion src/wrench/services/compute/ComputeService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,11 @@ namespace wrench {
// Get the reply
std::unique_ptr<SimulationMessage> message = nullptr;
try {
message = S4U_Mailbox::getMessage(answer_mailbox);
message = S4U_Mailbox::getMessage(answer_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
} catch (std::shared_ptr<NetworkTimeout> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<ComputeServiceResourceInformationAnswerMessage *>(message.get())) {
Expand Down
13 changes: 12 additions & 1 deletion src/wrench/services/compute/batch/BatchJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,24 @@ namespace wrench {
return this->num_nodes;
}


void BatchJob::setBeginTimeStamp(double time_stamp) {
this->begin_time_stamp = time_stamp;
}


double BatchJob::getBeginTimeStamp() {
return this->begin_time_stamp;
}


double BatchJob::getEndingTimeStamp() {
return this->ending_time_stamp;
}

void BatchJob::setEndingTimeStamp(double time_stamp) {
if (this->ending_time_stamp > 0) {
throw std::invalid_argument(
throw std::runtime_error(
"BatchJob::setEndingTimeStamp(): Cannot set time stamp again for the same job"
);
}
Expand Down
29 changes: 18 additions & 11 deletions src/wrench/services/compute/batch/BatchService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,19 +288,21 @@ namespace wrench {
std::string answer_mailbox = S4U_Mailbox::generateUniqueMailboxName("batch_standard_job_mailbox");
try {
S4U_Mailbox::putMessage(this->mailbox_name,
new BatchServiceJobRequestMessage(answer_mailbox, batch_job,
this->getPropertyValueAsDouble(
BatchServiceProperty::SUBMIT_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD)));
new BatchServiceJobRequestMessage(answer_mailbox, batch_job,
this->getPropertyValueAsDouble(
BatchServiceProperty::SUBMIT_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD)));
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
}

// Get the answer
std::unique_ptr<SimulationMessage> message = nullptr;
try {
message = S4U_Mailbox::getMessage(answer_mailbox);
message = S4U_Mailbox::getMessage(answer_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
} catch (std::shared_ptr<NetworkTimeout> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<ComputeServiceSubmitStandardJobAnswerMessage *>(message.get())) {
Expand Down Expand Up @@ -399,9 +401,11 @@ namespace wrench {
// Get the answer
std::unique_ptr<SimulationMessage> message = nullptr;
try {
message = S4U_Mailbox::getMessage(answer_mailbox);
message = S4U_Mailbox::getMessage(answer_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
} catch (std::shared_ptr<NetworkTimeout> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<ComputeServiceSubmitPilotJobAnswerMessage *>(message.get())) {
Expand Down Expand Up @@ -852,9 +856,11 @@ namespace wrench {
std::unique_ptr<SimulationMessage> message = nullptr;

try {
message = S4U_Mailbox::getMessage(answer_mailbox);
message = S4U_Mailbox::getMessage(answer_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
} catch (std::shared_ptr<NetworkTimeout> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<ComputeServiceTerminatePilotJobAnswerMessage *>(message.get())) {
Expand Down Expand Up @@ -1452,9 +1458,10 @@ namespace wrench {
this->getPropertyValueAsString(
BatchServiceProperty::THREAD_STARTUP_OVERHEAD)}}));
executor->start(executor, true);

this->running_standard_job_executors.insert(executor);
batch_job->setBeginTimeStamp(S4U_Simulation::getClock());
batch_job->setEndingTimeStamp(S4U_Simulation::getClock() + allocated_time);
this->running_standard_job_executors.insert(executor);

// this->running_jobs.insert(std::move(batch_job_ptr));
this->timeslots.push_back(batch_job->getEndingTimeStamp());
//remember the allocated resources for the job
Expand Down Expand Up @@ -1487,9 +1494,6 @@ namespace wrench {

//set the ending timestamp of the batchjob (pilotjob)

double timeout_timestamp = std::min(job->getDuration(), allocated_time);
batch_job->setEndingTimeStamp(S4U_Simulation::getClock() + timeout_timestamp);

// Create and launch a compute service for the pilot job
std::shared_ptr<ComputeService> cs = std::shared_ptr<ComputeService>(
new MultihostMulticoreComputeService(host_to_run_on,
Expand All @@ -1502,6 +1506,9 @@ namespace wrench {

try {
cs->start(cs, true);
batch_job->setBeginTimeStamp(S4U_Simulation::getClock());
double timeout_timestamp = std::min(job->getDuration(), allocated_time);
batch_job->setEndingTimeStamp(S4U_Simulation::getClock() + timeout_timestamp);
} catch (std::runtime_error &e) {
throw;
}
Expand Down
16 changes: 12 additions & 4 deletions src/wrench/services/compute/cloud/CloudService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ namespace wrench {
std::unique_ptr<SimulationMessage> message = nullptr;

try {
message = S4U_Mailbox::getMessage(answer_mailbox);
message = S4U_Mailbox::getMessage(answer_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
} catch (std::shared_ptr<NetworkTimeout> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<CloudServiceGetExecutionHostsAnswerMessage *>(message.get())) {
Expand Down Expand Up @@ -139,9 +141,11 @@ namespace wrench {
std::unique_ptr<SimulationMessage> message = nullptr;

try {
message = S4U_Mailbox::getMessage(answer_mailbox);
message = S4U_Mailbox::getMessage(answer_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
} catch (std::shared_ptr<NetworkTimeout> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<CloudServiceCreateVMAnswerMessage *>(message.get())) {
Expand Down Expand Up @@ -180,9 +184,11 @@ namespace wrench {
// Get the answer
std::unique_ptr<SimulationMessage> message = nullptr;
try {
message = S4U_Mailbox::getMessage(answer_mailbox);
message = S4U_Mailbox::getMessage(answer_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
} catch (std::shared_ptr<NetworkTimeout> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<ComputeServiceSubmitStandardJobAnswerMessage *>(message.get())) {
Expand Down Expand Up @@ -226,9 +232,11 @@ namespace wrench {
std::unique_ptr<SimulationMessage> message = nullptr;

try {
message = S4U_Mailbox::getMessage(answer_mailbox);
message = S4U_Mailbox::getMessage(answer_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
} catch (std::shared_ptr<NetworkTimeout> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<ComputeServiceSubmitPilotJobAnswerMessage *>(message.get())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ namespace wrench {
// Get the answer
std::unique_ptr<SimulationMessage> message = nullptr;
try {
message = S4U_Mailbox::getMessage(answer_mailbox);
message = S4U_Mailbox::getMessage(answer_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
} catch (std::shared_ptr<NetworkTimeout> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<ComputeServiceSubmitStandardJobAnswerMessage *>(message.get())) {
Expand Down Expand Up @@ -116,9 +118,11 @@ namespace wrench {
std::unique_ptr<SimulationMessage> message = nullptr;

try {
message = S4U_Mailbox::getMessage(answer_mailbox);
message = S4U_Mailbox::getMessage(answer_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
} catch (std::shared_ptr<NetworkTimeout> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<ComputeServiceSubmitPilotJobAnswerMessage *>(message.get())) {
Expand Down Expand Up @@ -1197,9 +1201,11 @@ namespace wrench {
// Get the answer
std::unique_ptr<SimulationMessage> message = nullptr;
try {
message = S4U_Mailbox::getMessage(answer_mailbox);
message = S4U_Mailbox::getMessage(answer_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
} catch (std::shared_ptr<NetworkTimeout> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<ComputeServiceTerminateStandardJobAnswerMessage *>(message.get())) {
Expand Down Expand Up @@ -1244,9 +1250,11 @@ namespace wrench {
std::unique_ptr<SimulationMessage> message = nullptr;

try {
message = S4U_Mailbox::getMessage(answer_mailbox);
message = S4U_Mailbox::getMessage(answer_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
} catch (std::shared_ptr<NetworkTimeout> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<ComputeServiceTerminatePilotJobAnswerMessage *>(message.get())) {
Expand Down Expand Up @@ -1375,45 +1383,7 @@ namespace wrench {
}
}

// /**
// * @brief Process a get number of cores request
// *
// * @param answer_mailbox: the mailbox_name to which the answer message should be sent
// *
// * @throw std::runtime_error
// */
// void MultihostMulticoreComputeService::processGetNumCores(const std::string &answer_mailbox) {
// ComputeServiceNumCoresAnswerMessage *answer_message = new ComputeServiceNumCoresAnswerMessage(
// this->total_num_cores,
// this->getPropertyValueAsDouble(
// ComputeServiceProperty::NUM_CORES_ANSWER_MESSAGE_PAYLOAD));
// try {
// S4U_Mailbox::dputMessage(answer_mailbox, answer_message);
// } catch (std::shared_ptr<NetworkError> &cause) {
// return;
// }
// }

// /**
// * @brief Process a get number of idle cores request
// *
// * @param answer_mailbox: the mailbox_name to which the answer message should be sent
// */
// void MultihostMulticoreComputeService::processGetNumIdleCores(const std::string &answer_mailbox) {
// unsigned long num_available_cores = 0;
// for (auto r : this->core_and_ram_availabilities) {
// num_available_cores += r.second;
// }
// ComputeServiceNumIdleCoresAnswerMessage *answer_message = new ComputeServiceNumIdleCoresAnswerMessage(
// num_available_cores,
// this->getPropertyValueAsDouble(
// MultihostMulticoreComputeServiceProperty::NUM_IDLE_CORES_ANSWER_MESSAGE_PAYLOAD));
// try {
// S4U_Mailbox::dputMessage(answer_mailbox, answer_message);
// } catch (std::shared_ptr<NetworkError> &cause) {
// return;
// }
// }
// /*

/**
* @brief Process a submit standard job request
Expand Down

0 comments on commit d9c9bc2

Please sign in to comment.