Skip to content

Commit

Permalink
[libc] Support concurrent RPC port access on the GPU
Browse files Browse the repository at this point in the history
Previously we used a single port to implement the RPC. This was
sufficient for single threaded tests but can potentially cause deadlocks
when using multiple threads. The reason for this is that GPUs make no
forward progress guarantees. Therefore one group of threads waiting on
another group of threads can spin forever because there is no guarantee
that the other threads will continue executing. The typical workaround
for this is to allocate enough memory that a sufficiently large number
of work groups can make progress. As long as this number is somewhat
close to the amount of total concurrency we can obtain reliable
execution around a shared resource.

This patch enables using multiple ports by widening the arrays to a
predetermined size and indexes into them. Empty ports are currently
obtained via a trivial linker scan. This should be imporoved in the
future for performance reasons. Portions of D148191 were applied to
achieve parallel support.

Depends on D149581

Reviewed By: JonChesterfield

Differential Revision: https://reviews.llvm.org/D149598
  • Loading branch information
jhuber6 committed May 5, 2023
1 parent a1be6f0 commit aea866c
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 122 deletions.
162 changes: 91 additions & 71 deletions libc/src/__support/RPC/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ struct alignas(64) Packet {
Payload payload;
};

// TODO: This should be configured by the server and passed in. The general rule
// of thumb is that you should have at least as many ports as possible
// concurrent work items on the GPU to mitigate the lack offorward
// progress guarantees on the GPU.
constexpr uint64_t default_port_count = 64;

/// A common process used to synchronize communication between a client and a
/// server. The process contains an inbox and an outbox used for signaling
/// ownership of the shared buffer between both sides.
Expand Down Expand Up @@ -96,22 +102,31 @@ template <bool InvertInbox> struct Process {
LIBC_INLINE Process &operator=(const Process &) = default;
LIBC_INLINE ~Process() = default;

uint64_t port_count;
uint32_t lane_size;
cpp::Atomic<uint32_t> *lock;
cpp::Atomic<uint32_t> *inbox;
cpp::Atomic<uint32_t> *outbox;
Packet *buffer;
Packet *packet;

/// Initialize the communication channels.
LIBC_INLINE void reset(uint32_t lane_size, void *lock, void *inbox,
void *outbox, void *buffer) {
*this = {
lane_size,
reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
reinterpret_cast<Packet *>(buffer),
};
LIBC_INLINE void reset(uint64_t port_count, uint32_t lane_size, void *lock,
void *inbox, void *outbox, void *packet) {
*this = {port_count,
lane_size,
reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
reinterpret_cast<Packet *>(packet)};
}

/// The length of the packet is flexible because the server needs to look up
/// the lane size at runtime. This helper indexes at the proper offset.
LIBC_INLINE Packet &get_packet(uint64_t index) {
return *reinterpret_cast<Packet *>(
reinterpret_cast<uint8_t *>(packet) +
index * align_up(sizeof(Header) + lane_size * sizeof(Buffer),
alignof(Packet)));
}

/// Inverting the bits loaded from the inbox in exactly one of the pair of
Expand Down Expand Up @@ -190,25 +205,25 @@ template <bool InvertInbox> struct Process {

/// Invokes a function accross every active buffer across the total lane size.
LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn,
uint32_t index) {
Packet &packet) {
if constexpr (is_process_gpu()) {
fn(&buffer[index].payload.slot[gpu::get_lane_id()]);
fn(&packet.payload.slot[gpu::get_lane_id()]);
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
if (buffer[index].header.mask & 1ul << i)
fn(&buffer[index].payload.slot[i]);
if (packet.header.mask & 1ul << i)
fn(&packet.payload.slot[i]);
}
}

/// Alternate version that also provides the index of the current lane.
LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn,
uint32_t index) {
Packet &packet) {
if constexpr (is_process_gpu()) {
fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
fn(&packet.payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
if (buffer[index].header.mask & 1ul << i)
fn(&buffer[index].payload.slot[i], i);
if (packet.header.mask & 1ul << i)
fn(&packet.payload.slot[i], i);
}
}
};
Expand All @@ -234,7 +249,7 @@ template <bool T> struct Port {
template <typename A> LIBC_INLINE void recv_n(A alloc);

LIBC_INLINE uint16_t get_opcode() const {
return process.buffer[index].header.opcode;
return process.get_packet(index).header.opcode;
}

LIBC_INLINE void close() { process.unlock(lane_mask, index); }
Expand Down Expand Up @@ -281,7 +296,7 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) {
}

// Apply the \p fill function to initialize the buffer and release the memory.
process.invoke_rpc(fill, index);
process.invoke_rpc(fill, process.get_packet(index));
out = !out;
atomic_thread_fence(cpp::MemoryOrder::RELEASE);
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
Expand All @@ -299,7 +314,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);

