Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

118 debugimprove multi gpu scheduling #150

Open
wants to merge 27 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
24abe71
BUGFIX: Make crosspy optional
wlruys Oct 20, 2023
646e2a1
UPDATE: Build dependency changes
wlruys Oct 20, 2023
80617ef
UPDATE: Build dependency changes
wlruys Oct 20, 2023
d6556cb
UPDATE: Build dependency changes
wlruys Oct 20, 2023
ac574f2
COMPILATION: Removing clang support to fix Docker compilation
wlruys Oct 20, 2023
f5ae447
FIX: MacOS compilation
wlruys Oct 22, 2023
0c9b9e5
Migrate compilation to scikit-build-core (#142)
wlruys Oct 24, 2023
23f8fb7
Add HIP backend to runahead scheduling (#143)
wlruys Oct 24, 2023
0361ff0
BUGFIX: update makefile to new build system
Oct 25, 2023
0cfde64
Add Python linter/formatter. Tests on a Cython Linter. (#145)
wlruys Nov 1, 2023
95d18d3
Style: lint & reformat cython (#146)
wlruys Nov 1, 2023
432593f
ci: add workflow for cython/cpp linting (#147)
wlruys Nov 1, 2023
521bcf4
fix: keyword name change in get_stream
wlruys Nov 2, 2023
abf7a39
fix: update variable name in get_stream
wlruys Nov 2, 2023
015be5c
Update README.md
wlruys Nov 8, 2023
03f0d11
Update README.md
wlruys Nov 8, 2023
963d52f
adding priority queue
ShreyaTalati Nov 22, 2023
e2b22ca
resolving conflicts
ShreyaTalati Nov 22, 2023
8b536bb
resolving conflicts
ShreyaTalati Nov 22, 2023
68e93f2
adding variable values
ShreyaTalati Nov 22, 2023
ca41ad3
adding relative start time of the task
ShreyaTalati Nov 24, 2023
44f0877
style: apply ruff format
ShreyaTalati Nov 24, 2023
eb271b2
removing global start time, fixing comparator error
ShreyaTalati Nov 27, 2023
c10ec63
removing global start time, fixing comparator error
ShreyaTalati Nov 27, 2023
e9a0cbe
style: apply ruff format
ShreyaTalati Nov 27, 2023
605726e
adding task num for priority
ShreyaTalati Nov 28, 2023
086ecff
adding 1 to num_dependents
ShreyaTalati Dec 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions src/c/backend/include/containers.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*! @file containers.hpp

Check notice on line 1 in src/c/backend/include/containers.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

Run clang-format on src/c/backend/include/containers.hpp

File src/c/backend/include/containers.hpp does not conform to LLVM style guidelines. (lines 43, 44, 46, 48, 490, 491, 492, 493, 494, 495, 496)
* @brief Provides interface for thread-safe containers.
*
*
Expand All @@ -21,7 +21,7 @@
#include <mutex>
#include <string>

#include <assert.h>

Check warning on line 24 in src/c/backend/include/containers.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/src/c/backend/include/containers.hpp:24:10 [modernize-deprecated-headers]

inclusion of deprecated C++ header 'assert.h'; consider using 'cassert' instead
#include <chrono>

/* Header only implementations of thread-safe data structures and helpers */
Expand All @@ -35,7 +35,19 @@
// as well as std::atomic_flag.
// template <> auto constexpr is_atomic<std::atomic_flag> = true;

// struct taskStructure { // Structure declaration
// int priority; // Member (int variable)
// int wait_time;
// InnerTask *task; // Member (string variable)
// bool operator() <(taskStructure& b){
// if(priority < b.priority) {
// return true; // the order is correct and NO swapping of elements takes place
// }
// return false; // the order is NOT correct and swapping of elements takes place
// }
// };

template <typename T> class ProtectedVector {

Check warning on line 50 in src/c/backend/include/containers.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/src/c/backend/include/containers.hpp:50:29 [cppcoreguidelines-special-member-functions]

class 'ProtectedVector' defines a copy constructor but does not define a destructor, a copy assignment operator, a move constructor or a move assignment operator

private:
std::vector<T> vec = std::vector<T>();
Expand All @@ -52,14 +64,14 @@
this->length.exchange(other.length);
}

ProtectedVector(std::string name) { this->name = name; }

Check warning on line 67 in src/c/backend/include/containers.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/src/c/backend/include/containers.hpp:67:31 [performance-unnecessary-value-param]

the parameter 'name' is copied for each invocation but only used as a const reference; consider making it a const reference

ProtectedVector(std::string name, std::vector<T> vec) {

Check warning on line 69 in src/c/backend/include/containers.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/src/c/backend/include/containers.hpp:69:31 [performance-unnecessary-value-param]

the parameter 'name' is copied for each invocation but only used as a const reference; consider making it a const reference
this->name = name;
this->vec = vec;
}

ProtectedVector(std::string name, size_t size) {

Check warning on line 74 in src/c/backend/include/containers.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/src/c/backend/include/containers.hpp:74:31 [performance-unnecessary-value-param]

the parameter 'name' is copied for each invocation but only used as a const reference; consider making it a const reference
this->name = name;
this->vec.reserve(size);
}
Expand Down Expand Up @@ -112,25 +124,25 @@
this->length--;
}

int atomic_size() { return this->length.load(); }

Check warning on line 127 in src/c/backend/include/containers.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/src/c/backend/include/containers.hpp:127:7 [modernize-use-trailing-return-type]

use a trailing return type for this function

size_t size() {

Check warning on line 129 in src/c/backend/include/containers.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/src/c/backend/include/containers.hpp:129:10 [modernize-use-trailing-return-type]

use a trailing return type for this function
this->mtx.lock();
int size = this->vec.size();
this->mtx.unlock();
return size;
}

size_t size_unsafe() { return this->vec.size(); }

Check warning on line 136 in src/c/backend/include/containers.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/src/c/backend/include/containers.hpp:136:10 [modernize-use-trailing-return-type]

use a trailing return type for this function

T operator[](size_t i) {

Check warning on line 138 in src/c/backend/include/containers.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/src/c/backend/include/containers.hpp:138:5 [modernize-use-trailing-return-type]

use a trailing return type for this function
this->mtx.lock();
auto val = this->vec[i];
this->mtx.unlock();
return val;
}

T at(size_t i) {

Check warning on line 145 in src/c/backend/include/containers.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/src/c/backend/include/containers.hpp:145:5 [modernize-use-trailing-return-type]

use a trailing return type for this function
this->mtx.lock();
T val = this->vec.at(i);
this->mtx.unlock();
Expand Down Expand Up @@ -475,4 +487,160 @@
inline bool empty_unsafe() { return this->q.empty(); }
};

template <typename T> class ComparePriority {
public:
bool operator()(T a, T b){
if(a->priority < b->priority) {
return true; // the order is correct and NO swapping of elements takes place
}
return false; // the order is NOT correct and swapping of elements takes place
}
};

template <typename T> class ProtectedPriorityQueue {

private:
std::priority_queue<T, std::vector<T>, ComparePriority<T>> pq;
std::atomic<int> length{0};
std::mutex mtx;
std::string name;

public:
ProtectedPriorityQueue() = default;

ProtectedPriorityQueue(std::string name) {
this->mtx.lock();
this->name = name;
this->mtx.unlock();
}

ProtectedPriorityQueue(std::string name, std::priority_queue<T> pq) {
this->mtx.lock();
this->name = name;
this->pq = pq;
this->mtx.unlock();
}

ProtectedPriorityQueue(std::string name, size_t size) {
this->mtx.lock();
this->name = name;
this->pq.reserve(size);
this->mtx.unlock();
}

void lock() { this->mtx.lock(); }

void unlock() { this->mtx.unlock(); }

void push(T a) {
this->mtx.lock();
this->pq.push(a);
this->mtx.unlock();
this->length++;
}

inline void push_unsafe(T a) {
this->pq.push(a);
this->length++;
}

void push(std::vector<T> &a) {
this->mtx.lock();
for (auto val : a) {
this->push_unsafe(val);
}
this->mtx.unlock();
}

inline void push_unsafe(std::vector<T> &a) {
for (auto val : a) {
this->push_unsafe(val);
}
}

void pop() {
this->mtx.lock();
this->pq.pop();
this->mtx.unlock();
this->length--;
}

inline void pop_unsafe() {
this->pq.pop();
this->length--;
}

size_t atomic_size() { return this->length.load(); }

size_t size() {
this->mtx.lock();
int size = this->pq.size();
this->mtx.unlock();
return size;
}

size_t size_unsafe() { return this->q.size(); }

T operator[](size_t i) {
this->mtx.lock();
auto val = this->pq[i];
this->mtx.unlock();
return val;
}

T at(size_t i) {
this->mtx.lock();
T val = this->pq.at(i);
this->mtx.unlock();
return val;
}

inline T at_unsafe(size_t i) { return this->pq.at(i); }

T front() {
this->mtx.lock();
T val = this->pq.top();
this->mtx.unlock();
return val;
}

inline T front_unsafe() { return this->pq.top(); }

T front_and_pop() {
this->mtx.lock();
T val = this->front_unsafe();
this->pop_unsafe();
this->mtx.unlock();
return val;
}

inline T front_and_pop_unsafe() {
T val = this->front_unsafe();
this->pop_unsafe();
return val;
}

// Add implementation of clear()
// void clear() {
// this->mtx.lock();
// this->pq.clear();
// this->mtx.unlock();
// this->length = 0;
// }

// inline void clear_unsafe() {
// this->pq.clear();
// this->length = 0;
// }

bool empty() {
this->mtx.lock();
bool empty = this->pq.empty();
this->mtx.unlock();
return empty;
}

inline bool empty_unsafe() { return this->pq.empty(); }
};

#endif // PARLA_CONTAINERS_HPP
30 changes: 22 additions & 8 deletions src/c/backend/include/device_queues.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*! @file device_queues.hpp

Check notice on line 1 in src/c/backend/include/device_queues.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

Run clang-format on src/c/backend/include/device_queues.hpp

File src/c/backend/include/device_queues.hpp does not conform to LLVM style guidelines. (lines 8, 38, 42, 43, 44)
* @brief Provides interface for task queues and collections of task queues for
* multidevice tasks.
*/
Expand All @@ -7,6 +7,7 @@

#include "device_manager.hpp"
#include "runtime.hpp"
#include "containers.hpp"

// TODO(wlr): FIXME Change these back to smart pointers. I'm leaking memory
// here...
Expand All @@ -21,7 +22,7 @@
* queue supervises the phase of.
*/
template <typename ResourceCategory> class DeviceQueue {
using MixedQueue_t = TaskQueue;
using MixedQueue_t = PriorityTaskQueue;
using MDQueue_t = TaskQueue;

public:
Expand All @@ -33,17 +34,29 @@
*/
Device *get_device() { return device; }

/**
* Calculates priority for the task. The scheduling algorithm follows a low priority scheme.
* @param task the task to set priority for
*/
void set_priority(InnerTask *task) {
int num_dependents = task->dependents.size() + 1; // inveresly propotional -> more the # of dependents, earlier it should be scheduled, added 1 to handle if dependents are 0
int num_gpus_required = task->assigned_devices.size(); // directly propotional -> more the # of GPUs req, later it should be scheduled
int priority = total_num_tasks + (num_gpus_required / num_dependents); // normalize and change this
std::cout << total_num_tasks << std::endl;
task->set_priority(priority);
// critical path length to most recently spawned task
// estimated completion time
}

/**
* Enqueues a task on this device.
* @param task the task to enqueue
*/
void enqueue(InnerTask *task) {
// std::cout << "DeviceQueue::enqueue() - " << task->get_name() <<
// std::endl;

// std::cout << "Mixed Queue size: " << mixed_queue.size() << std::endl;
this->mixed_queue.push_back(task);
this->set_priority(task);
this->mixed_queue.push(task);
num_tasks++;
total_num_tasks++;
};

/**
Expand Down Expand Up @@ -115,7 +128,7 @@
std::cout << "Moving task to waiting queue: " << head->get_name()
<< std::endl;
waiting_queue.push_back(head);
mixed_queue.pop_front();
mixed_queue.pop();

// TODO(wlr): Should num_tasks include waiting tasks?
// (2)
Expand All @@ -142,7 +155,7 @@
InnerTask *task = front();
if (task != nullptr) {
// std::cout << "Popping task: " << task->get_name() << std::endl;
mixed_queue.pop_front();
mixed_queue.pop();
// Set removed status so task can be pruned from other queues
task->set_removed<ResourceCategory>(true);
num_tasks--;
Expand All @@ -158,6 +171,7 @@
MixedQueue_t mixed_queue;
MDQueue_t waiting_queue;
std::atomic<int> num_tasks{0};
std::atomic<int> total_num_tasks{0};
};

// TODO(wlr): I don't know what to name this.
Expand Down
1 change: 1 addition & 0 deletions src/c/backend/include/phases.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ class RuntimeReserver : virtual public SchedulerPhase {
}

void enqueue(InnerTask *task);
// void enqueue(taskStructure *task_with_priority);
void enqueue(std::vector<InnerTask *> &tasks);
void run(SchedulerPhase *next_phase);
size_t get_count();
Expand Down
16 changes: 16 additions & 0 deletions src/c/backend/include/runtime.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#ifndef PARLA_BACKEND_HPP

Check notice on line 1 in src/c/backend/include/runtime.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

Run clang-format on src/c/backend/include/runtime.hpp

File src/c/backend/include/runtime.hpp does not conform to LLVM style guidelines. (lines 40, 70, 71, 73, 75, 1029, 1088, 1127, 1128, 1132, 1133, 1139, 1140, 1167)
#define PARLA_BACKEND_HPP

/**
Expand Down Expand Up @@ -44,6 +44,7 @@
#include "profiling.hpp"
#include "resource_requirements.hpp"
#include "memory_manager.hpp"
// # include "task.hpp"

using namespace std::chrono_literals;
using namespace parray;
Expand All @@ -59,10 +60,25 @@
class InnerWorker;
class InnerScheduler;

// struct taskStructure { // Structure declaration
// int priority; // Member (int variable)
// int wait_time;
// InnerTask *task; // Member (string variable)
// // bool operator()< (const taskStructure& b)
// // {
// // return a.priority < b.priority;
// // // if(priority < b.priority) {
// // // return true; // the order is correct and NO swapping of elements takes place
// // // }
// // // return false; // the order is NOT correct and swapping of elements takes place
// // }
// };

// Type Aliases for common containers
using WorkerQueue = ProtectedQueue<InnerWorker *>;
using WorkerList = ProtectedVector<InnerWorker *>;
using TaskQueue = ProtectedQueue<InnerTask *>;
using PriorityTaskQueue = ProtectedPriorityQueue<InnerTask *>;
using TaskList = ProtectedVector<InnerTask *>;
using SpaceList = ProtectedVector<TaskBarrier *>;
using PointerList = ProtectedVector<uintptr_t>;
Expand Down
1 change: 0 additions & 1 deletion src/python/parla/cython/cyparray.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from .cyparray cimport InnerPArray
from .cyparray_state cimport CyPArrayState

# a Cython wrapper class around C++ PArray
cdef class CyPArray:

Expand Down
1 change: 0 additions & 1 deletion src/python/parla/cython/device_manager.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class PrintableFrozenSet(frozenset):
def __repr__(self):
return self.get_name()


# TODO(wlr): - Allow device manager to initialize non-contiguous gpu ids.
# TODO(wlr): - Provide a way to iterate over these real device ids

Expand Down
2 changes: 1 addition & 1 deletion src/python/parla/cython/tasks.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1786,4 +1786,4 @@ class BackendTaskSpace(TaskSpace):
return return_list

def wait(self):
self.inner_space.wait()
self.inner_space.wait()
Loading