Skip to content

Commit

Permalink
[libc] Implement a generic streaming interface in the RPC
Browse files Browse the repository at this point in the history
Currently we provide the `send_n` and `recv_n` functions. These were
somewhat divergent and not tested on the GPU. This patch changes the
support to be more common. We do this my making the CPU provide an array
equal the to at least the lane size while the GPU can rely on the
private memory address of its stack variables. This allows us to send
data back and forth generically.

Reviewed By: JonChesterfield

Differential Revision: https://reviews.llvm.org/D150379
  • Loading branch information
jhuber6 committed May 11, 2023
1 parent dc0d00c commit d21e507
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 55 deletions.
93 changes: 47 additions & 46 deletions libc/src/__support/RPC/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ enum Opcode : uint16_t {
EXIT = 2,
TEST_INCREMENT = 3,
TEST_INTERFACE = 4,
TEST_STREAM = 5,
};

/// A fixed size channel used to communicate between the RPC client and server.
Expand Down Expand Up @@ -318,8 +319,10 @@ template <bool T> struct Port {
template <typename F, typename U>
LIBC_INLINE void send_and_recv(F fill, U use);
template <typename W> LIBC_INLINE void recv_and_send(W work);
LIBC_INLINE void send_n(const void *const *src, uint64_t *size);
LIBC_INLINE void send_n(const void *src, uint64_t size);
template <typename A> LIBC_INLINE void recv_n(A alloc);
template <typename A>
LIBC_INLINE void recv_n(void **dst, uint64_t *size, A &&alloc);

LIBC_INLINE uint16_t get_opcode() const {
return process.get_packet(index).header.opcode;
Expand Down Expand Up @@ -424,67 +427,65 @@ LIBC_INLINE void Port<T>::recv_and_send(W work) {
/// Sends an arbitrarily sized data buffer \p src across the shared channel in
/// multiples of the packet length.
template <bool T>
LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
LIBC_INLINE void Port<T>::send_n(const void *const *src, uint64_t *size) {
// TODO: We could send the first bytes in this call and potentially save an
// extra send operation.
// TODO: We may need a way for the CPU to send different strings per thread.
send([=](Buffer *buffer) {
reinterpret_cast<uint64_t *>(buffer->data)[0] = size;
uint64_t num_sends = 0;
send([&](Buffer *buffer, uint32_t id) {
reinterpret_cast<uint64_t *>(buffer->data)[0] = lane_value(size, id);
num_sends = is_process_gpu() ? lane_value(size, id)
: max(lane_value(size, id), num_sends);
});
const uint8_t *ptr = reinterpret_cast<const uint8_t *>(src);
for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
send([=](Buffer *buffer) {
const uint64_t len =
size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx;
inline_memcpy(buffer->data, ptr + idx, len);
for (uint64_t idx = 0; idx < num_sends; idx += sizeof(Buffer::data)) {
send([=](Buffer *buffer, uint32_t id) {
const uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data)
? sizeof(Buffer::data)
: lane_value(size, id) - idx;
if (idx < lane_value(size, id))
inline_memcpy(
buffer->data,
reinterpret_cast<const uint8_t *>(lane_value(src, id)) + idx, len);
});
}
gpu::sync_lane(process.get_packet(index).header.mask);
}

/// Helper routine to simplify the interface when sending from the GPU using
/// thread private pointers to the underlying value.
template <bool T>
LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
static_assert(is_process_gpu(), "Only valid when running on the GPU");
const void **src_ptr = &src;
uint64_t *size_ptr = &size;
send_n(src_ptr, size_ptr);
}

/// Receives an arbitrarily sized data buffer across the shared channel in
/// multiples of the packet length. The \p alloc function is called with the
/// size of the data so that we can initialize the size of the \p dst buffer.
template <bool T>
template <typename A>
LIBC_INLINE void Port<T>::recv_n(A alloc) {
// The GPU handles thread private variables and masking implicitly through its
// execution model. If this is the CPU we need to manually handle the
// possibility that the sent data is of different length.
if constexpr (is_process_gpu()) {
uint64_t size = 0;
recv([&](Buffer *buffer) {
size = reinterpret_cast<uint64_t *>(buffer->data)[0];
});
uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size), gpu::get_lane_id());
for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
recv([=](Buffer *buffer) {
uint64_t len = size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data)
: size - idx;
inline_memcpy(dst + idx, buffer->data, len);
});
}
return;
} else {
uint64_t size[MAX_LANE_SIZE];
uint8_t *dst[MAX_LANE_SIZE];
uint64_t max = 0;
recv([&](Buffer *buffer, uint32_t id) {
size[id] = reinterpret_cast<uint64_t *>(buffer->data)[0];
dst[id] = reinterpret_cast<uint8_t *>(alloc(size[id], id));
max = size[id] > max ? size[id] : max;
LIBC_INLINE void Port<T>::recv_n(void **dst, uint64_t *size, A &&alloc) {
uint64_t num_recvs = 0;
recv([&](Buffer *buffer, uint32_t id) {
lane_value(size, id) = reinterpret_cast<uint64_t *>(buffer->data)[0];
lane_value(dst, id) =
reinterpret_cast<uint8_t *>(alloc(lane_value(size, id)));
num_recvs = is_process_gpu() ? lane_value(size, id)
: max(lane_value(size, id), num_recvs);
});
for (uint64_t idx = 0; idx < num_recvs; idx += sizeof(Buffer::data)) {
recv([=](Buffer *buffer, uint32_t id) {
uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data)
? sizeof(Buffer::data)
: lane_value(size, id) - idx;
if (idx < lane_value(size, id))
inline_memcpy(reinterpret_cast<uint8_t *>(lane_value(dst, id)) + idx,
buffer->data, len);
});
for (uint64_t idx = 0; idx < max; idx += sizeof(Buffer::data)) {
recv([=](Buffer *buffer, uint32_t id) {
uint64_t len = size[id] - idx > sizeof(Buffer::data)
? sizeof(Buffer::data)
: size[id] - idx;
if (idx < size[id])
inline_memcpy(dst[id] + idx, buffer->data, len);
});
}
return;
}
return;
}

/// Attempts to open a port to use as the client. The client can only open a
Expand Down
15 changes: 15 additions & 0 deletions libc/src/__support/RPC/rpc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@ template <typename V, typename A> LIBC_INLINE V align_up(V val, A align) {
return ((val + V(align) - 1) / V(align)) * V(align);
}

/// Utility to provide a unified interface between the CPU and GPU's memory
/// model. On the GPU stack variables are always private to a lane so we can
/// simply use the variable passed in. On the CPU we need to allocate enough
/// space for the whole lane and index into it.
template <typename V> LIBC_INLINE V &lane_value(V *val, uint32_t id) {
if constexpr (is_process_gpu())
return *val;
return val[id];
}

/// Helper to get the maximum value.
template <typename T> LIBC_INLINE const T &max(const T &x, const T &y) {
return x < y ? y : x;
}

} // namespace rpc
} // namespace __llvm_libc

Expand Down
9 changes: 9 additions & 0 deletions libc/test/integration/startup/gpu/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,12 @@ add_integration_test(
SRCS
rpc_interface_test.cpp
)

add_integration_test(
startup_rpc_stream_test
SUITE libc-startup-tests
SRCS
rpc_stream_test.cpp
LOADER_ARGS
--threads-x 32
)
50 changes: 50 additions & 0 deletions libc/test/integration/startup/gpu/rpc_stream_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//===-- Loader test to check the RPC streaming interface with the loader --===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//

#include "src/__support/GPU/utils.h"
#include "src/__support/RPC/rpc_client.h"
#include "src/__support/integer_to_string.h"
#include "src/string/memory_utils/memcmp_implementations.h"
#include "src/string/memory_utils/memcpy_implementations.h"
#include "src/string/string_utils.h"
#include "test/IntegrationTest/test.h"

extern "C" void *malloc(uint64_t);

using namespace __llvm_libc;

static void test_stream() {
const char str[] = "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy"
"ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy"
"ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy"
"ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy"
"ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy";
uint64_t send_size = sizeof(str);
void *send_ptr = malloc(send_size);
void *recv_ptr;
uint64_t recv_size;

inline_memcpy(send_ptr, str, send_size);
ASSERT_TRUE(inline_memcmp(send_ptr, str, send_size) == 0 && "Data mismatch");
rpc::Client::Port port = rpc::client.open<rpc::TEST_STREAM>();
port.send_n(send_ptr, send_size);
port.recv_n(&recv_ptr, &recv_size,
[](uint64_t size) { return malloc(size); });
port.close();
ASSERT_TRUE(inline_memcmp(recv_ptr, str, recv_size) == 0 && "Data mismatch");
ASSERT_TRUE(recv_size == send_size && "Data size mismatch");

free(send_ptr);
free(recv_ptr);
}

TEST_MAIN(int argc, char **argv, char **envp) {
test_stream();

return 0;
}
25 changes: 16 additions & 9 deletions libc/utils/gpu/loader/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,13 @@ void handle_server() {

switch (port->get_opcode()) {
case rpc::Opcode::PRINT_TO_STDERR: {
uint64_t str_size[rpc::MAX_LANE_SIZE] = {0};
char *strs[rpc::MAX_LANE_SIZE] = {nullptr};
port->recv_n([&](uint64_t size, uint32_t id) {
str_size[id] = size;
strs[id] = new char[size];
return strs[id];
});
uint64_t sizes[rpc::MAX_LANE_SIZE] = {0};
void *strs[rpc::MAX_LANE_SIZE] = {nullptr};
port->recv_n(strs, sizes, [&](uint64_t size) { return new char[size]; });
for (uint64_t i = 0; i < rpc::MAX_LANE_SIZE; ++i) {
if (strs[i]) {
fwrite(strs[i], str_size[i], 1, stderr);
delete[] strs[i];
fwrite(strs[i], sizes[i], 1, stderr);
delete[] reinterpret_cast<uint8_t *>(strs[i]);
}
}
break;
Expand Down Expand Up @@ -78,6 +74,17 @@ void handle_server() {
[&](rpc::Buffer *buffer) { buffer->data[0] = cnt = cnt + 1; });
break;
}
case rpc::Opcode::TEST_STREAM: {
uint64_t sizes[rpc::MAX_LANE_SIZE] = {0};
void *dst[rpc::MAX_LANE_SIZE] = {nullptr};
port->recv_n(dst, sizes, [](uint64_t size) { return new char[size]; });
port->send_n(dst, sizes);
for (uint64_t i = 0; i < rpc::MAX_LANE_SIZE; ++i) {
if (dst[i])
delete[] reinterpret_cast<uint8_t *>(dst[i]);
}
break;
}
default:
port->recv([](rpc::Buffer *buffer) {});
}
Expand Down

0 comments on commit d21e507

Please sign in to comment.