diff --git a/include/wrench/services/compute/standard_job_executor/StandardJobExecutor.h b/include/wrench/services/compute/standard_job_executor/StandardJobExecutor.h index c67d3cefd2..edccef5802 100644 --- a/include/wrench/services/compute/standard_job_executor/StandardJobExecutor.h +++ b/include/wrench/services/compute/standard_job_executor/StandardJobExecutor.h @@ -56,6 +56,7 @@ namespace wrench { std::set> compute_resources, StorageService* scratch_space, bool part_of_pilot_job, + PilotJob* parent_pilot_job, std::map property_list, std::map messagepayload_list ); @@ -82,6 +83,9 @@ namespace wrench { bool part_of_pilot_job; + // if this is not a part of pilot job, then this value will be nullptr + PilotJob* parent_pilot_job; + // Files stored in scratch std::set files_stored_in_scratch; diff --git a/include/wrench/services/compute/standard_job_executor/WorkunitMulticoreExecutor.h b/include/wrench/services/compute/standard_job_executor/WorkunitMulticoreExecutor.h index 9663efabf7..dbc7ae3371 100644 --- a/include/wrench/services/compute/standard_job_executor/WorkunitMulticoreExecutor.h +++ b/include/wrench/services/compute/standard_job_executor/WorkunitMulticoreExecutor.h @@ -26,6 +26,7 @@ namespace wrench { class WorkerThreadWork; class Workunit; class ComputeThread; + class WorkflowJob; /***********************/ /** \cond INTERNAL */ @@ -46,6 +47,7 @@ namespace wrench { std::string callback_mailbox, Workunit *workunit, StorageService *scratch_space, + WorkflowJob* job, double thread_startup_overhead = 0.0); void kill(); @@ -75,6 +77,9 @@ namespace wrench { std::set files_stored_in_scratch; std::vector> compute_threads; + // a reference to the job it is a part of (currently required for creating the /tmp directory in scratch space) + WorkflowJob* job; + }; /***********************/ diff --git a/include/wrench/services/storage/StorageService.h b/include/wrench/services/storage/StorageService.h index 6cff01f6d3..40a97d331f 100644 --- a/include/wrench/services/storage/StorageService.h +++ b/include/wrench/services/storage/StorageService.h @@ -14,6 +14,7 @@ #include #include +#include #include "wrench/services/Service.h" #include "wrench/workflow/execution_events/FailureCause.h" @@ -47,33 +48,56 @@ namespace wrench { virtual bool lookupFile(WorkflowFile *file); + virtual bool lookupFile(WorkflowFile *file, std::string dst_dir); + virtual void deleteFile(WorkflowFile *file, FileRegistryService *file_registry_service=nullptr); + virtual void deleteFile(WorkflowFile *file, std::string dst_dir, FileRegistryService *file_registry_service=nullptr); + + virtual void copyFile(WorkflowFile *file, StorageService *src, std::string src_dir, std::string dst_dir); + + virtual void readFile(WorkflowFile *file, std::string src_dir); + + virtual void writeFile(WorkflowFile *file, std::string dst_dir); + + /***********************/ /** \cond INTERNAL **/ /***********************/ + virtual void deleteFile(WorkflowFile *file, WorkflowJob* job, FileRegistryService *file_registry_service=nullptr); + + virtual bool lookupFile(WorkflowFile *file, WorkflowJob*); + virtual void copyFile(WorkflowFile *file, StorageService *src); + virtual void copyFile(WorkflowFile *file, StorageService *src, WorkflowJob* src_job, WorkflowJob* dst_job); + virtual void initiateFileCopy(std::string answer_mailbox, WorkflowFile *file, StorageService *src); virtual void readFile(WorkflowFile *file); - virtual void initiateFileRead(std::string mailbox_that_should_receive_file_content, WorkflowFile *file); + virtual void readFile(WorkflowFile *file, WorkflowJob* job); + + virtual void initiateFileRead(std::string mailbox_that_should_receive_file_content, WorkflowFile *file, std::string src_dir); virtual void writeFile(WorkflowFile *file); + virtual void writeFile(WorkflowFile *file, WorkflowJob* job); + static void readFiles(std::set files, std::map file_locations, StorageService *default_storage_service, - std::set& files_in_scratch); + std::set& files_in_scratch, + WorkflowJob* job = nullptr); static void writeFiles(std::set files, std::map file_locations, StorageService *default_storage_service, - std::set& files_in_scratch); + std::set& files_in_scratch, + WorkflowJob* job = nullptr); static void deleteFiles(std::set files, std::map file_locations, @@ -92,10 +116,10 @@ namespace wrench { void stageFile(WorkflowFile *); - void removeFileFromStorage(WorkflowFile *); + void removeFileFromStorage(WorkflowFile *, std::string); - /** @brief The set of files stored on the storage service */ - std::set stored_files; + /** @brief The map of file directories and the set of files stored on those directories inside the storage service */ + std::map> stored_files; /** @brief The storage service's capacity */ double capacity; /** @brief The storage service's occupied space */ @@ -114,7 +138,8 @@ namespace wrench { static void writeOrReadFiles(FileOperation action, std::set files, std::map file_locations, - StorageService *default_storage_service, std::set& files_in_scratch); + StorageService *default_storage_service, std::set& files_in_scratch, + WorkflowJob* job); }; diff --git a/include/wrench/services/storage/simple/NetworkConnection.h b/include/wrench/services/storage/simple/NetworkConnection.h index 359d20797c..0256dd51fe 100644 --- a/include/wrench/services/storage/simple/NetworkConnection.h +++ b/include/wrench/services/storage/simple/NetworkConnection.h @@ -41,7 +41,7 @@ namespace wrench { INCOMING_CONTROL }; - NetworkConnection(int type, WorkflowFile* file, std::string mailbox, std::string ack_mailbox); + NetworkConnection(int type, WorkflowFile* file, std::string file_dir, std::string mailbox, std::string ack_mailbox); bool start(); bool hasFailed(); std::unique_ptr getMessage(); @@ -50,6 +50,8 @@ namespace wrench { int type; /** @brief: the file (for a DATA connection) */ WorkflowFile *file; + /** @brief: the file directory inside the storage service where the file will be stored to/read from */ + std::string file_dir; /** @brief: the mailbox for this connection */ std::string mailbox; /** @brief The low-level pending communication */ diff --git a/include/wrench/services/storage/simple/SimpleStorageService.h b/include/wrench/services/storage/simple/SimpleStorageService.h index ed7bbcacf0..1b47f6f023 100644 --- a/include/wrench/services/storage/simple/SimpleStorageService.h +++ b/include/wrench/services/storage/simple/SimpleStorageService.h @@ -95,12 +95,12 @@ namespace wrench { unsigned long getNewUniqueNumber(); - bool processFileWriteRequest(WorkflowFile *file, std::string answer_mailbox); + bool processFileWriteRequest(WorkflowFile *file, std::string dst_dir, std::string answer_mailbox); - bool processFileReadRequest(WorkflowFile *file, std::string answer_mailbox, + bool processFileReadRequest(WorkflowFile *file, std::string src_dir, std::string answer_mailbox, std::string mailbox_to_receive_the_file_content); - bool processFileCopyRequest(WorkflowFile *file, StorageService *src, std::string answer_mailbox); + bool processFileCopyRequest(WorkflowFile *file, StorageService *src, std::string src_dir, std::string dst_dir, std::string answer_mailbox); unsigned long num_concurrent_connections; diff --git a/src/wrench/services/compute/batch/BatchService.cpp b/src/wrench/services/compute/batch/BatchService.cpp index e1a208cee6..f98b93136a 100644 --- a/src/wrench/services/compute/batch/BatchService.cpp +++ b/src/wrench/services/compute/batch/BatchService.cpp @@ -1599,6 +1599,7 @@ namespace wrench { resources, this->getScratch(), false, + nullptr, {{StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, this->getPropertyValueAsString( BatchServiceProperty::THREAD_STARTUP_OVERHEAD)}}, diff --git a/src/wrench/services/compute/multihost_multicore/MultihostMulticoreComputeService.cpp b/src/wrench/services/compute/multihost_multicore/MultihostMulticoreComputeService.cpp index dfdd642d2e..f4257cfcb8 100644 --- a/src/wrench/services/compute/multihost_multicore/MultihostMulticoreComputeService.cpp +++ b/src/wrench/services/compute/multihost_multicore/MultihostMulticoreComputeService.cpp @@ -652,6 +652,7 @@ namespace wrench { compute_resources, getScratch(), part_of_pilot_job, + this->containing_pilot_job, {{StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, this->getPropertyValueAsString( MultihostMulticoreComputeServiceProperty::THREAD_STARTUP_OVERHEAD)}, {StandardJobExecutorProperty::CORE_ALLOCATION_ALGORITHM, this->getPropertyValueAsString( @@ -1628,7 +1629,7 @@ namespace wrench { for (auto scratch_cleanup_file : this->files_in_scratch) { try { - getScratch()->deleteFile(scratch_cleanup_file); + getScratch()->deleteFile(scratch_cleanup_file, this->containing_pilot_job, nullptr); } catch (WorkflowExecutionException &e) { throw; } diff --git a/src/wrench/services/compute/standard_job_executor/StandardJobExecutor.cpp b/src/wrench/services/compute/standard_job_executor/StandardJobExecutor.cpp index a53fd7ce7f..2a2931d2ad 100644 --- a/src/wrench/services/compute/standard_job_executor/StandardJobExecutor.cpp +++ b/src/wrench/services/compute/standard_job_executor/StandardJobExecutor.cpp @@ -63,6 +63,7 @@ namespace wrench { std::set> compute_resources, StorageService* scratch_space, bool part_of_pilot_job, + PilotJob* parent_pilot_job, std::map property_list, std::map messagepayload_list ) : @@ -165,6 +166,7 @@ namespace wrench { this->scratch_space = scratch_space; this->files_stored_in_scratch = {}; this->part_of_pilot_job = part_of_pilot_job; + this->parent_pilot_job = parent_pilot_job; // set properties @@ -483,6 +485,10 @@ namespace wrench { // std::cerr << "CREATING A WORKUNIT EXECUTOR\n"; + WorkflowJob* workflow_job = job; + if (this->part_of_pilot_job) { + workflow_job = this->parent_pilot_job; + } std::shared_ptr workunit_executor = std::shared_ptr( new WorkunitMulticoreExecutor(this->simulation, target_host, @@ -491,6 +497,7 @@ namespace wrench { this->mailbox_name, wu, this->scratch_space, + workflow_job, this->getPropertyValueAsDouble( StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD))); @@ -925,7 +932,7 @@ namespace wrench { /** Perform scratch cleanup */ for (auto scratch_cleanup_file : files_stored_in_scratch) { try { - this->scratch_space->deleteFile(scratch_cleanup_file); + this->scratch_space->deleteFile(scratch_cleanup_file, job, nullptr); } catch (WorkflowExecutionException &e) { throw; } diff --git a/src/wrench/services/compute/standard_job_executor/WorkunitMulticoreExecutor.cpp b/src/wrench/services/compute/standard_job_executor/WorkunitMulticoreExecutor.cpp index 31d7527b89..d0db0a6128 100644 --- a/src/wrench/services/compute/standard_job_executor/WorkunitMulticoreExecutor.cpp +++ b/src/wrench/services/compute/standard_job_executor/WorkunitMulticoreExecutor.cpp @@ -53,6 +53,7 @@ namespace wrench { std::string callback_mailbox, Workunit *workunit, StorageService *scratch_space, + WorkflowJob* job, double thread_startup_overhead) : Service(hostname, "workunit_multicore_executor", "workunit_multicore_executor") { @@ -71,6 +72,7 @@ namespace wrench { this->ram_utilization = ram_utilization; this->scratch_space = scratch_space; this->files_stored_in_scratch = {}; + this->job = job; } @@ -203,7 +205,11 @@ namespace wrench { dst->getName().c_str()); S4U_Simulation::sleep(this->thread_startup_overhead); - dst->copyFile(file, src); + if (dst == this->scratch_space) { + dst->copyFile(file, src, nullptr, job); + } else { + dst->copyFile(file, src, nullptr, nullptr); // if there is no scratch space, then there is no notion of job's directory, it is always to / directory in such case + } } catch (WorkflowExecutionException &e) { throw; } @@ -223,7 +229,7 @@ namespace wrench { try { StorageService::readFiles(task->getInputFiles(), work->file_locations, - this->scratch_space, files_stored_in_scratch); + this->scratch_space, files_stored_in_scratch, job); } catch (WorkflowExecutionException &e) { task->setInternalState(WorkflowTask::InternalState::TASK_FAILED); throw; @@ -245,7 +251,7 @@ namespace wrench { // Write all output files try { StorageService::writeFiles(task->getOutputFiles(), work->file_locations, this->scratch_space, - files_stored_in_scratch); + files_stored_in_scratch, job); } catch (WorkflowExecutionException &e) { task->setInternalState(WorkflowTask::InternalState::TASK_FAILED); throw; @@ -298,7 +304,11 @@ namespace wrench { try { S4U_Simulation::sleep(this->thread_startup_overhead); - dst->copyFile(file, src); + if (src == this->scratch_space) { + dst->copyFile(file, src, job, nullptr); + } else { + dst->copyFile(file, src, nullptr, nullptr); + } } catch (WorkflowExecutionException &e) { throw; @@ -311,7 +321,11 @@ namespace wrench { StorageService *storage_service = std::get<1>(cleanup); try { S4U_Simulation::sleep(this->thread_startup_overhead); - storage_service->deleteFile(file); + if (storage_service == this->scratch_space) { + storage_service->deleteFile(file, job, nullptr); + } else { + storage_service->deleteFile(file, nullptr, nullptr); + } } catch (WorkflowExecutionException &e) { throw; } diff --git a/src/wrench/services/storage/StorageService.cpp b/src/wrench/services/storage/StorageService.cpp index af953d9ebb..f33b7cfff1 100644 --- a/src/wrench/services/storage/StorageService.cpp +++ b/src/wrench/services/storage/StorageService.cpp @@ -63,7 +63,11 @@ namespace wrench { file->getSize(), (this->capacity - this->occupied_space)); throw std::runtime_error("StorageService::stageFile(): File exceeds free space capacity on storage service"); } - this->stored_files.insert(file); + if (this->stored_files.find("/") != this->stored_files.end()) { + this->stored_files["/"].insert(file); + } else { + this->stored_files["/"] = {file}; // By default all the staged files will go to the / directory + } this->occupied_space += file->getSize(); WRENCH_INFO("Stored file %s (storage usage: %.10lf%%)", file->getID().c_str(), 100.0 * this->occupied_space / this->capacity); @@ -74,23 +78,30 @@ namespace wrench { * @brief Remove a file from storage (internal method) * * @param file: a file + * @param dst_dir: the directory from where the file will be deleted * * @throw std::runtime_error */ - void StorageService::removeFileFromStorage(WorkflowFile *file) { + void StorageService::removeFileFromStorage(WorkflowFile *file, std::string dst_dir) { if (file == nullptr) { throw std::invalid_argument("StorageService::removeFileFromStorage(): Invalid arguments"); } - if (this->stored_files.find(file) == this->stored_files.end()) { + std::set files = this->stored_files[dst_dir]; + if (files.size() > 0) { + if (files.find(file) == files.end()) { + throw std::runtime_error( + "StorageService::removeFileFromStorage(): Attempting to remove a file that is not on the storage service"); + } + this->stored_files[dst_dir].erase(file); + this->occupied_space -= file->getSize(); + WRENCH_INFO("Deleted file %s (storage usage: %.2lf%%)", file->getID().c_str(), + 100.0 * this->occupied_space / this->capacity); + } else { throw std::runtime_error( "StorageService::removeFileFromStorage(): Attempting to remove a file that is not on the storage service"); } - this->stored_files.erase(file); - this->occupied_space -= file->getSize(); - WRENCH_INFO("Deleted file %s (storage usage: %.2lf%%)", file->getID().c_str(), - 100.0 * this->occupied_space / this->capacity); } /** @@ -145,6 +156,7 @@ namespace wrench { } } + /** * @brief Synchronously asks the storage service whether it holds a file * @@ -166,12 +178,60 @@ namespace wrench { throw WorkflowExecutionException(std::shared_ptr(new ServiceIsDown(this))); } + std::string dst_dir = "/"; + return this->lookupFile(file,dst_dir); + } + + /** + * @brief Synchronously asks the storage service whether it holds a file + * + * @param file: the file + * @param job: the job for whom we are doing the look up, the file is stored in this job's directory + * + * @return true or false + * + * @throw WorkflowExecutionException + * @throw std::runtime_error + * @throw std::invalid_arguments + */ + bool StorageService::lookupFile(WorkflowFile *file, WorkflowJob* job) { + + if (file == nullptr) { + throw std::invalid_argument("StorageService::lookupFile(): Invalid arguments"); + } + + if (this->state == DOWN) { + throw WorkflowExecutionException(std::shared_ptr(new ServiceIsDown(this))); + } + + std::string dst_dir = "/"; + if (job != nullptr) { + dst_dir += job->getName(); + } + return this->lookupFile(file,dst_dir); + } + + /** + * @brief Synchronously asks the storage service whether it holds a file + * + * @param file: the file + * @param dst_dir: the directory from where we are doing the look up + * + * @return true or false + * + * @throw WorkflowExecutionException + * @throw std::runtime_error + * @throw std::invalid_arguments + */ + bool StorageService::lookupFile(WorkflowFile *file, std::string dst_dir) { + // Send a message to the daemon std::string answer_mailbox = S4U_Mailbox::generateUniqueMailboxName("lookup_file"); try { S4U_Mailbox::putMessage(this->mailbox_name, new StorageServiceFileLookupRequestMessage( answer_mailbox, file, + dst_dir, this->getMessagePayloadValueAsDouble(StorageServiceMessagePayload::FILE_LOOKUP_REQUEST_MESSAGE_PAYLOAD))); } catch (std::shared_ptr &cause) { throw WorkflowExecutionException(cause); @@ -211,6 +271,49 @@ namespace wrench { throw WorkflowExecutionException(std::shared_ptr(new ServiceIsDown(this))); } + std::string src_dir = "/"; + this->readFile(file,src_dir); + } + + /** + * @brief Synchronously read a file from the storage service + * + * @param file: the file + * @param job: the job associated to the read of the workflow file + * + * @throw WorkflowExecutionException + * @throw std::runtime_error + * @throw std::invalid_arguments + */ + void StorageService::readFile(WorkflowFile *file, WorkflowJob* job) { + + if (file == nullptr) { + throw std::invalid_argument("StorageService::readFile(): Invalid arguments"); + } + + if (this->state == DOWN) { + throw WorkflowExecutionException(std::shared_ptr(new ServiceIsDown(this))); + } + + std::string src_dir = "/"; + if (job != nullptr) { + src_dir += job->getName(); + } + this->readFile(file,src_dir); + } + + /** + * @brief Synchronously read a file from the storage service + * + * @param file: the file + * @param src_dir: the directory associated to the read of the workflow file + * + * @throw WorkflowExecutionException + * @throw std::runtime_error + * @throw std::invalid_arguments + */ + + void StorageService::readFile(WorkflowFile *file, std::string src_dir) { // Send a message to the daemon std::string answer_mailbox = S4U_Mailbox::generateUniqueMailboxName("read_file"); try { @@ -218,6 +321,7 @@ namespace wrench { new StorageServiceFileReadRequestMessage(answer_mailbox, answer_mailbox, file, + src_dir, this->getMessagePayloadValueAsDouble( StorageServiceMessagePayload::FILE_READ_REQUEST_MESSAGE_PAYLOAD))); } catch (std::shared_ptr &cause) { @@ -261,7 +365,6 @@ namespace wrench { } } - /** * @brief Synchronously write a file to the storage service * @@ -280,12 +383,55 @@ namespace wrench { throw WorkflowExecutionException(std::shared_ptr(new ServiceIsDown(this))); } + std::string dst_dir = "/"; + this->writeFile(file,dst_dir); + } + + + /** + * @brief Synchronously write a file to the storage service + * + * @param file: the file + * @param job: the job associated to the write of the workflow file + * + * @throw WorkflowExecutionException + * @throw std::runtime_error + */ + void StorageService::writeFile(WorkflowFile *file, WorkflowJob* job) { + + if (file == nullptr) { + throw std::invalid_argument("StorageService::writeFile(): Invalid arguments"); + } + + if (this->state == DOWN) { + throw WorkflowExecutionException(std::shared_ptr(new ServiceIsDown(this))); + } + + std::string dst_dir = "/"; + if (job != nullptr) { + dst_dir += job->getName(); + } + this->writeFile(file,dst_dir); + } + + /** + * @brief Synchronously write a file to the storage service + * + * @param file: the file + * @param dst_dir: the directory associated to the write of the workflow file + * + * @throw WorkflowExecutionException + * @throw std::runtime_error + */ + void StorageService::writeFile(WorkflowFile *file, std::string dst_dir) { + // Send a message to the daemon std::string answer_mailbox = S4U_Mailbox::generateUniqueMailboxName("write_file"); try { S4U_Mailbox::putMessage(this->mailbox_name, new StorageServiceFileWriteRequestMessage(answer_mailbox, file, + dst_dir, this->getMessagePayloadValueAsDouble( StorageServiceMessagePayload::FILE_WRITE_REQUEST_MESSAGE_PAYLOAD))); } catch (std::shared_ptr &cause) { @@ -330,6 +476,7 @@ namespace wrench { * @param file_locations: a map of files to storage services * @param default_storage_service: the storage service to use when files don't appear in the file_locations map (which must be a compute service's scratch storage) * @param files_in_scratch: the set of files that have been written to the default storage service (which must be a compute service's scratch storage) + * @param job: the job which is doing the read of the files * * @throw std::runtime_error * @throw WorkflowExecutionException @@ -337,9 +484,10 @@ namespace wrench { void StorageService::readFiles(std::set files, std::map file_locations, StorageService *default_storage_service, - std::set& files_in_scratch) { + std::set& files_in_scratch, + WorkflowJob* job) { try { - StorageService::writeOrReadFiles(READ, std::move(files), std::move(file_locations), default_storage_service, files_in_scratch); + StorageService::writeOrReadFiles(READ, std::move(files), std::move(file_locations), default_storage_service, files_in_scratch, job); } catch (std::runtime_error &e) { throw; } catch (WorkflowExecutionException &e) { @@ -354,6 +502,8 @@ namespace wrench { * @param file_locations: a map of files to storage services * @param default_storage_service: the storage service to use when files don't appear in the file_locations map (which must be a compute service's scratch storage) * @param files_in_scratch: the set of files that have been writted to the default storage service (which must be a compute service's scratch storage) + * @param job: the job which is doing the write of the files + * * * @throw std::runtime_error * @throw WorkflowExecutionException @@ -361,9 +511,10 @@ namespace wrench { void StorageService::writeFiles(std::set files, std::map file_locations, StorageService *default_storage_service, - std::set& files_in_scratch) { + std::set& files_in_scratch, + WorkflowJob* job) { try { - StorageService::writeOrReadFiles(WRITE, std::move(files), std::move(file_locations), default_storage_service, files_in_scratch); + StorageService::writeOrReadFiles(WRITE, std::move(files), std::move(file_locations), default_storage_service, files_in_scratch, job); } catch (std::runtime_error &e) { throw; } catch (WorkflowExecutionException &e) { @@ -379,6 +530,7 @@ namespace wrench { * @param file_locations: a map of files to storage services * @param default_storage_service: the storage service to use when files don't appear in the file_locations map (which must be a compute service's scratch storage) * @param files_in_scratch: the set of files that have been writted to the default storage service (which must be a compute service's scratch storage) + * @param job: the job associated to the write/read of the files * * @throw std::runtime_error * @throw WorkflowExecutionException @@ -387,7 +539,8 @@ namespace wrench { std::set files, std::map file_locations, StorageService *default_storage_service, - std::set& files_in_scratch) { + std::set& files_in_scratch, + WorkflowJob* job) { for (auto const &f : files) { if (f == nullptr) { @@ -413,7 +566,13 @@ namespace wrench { if (action == READ) { try { WRENCH_INFO("Reading file %s from storage service %s", f->getID().c_str(), storage_service->getName().c_str()); - storage_service->readFile(f); + if (storage_service != default_storage_service) { + //if the storage service where I am going to read from is not the default storage service (scratch), then I + // don't want to read from job's temp directory, rather I would like to read from / directory of the storage service + storage_service->readFile(f, nullptr); + } else { + storage_service->readFile(f, job); + } WRENCH_INFO("File %s read", f->getID().c_str()); } catch (std::runtime_error &e) { throw; @@ -421,13 +580,15 @@ namespace wrench { throw; } } else { - // Write the file - if (storage_service == default_storage_service) { - files_in_scratch.insert(f); - } try { WRENCH_INFO("Writing file %s to storage service %s", f->getID().c_str(), storage_service->getName().c_str()); - storage_service->writeFile(f); + // Write the file + if (storage_service == default_storage_service) { + files_in_scratch.insert(f); + storage_service->writeFile(f, job); + } else { + storage_service->writeFile(f, nullptr); + } WRENCH_INFO("Wrote file %s", f->getID().c_str()); } catch (std::runtime_error &e) { throw; @@ -438,6 +599,7 @@ namespace wrench { } } + /** * @brief Synchronously asks the storage service to delete a file copy * @@ -459,14 +621,60 @@ namespace wrench { throw WorkflowExecutionException(std::shared_ptr(new ServiceIsDown(this))); } - bool unregister = (file_registry_service != nullptr); + std::string dst_dir = "/"; + this->deleteFile(file, dst_dir, file_registry_service); + } + /** + * @brief Synchronously asks the storage service to delete a file copy + * + * @param file: the file + * @param job: the job associated to deleting this file + * @param file_registry_service: a file registry service that should be updated once the + * file deletion has (successfully) completed (none if nullptr) + * + * @throw WorkflowExecutionException + * @throw std::runtime_error + * @throw std::invalid_argument + */ + void StorageService::deleteFile(WorkflowFile *file , WorkflowJob* job, FileRegistryService *file_registry_service) { + + if (file == nullptr) { + throw std::invalid_argument("StorageService::deleteFile(): Invalid arguments"); + } + + if (this->state == DOWN) { + throw WorkflowExecutionException(std::shared_ptr(new ServiceIsDown(this))); + } + + std::string dst_dir = "/"; + if (job != nullptr) { + dst_dir += job->getName(); + } + this->deleteFile(file, dst_dir, file_registry_service); + } + + /* @brief Synchronously asks the storage service to delete a file copy + * + * @param file: the file + * @param dst_dir: the directory from where the file will be deleted + * @param file_registry_service: a file registry service that should be updated once the + * file deletion has (successfully) completed (none if nullptr) + * + * @throw WorkflowExecutionException + * @throw std::runtime_error + * @throw std::invalid_argument + */ + void StorageService::deleteFile(WorkflowFile *file, std::string dst_dir, FileRegistryService *file_registry_service) { + + bool unregister = (file_registry_service != nullptr); // Send a message to the daemon std::string answer_mailbox = S4U_Mailbox::generateUniqueMailboxName("delete_file"); try { S4U_Mailbox::putMessage(this->mailbox_name, new StorageServiceFileDeleteRequestMessage( answer_mailbox, file, + dst_dir, this->getMessagePayloadValueAsDouble(StorageServiceMessagePayload::FILE_DELETE_REQUEST_MESSAGE_PAYLOAD))); } catch (std::shared_ptr &cause) { throw WorkflowExecutionException(cause); @@ -531,12 +739,14 @@ namespace wrench { } } + /** * @brief Synchronously ask the storage service to read a file from another storage service * * @param file: the file to copy * @param src: the storage service from which to read the file - * + * @param src_job: the job from whose directory we are copying this file + * @param dst_job: the job to whose directory we are copying this file * @throw WorkflowExecutionException * @throw std::runtime_error * @throw std::invalid_argument @@ -556,6 +766,70 @@ namespace wrench { throw WorkflowExecutionException(std::shared_ptr(new ServiceIsDown(this))); } + std::string src_dir = "/"; + + std::string dst_dir = "/"; + + this->copyFile(file,src,src_dir,dst_dir); + } + + /** + * @brief Synchronously ask the storage service to read a file from another storage service + * + * @param file: the file to copy + * @param src: the storage service from which to read the file + * @param src_job: the job from whose directory we are copying this file + * @param dst_job: the job to whose directory we are copying this file + * @throw WorkflowExecutionException + * @throw std::runtime_error + * @throw std::invalid_argument + */ + void StorageService::copyFile(WorkflowFile *file, StorageService *src, WorkflowJob* src_job, WorkflowJob* dst_job) { + + + if ((file == nullptr) || (src == nullptr)) { + throw std::invalid_argument("StorageService::copyFile(): Invalid arguments"); + } + + if (src == this) { + throw std::invalid_argument("StorageService::copyFile(): Cannot copy a file from oneself"); + } + + if (src_job != nullptr && dst_job != nullptr) { + throw std::invalid_argument("StorageService::copyFile(): Cannot copy files from one job's private directory to another job's private directory"); + } + + if (this->state == DOWN) { + throw WorkflowExecutionException(std::shared_ptr(new ServiceIsDown(this))); + } + + std::string src_dir = "/"; + if (src_job != nullptr) { + src_dir += src_job->getName(); + } + + std::string dst_dir = "/"; + if (dst_job != nullptr) { + dst_dir += dst_job->getName(); + } + + this->copyFile(file,src,src_dir,dst_dir); + } + + /** + * @brief Synchronously ask the storage service to read a file from another storage service + * + * @param file: the file to copy + * @param src: the storage service from which to read the file + * @param src_dir: the directory from where we are copying this file + * @param dst_dir: the directory to where we are copying this file + * @throw WorkflowExecutionException + * @throw std::runtime_error + * @throw std::invalid_argument + */ + void StorageService::copyFile(WorkflowFile *file, StorageService *src, std::string src_dir, + std::string dst_dir) { + // Send a message to the daemon std::string answer_mailbox = S4U_Mailbox::generateUniqueMailboxName("copy_file"); try { @@ -563,6 +837,8 @@ namespace wrench { answer_mailbox, file, src, + src_dir, + dst_dir, nullptr, this->getMessagePayloadValueAsDouble(StorageServiceMessagePayload::FILE_COPY_REQUEST_MESSAGE_PAYLOAD))); } catch (std::shared_ptr &cause) { @@ -615,10 +891,14 @@ namespace wrench { // Send a message to the daemon try { + std::string src_dir = "/"; + std::string dst_dir = "/"; S4U_Mailbox::putMessage(this->mailbox_name, new StorageServiceFileCopyRequestMessage( answer_mailbox, file, src, + src_dir, // I am not sure if it should always be /, but DataMovementManager calls this initiateFileCopy function and, + dst_dir, // so we probably don't need to copy from a job's directory. So, always from / directory to / directory nullptr, this->getMessagePayloadValueAsDouble(StorageServiceMessagePayload::FILE_COPY_REQUEST_MESSAGE_PAYLOAD))); } catch (std::shared_ptr &cause) { @@ -632,12 +912,13 @@ namespace wrench { * * @param mailbox_that_should_receive_file_content: the mailbox to which the file should be sent * @param file: the file + * @param src_dir: the file directory from where the file will be read * * @throw WorkflowExecutionException * @throw std::runtime_error * @throw std::invalid_arguments */ - void StorageService::initiateFileRead(std::string mailbox_that_should_receive_file_content, WorkflowFile *file) { + void StorageService::initiateFileRead(std::string mailbox_that_should_receive_file_content, WorkflowFile *file, std::string src_dir) { WRENCH_INFO("Initiating a file read operation for file %s on storage service %s", file->getID().c_str(), this->getName().c_str()); @@ -658,6 +939,7 @@ namespace wrench { new StorageServiceFileReadRequestMessage(request_answer_mailbox, mailbox_that_should_receive_file_content, file, + src_dir, this->getMessagePayloadValueAsDouble( StorageServiceMessagePayload::FILE_READ_REQUEST_MESSAGE_PAYLOAD))); } catch (std::shared_ptr &cause) { diff --git a/src/wrench/services/storage/StorageServiceMessage.cpp b/src/wrench/services/storage/StorageServiceMessage.cpp index df1248b1e4..3d2223b64d 100644 --- a/src/wrench/services/storage/StorageServiceMessage.cpp +++ b/src/wrench/services/storage/StorageServiceMessage.cpp @@ -59,12 +59,14 @@ namespace wrench { * @brief Constructor * @param answer_mailbox: the mailbox to which to send the answer * @param file: the file + * @param dst_dir: the file directory to look up the file for * @param payload: the message size in bytes * * @throw std::invalid_argument */ StorageServiceFileLookupRequestMessage::StorageServiceFileLookupRequestMessage(std::string answer_mailbox, WorkflowFile *file, + std::string& dst_dir, double payload) : StorageServiceMessage("FILE_LOOKUP_REQUEST", payload) { @@ -73,6 +75,7 @@ namespace wrench { } this->answer_mailbox = answer_mailbox; this->file = file; + this->dst_dir = dst_dir; } /** @@ -100,12 +103,14 @@ namespace wrench { * @brief Constructor * @param answer_mailbox: the mailbox to which to send the answer * @param file: the file + * @param dst_dir: the file directory from where the file will be deleted * @param payload: the message size in bytes * * @throw std::invalid_argument */ StorageServiceFileDeleteRequestMessage::StorageServiceFileDeleteRequestMessage(std::string answer_mailbox, WorkflowFile *file, + std::string& dst_dir, double payload) : StorageServiceMessage("FILE_DELETE_REQUEST", payload) { @@ -114,6 +119,7 @@ namespace wrench { } this->file = file; this->answer_mailbox = answer_mailbox; + this->dst_dir = dst_dir; } /** @@ -149,6 +155,8 @@ namespace wrench { * @param answer_mailbox: the mailbox to which to send the answer * @param file: the file * @param src: the source storage service + * @param src_dir: the file directory from where the file will be copied + * @param dst_dir: the file directory where the file will be stored * @param file_registry_service: the file registry service to update (nullptr if none) * @param payload: the message size in bytes * @@ -157,6 +165,8 @@ namespace wrench { StorageServiceFileCopyRequestMessage::StorageServiceFileCopyRequestMessage(std::string answer_mailbox, WorkflowFile *file, StorageService *src, + std::string& src_dir, + std::string& dst_dir, FileRegistryService *file_registry_service, double payload) : StorageServiceMessage( "FILE_COPY_REQUEST", payload) { @@ -167,6 +177,8 @@ namespace wrench { this->file = file; this->src = src; this->file_registry_service = file_registry_service; + this->src_dir = src_dir; + this->dst_dir = dst_dir; } /** @@ -214,6 +226,7 @@ namespace wrench { */ StorageServiceFileWriteRequestMessage::StorageServiceFileWriteRequestMessage(std::string answer_mailbox, WorkflowFile *file, + std::string& dst_dir, double payload) : StorageServiceMessage("FILE_WRITE_REQUEST", payload) { @@ -223,6 +236,7 @@ namespace wrench { this->payload += file->getSize(); this->answer_mailbox = answer_mailbox; this->file = file; + this->dst_dir = dst_dir; } /** @@ -266,6 +280,7 @@ namespace wrench { StorageServiceFileReadRequestMessage::StorageServiceFileReadRequestMessage(std::string answer_mailbox, std::string mailbox_to_receive_the_file_content, WorkflowFile *file, + std::string& src_dir, double payload) : StorageServiceMessage( "FILE_READ_REQUEST", payload) { @@ -275,6 +290,7 @@ namespace wrench { this->answer_mailbox = answer_mailbox; this->mailbox_to_receive_the_file_content = mailbox_to_receive_the_file_content; this->file = file; + this->src_dir = src_dir; } /** diff --git a/src/wrench/services/storage/StorageServiceMessage.h b/src/wrench/services/storage/StorageServiceMessage.h index ba35b26ab0..2820930ba1 100644 --- a/src/wrench/services/storage/StorageServiceMessage.h +++ b/src/wrench/services/storage/StorageServiceMessage.h @@ -60,12 +60,14 @@ namespace wrench { */ class StorageServiceFileLookupRequestMessage : public StorageServiceMessage { public: - StorageServiceFileLookupRequestMessage(std::string answer_mailbox, WorkflowFile *file, double payload); + StorageServiceFileLookupRequestMessage(std::string answer_mailbox, WorkflowFile *file, std::string& dst_dir, double payload); /** @brief Mailbox to which the answer message should be sent */ std::string answer_mailbox; /** @brief The file to lookup */ WorkflowFile *file; + /** @brief The file directory where to lookup the file for */ + std::string dst_dir; }; /** @@ -88,12 +90,15 @@ namespace wrench { public: StorageServiceFileDeleteRequestMessage(std::string answer_mailbox, WorkflowFile *file, + std::string& dst_dir, double payload); /** @brief Mailbox to which the answer message should be sent */ std::string answer_mailbox; /** @brief The file to delete */ WorkflowFile *file; + /** @brief The file directory from where the file will be deleted */ + std::string dst_dir; }; /** @@ -123,6 +128,7 @@ namespace wrench { class StorageServiceFileCopyRequestMessage : public StorageServiceMessage { public: StorageServiceFileCopyRequestMessage(std::string answer_mailbox, WorkflowFile *file, StorageService *src, + std::string& src_dir, std::string& dst_dir, FileRegistryService *file_registry_service, double payload); @@ -132,6 +138,10 @@ namespace wrench { WorkflowFile *file; /** @brief The storage service from which to copy the file */ StorageService *src; + /** @brief The file directory from where the file will be copied */ + std::string src_dir; + /** @brief The file directory inside the storage service where the file will be stored */ + std::string dst_dir; /** @brief The file registry service to update, or none if nullptr */ FileRegistryService *file_registry_service; }; @@ -166,12 +176,14 @@ namespace wrench { */ class StorageServiceFileWriteRequestMessage : public StorageServiceMessage { public: - StorageServiceFileWriteRequestMessage(std::string answer_mailbox, WorkflowFile *file, double payload); + StorageServiceFileWriteRequestMessage(std::string answer_mailbox, WorkflowFile *file, std::string& dst_dir, double payload); /** @brief Mailbox to which the answer message should be sent */ std::string answer_mailbox; /** @brief The file to write */ WorkflowFile *file; + /** @brief The file directory to write the file to */ + std::string dst_dir; }; /** @@ -205,7 +217,7 @@ namespace wrench { public: StorageServiceFileReadRequestMessage(std::string answer_mailbox, std::string mailbox_to_receive_the_file_content, - WorkflowFile *file, double payload); + WorkflowFile *file, std::string& src_dir, double payload); /** @brief The mailbox to which the answer message should be sent */ std::string answer_mailbox; @@ -213,6 +225,8 @@ namespace wrench { std::string mailbox_to_receive_the_file_content; /** @brief The file to read */ WorkflowFile *file; + /** @brief The source directory from which to read the file */ + std::string src_dir; }; /** diff --git a/src/wrench/services/storage/simple/NetworkConnection.cpp b/src/wrench/services/storage/simple/NetworkConnection.cpp index da8f37f0c4..45e3fc0239 100644 --- a/src/wrench/services/storage/simple/NetworkConnection.cpp +++ b/src/wrench/services/storage/simple/NetworkConnection.cpp @@ -27,14 +27,16 @@ namespace wrench { * - NetworkConnection::OUTGOING_DATA * - NetworkConnection::INCOMING_CONTROL * @param file: the file (for DATA connection only) + * @param file_dir: the file directory inside the storage service where the file will be stored to/read from * @param mailbox: the mailbox: the mailbox for this connection * @param ack_mailbox: the mailbox to which an ack should be sent when the connection completes/fails */ - NetworkConnection::NetworkConnection(int type, WorkflowFile *file, std::string mailbox, std::string ack_mailbox) { + NetworkConnection::NetworkConnection(int type, WorkflowFile *file, std::string file_dir, std::string mailbox, std::string ack_mailbox) { this->type = type; this->file = file; this->mailbox = mailbox; this->ack_mailbox = ack_mailbox; + this->file_dir = file_dir; if (this->mailbox.empty()) { throw std::invalid_argument("NetworkConnection::NetworkConnection(): empty mailbox_name"); diff --git a/src/wrench/services/storage/simple/SimpleStorageService.cpp b/src/wrench/services/storage/simple/SimpleStorageService.cpp index 0cfaa36af6..fe4c96bfc5 100644 --- a/src/wrench/services/storage/simple/SimpleStorageService.cpp +++ b/src/wrench/services/storage/simple/SimpleStorageService.cpp @@ -113,7 +113,7 @@ namespace wrench { // Post a recv on my standard mailbox_name in case there is none pending if (should_add_incoming_control_connection) { this->network_connection_manager->addConnection(std::unique_ptr( - new NetworkConnection(NetworkConnection::INCOMING_CONTROL, nullptr, this->mailbox_name, "") + new NetworkConnection(NetworkConnection::INCOMING_CONTROL, nullptr, "/", this->mailbox_name, "") )); should_add_incoming_control_connection = false; } @@ -197,13 +197,20 @@ namespace wrench { bool success = true; std::shared_ptr failure_cause = nullptr; - if (this->stored_files.find(msg->file) == this->stored_files.end()) { + if (this->stored_files.find(msg->dst_dir) != this->stored_files.end()) { + std::set files = this->stored_files[msg->dst_dir]; + if (files.find(msg->file) == files.end()) { + success = false; + failure_cause = std::shared_ptr(new FileNotFound(msg->file, this)); + } else { + this->removeFileFromStorage(msg->file, msg->dst_dir); + } + } else { success = false; failure_cause = std::shared_ptr(new FileNotFound(msg->file, this)); - } else { - this->removeFileFromStorage(msg->file); } + // Send an asynchronous reply try { S4U_Mailbox::dputMessage(msg->answer_mailbox, @@ -221,7 +228,9 @@ namespace wrench { } else if (auto msg = dynamic_cast(message.get())) { - bool file_found = (this->stored_files.find(msg->file) != this->stored_files.end()); + //TODO:: change it to the src dir to look for, for the time being using / directory + std::set files = this->stored_files[msg->dst_dir]; + bool file_found = (files.find(msg->file) != files.end()); try { S4U_Mailbox::dputMessage(msg->answer_mailbox, new StorageServiceFileLookupAnswerMessage(msg->file, file_found, @@ -235,15 +244,15 @@ namespace wrench { } else if (auto msg = dynamic_cast(message.get())) { - return processFileWriteRequest(msg->file, msg->answer_mailbox); + return processFileWriteRequest(msg->file, msg->dst_dir, msg->answer_mailbox); } else if (auto msg = dynamic_cast(message.get())) { - return processFileReadRequest(msg->file, msg->answer_mailbox, msg->mailbox_to_receive_the_file_content); + return processFileReadRequest(msg->file, msg->src_dir, msg->answer_mailbox, msg->mailbox_to_receive_the_file_content); } else if (auto msg = dynamic_cast(message.get())) { - return processFileCopyRequest(msg->file, msg->src, msg->answer_mailbox); + return processFileCopyRequest(msg->file, msg->src, msg->src_dir, msg->dst_dir, msg->answer_mailbox); } else { throw std::runtime_error( @@ -255,10 +264,11 @@ namespace wrench { * @brief Handle a file write request * * @param file: the file to write + * @param dst_dir: the file directory to write the file to * @param answer_mailbox: the mailbox to which the reply should be sent * @return true if this process should keep running */ - bool SimpleStorageService::processFileWriteRequest(WorkflowFile *file, std::string answer_mailbox) { + bool SimpleStorageService::processFileWriteRequest(WorkflowFile *file, std::string dst_dir, std::string answer_mailbox) { // If the file is already there, send back a failure // if (this->stored_files.find(file) != this->stored_files.end()) { @@ -283,31 +293,28 @@ namespace wrench { // If the file is not already there, do a capacity check/update // (If the file is already there, then there will just be an overwrite. Not that // if the overwrite fails, then the file will disappear) - if (this->stored_files.find(file) != this->stored_files.end()) { - // Check the file size and capacity, and reply "no" if not enough space - if (file->getSize() > (this->capacity - this->occupied_space)) { - try { - S4U_Mailbox::putMessage(answer_mailbox, - new StorageServiceFileWriteAnswerMessage(file, - this, - false, - std::shared_ptr( - new StorageServiceNotEnoughSpace( - file, - this)), - "", - this->getMessagePayloadValueAsDouble( - SimpleStorageServiceMessagePayload::FILE_WRITE_ANSWER_MESSAGE_PAYLOAD))); - } catch (std::shared_ptr &cause) { - return true; - } + // Check the file size and capacity, and reply "no" if not enough space + if (file->getSize() > (this->capacity - this->occupied_space)) { + try { + S4U_Mailbox::putMessage(answer_mailbox, + new StorageServiceFileWriteAnswerMessage(file, + this, + false, + std::shared_ptr( + new StorageServiceNotEnoughSpace( + file, + this)), + "", + this->getMessagePayloadValueAsDouble( + SimpleStorageServiceMessagePayload::FILE_WRITE_ANSWER_MESSAGE_PAYLOAD))); + } catch (std::shared_ptr &cause) { return true; } - - // Update occupied space, in advance (will have to be decreased later in case of failure) - this->occupied_space += file->getSize(); + return true; } + // Update occupied space, in advance (will have to be decreased later in case of failure) + this->occupied_space += file->getSize(); // Generate a mailbox_name name on which to receive the file std::string file_reception_mailbox = S4U_Mailbox::generateUniqueMailboxName("file_reception"); @@ -327,7 +334,7 @@ namespace wrench { } this->network_connection_manager->addConnection(std::unique_ptr( - new NetworkConnection(NetworkConnection::INCOMING_DATA, file, file_reception_mailbox, ""))); + new NetworkConnection(NetworkConnection::INCOMING_DATA, file, dst_dir, file_reception_mailbox, ""))); return true; } @@ -336,20 +343,25 @@ namespace wrench { /** * @brief Handle a file read request * @param file: the file + * @param src_dir: the file directory to read the file from * @param answer_mailbox: the mailbox to which the answer should be sent * @param mailbox_to_receive_the_file_content: the mailbox to which the file will be sent * @return */ - bool SimpleStorageService::processFileReadRequest(WorkflowFile *file, std::string answer_mailbox, + bool SimpleStorageService::processFileReadRequest(WorkflowFile *file, std::string src_dir, std::string answer_mailbox, std::string mailbox_to_receive_the_file_content) { // Figure out whether this succeeds or not bool success = true; std::shared_ptr failure_cause = nullptr; - if (this->stored_files. - find(file) == this->stored_files. - end()) { - WRENCH_INFO("Received a a read request for a file I don't have (%s)", this->getName().c_str()); + if (this->stored_files.find(src_dir) != this->stored_files.end()) { + std::set files = this->stored_files[src_dir]; + if (files.find(file) == files.end()) { + WRENCH_INFO("Received a a read request for a file I don't have (%s)", this->getName().c_str()); + success = false; + failure_cause = std::shared_ptr(new FileNotFound(file, this)); + } + } else { success = false; failure_cause = std::shared_ptr(new FileNotFound(file, this)); } @@ -367,7 +379,7 @@ namespace wrench { // If success, then follow up with sending the file (ASYNCHRONOUSLY!) if (success) { this->network_connection_manager->addConnection(std::unique_ptr( - new NetworkConnection(NetworkConnection::OUTGOING_DATA, file, mailbox_to_receive_the_file_content, "") + new NetworkConnection(NetworkConnection::OUTGOING_DATA, file, src_dir, mailbox_to_receive_the_file_content, "") )); } @@ -378,11 +390,13 @@ namespace wrench { * @brief Handle a file copy request * @param file: the file * @param src: the storage service that holds the file + * @param src_dir: the file directory from where the file will be copied + * @param dst_dir: the fie directory to where the file will be copied * @param answer_mailbox: the mailbox to which the answer should be sent * @return */ bool - SimpleStorageService::processFileCopyRequest(WorkflowFile *file, StorageService *src, std::string answer_mailbox) { + SimpleStorageService::processFileCopyRequest(WorkflowFile *file, StorageService *src, std::string src_dir, std::string dst_dir, std::string answer_mailbox) { // // If the file is already here, send back a failure // if (this->stored_files.find(file) != this->stored_files.end()) { @@ -408,30 +422,25 @@ namespace wrench { // If the file is not already there, do a capacity check/update // (If the file is already there, then there will just be an overwrite. Not that // if the overwrite fails, then the file will disappear) - if (this->stored_files.find(file) == this->stored_files.end()) { - - // Figure out whether this succeeds or not - if (file->getSize() > this->capacity - this->occupied_space) { - WRENCH_INFO("Cannot perform file copy due to lack of space"); - try { - S4U_Mailbox::putMessage(answer_mailbox, - new StorageServiceFileCopyAnswerMessage(file, this, nullptr, false, - false, - std::shared_ptr( - new StorageServiceNotEnoughSpace( - file, - this)), - this->getMessagePayloadValueAsDouble( - SimpleStorageServiceMessagePayload::FILE_COPY_ANSWER_MESSAGE_PAYLOAD))); - } catch (std::shared_ptr &cause) { - return true; - } + // Figure out whether this succeeds or not + if (file->getSize() > this->capacity - this->occupied_space) { + WRENCH_INFO("Cannot perform file copy due to lack of space"); + try { + S4U_Mailbox::putMessage(answer_mailbox, + new StorageServiceFileCopyAnswerMessage(file, this, nullptr, false, + false, + std::shared_ptr( + new StorageServiceNotEnoughSpace( + file, + this)), + this->getMessagePayloadValueAsDouble( + SimpleStorageServiceMessagePayload::FILE_COPY_ANSWER_MESSAGE_PAYLOAD))); + } catch (std::shared_ptr &cause) { return true; } - - // Update occupied space, in advance (will have to be decreased later in case of failure) - this->occupied_space += file->getSize(); + return true; } + this->occupied_space += file->getSize(); WRENCH_INFO("Asynchronously copying file %s from storage service %s", file->getID().c_str(), @@ -442,7 +451,7 @@ namespace wrench { // Initiate an ASYNCHRONOUS file read from the source try { - src->initiateFileRead(file_reception_mailbox, file); + src->initiateFileRead(file_reception_mailbox, file, src_dir); } catch (WorkflowExecutionException &e) { try { S4U_Mailbox::putMessage(answer_mailbox, @@ -458,7 +467,7 @@ namespace wrench { this->network_connection_manager->addConnection(std::unique_ptr( - new NetworkConnection(NetworkConnection::INCOMING_DATA, file, file_reception_mailbox, answer_mailbox) + new NetworkConnection(NetworkConnection::INCOMING_DATA, file, dst_dir, file_reception_mailbox, answer_mailbox) )); return true; @@ -495,7 +504,7 @@ namespace wrench { // Process the failure, meaning, just re-decrease the occupied space this->occupied_space -= connection->file->getSize(); // And if this was an overwrite, now we lost the file!!! - this->stored_files.erase(connection->file); + this->stored_files[connection->file_dir].erase(connection->file); WRENCH_INFO( "Sending back an ack since this was a file copy and some client is waiting for me to say something"); @@ -521,7 +530,7 @@ namespace wrench { } // Add the file to my storage (this will not add a duplicate in case of an overwrite, because it's a set) - this->stored_files.insert(connection->file); + this->stored_files[connection->file_dir].insert(connection->file); // Send back the corresponding ack? if (not connection->ack_mailbox.empty()) { diff --git a/test/simulation/MultihostMulticoreComputeService/MultihostMulticoreComputeServiceOneTaskTest.cpp b/test/simulation/MultihostMulticoreComputeService/MultihostMulticoreComputeServiceOneTaskTest.cpp index 994b760336..9168bf1a6a 100644 --- a/test/simulation/MultihostMulticoreComputeService/MultihostMulticoreComputeServiceOneTaskTest.cpp +++ b/test/simulation/MultihostMulticoreComputeService/MultihostMulticoreComputeServiceOneTaskTest.cpp @@ -295,7 +295,7 @@ void MultihostMulticoreComputeServiceOneTaskTest::do_Noop_test() { ASSERT_THROW(simulation->launch(), std::runtime_error); ASSERT_NO_THROW(compute_service = simulation->add( new wrench::MultihostMulticoreComputeService(hostname, - {std::make_tuple(hostname, wrench::ComputeService::ALL_CORES, wrench::ComputeService::ALL_RAM)}, + {std::make_tuple(hostname, wrench::ComputeService::ALL_CORES, wrench::ComputeService::ALL_RAM)},100.0, {}))); // Create a Storage Service @@ -614,7 +614,7 @@ void MultihostMulticoreComputeServiceOneTaskTest::do_StandardJobConstructor_test // Create a Compute Service compute_service = simulation->add( new wrench::MultihostMulticoreComputeService(hostname1, - {std::make_tuple(hostname1, wrench::ComputeService::ALL_CORES, wrench::ComputeService::ALL_RAM)}, + {std::make_tuple(hostname1, wrench::ComputeService::ALL_CORES, wrench::ComputeService::ALL_RAM)},100.0, {})); // Create a Storage Service @@ -720,7 +720,7 @@ void MultihostMulticoreComputeServiceOneTaskTest::do_HostMemory_test() { // Create a Compute Service compute_service = simulation->add( new wrench::MultihostMulticoreComputeService(hostname1, - {std::make_tuple(hostname1, wrench::ComputeService::ALL_CORES, wrench::ComputeService::ALL_RAM)}, + {std::make_tuple(hostname1, wrench::ComputeService::ALL_CORES, wrench::ComputeService::ALL_RAM)},100.0, {})); // Create a Storage Service @@ -800,7 +800,7 @@ class ExecutionWithLocationMapTestWMS : public wrench::WMS { throw std::runtime_error("Unexpected workflow execution event!"); } - if (!this->test->storage_service1->lookupFile(this->test->output_file)) { + if (!this->test->storage_service1->lookupFile(this->test->output_file, nullptr)) { throw std::runtime_error("Output file not written to storage service"); } @@ -954,7 +954,7 @@ class ExecutionWithDefaultStorageServiceTestWMS : public wrench::WMS { throw std::runtime_error("Unexpected workflow execution event!"); } - if (!this->test->storage_service1->lookupFile(this->test->output_file)) { + if (!this->test->storage_service1->lookupFile(this->test->output_file, job)) { throw std::runtime_error("Output file not written to storage service"); } @@ -1087,16 +1087,16 @@ class ExecutionWithPrePostCopiesAndCleanupTestWMS : public wrench::WMS { } // Test file locations - if (!this->test->storage_service1->lookupFile(this->test->input_file)) { + if (!this->test->storage_service1->lookupFile(this->test->input_file, nullptr)) { throw std::runtime_error("Input file should be on Storage Service #1"); } - if (!this->test->storage_service1->lookupFile(this->test->output_file)) { + if (!this->test->storage_service1->lookupFile(this->test->output_file, nullptr)) { throw std::runtime_error("Output file should be on Storage Service #1"); } - if (this->test->storage_service2->lookupFile(this->test->input_file)) { + if (this->test->storage_service2->lookupFile(this->test->input_file,job)) { throw std::runtime_error("Input file should not be on Storage Service #2"); } - if (this->test->storage_service2->lookupFile(this->test->input_file)) { + if (this->test->storage_service2->lookupFile(this->test->input_file,job)) { throw std::runtime_error("Output file should not be on Storage Service #2"); } diff --git a/test/simulation/MultihostMulticoreComputeService/StandardJobExecutorTest.cpp b/test/simulation/MultihostMulticoreComputeService/StandardJobExecutorTest.cpp index 6dda57fda8..adda1c7c68 100644 --- a/test/simulation/MultihostMulticoreComputeService/StandardJobExecutorTest.cpp +++ b/test/simulation/MultihostMulticoreComputeService/StandardJobExecutorTest.cpp @@ -157,6 +157,7 @@ class StandardJobExecutorConstructorTestWMS : public wrench::WMS { {std::make_tuple("bogus", 2, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -179,6 +180,7 @@ class StandardJobExecutorConstructorTestWMS : public wrench::WMS { {std::make_tuple(test->simulation->getHostnameList()[0], 2, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -201,6 +203,7 @@ class StandardJobExecutorConstructorTestWMS : public wrench::WMS { {}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -223,6 +226,7 @@ class StandardJobExecutorConstructorTestWMS : public wrench::WMS { {std::make_tuple(test->simulation->getHostnameList()[0], 0, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -245,6 +249,7 @@ class StandardJobExecutorConstructorTestWMS : public wrench::WMS { {std::make_tuple(test->simulation->getHostnameList()[0], 100, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -268,6 +273,7 @@ class StandardJobExecutorConstructorTestWMS : public wrench::WMS { {std::make_tuple(test->simulation->getHostnameList()[0], wrench::ComputeService::ALL_CORES, -1)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -290,6 +296,7 @@ class StandardJobExecutorConstructorTestWMS : public wrench::WMS { {std::make_tuple("Host4", wrench::ComputeService::ALL_CORES, 2048)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -334,6 +341,7 @@ class StandardJobExecutorConstructorTestWMS : public wrench::WMS { {std::make_tuple("Host4", wrench::ComputeService::ALL_CORES, 100.00)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -380,6 +388,7 @@ class StandardJobExecutorConstructorTestWMS : public wrench::WMS { {std::make_tuple("Host4", wrench::ComputeService::ALL_CORES, 100.00)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -420,6 +429,7 @@ class StandardJobExecutorConstructorTestWMS : public wrench::WMS { {std::make_tuple("Host1", wrench::ComputeService::ALL_CORES, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -579,6 +589,7 @@ class OneSingleCoreTaskTestWMS : public wrench::WMS { {std::make_tuple(test->simulation->getHostnameList()[0], 2, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -625,12 +636,12 @@ class OneSingleCoreTaskTestWMS : public wrench::WMS { } // Has the output file been copied back to storage_service1? - if (!this->test->storage_service1->lookupFile(this->getWorkflow()->getFileByID("output_file"))) { + if (!this->test->storage_service1->lookupFile(this->getWorkflow()->getFileByID("output_file"), nullptr)) { throw std::runtime_error("The output file has not been copied back to the specified storage service"); } // Has the output file been erased from storage_service2? - if (this->test->storage_service2->lookupFile(this->getWorkflow()->getFileByID("output_file"))) { + if (this->test->storage_service2->lookupFile(this->getWorkflow()->getFileByID("output_file"), nullptr)) { throw std::runtime_error("The output file has not been erased from the specified storage service"); } @@ -774,6 +785,7 @@ class OneSingleCoreTaskBogusPreFileCopyTestWMS : public wrench::WMS { {std::make_tuple(test->simulation->getHostnameList()[0], 2, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -953,6 +965,7 @@ class OneSingleCoreTaskMissingFileTestWMS : public wrench::WMS { {std::make_tuple(test->simulation->getHostnameList()[1], 2, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -1156,6 +1169,7 @@ class DependentTasksTestWMS : public wrench::WMS { this->test->storage_service1 , // This should be a scratch space of a compute service, but since this //standard job executor is being created direclty (not by any Compute Service), we pass a dummy storage as a scratch space false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} @@ -1314,6 +1328,7 @@ class OneMultiCoreTaskTestWMS : public wrench::WMS { {std::make_tuple(test->simulation->getHostnameList()[1], 6, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}}, {} )); executor->start(executor, true); @@ -1378,6 +1393,7 @@ class OneMultiCoreTaskTestWMS : public wrench::WMS { {std::make_tuple(test->simulation->getHostnameList()[1], 10, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}}, {} )); executor->start(executor, true); @@ -1446,6 +1462,7 @@ class OneMultiCoreTaskTestWMS : public wrench::WMS { {std::make_tuple(test->simulation->getHostnameList()[1], 10, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, std::to_string( thread_startup_overhead)}}, {} )); @@ -1615,6 +1632,7 @@ class TwoMultiCoreTasksTestWMS : public wrench::WMS { {std::make_tuple(test->simulation->getHostnameList()[0], 10, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}}, {} )); executor->start(executor, true); @@ -1701,6 +1719,7 @@ class TwoMultiCoreTasksTestWMS : public wrench::WMS { {std::make_tuple(test->simulation->getHostnameList()[0], 10, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}}, {} )); executor->start(executor, true); @@ -1788,6 +1807,7 @@ class TwoMultiCoreTasksTestWMS : public wrench::WMS { {std::make_tuple(test->simulation->getHostnameList()[0], 10, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}}, {} )); executor->start(executor, true); @@ -1977,6 +1997,7 @@ class MultiHostTestWMS : public wrench::WMS { std::make_tuple(test->simulation->getHostnameList()[1], 10, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}}, {} )); executor->start(executor, true); @@ -2064,6 +2085,7 @@ class MultiHostTestWMS : public wrench::WMS { std::make_tuple(test->simulation->getHostnameList()[1], 10, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}}, {} )); executor->start(executor, true); @@ -2254,6 +2276,7 @@ class JobTerminationTestDuringAComputationWMS : public wrench::WMS { std::make_tuple("Host4", 10, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}}, {} )); executor->start(executor, true); @@ -2406,6 +2429,7 @@ class JobTerminationTestDuringATransferWMS : public wrench::WMS { std::make_tuple("Host4", 10, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}}, {} )); executor->start(executor, true); @@ -2573,6 +2597,7 @@ class JobTerminationTestAtRandomTimesWMS : public wrench::WMS { std::make_tuple("Host4", 10, wrench::ComputeService::ALL_RAM)}, nullptr, false, + nullptr, {{wrench::StandardJobExecutorProperty::THREAD_STARTUP_OVERHEAD, "0"}}, {} )); executor->start(executor, true); diff --git a/test/simulation/NetworkProximityTest.cpp b/test/simulation/NetworkProximityTest.cpp index 32dcb3d0f3..c9733f334d 100644 --- a/test/simulation/NetworkProximityTest.cpp +++ b/test/simulation/NetworkProximityTest.cpp @@ -289,7 +289,7 @@ class CompareProxTestWMS : public wrench::WMS { std::pair first_pair_to_compute_proximity; first_pair_to_compute_proximity = std::make_pair(this->simulation->getHostnameList()[0], this->simulation->getHostnameList()[1]); - int count = 0, max_count = 100; + int count = 0, max_count = 1000; double first_pair_proximity = (*(this->getAvailableNetworkProximityServices().begin()))->query( first_pair_to_compute_proximity); @@ -311,7 +311,7 @@ class CompareProxTestWMS : public wrench::WMS { std::pair second_pair_to_compute_proximity; second_pair_to_compute_proximity = std::make_pair(this->simulation->getHostnameList()[2], this->simulation->getHostnameList()[3]); - count = 0, max_count = 100; + count = 0, max_count = 1000; double second_pair_proximity = (*(this->getAvailableNetworkProximityServices().begin()))->query( second_pair_to_compute_proximity); diff --git a/test/simulation/ScratchSpaceTest.cpp b/test/simulation/ScratchSpaceTest.cpp index 4067ff3a5d..3afaf31a1a 100644 --- a/test/simulation/ScratchSpaceTest.cpp +++ b/test/simulation/ScratchSpaceTest.cpp @@ -39,6 +39,8 @@ class ScratchSpaceTest : public ::testing::Test { void do_RaceConditionTest_test(); + void do_DirectoriesTest_test(); + protected: ScratchSpaceTest() { @@ -125,9 +127,12 @@ class SimpleScratchSpaceTestWMS : public wrench::WMS { } switch (event->type) { case wrench::WorkflowExecutionEvent::STANDARD_JOB_COMPLETION: { - if (this->test->compute_service->getFreeScratchSpaceSize() < 10000000000000.0) { + //sleep to make sure that the files are deleted + wrench::S4U_Simulation::sleep(100); + double free_space_size = this->test->compute_service->getFreeScratchSpaceSize(); + if (free_space_size < this->test->compute_service->getTotalScratchSpaceSize()) { throw std::runtime_error( - "ScratchSpaceTest::do_SimpleScratchSpace_test():File was not deleted from scratch"); + "File was not deleted from scratch"); } break; } @@ -165,11 +170,11 @@ void ScratchSpaceTest::do_SimpleScratchSpace_test() { // Create a Storage Service ASSERT_NO_THROW(storage_service1 = simulation->add( - new wrench::SimpleStorageService(hostname, 10000000000000.0))); + new wrench::SimpleStorageService(hostname, 1000000.0))); // Create a Storage Service ASSERT_NO_THROW(storage_service2 = simulation->add( - new wrench::SimpleStorageService(hostname, 10000000000000.0))); + new wrench::SimpleStorageService(hostname, 1000000.0))); // Create a Compute Service @@ -177,7 +182,7 @@ void ScratchSpaceTest::do_SimpleScratchSpace_test() { new wrench::MultihostMulticoreComputeService(hostname, {std::make_tuple(hostname, wrench::ComputeService::ALL_CORES, wrench::ComputeService::ALL_RAM)}, - 10000000000000.0, {}))); + 1000000.0, {}))); simulation->add(new wrench::FileRegistryService(hostname)); @@ -558,7 +563,8 @@ class PilotJobScratchSpaceTestWMS : public wrench::WMS { switch (event->type) { case wrench::WorkflowExecutionEvent::STANDARD_JOB_COMPLETION: { // success, check if the scratch space size is not full again or not, it should not be - if (pilot_job->getComputeService()->getFreeScratchSpaceSize() == 3000.0) { + double free_space_size = pilot_job->getComputeService()->getFreeScratchSpaceSize(); + if (free_space_size == 3000.0) { throw std::runtime_error( "Pilot Job is expected to clear its scratch space only after all the standard job finishes"); } @@ -582,7 +588,8 @@ class PilotJobScratchSpaceTestWMS : public wrench::WMS { case wrench::WorkflowExecutionEvent::PILOT_JOB_EXPIRATION: { // success, check if the scratch space size is full again or not, it should be full wrench::S4U_Simulation::sleep(10); //sleep for some time to ensure everything is deleted - if (pilot_job->getComputeService()->getFreeScratchSpaceSize() != 3000.0) { + double free_space_size = pilot_job->getComputeService()->getFreeScratchSpaceSize(); + if (free_space_size != 3000.0) { throw std::runtime_error( "Scratch space should be full after this pilot job expires but it is not now"); } @@ -756,7 +763,7 @@ class ScratchSpaceRaceConditionTestWMS : public wrench::WMS { } }; -TEST_F(ScratchSpaceTest, DISABLED_RaceConditionTest) { +TEST_F(ScratchSpaceTest, RaceConditionTest) { DO_TEST_WITH_FORK(do_RaceConditionTest_test); } @@ -766,7 +773,7 @@ void ScratchSpaceTest::do_RaceConditionTest_test() { auto *simulation = new wrench::Simulation(); int argc = 1; auto argv = (char **) calloc(1, sizeof(char *)); - argv[0] = strdup("cloud_service_test"); + argv[0] = strdup("scratch_space_test"); ASSERT_NO_THROW(simulation->init(&argc, argv)); @@ -807,6 +814,208 @@ void ScratchSpaceTest::do_RaceConditionTest_test() { // Running a "run a single task" simulation ASSERT_NO_THROW(simulation->launch()); + delete simulation; + free(argv[0]); + free(argv); +} + + +/**********************************************************************/ +/** Directories Test (For both Sratch and Non-Scratch) **/ +/**********************************************************************/ + +class ScratchNonScratchDirectoriesTestWMS : public wrench::WMS { + +public: + ScratchNonScratchDirectoriesTestWMS(ScratchSpaceTest *test, + const std::set &compute_services, + const std::set &storage_services, + std::string &hostname) : + wrench::WMS(nullptr, nullptr, compute_services, storage_services, {}, nullptr, hostname, "test") { + this->test = test; + } + +private: + + ScratchSpaceTest *test; + + int main() { + + //NonScratch have only / directory but other directories can be created + //Scratch have /, / directories + + // Create a data movement manager and this should only copy from / to / of two non scratch space + std::shared_ptr data_movement_manager = this->createDataMovementManager(); + + // Create a job manager + std::shared_ptr job_manager = this->createJobManager(); + + // Get a reference to the file + wrench::WorkflowFile *file1 = this->getWorkflow()->getFileByID("input1"); + // Get a reference to the file + wrench::WorkflowFile *file2 = this->getWorkflow()->getFileByID("input2"); + + //check if this file is staged in / directory of non-scratch + if(!test->storage_service1->lookupFile(file1, nullptr)) { //nullptr is referring to no job's directory + throw std::runtime_error( + "The file1 was supposed to be staged in / directory but is not" + ); + } + //check if this file is staged in / directory of non-scratch + if(!test->storage_service2->lookupFile(file2, nullptr)) { //nullptr is referring to no job's directory + throw std::runtime_error( + "The file2 was supposed to be staged in / directory but is not" + ); + } + + // Create a task + wrench::WorkflowTask *task1 = this->getWorkflow()->addTask("task1", 10, 1, 1, 1.0, 0); // 10 seconds + task1->addInputFile(file1); + + // Create a first job that: + // - copies file "input" to the scratch space + // - runs task1 + wrench::StandardJob *job1 = job_manager->createStandardJob( + {task1}, {}, + {std::make_tuple(file1, this->test->storage_service1, wrench::ComputeService::SCRATCH)}, + {}, {}); + + // Submit both jobs + job_manager->submitJob(job1, this->test->compute_service); + + // Wait for workflow execution events + for (auto job : {job1}) { + std::unique_ptr event; + try { + event = this->getWorkflow()->waitForNextExecutionEvent(); + } catch (wrench::WorkflowExecutionException &e) { + throw std::runtime_error("Error while getting and execution event: " + e.getCause()->toString()); + } + switch (event->type) { + case wrench::WorkflowExecutionEvent::STANDARD_JOB_COMPLETION: { + // success, do nothing for now + break; + } + default: { + throw std::runtime_error("Unexpected workflow execution event: " + std::to_string((int) (event->type))); + } + } + } + + //the file1 should still be non-scratch space, the job should only delete file from it's scratch job's directory + //check if this file is staged in / directory of non-scratch + if(!test->storage_service1->lookupFile(file1, nullptr)) { //nullptr is referring to no job's directory + throw std::runtime_error( + "The file1 again was supposed to be staged in / directory but is not" + ); + } + + //try to copy file1 from job1's directory of storage service1 into storage service2 in / directory, this should fail + bool success = false; + try { + this->test->storage_service2->copyFile(file1, this->test->storage_service1, job1, nullptr); + }catch(wrench::WorkflowExecutionException) { + success = true; + } + if(!success) { + throw std::runtime_error( + "Non-scratch space have / directory unless created by copying something into a new directory name" + ); + } + + //try to copy file1 from / directory of storage service1 into storage service2 in job1's directory, this should succeed + success = true; + try { + this->test->storage_service2->copyFile(file1, this->test->storage_service1, nullptr, job1); + }catch(wrench::WorkflowExecutionException) { + success = false; + } + if(!success) { + throw std::runtime_error( + "We should have been able to copy from / directory of non-scratch to a new directory into another non-scratch space" + ); + } + + //try to copy file2 from / directory of stroage service2 into storage service1 in / directory, it should succeed + success = true; + try { + this->test->storage_service1->copyFile(file2, this->test->storage_service2, nullptr, nullptr); + }catch(wrench::WorkflowExecutionException) { + success = false; + } + if(!success) { + throw std::runtime_error( + "We should have been able to copy from / of one non-scratch space to / of another non-scratch space" + ); + } + + + return 0; + } +}; + +TEST_F(ScratchSpaceTest, ScratchNonScratchDirectoriesTest) { + DO_TEST_WITH_FORK(do_DirectoriesTest_test); +} + +void ScratchSpaceTest::do_DirectoriesTest_test() { + + // Create and initialize a simulation + auto *simulation = new wrench::Simulation(); + int argc = 1; + auto argv = (char **) calloc(1, sizeof(char *)); + argv[0] = strdup("scratch_space_test"); + + ASSERT_NO_THROW(simulation->init(&argc, argv)); + + // Setting up the platform + ASSERT_NO_THROW(simulation->instantiatePlatform(platform_file_path)); + + // Get a hostname + std::string hostname = simulation->getHostnameList()[0]; + + // Create a Storage Service (note the BOGUS property, which is for testing puposes + // and doesn't matter because we do not stop the service) + ASSERT_NO_THROW(storage_service1 = simulation->add( + new wrench::SimpleStorageService(hostname, 100.0, + {{wrench::SimpleStorageServiceMessagePayload::STOP_DAEMON_MESSAGE_PAYLOAD, "BOGUS"}}))); + + // Create a Storage Service (note the BOGUS property, which is for testing puposes + // and doesn't matter because we do not stop the service) + ASSERT_NO_THROW(storage_service2 = simulation->add( + new wrench::SimpleStorageService(hostname, 100.0, + {{wrench::SimpleStorageServiceMessagePayload::STOP_DAEMON_MESSAGE_PAYLOAD, "BOGUS"}}))); + + // Create a Cloud Service + ASSERT_NO_THROW(compute_service = simulation->add( + new wrench::MultihostMulticoreComputeService(hostname, {"Host1"}, 100, {}, {}))); + + // Create a WMS + wrench::WMS *wms = nullptr; + ASSERT_NO_THROW(wms = simulation->add( + new ScratchNonScratchDirectoriesTestWMS(this, {compute_service}, {storage_service1}, hostname))); + + + wrench::Workflow *workflow = new wrench::Workflow(); + ASSERT_NO_THROW(wms->addWorkflow(workflow)); + + // Create a file registry + ASSERT_NO_THROW(simulation->add(new wrench::FileRegistryService(hostname))); + + // Create a file + wrench::WorkflowFile *file1 = nullptr; + ASSERT_NO_THROW(file1 = workflow->addFile("input1", 1)); + // Create a file + wrench::WorkflowFile *file2 = nullptr; + ASSERT_NO_THROW(file2 = workflow->addFile("input2", 1)); + // Staging the input_file on the storage service + ASSERT_NO_THROW(simulation->stageFile(file1, storage_service1)); + // Staging the input_file on the storage service + ASSERT_NO_THROW(simulation->stageFile(file2, storage_service2)); + + // Running a "run a single task" simulation + ASSERT_NO_THROW(simulation->launch()); + delete simulation; free(argv[0]); free(argv); diff --git a/test/simulation/SimpleStorageService/DataMovementManagerCopyRegisterTest.cpp b/test/simulation/SimpleStorageService/DataMovementManagerCopyRegisterTest.cpp index 0d6a349396..fae6abb0fa 100644 --- a/test/simulation/SimpleStorageService/DataMovementManagerCopyRegisterTest.cpp +++ b/test/simulation/SimpleStorageService/DataMovementManagerCopyRegisterTest.cpp @@ -253,7 +253,7 @@ class DataMovementManagerCopyRegisterTestWMS : public wrench::WMS { throw std::runtime_error("File registry service should not have been updated"); } - if (!this->test->dst_storage_service->lookupFile(this->test->src2_file_2)) { + if (!this->test->dst_storage_service->lookupFile(this->test->src2_file_2, nullptr)) { throw std::runtime_error("Asynchronous file copy should have completed even though the FileRegistryService was down."); } diff --git a/test/simulation/SimpleStorageService/InternalNetworkConnectionTest.cpp b/test/simulation/SimpleStorageService/InternalNetworkConnectionTest.cpp index fc07d7e415..da714186a9 100644 --- a/test/simulation/SimpleStorageService/InternalNetworkConnectionTest.cpp +++ b/test/simulation/SimpleStorageService/InternalNetworkConnectionTest.cpp @@ -58,33 +58,33 @@ void InternalNetworkConnectionTest::do_Constructor_test() { wrench::WorkflowFile *file = workflow->addFile("file", 10); // Bogus type - ASSERT_THROW(conn = new wrench::NetworkConnection(3, nullptr, "", ""), std::invalid_argument); - ASSERT_THROW(conn = new wrench::NetworkConnection(4, nullptr, "", ""), std::invalid_argument); - ASSERT_THROW(conn = new wrench::NetworkConnection(5, nullptr, "", ""), std::invalid_argument); - ASSERT_THROW(conn = new wrench::NetworkConnection(6, nullptr, "", ""), std::invalid_argument); + ASSERT_THROW(conn = new wrench::NetworkConnection(3, nullptr, "/", "", ""), std::invalid_argument); + ASSERT_THROW(conn = new wrench::NetworkConnection(4, nullptr, "/", "", ""), std::invalid_argument); + ASSERT_THROW(conn = new wrench::NetworkConnection(5, nullptr, "/", "", ""), std::invalid_argument); + ASSERT_THROW(conn = new wrench::NetworkConnection(6, nullptr, "/", "", ""), std::invalid_argument); // Empty mailbox_name - ASSERT_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::INCOMING_DATA, file, "", "ack"), std::invalid_argument); + ASSERT_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::INCOMING_DATA, file, "/", "", "ack"), std::invalid_argument); // Empty file - ASSERT_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::INCOMING_DATA, nullptr, "mailbox_name", "ack"), std::invalid_argument); - ASSERT_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::OUTGOING_DATA, nullptr, "mailbox_name", ""), std::invalid_argument); + ASSERT_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::INCOMING_DATA, nullptr, "/", "mailbox_name", "ack"), std::invalid_argument); + ASSERT_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::OUTGOING_DATA, nullptr, "/", "mailbox_name", ""), std::invalid_argument); // Ack mailbox_name should be empty - ASSERT_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::OUTGOING_DATA, nullptr, "mailbox_name", "ack"), std::invalid_argument); - ASSERT_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::INCOMING_CONTROL, nullptr, "mailbox_name", "ack"), std::invalid_argument); + ASSERT_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::OUTGOING_DATA, nullptr, "/", "mailbox_name", "ack"), std::invalid_argument); + ASSERT_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::INCOMING_CONTROL, nullptr, "/", "mailbox_name", "ack"), std::invalid_argument); // Non-Empty file - ASSERT_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::INCOMING_CONTROL, file, "mailbox_name", ""), std::invalid_argument); + ASSERT_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::INCOMING_CONTROL, file, "/", "mailbox_name", ""), std::invalid_argument); - ASSERT_NO_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::INCOMING_CONTROL, nullptr, "mailbox_name", "")); + ASSERT_NO_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::INCOMING_CONTROL, nullptr, "/", "mailbox_name", "")); delete conn; - ASSERT_NO_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::INCOMING_DATA, file, "mailbox_name", "ack")); + ASSERT_NO_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::INCOMING_DATA, file, "/", "mailbox_name", "ack")); delete conn; - ASSERT_NO_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::OUTGOING_DATA, file, "mailbox_name", "")); + ASSERT_NO_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::OUTGOING_DATA, file, "/", "mailbox_name", "")); delete conn; - ASSERT_NO_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::OUTGOING_DATA, file, "mailbox_name", "")); + ASSERT_NO_THROW(conn = new wrench::NetworkConnection(wrench::NetworkConnection::OUTGOING_DATA, file, "/", "mailbox_name", "")); ASSERT_THROW(conn->getMessage(), std::runtime_error); delete conn; diff --git a/test/simulation/SimpleStorageService/SimpleStorageServiceFunctionalTest.cpp b/test/simulation/SimpleStorageService/SimpleStorageServiceFunctionalTest.cpp index 1f8665ddc5..650dba0fad 100644 --- a/test/simulation/SimpleStorageService/SimpleStorageServiceFunctionalTest.cpp +++ b/test/simulation/SimpleStorageService/SimpleStorageServiceFunctionalTest.cpp @@ -123,9 +123,9 @@ class SimpleStorageServiceBasicFunctionalityTestWMS : public wrench::WMS { // Do a few queries to storage services for (auto f : {this->test->file_1, this->test->file_10, this->test->file_100, this->test->file_500}) { - if ((!this->test->storage_service_1000->lookupFile(f)) || - (this->test->storage_service_100->lookupFile(f)) || - (this->test->storage_service_500->lookupFile(f))) { + if ((!this->test->storage_service_1000->lookupFile(f, nullptr)) || + (this->test->storage_service_100->lookupFile(f, nullptr)) || + (this->test->storage_service_500->lookupFile(f, nullptr))) { throw std::runtime_error("Some storage services do/don't have the files that they shouldn't/should have"); } } @@ -168,7 +168,7 @@ class SimpleStorageServiceBasicFunctionalityTestWMS : public wrench::WMS { // Make sure the copy didn't happen success = false; - if (this->test->storage_service_100->lookupFile(this->test->file_500)) { + if (this->test->storage_service_100->lookupFile(this->test->file_500, nullptr)) { success = true; } if (success) { @@ -203,7 +203,7 @@ class SimpleStorageServiceBasicFunctionalityTestWMS : public wrench::WMS { // Read a file on a storage service try { - this->test->storage_service_100->readFile(this->test->file_10); + this->test->storage_service_100->readFile(this->test->file_10, nullptr); } catch (wrench::WorkflowExecutionException &e) { throw std::runtime_error("Should be able to read a file available on a storage service"); } @@ -211,7 +211,7 @@ class SimpleStorageServiceBasicFunctionalityTestWMS : public wrench::WMS { // Read a file on a storage service that doesn't have that file success = true; try { - this->test->storage_service_100->readFile(this->test->file_100); + this->test->storage_service_100->readFile(this->test->file_100, nullptr); } catch (wrench::WorkflowExecutionException &e) { success = false; } @@ -317,7 +317,7 @@ class SimpleStorageServiceBasicFunctionalityTestWMS : public wrench::WMS { } // Check that the copy has happened.. - if (!this->test->storage_service_100->lookupFile(this->test->file_1)) { + if (!this->test->storage_service_100->lookupFile(this->test->file_1, nullptr)) { throw std::runtime_error("Asynchronous file copy operation didn't copy the file"); } @@ -405,7 +405,7 @@ class SimpleStorageServiceBasicFunctionalityTestWMS : public wrench::WMS { // Try to do stuff with a shutdown service success = true; try { - this->test->storage_service_100->lookupFile(this->test->file_1); + this->test->storage_service_100->lookupFile(this->test->file_1, nullptr); } catch (wrench::WorkflowExecutionException &e) { success = false; // Check Exception @@ -426,7 +426,7 @@ class SimpleStorageServiceBasicFunctionalityTestWMS : public wrench::WMS { success = true; try { - this->test->storage_service_100->readFile(this->test->file_1); + this->test->storage_service_100->readFile(this->test->file_1, nullptr); } catch (wrench::WorkflowExecutionException &e) { success = false; // Check Exception @@ -447,7 +447,7 @@ class SimpleStorageServiceBasicFunctionalityTestWMS : public wrench::WMS { success = true; try { - this->test->storage_service_100->writeFile(this->test->file_1); + this->test->storage_service_100->writeFile(this->test->file_1, nullptr); } catch (wrench::WorkflowExecutionException &e) { success = false; // Check Exception @@ -620,15 +620,15 @@ class SimpleStorageServiceSynchronousFileCopyTestWMS : public wrench::WMS { } // Do the file copy again, which should fail - success = true; + success = false; try { data_movement_manager->doSynchronousFileCopy(this->test->file_500, this->test->storage_service_1000, this->test->storage_service_500); } catch (wrench::WorkflowExecutionException &e) { - success = false; + success = true; } if (!success) { - throw std::runtime_error("Should be able fo write a file that's already there"); + throw std::runtime_error("Should not be able to write a file beyond the storage capacity"); } return 0; diff --git a/test/simulation/SimpleStorageService/StorageServiceDeleteRegisterTest.cpp b/test/simulation/SimpleStorageService/StorageServiceDeleteRegisterTest.cpp index be43db23c4..adaecbd0c2 100644 --- a/test/simulation/SimpleStorageService/StorageServiceDeleteRegisterTest.cpp +++ b/test/simulation/SimpleStorageService/StorageServiceDeleteRegisterTest.cpp @@ -83,7 +83,7 @@ class SimpleStorageServiceDeleteRegisterTestWMS : public wrench::WMS { // delete file and don't unregister storage_service->deleteFile(file_1); - if (storage_service->lookupFile(file_1)) { + if (storage_service->lookupFile(file_1, nullptr)) { throw std::runtime_error("StorageService should have deleted file_1"); } @@ -93,7 +93,7 @@ class SimpleStorageServiceDeleteRegisterTestWMS : public wrench::WMS { // delete file and unregister storage_service->deleteFile(file_2, file_registry_service); - if (storage_service->lookupFile(file_2)) { + if (storage_service->lookupFile(file_2, nullptr)) { throw std::runtime_error("StorageService should have deleted file_2"); } diff --git a/test/simulation_message_constructors/MessageConstructorTest.cpp b/test/simulation_message_constructors/MessageConstructorTest.cpp index 22d2650c15..8c1d6a5069 100644 --- a/test/simulation_message_constructors/MessageConstructorTest.cpp +++ b/test/simulation_message_constructors/MessageConstructorTest.cpp @@ -217,16 +217,17 @@ TEST_F(MessageConstructorTest, StorageServiceMessages) { ASSERT_NO_THROW(new wrench::StorageServiceFreeSpaceAnswerMessage(0.1, 666)); ASSERT_THROW(new wrench::StorageServiceFreeSpaceAnswerMessage(-0.1, 666), std::invalid_argument); - ASSERT_NO_THROW(new wrench::StorageServiceFileLookupRequestMessage("mailbox", file, 666)); - ASSERT_THROW(new wrench::StorageServiceFileLookupRequestMessage("", file, 666), std::invalid_argument); - ASSERT_THROW(new wrench::StorageServiceFileLookupRequestMessage("mailbox", nullptr, 666), std::invalid_argument); + std::string root_dir = "/"; + ASSERT_NO_THROW(new wrench::StorageServiceFileLookupRequestMessage("mailbox", file, root_dir, 666)); + ASSERT_THROW(new wrench::StorageServiceFileLookupRequestMessage("", file, root_dir, 666), std::invalid_argument); + ASSERT_THROW(new wrench::StorageServiceFileLookupRequestMessage("mailbox", nullptr, root_dir, 666), std::invalid_argument); ASSERT_NO_THROW(new wrench::StorageServiceFileLookupAnswerMessage(file, true, 666)); ASSERT_THROW(new wrench::StorageServiceFileLookupAnswerMessage(nullptr, true, 666), std::invalid_argument); - ASSERT_NO_THROW(new wrench::StorageServiceFileDeleteRequestMessage("mailbox", file, 666)); - ASSERT_THROW(new wrench::StorageServiceFileDeleteRequestMessage("", file, 666), std::invalid_argument); - ASSERT_THROW(new wrench::StorageServiceFileDeleteRequestMessage("mailbox", nullptr, 666), std::invalid_argument); + ASSERT_NO_THROW(new wrench::StorageServiceFileDeleteRequestMessage("mailbox", file, root_dir, 666)); + ASSERT_THROW(new wrench::StorageServiceFileDeleteRequestMessage("", file, root_dir, 666), std::invalid_argument); + ASSERT_THROW(new wrench::StorageServiceFileDeleteRequestMessage("mailbox", nullptr, root_dir, 666), std::invalid_argument); ASSERT_NO_THROW(new wrench::StorageServiceFileDeleteAnswerMessage(file, storage_service, true, nullptr, 666)); ASSERT_NO_THROW(new wrench::StorageServiceFileDeleteAnswerMessage(file, storage_service, false, failure_cause, 666)); @@ -239,10 +240,10 @@ TEST_F(MessageConstructorTest, StorageServiceMessages) { ASSERT_THROW(new wrench::StorageServiceFileDeleteAnswerMessage(file, storage_service, true, failure_cause, 666), std::invalid_argument); - ASSERT_NO_THROW(new wrench::StorageServiceFileCopyRequestMessage("mailbox", file, storage_service, nullptr, 666)); - ASSERT_THROW(new wrench::StorageServiceFileCopyRequestMessage("", file, storage_service, nullptr, 666), std::invalid_argument); - ASSERT_THROW(new wrench::StorageServiceFileCopyRequestMessage("mailbox", nullptr, storage_service, nullptr, 666), std::invalid_argument); - ASSERT_THROW(new wrench::StorageServiceFileCopyRequestMessage("mailbox", file, nullptr, nullptr, 666), std::invalid_argument); + ASSERT_NO_THROW(new wrench::StorageServiceFileCopyRequestMessage("mailbox", file, storage_service, root_dir, root_dir, nullptr, 666)); + ASSERT_THROW(new wrench::StorageServiceFileCopyRequestMessage("", file, storage_service, root_dir, root_dir, nullptr, 666), std::invalid_argument); + ASSERT_THROW(new wrench::StorageServiceFileCopyRequestMessage("mailbox", nullptr, storage_service, root_dir, root_dir, nullptr, 666), std::invalid_argument); + ASSERT_THROW(new wrench::StorageServiceFileCopyRequestMessage("mailbox", file, nullptr, root_dir, root_dir, nullptr, 666), std::invalid_argument); ASSERT_NO_THROW(new wrench::StorageServiceFileCopyAnswerMessage(file, storage_service, nullptr, false, true, nullptr, 666)); ASSERT_NO_THROW(new wrench::StorageServiceFileCopyAnswerMessage(file, storage_service, nullptr, false, false, failure_cause, 666)); @@ -251,9 +252,9 @@ TEST_F(MessageConstructorTest, StorageServiceMessages) { ASSERT_THROW(new wrench::StorageServiceFileCopyAnswerMessage(file, storage_service, nullptr, false, true, failure_cause, 666), std::invalid_argument); ASSERT_THROW(new wrench::StorageServiceFileCopyAnswerMessage(file, storage_service, nullptr, false, false, nullptr, 666), std::invalid_argument); - ASSERT_NO_THROW(new wrench::StorageServiceFileWriteRequestMessage("mailbox", file, 666)); - ASSERT_THROW(new wrench::StorageServiceFileWriteRequestMessage("", file, 666), std::invalid_argument); - ASSERT_THROW(new wrench::StorageServiceFileWriteRequestMessage("mailbox", nullptr, 666), std::invalid_argument); + ASSERT_NO_THROW(new wrench::StorageServiceFileWriteRequestMessage("mailbox", file, root_dir, 666)); + ASSERT_THROW(new wrench::StorageServiceFileWriteRequestMessage("", file, root_dir, 666), std::invalid_argument); + ASSERT_THROW(new wrench::StorageServiceFileWriteRequestMessage("mailbox", nullptr, root_dir, 666), std::invalid_argument); ASSERT_NO_THROW(new wrench::StorageServiceFileWriteAnswerMessage(file, storage_service, true, nullptr, "mailbox", 666)); ASSERT_NO_THROW(new wrench::StorageServiceFileWriteAnswerMessage(file, storage_service, false, failure_cause, "mailbox", 666)); @@ -263,10 +264,10 @@ TEST_F(MessageConstructorTest, StorageServiceMessages) { ASSERT_THROW(new wrench::StorageServiceFileWriteAnswerMessage(file, storage_service, true, failure_cause, "mailbox", 666), std::invalid_argument); ASSERT_THROW(new wrench::StorageServiceFileWriteAnswerMessage(file, storage_service, false, nullptr, "mailbox", 666), std::invalid_argument); - ASSERT_NO_THROW(new wrench::StorageServiceFileReadRequestMessage("mailbox", "mailbox", file, 666)); - ASSERT_THROW(new wrench::StorageServiceFileReadRequestMessage("", "mailbox", file, 666), std::invalid_argument); - ASSERT_THROW(new wrench::StorageServiceFileReadRequestMessage("mailbox", "", file, 666), std::invalid_argument); - ASSERT_THROW(new wrench::StorageServiceFileReadRequestMessage("", "mailbox", nullptr, 666), std::invalid_argument); + ASSERT_NO_THROW(new wrench::StorageServiceFileReadRequestMessage("mailbox", "mailbox", file, root_dir, 666)); + ASSERT_THROW(new wrench::StorageServiceFileReadRequestMessage("", "mailbox", file, root_dir, 666), std::invalid_argument); + ASSERT_THROW(new wrench::StorageServiceFileReadRequestMessage("mailbox", "", file, root_dir, 666), std::invalid_argument); + ASSERT_THROW(new wrench::StorageServiceFileReadRequestMessage("", "mailbox", nullptr, root_dir, 666), std::invalid_argument); ASSERT_NO_THROW(new wrench::StorageServiceFileReadAnswerMessage(file, storage_service, true, nullptr, 666)); ASSERT_NO_THROW(new wrench::StorageServiceFileReadAnswerMessage(file, storage_service, false, failure_cause, 666));