Skip to content

Commit

Permalink
Merge branch 'batch_fcfs_cbf'
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Mar 17, 2020
2 parents f002e72 + f5ad673 commit d7a88a0
Show file tree
Hide file tree
Showing 46 changed files with 3,128 additions and 1,446 deletions.
22 changes: 11 additions & 11 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ addons:
secure: "a3gk/Phr7yPccMhzBp6LRwEkIXWwSMbeC20sm0N5O/eZ63gN+4sp371pQcUugoPIXBa1jLMeJLL0we950EY9/kFPMVTdscHJ1OOwrM98R65dTtayxLOhDf1D8GTnrOIdpuH9vMatIulRoW0TYUhD1Ay2wIJWToXAJRM4W3bgzAOfsaM45PYeR21tlIuplazZtNys7XFVBO/4F4dHvG1tC5DqiaAM6GBHWqFivdr6vOVzZRZ1+ZvNWv0qk0zy4uxF6W4C+kdBEqhX6Ad2/AuFxIEe5zwdgqMGIu3pSySLVmcrIAGUjBqkR84iQQjlbniSEUyv89jpTvyJ9D95UVlbJ9PB38SYLtfyboDgXb2Sm/M3v0gqPcF6ZX8RP9MaIBEN6HfB4XpPYN4CI6Pwkzkz+7gdFPIl6o5lqbjrvXkeccBH2pWIHrL33pBkIB/wQEKA552jZbB9DbU+A2lobJcFREO5PO5EPWh2WUFB9/C0cOTHPY2k8W92mvsjXlWNqmUTqKwQsldOLrX45sPTkFFCAMU0hOVrjL4qtVO8l/jJitRvtKqXe3ZK1dbZmJU1Rh2G7KDS6+m4t2kIgoxtYDOPdHft38s6NTo/639TPEisn2kLO4sjx/sk2KXh7ja6vKa/eNKMxUBLTbv0x28xMiKN/WEtSTzQKsLWpYLlMUNGUTM="

env:
- DIST=ubuntu-xenial COMPILER=clang BATSCHED=off
- DIST=ubuntu-xenial COMPILER=clang-6 BATSCHED=off
- DIST=ubuntu-xenial COMPILER=gcc5 BATSCHED=off
- DIST=ubuntu-xenial COMPILER=gcc6 BATSCHED=off
- DIST=ubuntu-bionic COMPILER=gcc7 BATSCHED=off
- DIST=ubuntu-bionic COMPILER=gcc7-batsched BATSCHED=on
- DIST=ubuntu-bionic COMPILER=clang BATSCHED=off
- DIST=debian-stretch COMPILER=gcc6 BATSCHED=off
- DIST=debian-buster COMPILER=gcc8 BATSCHED=off
- DIST=ubuntu-xenial COMPILER=clang batsched=off
- DIST=ubuntu-xenial COMPILER=clang-6 batsched=off
- DIST=ubuntu-xenial COMPILER=gcc5 batsched=off
- DIST=ubuntu-xenial COMPILER=gcc6 batsched=off
- DIST=ubuntu-bionic COMPILER=gcc7 batsched=off
- DIST=ubuntu-bionic COMPILER=gcc7-batsched batsched=on
- DIST=ubuntu-bionic COMPILER=clang batsched=off
- DIST=debian-stretch COMPILER=gcc6 batsched=off
- DIST=debian-buster COMPILER=gcc8 batsched=off

services:
- docker
Expand All @@ -42,14 +42,14 @@ before_install:
script:
# building wrench
- if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then
docker exec -w /home/wrench/wrench/build -it wrench cmake -DENABLE_BATSCHED=${BATSCHED} -DCMAKE_VERBOSE_MAKEFILE=ON -DCOVERAGE=1 ..;
docker exec -w /home/wrench/wrench/build -it wrench cmake -DENABLE_BATSCHED=${batsched} -DCMAKE_VERBOSE_MAKEFILE=ON -DCOVERAGE=1 ..;
docker exec -w /home/wrench/wrench/build -it wrench make all unit_tests doc-gh;
travis_wait sleep infinity & docker exec -w /home/wrench/wrench/build -it wrench ./unit_tests;
fi