// Apply the \p use function to read the memory out of the buffer.
process.invoke_rpc(use, index);
process.invoke_rpc(use, process.get_packet(index));
out = !out;
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
}
Expand Down Expand Up @@ -340,7 +355,7 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
inline_memcpy(buffer->data, ptr + idx, len);
});
}
gpu::sync_lane(process.buffer[index].header.mask);
gpu::sync_lane(process.get_packet(index).header.mask);
}

/// Receives an arbitrarily sized data buffer across the shared channel in
Expand Down Expand Up @@ -396,32 +411,34 @@ LIBC_INLINE void Port<T>::recv_n(A alloc) {
/// participating thread.
[[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port>
Client::try_open(uint16_t opcode) {
constexpr uint64_t index = 0;
const uint64_t lane_mask = gpu::get_lane_mask();

// Attempt to acquire the lock on this index.
if (!try_lock(lane_mask, index))
return cpp::nullopt;

// The mailbox state must be read with the lock held.
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);

uint32_t in = load_inbox(index);
uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);

// Once we acquire the index we need to check if we are in a valid sending
// state.
if (buffer_unavailable(in, out)) {
unlock(lane_mask, index);
return cpp::nullopt;
}
// Perform a naive linear scan for a port that can be opened to send data.
for (uint64_t index = 0; index < port_count; ++index) {
// Attempt to acquire the lock on this index.
uint64_t lane_mask = gpu::get_lane_mask();
if (!try_lock(lane_mask, index))
continue;

// The mailbox state must be read with the lock held.
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);

uint32_t in = load_inbox(index);
uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);

// Once we acquire the index we need to check if we are in a valid sending
// state.
if (buffer_unavailable(in, out)) {
unlock(lane_mask, index);
continue;
}

if (is_first_lane(lane_mask)) {
buffer[index].header.opcode = opcode;
buffer[index].header.mask = lane_mask;
if (is_first_lane(lane_mask)) {
get_packet(index).header.opcode = opcode;
get_packet(index).header.mask = lane_mask;
}
gpu::sync_lane(lane_mask);
return Port(*this, lane_mask, index, out);
}
gpu::sync_lane(lane_mask);
return Port(*this, lane_mask, index, out);
return cpp::nullopt;
}

LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
Expand All @@ -436,33 +453,36 @@ LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
/// port if it has a pending receive operation
[[clang::convergent]] LIBC_INLINE cpp::optional<Server::Port>
Server::try_open() {
constexpr uint64_t index = 0;
const uint64_t lane_mask = gpu::get_lane_mask();

uint32_t in = load_inbox(index);
uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);

// The server is passive, if there is no work pending don't bother
// opening a port.
if (buffer_unavailable(in, out))
return cpp::nullopt;

// Attempt to acquire the lock on this index.
if (!try_lock(lane_mask, index))
return cpp::nullopt;

// The mailbox state must be read with the lock held.
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);

in = load_inbox(index);
out = outbox[index].load(cpp::MemoryOrder::RELAXED);
// Perform a naive linear scan for a port that has a pending request.
for (uint64_t index = 0; index < port_count; ++index) {
uint32_t in = load_inbox(index);
uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);

// The server is passive, if there is no work pending don't bother
// opening a port.
if (buffer_unavailable(in, out))
continue;

// Attempt to acquire the lock on this index.
uint64_t lane_mask = gpu::get_lane_mask();
// Attempt to acquire the lock on this index.
if (!try_lock(lane_mask, index))
continue;

// The mailbox state must be read with the lock held.
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);

in = load_inbox(index);
out = outbox[index].load(cpp::MemoryOrder::RELAXED);

if (buffer_unavailable(in, out)) {
unlock(lane_mask, index);
continue;
}

if (buffer_unavailable(in, out)) {
unlock(lane_mask, index);
return cpp::nullopt;
return Port(*this, lane_mask, index, out);
}

return Port(*this, lane_mask, index, out);
return cpp::nullopt;
}

LIBC_INLINE Server::Port Server::open() {
Expand Down
5 changes: 5 additions & 0 deletions libc/src/__support/RPC/rpc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ LIBC_INLINE constexpr bool is_process_gpu() {
#endif
}

/// Return \p val aligned "upwards" according to \p align.
template <typename V, typename A> LIBC_INLINE V align_up(V val, A align) {
return ((val + V(align) - 1) / V(align)) * V(align);
}

} // namespace rpc
} // namespace __llvm_libc

