Skip to content

Commit

Permalink
Merge pull request #48 from wrench-project/directories_in_scratch_space
Browse files Browse the repository at this point in the history
Directories in scratch space
  • Loading branch information
henricasanova committed Jun 14, 2018
2 parents f8499d1 + 05b0646 commit 939d674
Show file tree
Hide file tree
Showing 23 changed files with 793 additions and 176 deletions.
Expand Up @@ -56,6 +56,7 @@ namespace wrench {
std::set<std::tuple<std::string, unsigned long, double>> compute_resources,
StorageService* scratch_space,
bool part_of_pilot_job,
PilotJob* parent_pilot_job,
std::map<std::string, std::string> property_list,
std::map<std::string, std::string> messagepayload_list
);
Expand All @@ -82,6 +83,9 @@ namespace wrench {

bool part_of_pilot_job;

// if this is not a part of pilot job, then this value will be nullptr
PilotJob* parent_pilot_job;

// Files stored in scratch
std::set<WorkflowFile*> files_stored_in_scratch;

Expand Down
Expand Up @@ -26,6 +26,7 @@ namespace wrench {
class WorkerThreadWork;
class Workunit;
class ComputeThread;
class WorkflowJob;

/***********************/
/** \cond INTERNAL */
Expand All @@ -46,6 +47,7 @@ namespace wrench {
std::string callback_mailbox,
Workunit *workunit,
StorageService *scratch_space,
WorkflowJob* job,
double thread_startup_overhead = 0.0);

void kill();
Expand Down Expand Up @@ -75,6 +77,9 @@ namespace wrench {
std::set<WorkflowFile* > files_stored_in_scratch;
std::vector<std::shared_ptr<ComputeThread>> compute_threads;

// a reference to the job it is a part of (currently required for creating the /tmp directory in scratch space)
WorkflowJob* job;

};

/***********************/
Expand Down
39 changes: 32 additions & 7 deletions include/wrench/services/storage/StorageService.h
Expand Up @@ -14,6 +14,7 @@

#include <string>
#include <set>
#include <wrench/workflow/job/StandardJob.h>

#include "wrench/services/Service.h"
#include "wrench/workflow/execution_events/FailureCause.h"
Expand Down Expand Up @@ -47,33 +48,56 @@ namespace wrench {

virtual bool lookupFile(WorkflowFile *file);

virtual bool lookupFile(WorkflowFile *file, std::string dst_dir);

virtual void deleteFile(WorkflowFile *file, FileRegistryService *file_registry_service=nullptr);

virtual void deleteFile(WorkflowFile *file, std::string dst_dir, FileRegistryService *file_registry_service=nullptr);

virtual void copyFile(WorkflowFile *file, StorageService *src, std::string src_dir, std::string dst_dir);

virtual void readFile(WorkflowFile *file, std::string src_dir);

virtual void writeFile(WorkflowFile *file, std::string dst_dir);


/***********************/
/** \cond INTERNAL **/
/***********************/

virtual void deleteFile(WorkflowFile *file, WorkflowJob* job, FileRegistryService *file_registry_service=nullptr);

virtual bool lookupFile(WorkflowFile *file, WorkflowJob*);

virtual void copyFile(WorkflowFile *file, StorageService *src);

virtual void copyFile(WorkflowFile *file, StorageService *src, WorkflowJob* src_job, WorkflowJob* dst_job);

virtual void initiateFileCopy(std::string answer_mailbox,
WorkflowFile *file,
StorageService *src);

virtual void readFile(WorkflowFile *file);

virtual void initiateFileRead(std::string mailbox_that_should_receive_file_content, WorkflowFile *file);
virtual void readFile(WorkflowFile *file, WorkflowJob* job);

virtual void initiateFileRead(std::string mailbox_that_should_receive_file_content, WorkflowFile *file, std::string src_dir);

virtual void writeFile(WorkflowFile *file);

virtual void writeFile(WorkflowFile *file, WorkflowJob* job);

static void readFiles(std::set<WorkflowFile *> files,
std::map<WorkflowFile *, StorageService *> file_locations,
StorageService *default_storage_service,
std::set<WorkflowFile*>& files_in_scratch);
std::set<WorkflowFile*>& files_in_scratch,
WorkflowJob* job = nullptr);

static void writeFiles(std::set<WorkflowFile *> files,
std::map<WorkflowFile *, StorageService *> file_locations,
StorageService *default_storage_service,
std::set<WorkflowFile*>& files_in_scratch);
std::set<WorkflowFile*>& files_in_scratch,
WorkflowJob* job = nullptr);

static void deleteFiles(std::set<WorkflowFile *> files,
std::map<WorkflowFile *, StorageService *> file_locations,
Expand All @@ -92,10 +116,10 @@ namespace wrench {

void stageFile(WorkflowFile *);

void removeFileFromStorage(WorkflowFile *);
void removeFileFromStorage(WorkflowFile *, std::string);

/** @brief The set of files stored on the storage service */
std::set<WorkflowFile *> stored_files;
/** @brief The map of file directories and the set of files stored on those directories inside the storage service */
std::map<std::string, std::set<WorkflowFile *>> stored_files;
/** @brief The storage service's capacity */
double capacity;
/** @brief The storage service's occupied space */
Expand All @@ -114,7 +138,8 @@ namespace wrench {

static void writeOrReadFiles(FileOperation action, std::set<WorkflowFile *> files,
std::map<WorkflowFile *, StorageService *> file_locations,
StorageService *default_storage_service, std::set<WorkflowFile*>& files_in_scratch);
StorageService *default_storage_service, std::set<WorkflowFile*>& files_in_scratch,
WorkflowJob* job);


};
Expand Down
4 changes: 3 additions & 1 deletion include/wrench/services/storage/simple/NetworkConnection.h
Expand Up @@ -41,7 +41,7 @@ namespace wrench {
INCOMING_CONTROL
};

NetworkConnection(int type, WorkflowFile* file, std::string mailbox, std::string ack_mailbox);
NetworkConnection(int type, WorkflowFile* file, std::string file_dir, std::string mailbox, std::string ack_mailbox);
bool start();
bool hasFailed();
std::unique_ptr<SimulationMessage> getMessage();
Expand All @@ -50,6 +50,8 @@ namespace wrench {
int type;
/** @brief: the file (for a DATA connection) */
WorkflowFile *file;
/** @brief: the file directory inside the storage service where the file will be stored to/read from */
std::string file_dir;
/** @brief: the mailbox for this connection */
std::string mailbox;
/** @brief The low-level pending communication */
Expand Down
6 changes: 3 additions & 3 deletions include/wrench/services/storage/simple/SimpleStorageService.h
Expand Up @@ -95,12 +95,12 @@ namespace wrench {

unsigned long getNewUniqueNumber();

bool processFileWriteRequest(WorkflowFile *file, std::string answer_mailbox);
bool processFileWriteRequest(WorkflowFile *file, std::string dst_dir, std::string answer_mailbox);

bool processFileReadRequest(WorkflowFile *file, std::string answer_mailbox,
bool processFileReadRequest(WorkflowFile *file, std::string src_dir, std::string answer_mailbox,
std::string mailbox_to_receive_the_file_content);

bool processFileCopyRequest(WorkflowFile *file, StorageService *src, std::string answer_mailbox);
bool processFileCopyRequest(WorkflowFile *file, StorageService *src, std::string src_dir, std::string dst_dir, std::string answer_mailbox);

unsigned long num_concurrent_connections;

Expand Down
1 change: 1 addition & 0 deletions src/wrench/services/compute/batch/BatchService.cpp
Expand Up @@ -1599,6 +1599,7 @@ namespace wrench {
resources,
this->getScratch(),
false,
nullptr,
{{StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD,
this->getPropertyValueAsString(
BatchServiceProperty::THREAD_STARTUP_OVERHEAD)}},
Expand Down
Expand Up @@ -652,6 +652,7 @@ namespace wrench {
compute_resources,
getScratch(),
part_of_pilot_job,
this->containing_pilot_job,
{{StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, this->getPropertyValueAsString(
MultihostMulticoreComputeServiceProperty::THREAD_STARTUP_OVERHEAD)},
{StandardJobExecutorProperty::CORE_ALLOCATION_ALGORITHM, this->getPropertyValueAsString(
Expand Down Expand Up @@ -1628,7 +1629,7 @@ namespace wrench {

for (auto scratch_cleanup_file : this->files_in_scratch) {
try {
getScratch()->deleteFile(scratch_cleanup_file);
getScratch()->deleteFile(scratch_cleanup_file, this->containing_pilot_job, nullptr);
} catch (WorkflowExecutionException &e) {
throw;
}
Expand Down
Expand Up @@ -63,6 +63,7 @@ namespace wrench {
std::set<std::tuple<std::string, unsigned long, double>> compute_resources,
StorageService* scratch_space,
bool part_of_pilot_job,
PilotJob* parent_pilot_job,
std::map<std::string, std::string> property_list,
std::map<std::string, std::string> messagepayload_list
) :
Expand Down Expand Up @@ -165,6 +166,7 @@ namespace wrench {
this->scratch_space = scratch_space;
this->files_stored_in_scratch = {};
this->part_of_pilot_job = part_of_pilot_job;
this->parent_pilot_job = parent_pilot_job;


// set properties
Expand Down Expand Up @@ -483,6 +485,10 @@ namespace wrench {

// std::cerr << "CREATING A WORKUNIT EXECUTOR\n";

WorkflowJob* workflow_job = job;
if (this->part_of_pilot_job) {
workflow_job = this->parent_pilot_job;
}
std::shared_ptr<WorkunitMulticoreExecutor> workunit_executor = std::shared_ptr<WorkunitMulticoreExecutor>(
new WorkunitMulticoreExecutor(this->simulation,
target_host,
Expand All @@ -491,6 +497,7 @@ namespace wrench {
this->mailbox_name,
wu,
this->scratch_space,
workflow_job,
this->getPropertyValueAsDouble(
StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD)));

Expand Down Expand Up @@ -925,7 +932,7 @@ namespace wrench {
/** Perform scratch cleanup */
for (auto scratch_cleanup_file : files_stored_in_scratch) {
try {
this->scratch_space->deleteFile(scratch_cleanup_file);
this->scratch_space->deleteFile(scratch_cleanup_file, job, nullptr);
} catch (WorkflowExecutionException &e) {
throw;
}
Expand Down
Expand Up @@ -53,6 +53,7 @@ namespace wrench {
std::string callback_mailbox,
Workunit *workunit,
StorageService *scratch_space,
WorkflowJob* job,
double thread_startup_overhead) :
Service(hostname, "workunit_multicore_executor", "workunit_multicore_executor") {

Expand All @@ -71,6 +72,7 @@ namespace wrench {
this->ram_utilization = ram_utilization;
this->scratch_space = scratch_space;
this->files_stored_in_scratch = {};
this->job = job;

}

Expand Down Expand Up @@ -203,7 +205,11 @@ namespace wrench {
dst->getName().c_str());

S4U_Simulation::sleep(this->thread_startup_overhead);
dst->copyFile(file, src);
if (dst == this->scratch_space) {
dst->copyFile(file, src, nullptr, job);
} else {
dst->copyFile(file, src, nullptr, nullptr); // if there is no scratch space, then there is no notion of job's directory, it is always to / directory in such case
}
} catch (WorkflowExecutionException &e) {
throw;
}
Expand All @@ -223,7 +229,7 @@ namespace wrench {
try {
StorageService::readFiles(task->getInputFiles(),
work->file_locations,
this->scratch_space, files_stored_in_scratch);
this->scratch_space, files_stored_in_scratch, job);
} catch (WorkflowExecutionException &e) {
task->setInternalState(WorkflowTask::InternalState::TASK_FAILED);
throw;
Expand All @@ -245,7 +251,7 @@ namespace wrench {
// Write all output files
try {
StorageService::writeFiles(task->getOutputFiles(), work->file_locations, this->scratch_space,
files_stored_in_scratch);
files_stored_in_scratch, job);
} catch (WorkflowExecutionException &e) {
task->setInternalState(WorkflowTask::InternalState::TASK_FAILED);
throw;
Expand Down Expand Up @@ -298,7 +304,11 @@ namespace wrench {

try {
S4U_Simulation::sleep(this->thread_startup_overhead);
dst->copyFile(file, src);
if (src == this->scratch_space) {
dst->copyFile(file, src, job, nullptr);
} else {
dst->copyFile(file, src, nullptr, nullptr);
}

} catch (WorkflowExecutionException &e) {
throw;
Expand All @@ -311,7 +321,11 @@ namespace wrench {
StorageService *storage_service = std::get<1>(cleanup);
try {
S4U_Simulation::sleep(this->thread_startup_overhead);
storage_service->deleteFile(file);
if (storage_service == this->scratch_space) {
storage_service->deleteFile(file, job, nullptr);
} else {
storage_service->deleteFile(file, nullptr, nullptr);
}
} catch (WorkflowExecutionException &e) {
throw;
}
Expand Down

0 comments on commit 939d674

Please sign in to comment.