after_success:
# coverage analysis and sonarcloud
- if [[ "$TRAVIS_OS_NAME" == "linux" && $DIST == "ubuntu-bionic" && $COMPILER == "gcc7" && $BATSCHED == "off" ]] || [[ "$TRAVIS_OS_NAME" == "linux" && $DIST == "ubuntu-bionic" && $COMPILER == "gcc7-batsched" && $BATSCHED == "on" ]]; then
- if [[ "$TRAVIS_OS_NAME" == "linux" && $DIST == "ubuntu-bionic" && $COMPILER == "gcc7" && $batsched == "off" ]] || [[ "$TRAVIS_OS_NAME" == "linux" && $DIST == "ubuntu-bionic" && $COMPILER == "gcc7-batsched" && $batsched == "on" ]]; then
docker exec -w /home/wrench/wrench/build -it wrench lcov --directory . --capture --output-file coverage.info;
docker exec -w /home/wrench/wrench/build -it wrench lcov --remove coverage.info '*/test/*' '*/examples/*' '*/include/*' --output-file coverage.info;
docker exec -w /home/wrench/wrench/build -it wrench coveralls-lcov --repo-token ${COVERALLS_TOKEN} coverage.info;
Expand Down
20 changes: 17 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,24 @@ set(SOURCE_FILES
src/wrench/services/compute/batch/BatchJob.cpp
src/wrench/services/compute/batch/BatchComputeServiceMessage.cpp
src/wrench/services/compute/batch/BatchComputeService.cpp
src/wrench/services/compute/batch/WorkloadTraceFileReplayer.cpp
src/wrench/services/compute/batch/WorkloadTraceFileReplayerEventReceiver.cpp
src/wrench/services/compute/batch/workload_helper_classes/WorkloadTraceFileReplayer.cpp
src/wrench/services/compute/batch/workload_helper_classes/WorkloadTraceFileReplayerEventReceiver.cpp
src/wrench/services/compute/batch/BatchComputeServiceProperty.cpp
src/wrench/services/compute/batch/BatchComputeServiceMessagePayload.cpp
src/wrench/services/compute/batch/BatschedNetworkListener.cpp
src/wrench/services/compute/batch/batch_schedulers/BatchScheduler.cpp
include/wrench/services/compute/batch/batch_schedulers/BatchScheduler.h
src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp
src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.h
src/wrench/services/compute/batch/batch_schedulers/homegrown/HomegrownBatchScheduler.cpp
include/wrench/services/compute/batch/batch_schedulers/homegrown/HomegrownBatchScheduler.h
src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.cpp
src/wrench/services/compute/batch/batch_schedulers/homegrown/fcfs/FCFSBatchScheduler.h
src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/CONSERVATIVEBFBatchScheduler.cpp
src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/CONSERVATIVEBFBatchScheduler.h
src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/NodeAvailabilityTimeLine.cpp
src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/NodeAvailabilityTimeLine.h
src/wrench/services/compute/batch/batch_schedulers/homegrown/conservative_bf/BatchJobSet.h
src/wrench/services/compute/htcondor/HTCondorComputeService.cpp
src/wrench/services/compute/htcondor/HTCondorCentralManagerService.cpp
src/wrench/services/compute/htcondor/HTCondorCentralManagerServiceMessage.cpp
Expand All @@ -241,7 +254,7 @@ set(SOURCE_FILES
src/wrench/util/PointerUtil.cpp
src/wrench/util/PointerUtil.cpp
src/wrench/util/MessageManager.cpp
src/wrench/services/compute/batch/TraceFileLoader.cpp
src/wrench/services/compute/batch/workload_helper_classes/TraceFileLoader.cpp
src/wrench/util/UnitParser.cpp
src/wrench/simulation/SimulationTimestampTypes.cpp
)
Expand Down Expand Up @@ -276,6 +289,7 @@ set(TEST_FILES
test/compute_services/BareMetalComputeService/BareMetalComputeServiceResourceInformationTest.cpp
test/compute_services/BatchService/BatchServiceTest.cpp
test/compute_services/BatchService/BatchServiceFCFSTest.cpp
test/compute_services/BatchService/BatchServiceCONSERVATIVEBFTest.cpp
test/compute_services/BatchService/BatchServiceTraceFileTest.cpp
test/compute_services/BatchService/BatchServiceOutputCSVFileTest.cpp
test/compute_services/BatchService/BatchServiceBatschedQueueWaitTimePredictionTest.cpp
Expand Down
10 changes: 5 additions & 5 deletions examples/simple-example/scheduler/BatchStandardJobScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
*
*/

#ifndef WRENCH_BATCHSCHEDULER_H
#define WRENCH_BATCHSCHEDULER_H
#ifndef WRENCH_BATCHSTANDARDJOBSCHEDULER_H
#define WRENCH_BATCHSTANDARDJOBSCHEDULER_H

#include <wrench-dev.h>

Expand All @@ -22,15 +22,15 @@ namespace wrench {

public:

BatchStandardJobScheduler(std::shared_ptr<StorageService> default_storage_service) :
explicit BatchStandardJobScheduler(std::shared_ptr<StorageService> default_storage_service) :
default_storage_service(default_storage_service) {}

/***********************/
/** \cond DEVELOPER */
/***********************/

void scheduleTasks(const std::set<std::shared_ptr<ComputeService>> &compute_services,
const std::vector<WorkflowTask *> &tasks);
const std::vector<WorkflowTask *> &tasks) override;

/***********************/
/** \endcond */
Expand All @@ -42,4 +42,4 @@ namespace wrench {
};
}

#endif //WRENCH_BATCHSCHEDULER_H
#endif //WRENCH_BATCHSTANDARDJOBSCHEDULER_H
95 changes: 31 additions & 64 deletions include/wrench/services/compute/batch/BatchComputeService.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "wrench/services/helpers/Alarm.h"
#include "wrench/workflow/job/StandardJob.h"
#include "wrench/workflow/job/WorkflowJob.h"
#include "wrench/services/compute/batch/batch_schedulers/BatchScheduler.h"

#include <deque>
#include <queue>
#include <set>
Expand All @@ -35,7 +37,7 @@ namespace wrench {
*
* In the current implementation of
* this service, like for many of its real-world counterparts, memory
* partitioning among jobs on the same host is not handled. When multiple jobs share hosts,
* partitioning among jobs onq the same host is not handled. When multiple jobs share hosts,
* which can happen when jobs require only a few cores per host and can thus
* be co-located on the same hosts in a non-exclusive fashion,
* each job simply runs as if it had access to the
Expand All @@ -61,11 +63,12 @@ namespace wrench {
{BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM, "conservative_bf"},
// {BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM, "easy_bf"},
// {BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM, "easy_bf_fast"},

{BatchComputeServiceProperty::BATCH_QUEUE_ORDERING_ALGORITHM, "fcfs"},
#else
{BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM, "FCFS"},
{BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM, "fcfs"},
#endif
{BatchComputeServiceProperty::BATCH_RJMS_DELAY, "0"},
{BatchComputeServiceProperty::BATCH_RJMS_PADDING_DELAY, "5"},
{BatchComputeServiceProperty::SIMULATED_WORKLOAD_TRACE_FILE, ""},
{BatchComputeServiceProperty::USE_REAL_RUNTIMES_AS_REQUESTED_RUNTIMES_IN_WORKLOAD_TRACE_FILE, "false"},
{BatchComputeServiceProperty::IGNORE_INVALID_JOBS_IN_WORKLOAD_TRACE_FILE, "false"},
Expand Down Expand Up @@ -106,7 +109,7 @@ namespace wrench {
/***********************/
/** \cond DEVELOPER **/
/***********************/
std::map<std::string,double> getStartTimeEstimates(std::set<std::tuple<std::string,unsigned int,unsigned int, double>> resources);
std::map<std::string,double> getStartTimeEstimates(std::set<std::tuple<std::string,unsigned long,unsigned long, double>> resources);

/***********************/
/** \endcond **/
Expand All @@ -122,9 +125,10 @@ namespace wrench {

private:



friend class WorkloadTraceFileReplayer;
friend class FCFSBatchScheduler;
friend class CONSERVATIVEBFBatchScheduler;
friend class BatschedBatchScheduler;

BatchComputeService(std::string hostname,
std::vector<std::string> compute_hosts,
Expand Down Expand Up @@ -171,16 +175,15 @@ namespace wrench {
//alarms for pilot jobs (only one pilot job alarm)
std::map<std::string,std::shared_ptr<Alarm>> pilot_job_alarms;


/* Resources information in Batchservice */
/* Resources information in BatchService */
unsigned long total_num_of_nodes;
unsigned long num_cores_per_node;
std::map<std::string, unsigned long> nodes_to_cores_map;
std::vector<double> timeslots;
std::map<std::string, unsigned long> available_nodes_to_cores;
std::map<unsigned long, std::string> host_id_to_names;
std::vector<std::string> compute_hosts;
/*End Resources information in Batchservice */
/* End Resources information in BatchService */

// Vector of standard job executors
std::set<std::shared_ptr<StandardJobExecutor>> running_standard_job_executors;
Expand All @@ -189,16 +192,20 @@ namespace wrench {
std::set<std::shared_ptr<StandardJobExecutor>> finished_standard_job_executors;

// Master List of batch jobs
std::set<std::unique_ptr<BatchJob>> all_jobs;

//Queue of pending batch jobs
std::deque<BatchJob *> pending_jobs;
std::set<std::shared_ptr<BatchJob>> all_jobs;

//A set of running batch jobs
std::set<BatchJob *> running_jobs;
std::set<std::shared_ptr<BatchJob>> running_jobs;

// The batch queue
std::deque<std::shared_ptr<BatchJob>> batch_queue;

// A set of "waiting" batch jobs, i.e., jobs that are waiting to be sent to
// the scheduler (useful for batsched only)
std::set<std::shared_ptr<BatchJob>> waiting_jobs;

// A set of waiting jobs that have been submitted to batsched, but not scheduled
std::set<BatchJob *> waiting_jobs;
// Scheduler
std::unique_ptr<BatchScheduler> scheduler;


#ifdef ENABLE_BATSCHED
Expand All @@ -218,7 +225,7 @@ namespace wrench {

};
#else
std::set<std::string> scheduling_algorithms = {"FCFS"
std::set<std::string> scheduling_algorithms = {"fcfs", "conservative_bf",
};

//Batch queue ordering options
Expand All @@ -227,12 +234,13 @@ namespace wrench {

#endif


unsigned long generateUniqueJobID();

void removeJobFromRunningList(BatchJob *job);
void removeJobFromRunningList(std::shared_ptr<BatchJob> job);

void removeJobFromBatchQueue(std::shared_ptr<BatchJob> job);

void freeJobFromJobsList(BatchJob* job);
void removeBatchJobFromJobsList(std::shared_ptr<BatchJob> job);

int main() override;

Expand All @@ -250,10 +258,6 @@ namespace wrench {

void terminateRunningStandardJob(StandardJob *job);

std::map<std::string, std::tuple<unsigned long, double>> scheduleOnHosts(std::string host_selection_algorithm,
unsigned long, unsigned long, double);

BatchJob *pickJobForScheduling(std::string);

//Terminate the batch service (this is usually for pilot jobs when they act as a batch service)
void cleanup(bool has_returned_from_main, int return_value) override;
Expand All @@ -277,7 +281,7 @@ namespace wrench {
void processPilotJobTerminationRequest(PilotJob *job, std::string answer_mailbox);

// process a batch job tiemout event
void processAlarmJobTimeout(BatchJob *job);
void processAlarmJobTimeout(std::shared_ptr<BatchJob>job);

//Process pilot job timeout
void processPilotJobTimeout(PilotJob *job);
Expand All @@ -291,53 +295,16 @@ namespace wrench {
//send call back to the standard job submitters
void sendStandardJobFailureNotification(StandardJob *job, std::string job_id, std::shared_ptr<FailureCause> cause);

// Try to schedule a job
bool scheduleOneQueuedJob();

// process a job submission
void processJobSubmission(BatchJob *job, std::string answer_mailbox);

void processJobSubmission(std::shared_ptr<BatchJob>job, std::string answer_mailbox);

//start a job
void startJob(std::map<std::string, std::tuple<unsigned long, double>>, WorkflowJob *,
BatchJob *, unsigned long, unsigned long, unsigned long);


std::shared_ptr<BatchJob>, unsigned long, unsigned long, unsigned long);

//vector of network listeners (only useful when ENABLE_BATSCHED == on)

std::map<std::string,double> getStartTimeEstimatesForFCFS(std::set<std::tuple<std::string,unsigned int,unsigned int, double>>);


/** BATSCHED-related fields **/
std::set<std::shared_ptr<BatschedNetworkListener>> network_listeners;
pid_t pid;
unsigned long batsched_port;


#ifdef ENABLE_BATSCHED
friend class BatschedNetworkListener;

void startBatsched();
void stopBatsched();

std::map<std::string,double> getStartTimeEstimatesFromBatsched(std::set<std::tuple<std::string,unsigned int,unsigned int,double>>);

void startBatschedNetworkListener();

void notifyJobEventsToBatSched(std::string job_id, std::string status, std::string job_state,
std::string kill_reason, std::string even_type);

void appendJobInfoToCSVOutputFile(BatchJob *batch_job, std::string status);

void sendAllQueuedJobsToBatsched();

//process execute events from batsched
void processExecuteJobFromBatSched(std::string bat_sched_reply);

#endif // ENABLE_BATSCHED


};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ namespace wrench{
*/
class BatchComputeServiceJobRequestMessage : public BatchComputeServiceMessage {
public:
BatchComputeServiceJobRequestMessage(std::string answer_mailbox, BatchJob* job , double payload);
BatchComputeServiceJobRequestMessage(std::string answer_mailbox, std::shared_ptr<BatchJob> job , double payload);

/** @brief The mailbox to answer to */
std::string answer_mailbox;
/** @brief The batch job */
BatchJob* job;
std::shared_ptr<BatchJob> job;
};

/**
Expand All @@ -46,9 +46,9 @@ namespace wrench{
*/
class AlarmJobTimeOutMessage : public ServiceMessage {
public:
AlarmJobTimeOutMessage(BatchJob* job,double payload);
AlarmJobTimeOutMessage(std::shared_ptr<BatchJob> job,double payload);
/** @brief The batch job */
BatchJob* job;
std::shared_ptr<BatchJob> job;
};

#if 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ namespace wrench {
/**
* @brief The batch scheduling algorithm. Can be:
* - If ENABLE_BATSCHED is set to off / not set:
* - "FCFS": First Come First Serve
* - "fcfs": First Come First Serve
* - "conservative_bf": a home-grown implementation of FCFS with conservative backfilling, which only allocates resources at the node level
* - If ENABLE_BATSCHED is set to on:
* - whatever scheduling algorithm is supported by Batsched
* (by default: "conservative_bf", other options include
Expand All @@ -49,8 +50,8 @@ namespace wrench {

/**
* @brief The host selection algorithm. Can be:
* - If ENABLE_BATSCHED is set to off or not set: ignored
* - If ENABLE_BATSCHED is set to on:
* - If ENABLE_BATSCHED is set to on or if the BATCH_SCHEDULING_ALGORITHM is not fcfs: ignored
* - If ENABLE_BATSCHED is set to off or not set, and if the BATCH_SCHEDULING_ALGORITHM is fcfs:
* - FIRSTFIT (default)
* - BESTFIT
* - ROUNDROBIN
Expand Down Expand Up @@ -125,7 +126,7 @@ namespace wrench {
* if a job says it wants to run for (at most) 60 seconds, the system
* will actually assume the job wants to run for (at most) 60 + 5 seconds.
*/
DECLARE_PROPERTY_NAME(BATCH_RJMS_DELAY);
DECLARE_PROPERTY_NAME(BATCH_RJMS_PADDING_DELAY);

/** @brief Simulate computation as just a sleep instead of an actual compute thread. This is for scalability reason,
* and only simulation-valid
Expand Down

0 comments on commit d7a88a0

Please sign in to comment.