Expand Down
5 changes: 3 additions & 2 deletions libc/startup/gpu/amdgpu/start.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ extern "C" int main(int argc, char **argv, char **envp);

namespace __llvm_libc {

static cpp::Atomic<uint32_t> lock = 0;
static cpp::Atomic<uint32_t> lock[rpc::default_port_count] = {0};

extern "C" uintptr_t __init_array_start[];
extern "C" uintptr_t __init_array_end[];
Expand Down Expand Up @@ -43,7 +43,8 @@ extern "C" [[gnu::visibility("protected"), clang::amdgpu_kernel]] void
_begin(int argc, char **argv, char **env, void *in, void *out, void *buffer) {
// We need to set up the RPC client first in case any of the constructors
// require it.
__llvm_libc::rpc::client.reset(__llvm_libc::gpu::get_lane_size(),
__llvm_libc::rpc::client.reset(__llvm_libc::rpc::default_port_count,
__llvm_libc::gpu::get_lane_size(),
&__llvm_libc::lock, in, out, buffer);

// We want the fini array callbacks to be run after other atexit
Expand Down
5 changes: 3 additions & 2 deletions libc/startup/gpu/nvptx/start.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ extern "C" int main(int argc, char **argv, char **envp);

namespace __llvm_libc {

static cpp::Atomic<uint32_t> lock = 0;
static cpp::Atomic<uint32_t> lock[rpc::default_port_count] = {0};

extern "C" {
// Nvidia's 'nvlink' linker does not provide these symbols. We instead need
Expand Down Expand Up @@ -47,7 +47,8 @@ extern "C" [[gnu::visibility("protected"), clang::nvptx_kernel]] void
_begin(int argc, char **argv, char **env, void *in, void *out, void *buffer) {
// We need to set up the RPC client first in case any of the constructors
// require it.
__llvm_libc::rpc::client.reset(__llvm_libc::gpu::get_lane_size(),
__llvm_libc::rpc::client.reset(__llvm_libc::rpc::default_port_count,
__llvm_libc::gpu::get_lane_size(),
&__llvm_libc::lock, in, out, buffer);

// We want the fini array callbacks to be run after other atexit
Expand Down
72 changes: 38 additions & 34 deletions libc/utils/gpu/loader/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,47 +19,51 @@

static __llvm_libc::rpc::Server server;

static __llvm_libc::cpp::Atomic<uint32_t> lock;
static __llvm_libc::cpp::Atomic<uint32_t>
lock[__llvm_libc::rpc::default_port_count] = {0};

/// Queries the RPC client at least once and performs server-side work if there
/// are any active requests.
void handle_server() {
auto port = server.try_open();
if (!port)
return;
// Continue servicing the client until there is no work left and we return.
for (;;) {
auto port = server.try_open();
if (!port)
return;

switch (port->get_opcode()) {
case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: {
uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0};
char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr};
port->recv_n([&](uint64_t size, uint32_t id) {
str_size[id] = size;
strs[id] = new char[size];
return strs[id];
});
for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) {
if (strs[i]) {
fwrite(strs[i], str_size[i], 1, stderr);
delete[] strs[i];
switch (port->get_opcode()) {
case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: {
uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0};
char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr};
port->recv_n([&](uint64_t size, uint32_t id) {
str_size[id] = size;
strs[id] = new char[size];
return strs[id];
});
for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) {
if (strs[i]) {
fwrite(strs[i], str_size[i], 1, stderr);
delete[] strs[i];
}
}
break;
}
break;
}
case __llvm_libc::rpc::Opcode::EXIT: {
port->recv([](__llvm_libc::rpc::Buffer *buffer) {
exit(reinterpret_cast<uint32_t *>(buffer->data)[0]);
});
break;
}
case __llvm_libc::rpc::Opcode::TEST_INCREMENT: {
port->recv_and_send([](__llvm_libc::rpc::Buffer *buffer) {
reinterpret_cast<uint64_t *>(buffer->data)[0] += 1;
});
break;
}
default:
port->recv([](__llvm_libc::rpc::Buffer *buffer) {});
case __llvm_libc::rpc::Opcode::EXIT: {
port->recv([](__llvm_libc::rpc::Buffer *buffer) {
exit(reinterpret_cast<uint32_t *>(buffer->data)[0]);
});
break;
}
case __llvm_libc::rpc::Opcode::TEST_INCREMENT: {
port->recv_and_send([](__llvm_libc::rpc::Buffer *buffer) {
reinterpret_cast<uint64_t *>(buffer->data)[0] += 1;
});
break;
}
default:
port->recv([](__llvm_libc::rpc::Buffer *buffer) {});
}
port->close();
}
port->close();
}
#endif

0 comments on commit aea866c

Please sign in to comment.