Skip to content

Commit

Permalink
fixes for properly running XML and JSON workflow trace files
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelfsilva committed Nov 10, 2018
1 parent 810fb5c commit 4f44830
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 28 deletions.
3 changes: 1 addition & 2 deletions src/DAGMan.cpp
Expand Up @@ -36,8 +36,6 @@ namespace wrench {

// DAGMan performs BFS search by default
this->running_tasks_level = std::make_pair(0, 0);

this->dagman_monitor = std::make_shared<DAGManMonitor>(this->hostname);
}

/**
Expand Down Expand Up @@ -70,6 +68,7 @@ namespace wrench {
WRENCH_INFO("DAGMan is about to execute a workflow with %lu tasks", this->getWorkflow()->getNumberOfTasks());

// starting monitor
this->dagman_monitor = std::make_shared<DAGManMonitor>(this->hostname, this->getWorkflow());
this->dagman_monitor->simulation = this->simulation;
this->dagman_monitor->start(dagman_monitor, true);

Expand Down
41 changes: 17 additions & 24 deletions src/DAGManMonitor.cpp
Expand Up @@ -19,7 +19,8 @@ namespace wrench {
*
* @param hostname: the name of the host on which the service will run
*/
DAGManMonitor::DAGManMonitor(std::string &hostname) : Service(hostname, "dagman_monitor", "dagman_monitor") {}
DAGManMonitor::DAGManMonitor(std::string &hostname, Workflow *workflow) :
Service(hostname, "dagman_monitor", "dagman_monitor"), workflow(workflow) {}

/**
* @brief Destructor
Expand Down Expand Up @@ -81,33 +82,25 @@ namespace wrench {
* @throw std::runtime_error
*/
bool DAGManMonitor::processNextMessage() {
// Wait for a message
std::unique_ptr<SimulationMessage> message;

// Wait for a workflow execution event
std::unique_ptr<wrench::WorkflowExecutionEvent> event;
try {
message = S4U_Mailbox::getMessage(this->mailbox_name);
} catch (std::shared_ptr<NetworkError> &cause) {
return true;
}
event = this->workflow->waitForNextExecutionEvent();

if (message == nullptr) {
WRENCH_INFO("Got a NULL message... Likely this means we're all done. Aborting");
return false;
} catch (wrench::WorkflowExecutionException &e) {
throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString());
}

WRENCH_INFO("Got a [%s] message", message->getName().c_str());

if (auto *msg = dynamic_cast<ServiceStopDaemonMessage *>(message.get())) {
this->completed_jobs.clear();
return false;

} else if (auto *msg = dynamic_cast<ComputeServiceStandardJobDoneMessage *>(message.get())) {
processStandardJobCompletion(msg->job);
return true;

} else {
throw std::runtime_error("Unexpected [" + message->getName() + "] message");
switch (event->type) {
case wrench::WorkflowExecutionEvent::STANDARD_JOB_COMPLETION: {
StandardJob *job = (dynamic_cast<StandardJobCompletedEvent *>(event.get()))->standard_job;
this->processStandardJobCompletion(job);
break;
}
default: {
throw std::runtime_error("Unexpected workflow execution event: " + std::to_string((int) (event->type)));
}
}
return true;
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/DAGManMonitor.h
Expand Up @@ -20,7 +20,7 @@ namespace wrench {
*/
class DAGManMonitor : public Service {
public:
DAGManMonitor(std::string &hostname);
DAGManMonitor(std::string &hostname, Workflow *workflow);

~DAGManMonitor() override;

Expand All @@ -36,6 +36,8 @@ namespace wrench {
void processStandardJobCompletion(StandardJob *job);

std::set<WorkflowJob *> completed_jobs;

Workflow *workflow;
};
}
}
Expand Down
1 change: 0 additions & 1 deletion src/DAGManScheduler.cpp
Expand Up @@ -133,7 +133,6 @@ namespace wrench {

}
WRENCH_INFO("Scheduling task: %s", task->getID().c_str());
job->pushCallbackMailbox(this->monitor_callback_mailbox);
this->getJobManager()->submitJob(job, htcondor_service);
// create job scheduled event
this->simulation->getOutput().addTimestamp<SimulationTimestampJobScheduled>(
Expand Down
14 changes: 14 additions & 0 deletions src/PegasusRun.cpp
Expand Up @@ -86,12 +86,16 @@ int main(int argc, char **argv) {
file_registry_service));
dagman->addWorkflow(&workflow);

// stage input data
WRENCH_INFO("Staging workflow input files to external Storage Service...");
std::map<std::string, wrench::WorkflowFile *> input_files = workflow.getInputFiles();
std::map<std::string, wrench::StorageService *> storage_services = config.getStorageServicesMap();

int num_transfer_tasks = 0;

for (auto task : workflow.getTasks()) {
if (task->getTaskType() == wrench::WorkflowTask::TaskType::TRANSFER) {
num_transfer_tasks++;
for (auto file_transfer : task->getFileTransfers()) {
if (not file_transfer.first->isOutput()) {
if (file_transfer.second.first == "local") {
Expand All @@ -104,6 +108,16 @@ int main(int argc, char **argv) {
}
}

if (num_transfer_tasks == 0) {
// handle the XML import case where there are no transfer tasks
for (auto file : input_files) {
for (auto storage_service : storage_services) {
simulation.stageFiles(input_files, storage_service.second);
}
simulation.stageFiles(input_files, htcondor_service->getLocalStorageService());
}
}

// simulation execution
WRENCH_INFO("Launching the Simulation...");
try {
Expand Down

0 comments on commit 4f44830

Please sign in to comment.