Skip to content

Commit

Permalink
Code duplication--
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Mar 10, 2019
1 parent 5658b8c commit f8e3693
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 75 deletions.
3 changes: 3 additions & 0 deletions include/wrench/services/compute/batch/BatchService.h
Expand Up @@ -141,6 +141,9 @@ namespace wrench {
//submits a standard job
void submitPilotJob(PilotJob *job, std::map<std::string, std::string> &batch_job_args) override;

// helper function
void terminateWorkflowJob(WorkflowJob *job);

// terminate a standard job
void terminateStandardJob(StandardJob *job) override;

Expand Down
158 changes: 83 additions & 75 deletions src/wrench/services/compute/batch/BatchService.cpp
Expand Up @@ -349,7 +349,7 @@ namespace wrench {
}

throw std::runtime_error(
"BatchService::submitStandardJob(): Received an unexpected [" + message->getName() +
"BatchService::submitWorkflowJob(): Received an unexpected [" + message->getName() +
"] message!");
}

Expand Down Expand Up @@ -398,30 +398,37 @@ namespace wrench {
}
}

/**
* @brief Terminate a standard job submitted to the compute service. Will throw a
* std::runtime_error exception if the job cannot be terminated, including
* if the cause is that the job is neither pending not running (perhaps alread
* terminated)
*
* @param job: the job
*
* @throw std::runtime_error
*/
void BatchService::terminateStandardJob(StandardJob *job) {

/**
* @brief Helper function called by terminateStandardJob() and terminateWorkJob() to process a job submission
* @param job
*/
void BatchService::terminateWorkflowJob(WorkflowJob *job) {

if (this->state == Service::DOWN) {
throw WorkflowExecutionException(std::shared_ptr<FailureCause>(new ServiceIsDown(this)));
}

std::string answer_mailbox = S4U_Mailbox::generateUniqueMailboxName("terminate_standard_job");

// Send a "terminate a pilot job" message to the daemon's mailbox_name
// Send a "terminate a job" message to the daemon's mailbox_name
try {
S4U_Mailbox::putMessage(this->mailbox_name,
new ComputeServiceTerminateStandardJobRequestMessage(answer_mailbox, job,
this->getMessagePayloadValueAsDouble(
BatchServiceMessagePayload::TERMINATE_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD)));
switch (job->getType()) {
case WorkflowJob::Type::STANDARD: {
S4U_Mailbox::putMessage(this->mailbox_name,
new ComputeServiceTerminateStandardJobRequestMessage(answer_mailbox, (StandardJob *)job,
this->getMessagePayloadValueAsDouble(
BatchServiceMessagePayload::TERMINATE_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD)));
break;
}
case WorkflowJob::Type::PILOT: {
S4U_Mailbox::putMessage(this->mailbox_name,
new ComputeServiceTerminatePilotJobRequestMessage(answer_mailbox, (PilotJob *)job,
this->getMessagePayloadValueAsDouble(
BatchServiceMessagePayload::TERMINATE_PILOT_JOB_REQUEST_MESSAGE_PAYLOAD)));
break;
}
}
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
}
Expand All @@ -435,22 +442,72 @@ namespace wrench {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<ComputeServiceTerminateStandardJobAnswerMessage *>(message.get())) {
// If no success, throw an exception
if (not msg->success) {
throw WorkflowExecutionException(msg->failure_cause);
switch (job->getType()) {
case WorkflowJob::Type::STANDARD: {
if (auto msg = dynamic_cast<ComputeServiceTerminateStandardJobAnswerMessage *>(message.get())) {
// If no success, throw an exception
if (not msg->success) {
throw WorkflowExecutionException(msg->failure_cause);
}
return;
}
}
case WorkflowJob::Type::PILOT: {
if (auto msg = dynamic_cast<ComputeServiceTerminatePilotJobAnswerMessage *>(message.get())) {
// If no success, throw an exception
if (not msg->success) {
throw WorkflowExecutionException(msg->failure_cause);
}
return;
}
}

} else {
throw std::runtime_error(
"BatchService::terminateStandardJob(): Received an unexpected [" +
message->getName() +
"] message!");
}

throw std::runtime_error("BatchService::terminateWorkflowJob(): Received an unexpected [" +
message->getName() +
"] message!");
}


/**
* @brief Terminate a standard job submitted to the compute service. Will throw a
* std::runtime_error exception if the job cannot be terminated, including
* if the cause is that the job is neither pending not running (perhaps alread
* terminated)
*
* @param job: the job
*
* @throw std::runtime_error
*/
void BatchService::terminateStandardJob(StandardJob *job) {

try {
this->terminateWorkflowJob(job);
} catch (std::exception &e) {
throw;
}
}


/**
* @brief Synchronously terminate a pilot job to the compute service
*
* @param job: a pilot job
*
* @throw WorkflowExecutionException
* @throw std::runtime_error
*/
void BatchService::terminatePilotJob(PilotJob *job) {

try {
this->terminateWorkflowJob(job);
} catch (std::exception &e) {
throw;
}
}


/**
* @brief Main method of the daemon
*
Expand Down Expand Up @@ -948,56 +1005,7 @@ namespace wrench {
this->releaseDaemonLock();
}

/**
* @brief Synchronously terminate a pilot job to the compute service
*
* @param job: a pilot job
*
* @throw WorkflowExecutionException
* @throw std::runtime_error
*/
void BatchService::terminatePilotJob(PilotJob *job) {

if (this->state == Service::DOWN) {
throw WorkflowExecutionException(std::shared_ptr<FailureCause>(new ServiceIsDown(this)));
}

std::string answer_mailbox = S4U_Mailbox::generateUniqueMailboxName("terminate_pilot_job");

// Send a "terminate a pilot job" message to the daemon's mailbox_name
try {
S4U_Mailbox::putMessage(this->mailbox_name,
new ComputeServiceTerminatePilotJobRequestMessage(
answer_mailbox, job,
this->getMessagePayloadValueAsDouble(
BatchServiceMessagePayload::TERMINATE_PILOT_JOB_REQUEST_MESSAGE_PAYLOAD)));
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
}

// Get the answer
std::unique_ptr<SimulationMessage> message = nullptr;

try {
message = S4U_Mailbox::getMessage(answer_mailbox, this->network_timeout);
} catch (std::shared_ptr<NetworkError> &cause) {
throw WorkflowExecutionException(cause);
}

if (auto msg = dynamic_cast<ComputeServiceTerminatePilotJobAnswerMessage *>(message.get())) {
// If no success, throw an exception
if (not msg->success) {
throw WorkflowExecutionException(msg->failure_cause);
}

} else {
throw std::runtime_error(
"BatchService::terminatePilotJob(): Received an unexpected [" +
message->getName() +
"] message!");
}

}

/**
* @brief Terminate the daemon, dealing with pending/running jobs
Expand Down

0 comments on commit f8e3693

Please sign in to comment.