152 changes: 79 additions & 73 deletions libc/src/__support/RPC/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,6 @@ struct Header {
uint16_t opcode;
};

/// The data payload for the associated packet. We provide enough space for each
/// thread in the cooperating lane to have a buffer.
template <uint32_t lane_size = gpu::LANE_SIZE> struct Payload {
Buffer slot[lane_size];
};

/// A packet used to share data between the client and server across an entire
/// lane. We use a lane as the minimum granularity for execution.
template <uint32_t lane_size = gpu::LANE_SIZE> struct alignas(64) Packet {
Header header;
Payload<lane_size> payload;
};

/// The maximum number of parallel ports that the RPC interface can support.
constexpr uint64_t MAX_PORT_COUNT = 4096;

Expand All @@ -71,7 +58,7 @@ constexpr uint64_t MAX_PORT_COUNT = 4096;
/// - The client will always start with a 'send' operation.
/// - The server will always start with a 'recv' operation.
/// - Every 'send' or 'recv' call is mirrored by the other process.
template <bool Invert, typename Packet> struct Process {
template <bool Invert> struct Process {
LIBC_INLINE Process() = default;
LIBC_INLINE Process(const Process &) = delete;
LIBC_INLINE Process &operator=(const Process &) = delete;
Expand All @@ -82,7 +69,8 @@ template <bool Invert, typename Packet> struct Process {
uint32_t port_count = 0;
cpp::Atomic<uint32_t> *inbox = nullptr;
cpp::Atomic<uint32_t> *outbox = nullptr;
Packet *packet = nullptr;
Header *header = nullptr;
Buffer *packet = nullptr;

static constexpr uint64_t NUM_BITS_IN_WORD = sizeof(uint32_t) * 8;
cpp::Atomic<uint32_t> lock[MAX_PORT_COUNT / NUM_BITS_IN_WORD] = {0};
Expand All @@ -92,7 +80,9 @@ template <bool Invert, typename Packet> struct Process {
advance(buffer, inbox_offset(port_count)))),
outbox(reinterpret_cast<cpp::Atomic<uint32_t> *>(
advance(buffer, outbox_offset(port_count)))),
packet(reinterpret_cast<Packet *>(
header(reinterpret_cast<Header *>(
advance(buffer, header_offset(port_count)))),
packet(reinterpret_cast<Buffer *>(
advance(buffer, buffer_offset(port_count)))) {}

/// Allocate a memory buffer sufficient to store the following equivalent
Expand All @@ -101,10 +91,12 @@ template <bool Invert, typename Packet> struct Process {
/// struct Equivalent {
/// Atomic<uint32_t> primary[port_count];
/// Atomic<uint32_t> secondary[port_count];
/// Packet buffer[port_count];
/// Header header[port_count];
/// Buffer packet[port_count][lane_size];
/// };
LIBC_INLINE static constexpr uint64_t allocation_size(uint32_t port_count) {
return buffer_offset(port_count) + buffer_bytes(port_count);
LIBC_INLINE static constexpr uint64_t allocation_size(uint32_t port_count,
uint32_t lane_size) {
return buffer_offset(port_count) + buffer_bytes(port_count, lane_size);
}

/// Retrieve the inbox state from memory shared between processes.
Expand Down Expand Up @@ -144,6 +136,13 @@ template <bool Invert, typename Packet> struct Process {
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
}

/// The packet is a linearly allocated array of buffers used to communicate
/// with the other process. This function returns the appropriate slot in this
/// array such that the process can operate on an entire warp or wavefront.
LIBC_INLINE Buffer *get_packet(uint32_t index, uint32_t lane_size) {
return &packet[index * lane_size];
}

/// Determines if this process needs to wait for ownership of the buffer. We
/// invert the condition on one of the processes to indicate that if one
/// process owns the buffer then the other does not.
Expand Down Expand Up @@ -219,8 +218,9 @@ template <bool Invert, typename Packet> struct Process {
}

/// Number of bytes to allocate for the buffer containing the packets.
LIBC_INLINE static constexpr uint64_t buffer_bytes(uint32_t port_count) {
return port_count * sizeof(Packet);
LIBC_INLINE static constexpr uint64_t buffer_bytes(uint32_t port_count,
uint32_t lane_size) {
return port_count * lane_size * sizeof(Buffer);
}

/// Offset of the inbox in memory. This is the same as the outbox if inverted.
Expand All @@ -233,9 +233,15 @@ template <bool Invert, typename Packet> struct Process {
return Invert ? 0 : mailbox_bytes(port_count);
}

/// Offset of the buffer containing the packets after the inbox and outbox.
LIBC_INLINE static constexpr uint64_t header_offset(uint32_t port_count) {
return align_up(2 * mailbox_bytes(port_count), alignof(Header));
}

/// Offset of the buffer containing the packets after the inbox and outbox.
LIBC_INLINE static constexpr uint64_t buffer_offset(uint32_t port_count) {
return align_up(2 * mailbox_bytes(port_count), alignof(Packet));
return align_up(header_offset(port_count) + port_count * sizeof(Header),
alignof(Buffer));
}

/// Conditionally set the n-th bit in the atomic bitfield.
Expand All @@ -262,39 +268,39 @@ template <bool Invert, typename Packet> struct Process {
};

/// Invokes a function accross every active buffer across the total lane size.
template <uint32_t lane_size>
static LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn,
Packet<lane_size> &packet) {
uint32_t lane_size, uint64_t lane_mask,
Buffer *slot) {
if constexpr (is_process_gpu()) {
fn(&packet.payload.slot[gpu::get_lane_id()]);
fn(&slot[gpu::get_lane_id()]);
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
if (packet.header.mask & 1ul << i)
fn(&packet.payload.slot[i]);
if (lane_mask & (1ul << i))
fn(&slot[i]);
}
}

/// Alternate version that also provides the index of the current lane.
template <uint32_t lane_size>
static LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn,
Packet<lane_size> &packet) {
uint32_t lane_size, uint64_t lane_mask,
Buffer *slot) {
if constexpr (is_process_gpu()) {
fn(&packet.payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
fn(&slot[gpu::get_lane_id()], gpu::get_lane_id());
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
if (packet.header.mask & 1ul << i)
fn(&packet.payload.slot[i], i);
if (lane_mask & (1ul << i))
fn(&slot[i], i);
}
}

/// The port provides the interface to communicate between the multiple
/// processes. A port is conceptually an index into the memory provided by the
/// underlying process that is guarded by a lock bit.
template <bool T, typename S> struct Port {
LIBC_INLINE Port(Process<T, S> &process, uint64_t lane_mask, uint32_t index,
uint32_t out)
: process(process), lane_mask(lane_mask), index(index), out(out),
receive(false), owns_buffer(true) {}
template <bool T> struct Port {
LIBC_INLINE Port(Process<T> &process, uint64_t lane_mask, uint32_t lane_size,
uint32_t index, uint32_t out)
: process(process), lane_mask(lane_mask), lane_size(lane_size),
index(index), out(out), receive(false), owns_buffer(true) {}
LIBC_INLINE ~Port() = default;

private:
Expand All @@ -305,7 +311,7 @@ template <bool T, typename S> struct Port {

friend struct Client;
template <uint32_t U> friend struct Server;
friend class cpp::optional<Port<T, S>>;
friend class cpp::optional<Port<T>>;

public:
template <typename U> LIBC_INLINE void recv(U use);
Expand All @@ -319,7 +325,7 @@ template <bool T, typename S> struct Port {
LIBC_INLINE void recv_n(void **dst, uint64_t *size, A &&alloc);

LIBC_INLINE uint16_t get_opcode() const {
return process.packet[index].header.opcode;
return process.header[index].opcode;
}

LIBC_INLINE uint16_t get_index() const { return index; }
Expand All @@ -333,8 +339,9 @@ template <bool T, typename S> struct Port {
}

private:
Process<T, S> &process;
Process<T> &process;
uint64_t lane_mask;
uint32_t lane_size;
uint32_t index;
uint32_t out;
bool receive;
Expand All @@ -351,15 +358,14 @@ struct Client {
LIBC_INLINE Client(uint32_t port_count, void *buffer)
: process(port_count, buffer) {}

using Port = rpc::Port<false, Packet<gpu::LANE_SIZE>>;
using Port = rpc::Port<false>;
template <uint16_t opcode> LIBC_INLINE Port open();

private:
Process<false, Packet<gpu::LANE_SIZE>> process;
Process<false> process;
};
static_assert(cpp::is_trivially_copyable<Client>::value &&
sizeof(Process<false, Packet<1>>) ==
sizeof(Process<false, Packet<32>>),
sizeof(Process<true>) == sizeof(Process<false>),
"The client is not trivially copyable from the server");

/// The RPC server used to respond to the client.
Expand All @@ -372,38 +378,35 @@ template <uint32_t lane_size> struct Server {
LIBC_INLINE Server(uint32_t port_count, void *buffer)
: process(port_count, buffer) {}

using Port = rpc::Port<true, Packet<lane_size>>;
using Port = rpc::Port<true>;
LIBC_INLINE cpp::optional<Port> try_open(uint32_t start = 0);
LIBC_INLINE Port open();

LIBC_INLINE static uint64_t allocation_size(uint32_t port_count) {
return Process<true, Packet<lane_size>>::allocation_size(port_count);
return Process<true>::allocation_size(port_count, lane_size);
}

private:
Process<true, Packet<lane_size>> process;
Process<true> process;
};

/// Applies \p fill to the shared buffer and initiates a send operation.
template <bool T, typename S>
template <typename F>
LIBC_INLINE void Port<T, S>::send(F fill) {
template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) {
uint32_t in = owns_buffer ? out ^ T : process.load_inbox(lane_mask, index);

// We need to wait until we own the buffer before sending.
process.wait_for_ownership(lane_mask, index, out, in);

// Apply the \p fill function to initialize the buffer and release the memory.
invoke_rpc(fill, process.packet[index]);
invoke_rpc(fill, lane_size, process.header[index].mask,
process.get_packet(index, lane_size));
out = process.invert_outbox(index, out);
owns_buffer = false;
receive = false;
}

/// Applies \p use to the shared buffer and acknowledges the send.
template <bool T, typename S>
template <typename U>
LIBC_INLINE void Port<T, S>::recv(U use) {
template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {
// We only exchange ownership of the buffer during a receive if we are waiting
// for a previous receive to finish.
if (receive) {
Expand All @@ -417,42 +420,43 @@ LIBC_INLINE void Port<T, S>::recv(U use) {
process.wait_for_ownership(lane_mask, index, out, in);

// Apply the \p use function to read the memory out of the buffer.
invoke_rpc(use, process.packet[index]);
invoke_rpc(use, lane_size, process.header[index].mask,
process.get_packet(index, lane_size));
receive = true;
owns_buffer = true;
}

/// Combines a send and receive into a single function.
template <bool T, typename S>
template <bool T>
template <typename F, typename U>
LIBC_INLINE void Port<T, S>::send_and_recv(F fill, U use) {
LIBC_INLINE void Port<T>::send_and_recv(F fill, U use) {
send(fill);
recv(use);
}

/// Combines a receive and send operation into a single function. The \p work
/// function modifies the buffer in-place and the send is only used to initiate
/// the copy back.
template <bool T, typename S>
template <bool T>
template <typename W>
LIBC_INLINE void Port<T, S>::recv_and_send(W work) {
LIBC_INLINE void Port<T>::recv_and_send(W work) {
recv(work);
send([](Buffer *) { /* no-op */ });
}

/// Helper routine to simplify the interface when sending from the GPU using
/// thread private pointers to the underlying value.
template <bool T, typename S>
LIBC_INLINE void Port<T, S>::send_n(const void *src, uint64_t size) {
template <bool T>
LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
const void **src_ptr = &src;
uint64_t *size_ptr = &size;
send_n(src_ptr, size_ptr);
}

/// Sends an arbitrarily sized data buffer \p src across the shared channel in
/// multiples of the packet length.
template <bool T, typename S>
LIBC_INLINE void Port<T, S>::send_n(const void *const *src, uint64_t *size) {
template <bool T>
LIBC_INLINE void Port<T>::send_n(const void *const *src, uint64_t *size) {
uint64_t num_sends = 0;
send([&](Buffer *buffer, uint32_t id) {
reinterpret_cast<uint64_t *>(buffer->data)[0] = lane_value(size, id);
Expand All @@ -465,7 +469,7 @@ LIBC_INLINE void Port<T, S>::send_n(const void *const *src, uint64_t *size) {
rpc_memcpy(&buffer->data[1], lane_value(src, id), len);
});
uint64_t idx = sizeof(Buffer::data) - sizeof(uint64_t);
uint64_t mask = process.packet[index].header.mask;
uint64_t mask = process.header[index].mask;
while (gpu::ballot(mask, idx < num_sends)) {
send([=](Buffer *buffer, uint32_t id) {
uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data)
Expand All @@ -481,9 +485,9 @@ LIBC_INLINE void Port<T, S>::send_n(const void *const *src, uint64_t *size) {
/// Receives an arbitrarily sized data buffer across the shared channel in
/// multiples of the packet length. The \p alloc function is called with the
/// size of the data so that we can initialize the size of the \p dst buffer.
template <bool T, typename S>
template <bool T>
template <typename A>
LIBC_INLINE void Port<T, S>::recv_n(void **dst, uint64_t *size, A &&alloc) {
LIBC_INLINE void Port<T>::recv_n(void **dst, uint64_t *size, A &&alloc) {
uint64_t num_recvs = 0;
recv([&](Buffer *buffer, uint32_t id) {
lane_value(size, id) = reinterpret_cast<uint64_t *>(buffer->data)[0];
Expand All @@ -498,7 +502,7 @@ LIBC_INLINE void Port<T, S>::recv_n(void **dst, uint64_t *size, A &&alloc) {
rpc_memcpy(lane_value(dst, id), &buffer->data[1], len);
});
uint64_t idx = sizeof(Buffer::data) - sizeof(uint64_t);
uint64_t mask = process.packet[index].header.mask;
uint64_t mask = process.header[index].mask;
while (gpu::ballot(mask, idx < num_recvs)) {
recv([=](Buffer *buffer, uint32_t id) {
uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data)
Expand All @@ -515,8 +519,10 @@ LIBC_INLINE void Port<T, S>::recv_n(void **dst, uint64_t *size, A &&alloc) {
/// only open a port if we find an index that is in a valid sending state. That
/// is, there are send operations pending that haven't been serviced on this
/// port. Each port instance uses an associated \p opcode to tell the server
/// what to do.
template <uint16_t opcode> LIBC_INLINE Client::Port Client::open() {
/// what to do. The Client interface provides the appropriate lane size to the
/// port using the platform's returned value.
template <uint16_t opcode>
[[clang::convergent]] LIBC_INLINE Client::Port Client::open() {
// Repeatedly perform a naive linear scan for a port that can be opened to
// send data.
for (uint32_t index = gpu::get_cluster_id();; ++index) {
Expand All @@ -540,11 +546,11 @@ template <uint16_t opcode> LIBC_INLINE Client::Port Client::open() {
}

if (gpu::is_first_lane(lane_mask)) {
process.packet[index].header.opcode = opcode;
process.packet[index].header.mask = lane_mask;
process.header[index].opcode = opcode;
process.header[index].mask = lane_mask;
}
gpu::sync_lane(lane_mask);
return Port(process, lane_mask, index, out);
return Port(process, lane_mask, gpu::get_lane_size(), index, out);
}
}

Expand Down Expand Up @@ -577,7 +583,7 @@ template <uint32_t lane_size>
continue;
}

return Port(process, lane_mask, index, out);
return Port(process, lane_mask, lane_size, index, out);
}
return cpp::nullopt;
}
Expand Down
10 changes: 3 additions & 7 deletions libc/test/src/__support/RPC/rpc_smoke_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,16 @@
namespace {
enum { lane_size = 8, port_count = 4 };

struct Packet {
uint64_t unused;
};

using ProcAType = LIBC_NAMESPACE::rpc::Process<false, Packet>;
using ProcBType = LIBC_NAMESPACE::rpc::Process<true, Packet>;
using ProcAType = LIBC_NAMESPACE::rpc::Process<false>;
using ProcBType = LIBC_NAMESPACE::rpc::Process<true>;

static_assert(ProcAType::inbox_offset(port_count) ==
ProcBType::outbox_offset(port_count));

static_assert(ProcAType::outbox_offset(port_count) ==
ProcBType::inbox_offset(port_count));

enum { alloc_size = ProcAType::allocation_size(port_count) };
enum { alloc_size = ProcAType::allocation_size(port_count, 1) };

alignas(64) char buffer[alloc_size] = {0};
} // namespace
Expand Down
56 changes: 18 additions & 38 deletions libc/utils/gpu/server/rpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,62 +396,42 @@ const void *rpc_get_client_buffer(uint32_t device_id) {

uint64_t rpc_get_client_size() { return sizeof(rpc::Client); }

using ServerPort = std::variant<rpc::Server<1>::Port *, rpc::Server<32>::Port *,
rpc::Server<64>::Port *>;
using ServerPort = std::variant<rpc::Server<0>::Port *>;

ServerPort get_port(rpc_port_t ref) {
if (ref.lane_size == 1)
return reinterpret_cast<rpc::Server<1>::Port *>(ref.handle);
else if (ref.lane_size == 32)
return reinterpret_cast<rpc::Server<32>::Port *>(ref.handle);
else if (ref.lane_size == 64)
return reinterpret_cast<rpc::Server<64>::Port *>(ref.handle);
else
__builtin_unreachable();
return reinterpret_cast<rpc::Server<0>::Port *>(ref.handle);
}

void rpc_send(rpc_port_t ref, rpc_port_callback_ty callback, void *data) {
auto port = get_port(ref);
std::visit(
[=](auto &port) {
port->send([=](rpc::Buffer *buffer) {
callback(reinterpret_cast<rpc_buffer_t *>(buffer), data);
});
},
port);
auto port = reinterpret_cast<rpc::Server<0>::Port *>(ref.handle);
port->send([=](rpc::Buffer *buffer) {
callback(reinterpret_cast<rpc_buffer_t *>(buffer), data);
});
}

void rpc_send_n(rpc_port_t ref, const void *const *src, uint64_t *size) {
auto port = get_port(ref);
std::visit([=](auto &port) { port->send_n(src, size); }, port);
auto port = reinterpret_cast<rpc::Server<0>::Port *>(ref.handle);
port->send_n(src, size);
}

void rpc_recv(rpc_port_t ref, rpc_port_callback_ty callback, void *data) {
auto port = get_port(ref);
std::visit(
[=](auto &port) {
port->recv([=](rpc::Buffer *buffer) {
callback(reinterpret_cast<rpc_buffer_t *>(buffer), data);
});
},
port);
auto port = reinterpret_cast<rpc::Server<0>::Port *>(ref.handle);
port->recv([=](rpc::Buffer *buffer) {
callback(reinterpret_cast<rpc_buffer_t *>(buffer), data);
});
}

void rpc_recv_n(rpc_port_t ref, void **dst, uint64_t *size, rpc_alloc_ty alloc,
void *data) {
auto port = get_port(ref);
auto port = reinterpret_cast<rpc::Server<0>::Port *>(ref.handle);
auto alloc_fn = [=](uint64_t size) { return alloc(size, data); };
std::visit([=](auto &port) { port->recv_n(dst, size, alloc_fn); }, port);
port->recv_n(dst, size, alloc_fn);
}

void rpc_recv_and_send(rpc_port_t ref, rpc_port_callback_ty callback,
void *data) {
auto port = get_port(ref);
std::visit(
[=](auto &port) {
port->recv_and_send([=](rpc::Buffer *buffer) {
callback(reinterpret_cast<rpc_buffer_t *>(buffer), data);
});
},
port);
auto port = reinterpret_cast<rpc::Server<0>::Port *>(ref.handle);
port->recv_and_send([=](rpc::Buffer *buffer) {
callback(reinterpret_cast<rpc_buffer_t *>(buffer), data);
});
}