From 4f44830ba7c59b53494bf3df7f5ae7efba7f118a Mon Sep 17 00:00:00 2001 From: Rafael Ferreira da Silva Date: Sat, 10 Nov 2018 08:20:29 -0800 Subject: [PATCH] fixes for properly running XML and JSON workflow trace files --- src/DAGMan.cpp | 3 +-- src/DAGManMonitor.cpp | 41 +++++++++++++++++------------------------ src/DAGManMonitor.h | 4 +++- src/DAGManScheduler.cpp | 1 - src/PegasusRun.cpp | 14 ++++++++++++++ 5 files changed, 35 insertions(+), 28 deletions(-) diff --git a/src/DAGMan.cpp b/src/DAGMan.cpp index a8f2b2c..c7438a9 100644 --- a/src/DAGMan.cpp +++ b/src/DAGMan.cpp @@ -36,8 +36,6 @@ namespace wrench { // DAGMan performs BFS search by default this->running_tasks_level = std::make_pair(0, 0); - - this->dagman_monitor = std::make_shared(this->hostname); } /** @@ -70,6 +68,7 @@ namespace wrench { WRENCH_INFO("DAGMan is about to execute a workflow with %lu tasks", this->getWorkflow()->getNumberOfTasks()); // starting monitor + this->dagman_monitor = std::make_shared(this->hostname, this->getWorkflow()); this->dagman_monitor->simulation = this->simulation; this->dagman_monitor->start(dagman_monitor, true); diff --git a/src/DAGManMonitor.cpp b/src/DAGManMonitor.cpp index 9a79d9d..fd9b2e2 100644 --- a/src/DAGManMonitor.cpp +++ b/src/DAGManMonitor.cpp @@ -19,7 +19,8 @@ namespace wrench { * * @param hostname: the name of the host on which the service will run */ - DAGManMonitor::DAGManMonitor(std::string &hostname) : Service(hostname, "dagman_monitor", "dagman_monitor") {} + DAGManMonitor::DAGManMonitor(std::string &hostname, Workflow *workflow) : + Service(hostname, "dagman_monitor", "dagman_monitor"), workflow(workflow) {} /** * @brief Destructor @@ -81,33 +82,25 @@ namespace wrench { * @throw std::runtime_error */ bool DAGManMonitor::processNextMessage() { - // Wait for a message - std::unique_ptr message; - + // Wait for a workflow execution event + std::unique_ptr event; try { - message = S4U_Mailbox::getMessage(this->mailbox_name); - } catch (std::shared_ptr &cause) { - return true; - } + event = this->workflow->waitForNextExecutionEvent(); - if (message == nullptr) { - WRENCH_INFO("Got a NULL message... Likely this means we're all done. Aborting"); - return false; + } catch (wrench::WorkflowExecutionException &e) { + throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); } - - WRENCH_INFO("Got a [%s] message", message->getName().c_str()); - - if (auto *msg = dynamic_cast(message.get())) { - this->completed_jobs.clear(); - return false; - - } else if (auto *msg = dynamic_cast(message.get())) { - processStandardJobCompletion(msg->job); - return true; - - } else { - throw std::runtime_error("Unexpected [" + message->getName() + "] message"); + switch (event->type) { + case wrench::WorkflowExecutionEvent::STANDARD_JOB_COMPLETION: { + StandardJob *job = (dynamic_cast(event.get()))->standard_job; + this->processStandardJobCompletion(job); + break; + } + default: { + throw std::runtime_error("Unexpected workflow execution event: " + std::to_string((int) (event->type))); + } } + return true; } /** diff --git a/src/DAGManMonitor.h b/src/DAGManMonitor.h index addf01d..41b08d6 100644 --- a/src/DAGManMonitor.h +++ b/src/DAGManMonitor.h @@ -20,7 +20,7 @@ namespace wrench { */ class DAGManMonitor : public Service { public: - DAGManMonitor(std::string &hostname); + DAGManMonitor(std::string &hostname, Workflow *workflow); ~DAGManMonitor() override; @@ -36,6 +36,8 @@ namespace wrench { void processStandardJobCompletion(StandardJob *job); std::set completed_jobs; + + Workflow *workflow; }; } } diff --git a/src/DAGManScheduler.cpp b/src/DAGManScheduler.cpp index aa02fb4..20f6a85 100644 --- a/src/DAGManScheduler.cpp +++ b/src/DAGManScheduler.cpp @@ -133,7 +133,6 @@ namespace wrench { } WRENCH_INFO("Scheduling task: %s", task->getID().c_str()); - job->pushCallbackMailbox(this->monitor_callback_mailbox); this->getJobManager()->submitJob(job, htcondor_service); // create job scheduled event this->simulation->getOutput().addTimestamp( diff --git a/src/PegasusRun.cpp b/src/PegasusRun.cpp index d6a492c..a22c594 100644 --- a/src/PegasusRun.cpp +++ b/src/PegasusRun.cpp @@ -86,12 +86,16 @@ int main(int argc, char **argv) { file_registry_service)); dagman->addWorkflow(&workflow); + // stage input data WRENCH_INFO("Staging workflow input files to external Storage Service..."); std::map input_files = workflow.getInputFiles(); std::map storage_services = config.getStorageServicesMap(); + int num_transfer_tasks = 0; + for (auto task : workflow.getTasks()) { if (task->getTaskType() == wrench::WorkflowTask::TaskType::TRANSFER) { + num_transfer_tasks++; for (auto file_transfer : task->getFileTransfers()) { if (not file_transfer.first->isOutput()) { if (file_transfer.second.first == "local") { @@ -104,6 +108,16 @@ int main(int argc, char **argv) { } } + if (num_transfer_tasks == 0) { + // handle the XML import case where there are no transfer tasks + for (auto file : input_files) { + for (auto storage_service : storage_services) { + simulation.stageFiles(input_files, storage_service.second); + } + simulation.stageFiles(input_files, htcondor_service->getLocalStorageService()); + } + } + // simulation execution WRENCH_INFO("Launching the Simulation..."); try {