Skip to content

Commit

Permalink
On the way to fewer memory leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Jul 11, 2017
1 parent d863670 commit e3d4191
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 36 deletions.
Expand Up @@ -573,7 +573,7 @@ namespace wrench {
(this->working_threads.size() < this->max_num_worker_threads)) {

// Get the first work out of the pending work queue
WorkUnit *work_to_do = this->ready_works.front();
std::shared_ptr<WorkUnit> work_to_do = this->ready_works.front();
this->ready_works.pop_front();
this->running_works.insert(work_to_do);

Expand Down Expand Up @@ -891,7 +891,7 @@ namespace wrench {
*/
void MulticoreComputeService::terminateRunningStandardJob(StandardJob *job) {

std::vector<WorkUnit *> works_to_terminate;
std::vector<std::shared_ptr<WorkUnit>> works_to_terminate;

// Find all works to terminate
for (auto w : this->running_works) {
Expand All @@ -907,6 +907,7 @@ namespace wrench {
for (auto wt : this->working_threads) {
if (wt->work == w) {
worker_thread_to_terminate = wt;
this->working_threads.erase(wt);
break;
}
}
Expand All @@ -924,6 +925,9 @@ namespace wrench {
tw->kill();
}

// Free al memory (not necessary since worker_threads_to_terminates will be out of scope?)
worker_threads_to_terminate.clear();

// Set all non-COMPLETED tasks back to the READY state and wipe out output files
// TODO: At some point we'll have to think hard about the task life cycle and make it better/coherent
for (auto failed_task: job->getTasks()) {
Expand Down Expand Up @@ -1027,7 +1031,7 @@ namespace wrench {
*
* @throw std::runtime_error
*/
void MulticoreComputeService::processWorkCompletion(WorkUnitExecutor *worker_thread, WorkUnit *work) {
void MulticoreComputeService::processWorkCompletion(WorkUnitExecutor *worker_thread, std::shared_ptr<WorkUnit> work) {

// Remove the work thread from the working list
for (auto wt : this->working_threads) {
Expand Down Expand Up @@ -1062,7 +1066,22 @@ namespace wrench {
// Send the callback to the originator if the job has completed (i.e., if this
// work unit has no children)
if (work->children.size() == 0) {

// Erase all completed works for the job (there has to be an easier way to do this)
std::vector<std::shared_ptr<WorkUnit>> to_erase;
for (auto wu : this->completed_works) {
if (wu->job == work->job) {
to_erase.push_back(wu);
}
}
for (auto wu : to_erase) {
this->completed_works.erase(wu);
}
to_erase.clear();

// Erase the job
this->running_jobs.erase(work->job);

try {
S4U_Mailbox::dputMessage(work->job->popCallbackMailbox(),
new ComputeServiceStandardJobDoneMessage(work->job, this,
Expand Down Expand Up @@ -1098,7 +1117,7 @@ namespace wrench {
* @param cause: the cause of the failure
*/
void MulticoreComputeService::processWorkFailure(WorkUnitExecutor *worker_thread,
WorkUnit *work,
std::shared_ptr<WorkUnit> work,
std::shared_ptr<WorkflowExecutionFailureCause> cause) {

StandardJob *job = work->job;
Expand All @@ -1121,7 +1140,7 @@ namespace wrench {
this->running_works.erase(work);

// Remove all other works for the job in the "not ready" state
std::vector<WorkUnit *> to_erase;
std::vector<std::shared_ptr<WorkUnit>> to_erase;

for (auto w : this->non_ready_works) {
if (w->job == job) {
Expand All @@ -1134,7 +1153,7 @@ namespace wrench {
}

// Remove all other works for the job in the "ready" state
for (std::deque<WorkUnit *>::iterator it = this->ready_works.begin(); it != this->ready_works.end(); it++) {
for (auto it = this->ready_works.begin(); it != this->ready_works.end(); it++) {
if ((*it)->job == job) {
this->ready_works.erase(it);
}
Expand Down Expand Up @@ -1246,29 +1265,29 @@ namespace wrench {
*/
void MulticoreComputeService::createWorkForNewlyDispatchedJob(StandardJob *job) {

WorkUnit *pre_file_copies_work_unit = nullptr;
std::vector<WorkUnit *> task_work_units;
WorkUnit *post_file_copies_work_unit = nullptr;
WorkUnit *cleanup_workunit = nullptr;
std::shared_ptr<WorkUnit> pre_file_copies_work_unit = nullptr;
std::vector<std::shared_ptr<WorkUnit>> task_work_units;
std::shared_ptr<WorkUnit> post_file_copies_work_unit = nullptr;
std::shared_ptr<WorkUnit> cleanup_workunit = nullptr;

// Create the clean work unit, if any
if (job->cleanup_file_deletions.size() > 0) {
cleanup_workunit = new WorkUnit(job, {}, {}, {}, {}, job->cleanup_file_deletions);
cleanup_workunit = std::shared_ptr<WorkUnit>(new WorkUnit(job, {}, {}, {}, {}, job->cleanup_file_deletions));
}

// Create the pre_file_copies work unit, if any
if (job->pre_file_copies.size() > 0) {
pre_file_copies_work_unit = new WorkUnit(job, job->pre_file_copies, {}, {}, {}, {});
pre_file_copies_work_unit = std::shared_ptr<WorkUnit>(new WorkUnit(job, job->pre_file_copies, {}, {}, {}, {}));
}

// Create the post_file_copies work unit, if any
if (job->post_file_copies.size() > 0) {
post_file_copies_work_unit = new WorkUnit(job, {}, {}, {}, job->post_file_copies, {});
post_file_copies_work_unit = std::shared_ptr<WorkUnit>(new WorkUnit(job, {}, {}, {}, job->post_file_copies, {}));
}

// Create the task work units, if any
for (auto task : job->tasks) {
task_work_units.push_back(new WorkUnit(job, {}, {task}, job->file_locations, {}, {}));
task_work_units.push_back(std::shared_ptr<WorkUnit>(new WorkUnit(job, {}, {task}, job->file_locations, {}, {})));
}

// Add dependencies from pre copies to possible successors
Expand Down Expand Up @@ -1300,15 +1319,16 @@ namespace wrench {
}
}

// Insert work units in the ready or non-ready queues
std::vector<WorkUnit *> all_work_units;
// Create a list of all work units
std::vector<std::shared_ptr<WorkUnit>> all_work_units;
if (pre_file_copies_work_unit) all_work_units.push_back(pre_file_copies_work_unit);
for (auto twu : task_work_units) {
all_work_units.push_back(twu);
}
if (post_file_copies_work_unit) all_work_units.push_back(post_file_copies_work_unit);
if (cleanup_workunit) all_work_units.push_back(cleanup_workunit);

// Insert work units in the ready or non-ready queues
for (auto wu : all_work_units) {
if (wu->num_pending_parents == 0) {
this->ready_works.push_back(wu);
Expand Down
Expand Up @@ -19,6 +19,7 @@

#include "MulticoreComputeServiceProperty.h"
#include "WorkUnitExecutor.h"
#include "WorkUnit.h"


namespace wrench {
Expand Down Expand Up @@ -142,10 +143,11 @@ namespace wrench {
// Set of currently running (standard or pilot) jobs
std::set<WorkflowJob *> running_jobs;

std::set<WorkUnit *> non_ready_works;
std::deque<WorkUnit *> ready_works;
std::set<WorkUnit *> running_works;
std::set<WorkUnit *> completed_works;
// Work units
std::set<std::shared_ptr<WorkUnit>> non_ready_works;
std::deque<std::shared_ptr<WorkUnit>> ready_works;
std::set<std::shared_ptr<WorkUnit>> running_works;
std::set<std::shared_ptr<WorkUnit>> completed_works;

int main();

Expand All @@ -158,9 +160,9 @@ namespace wrench {

void failCurrentStandardJobs(std::shared_ptr<WorkflowExecutionFailureCause> cause);

void processWorkCompletion(WorkUnitExecutor *worker_thread, WorkUnit *work);
void processWorkCompletion(WorkUnitExecutor *worker_thread, std::shared_ptr<WorkUnit> work);

void processWorkFailure(WorkUnitExecutor *worker_thread, WorkUnit *work,
void processWorkFailure(WorkUnitExecutor *worker_thread, std::shared_ptr<WorkUnit> work,
std::shared_ptr<WorkflowExecutionFailureCause> cause);

void processPilotJobCompletion(PilotJob *job);
Expand Down
Expand Up @@ -177,7 +177,7 @@ namespace wrench {
*/
WorkerThreadWorkDoneMessage::WorkerThreadWorkDoneMessage(
WorkUnitExecutor *worker_thread,
WorkUnit *work,
std::shared_ptr<WorkUnit> work,
double payload) :
MulticoreComputeServiceMessage("WORKER_THREAD_WORK_DONE", payload) {
this->worker_thread = worker_thread;
Expand All @@ -195,7 +195,7 @@ namespace wrench {
*/
WorkerThreadWorkFailedMessage::WorkerThreadWorkFailedMessage(
WorkUnitExecutor *worker_thread,
WorkUnit *work,
std::shared_ptr<WorkUnit> work,
std::shared_ptr<WorkflowExecutionFailureCause> cause,
double payload):
MulticoreComputeServiceMessage("WORKER_THREAD_WORK_FAILED", payload) {
Expand Down
Expand Up @@ -155,13 +155,13 @@ namespace wrench {
public:
WorkerThreadWorkDoneMessage(
WorkUnitExecutor *worker_thread,
WorkUnit *work_unit,
std::shared_ptr<WorkUnit> work_unit,
double payload);

/** @brief The worker thread that performed the work */
WorkUnitExecutor *worker_thread;
/** @brief The work that was performed */
WorkUnit *work;
std::shared_ptr<WorkUnit> work;

};

Expand All @@ -172,14 +172,14 @@ namespace wrench {
public:
WorkerThreadWorkFailedMessage(
WorkUnitExecutor *worker_thread,
WorkUnit *work,
std::shared_ptr<WorkUnit> work,
std::shared_ptr<WorkflowExecutionFailureCause> cause,
double payload);

/** @brief The worker thread that failed to perform the work */
WorkUnitExecutor *worker_thread;
/** @brief The work that failed */
WorkUnit *work;
std::shared_ptr<WorkUnit> work;
/** @brief The cause of the failure */
std::shared_ptr<WorkflowExecutionFailureCause> cause;
};
Expand Down
Expand Up @@ -49,7 +49,7 @@ namespace wrench {
*
* @throw std::invalid_argument
*/
void WorkUnit::addDependency(WorkUnit *parent, WorkUnit *child) {
void WorkUnit::addDependency(std::shared_ptr<WorkUnit> parent, std::shared_ptr<WorkUnit> child) {
if ((parent == nullptr) || (child == nullptr)) {
throw std::invalid_argument("WorkUnit::addDependency(): Invalid arguments");
}
Expand Down
Expand Up @@ -15,6 +15,7 @@
#include <set>
#include <map>
#include <vector>
#include <memory>

namespace wrench {

Expand All @@ -41,12 +42,12 @@ namespace wrench {
std::set<std::tuple<WorkflowFile *, StorageService *, StorageService *>> post_file_copies,
std::set<std::tuple<WorkflowFile *, StorageService *>> cleanup_file_deletions);

static void addDependency(WorkUnit *parent, WorkUnit *child);
static void addDependency(std::shared_ptr<WorkUnit> parent, std::shared_ptr<WorkUnit> child);

/** @brief The job that this WorkUnit belongs to */
StandardJob *job;
/** @brief The WorkUnits that depend on this WorkUnit */
std::set<WorkUnit *> children;
std::set<std::shared_ptr<WorkUnit>> children;
/** @brief The number of WorkUnits this WorkUnit depends on */
unsigned long num_pending_parents;

Expand Down
Expand Up @@ -41,7 +41,7 @@ namespace wrench {
WorkUnitExecutor::WorkUnitExecutor(Simulation *simulation,
std::string hostname,
std::string callback_mailbox,
WorkUnit *work,
std::shared_ptr<WorkUnit> work,
StorageService *default_storage_service,
double startup_overhead) :
S4U_DaemonWithMailbox("worker_thread", "worker_thread") {
Expand Down Expand Up @@ -136,7 +136,7 @@ namespace wrench {
* @param work: the work to perform
*/
void
WorkUnitExecutor::performWork(WorkUnit *work) {
WorkUnitExecutor::performWork(std::shared_ptr<WorkUnit> work) {

// Simulate the startup overhead
S4U_Simulation::sleep(this->start_up_overhead);
Expand Down
Expand Up @@ -38,22 +38,22 @@ namespace wrench {

WorkUnitExecutor(Simulation *simulation,
std::string hostname, std::string callback_mailbox,
WorkUnit *work,
std::shared_ptr<WorkUnit> work,
StorageService *default_storage_service,
double startup_overhead = 0.0);


void kill();

/** @brief The WorkUnit this WorkUnitExecutor is supposed to perform */
WorkUnit *work;
std::shared_ptr<WorkUnit> work;

private:
int main();

Simulation *simulation;

void performWork(WorkUnit *work);
void performWork(std::shared_ptr<WorkUnit> work);
std::string callback_mailbox;
std::string hostname;
double start_up_overhead;
Expand Down

0 comments on commit e3d4191

Please sign in to comment.