Skip to content

Commit

Permalink
[libc] Fix RPC interface when sending and recieving aribtrary packets
Browse files Browse the repository at this point in the history
The interface exported by the RPC library allows users to simply send
and recieve fixed sized packets without worrying about the data motion
underneath. However, this was broken in the current implementation. We
can think of the send and recieve implementations in terms of waiting
for ownership of the buffer, using the buffer, and posting ownership to
the other side. Our implementation of `recv` was incorrect in the
following scenarios.

recv -> send // we still own the buffer and should give away ownership
recv -> close // The other side is not waiting for data, this will
                 result in multiple openings of the same port

This patch attempts to fix this with an admittedly hacky fix where we
track if the previous implementation was a recv and post conditionally.

Reviewed By: JonChesterfield

Differential Revision: https://reviews.llvm.org/D150327
  • Loading branch information
jhuber6 committed May 10, 2023
1 parent f9759d0 commit c8c19e1
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 3 deletions.
21 changes: 18 additions & 3 deletions libc/src/__support/RPC/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum Opcode : uint16_t {
PRINT_TO_STDERR = 1,
EXIT = 2,
TEST_INCREMENT = 3,
TEST_INTERFACE = 4,
};

/// A fixed size channel used to communicate between the RPC client and server.
Expand Down Expand Up @@ -252,7 +253,8 @@ template <bool InvertInbox> struct Process {
template <bool T> struct Port {
LIBC_INLINE Port(Process<T> &process, uint64_t lane_mask, uint64_t index,
uint32_t out)
: process(process), lane_mask(lane_mask), index(index), out(out) {}
: process(process), lane_mask(lane_mask), index(index), out(out),
receive(false) {}
LIBC_INLINE ~Port() = default;

private:
Expand All @@ -278,13 +280,20 @@ template <bool T> struct Port {
return process.get_packet(index).header.opcode;
}

LIBC_INLINE void close() { process.unlock(lane_mask, index); }
LIBC_INLINE void close() {
// If the server last did a receive it needs to exchange ownership before
// closing the port.
if (receive && T)
out = process.invert_outbox(index, out);
process.unlock(lane_mask, index);
}

private:
Process<T> &process;
uint64_t lane_mask;
uint64_t index;
uint32_t out;
bool receive;
};

/// The RPC client used to make requests to the server.
Expand Down Expand Up @@ -325,10 +334,16 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) {
process.invoke_rpc(fill, process.get_packet(index));
atomic_thread_fence(cpp::MemoryOrder::RELEASE);
out = process.invert_outbox(index, out);
receive = false;
}

/// Applies \p use to the shared buffer and acknowledges the send.
template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {
// We only exchange ownership of the buffer during a receive if we are waiting
// for a previous receive to finish.
if (receive)
out = process.invert_outbox(index, out);

uint32_t in = process.load_inbox(index);

// We need to wait until we own the buffer before receiving.
Expand All @@ -340,7 +355,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {

// Apply the \p use function to read the memory out of the buffer.
process.invoke_rpc(use, process.get_packet(index));
out = process.invert_outbox(index, out);
receive = true;
}

/// Combines a send and receive into a single function.
Expand Down
7 changes: 7 additions & 0 deletions libc/test/integration/startup/gpu/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,10 @@ add_integration_test(
SRCS
init_fini_array_test.cpp
)

add_integration_test(
startup_rpc_interface_test
SUITE libc-startup-tests
SRCS
rpc_interface_test.cpp
)
43 changes: 43 additions & 0 deletions libc/test/integration/startup/gpu/rpc_interface_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//===-- Loader test to check the RPC interface with the loader ------------===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//

#include "src/__support/GPU/utils.h"
#include "src/__support/RPC/rpc_client.h"
#include "test/IntegrationTest/test.h"

using namespace __llvm_libc;

// Test to ensure that we can use aribtrary combinations of sends and recieves
// as long as they are mirrored.
static void test_interface(bool end_with_send) {
uint64_t cnt = 0;
rpc::Client::Port port = rpc::client.open<rpc::TEST_INTERFACE>();
port.send([&](rpc::Buffer *buffer) { buffer->data[0] = end_with_send; });
port.send([&](rpc::Buffer *buffer) { buffer->data[0] = cnt = cnt + 1; });
port.recv([&](rpc::Buffer *buffer) { cnt = buffer->data[0]; });
port.send([&](rpc::Buffer *buffer) { buffer->data[0] = cnt = cnt + 1; });
port.recv([&](rpc::Buffer *buffer) { cnt = buffer->data[0]; });
port.send([&](rpc::Buffer *buffer) { buffer->data[0] = cnt = cnt + 1; });
port.send([&](rpc::Buffer *buffer) { buffer->data[0] = cnt = cnt + 1; });
port.recv([&](rpc::Buffer *buffer) { cnt = buffer->data[0]; });
port.recv([&](rpc::Buffer *buffer) { cnt = buffer->data[0]; });
if (end_with_send)
port.send([&](rpc::Buffer *buffer) { buffer->data[0] = cnt = cnt + 1; });
else
port.recv([&](rpc::Buffer *buffer) { cnt = buffer->data[0]; });
port.close();

ASSERT_TRUE(cnt == 9 && "Invalid number of increments");
}

TEST_MAIN(int argc, char **argv, char **envp) {
test_interface(true);
test_interface(false);

return 0;
}
35 changes: 35 additions & 0 deletions libc/utils/gpu/loader/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,41 @@ void handle_server() {
});
break;
}
case __llvm_libc::rpc::Opcode::TEST_INTERFACE: {
uint64_t cnt = 0;
bool end_with_recv;
port->recv([&](__llvm_libc::rpc::Buffer *buffer) {
end_with_recv = buffer->data[0];
});
port->recv(
[&](__llvm_libc::rpc::Buffer *buffer) { cnt = buffer->data[0]; });
port->send([&](__llvm_libc::rpc::Buffer *buffer) {
buffer->data[0] = cnt = cnt + 1;
});
port->recv(
[&](__llvm_libc::rpc::Buffer *buffer) { cnt = buffer->data[0]; });
port->send([&](__llvm_libc::rpc::Buffer *buffer) {
buffer->data[0] = cnt = cnt + 1;
});
port->recv(
[&](__llvm_libc::rpc::Buffer *buffer) { cnt = buffer->data[0]; });
port->recv(
[&](__llvm_libc::rpc::Buffer *buffer) { cnt = buffer->data[0]; });
port->send([&](__llvm_libc::rpc::Buffer *buffer) {
buffer->data[0] = cnt = cnt + 1;
});
port->send([&](__llvm_libc::rpc::Buffer *buffer) {
buffer->data[0] = cnt = cnt + 1;
});
if (end_with_recv)
port->recv(
[&](__llvm_libc::rpc::Buffer *buffer) { cnt = buffer->data[0]; });
else
port->send([&](__llvm_libc::rpc::Buffer *buffer) {
buffer->data[0] = cnt = cnt + 1;
});
break;
}
default:
port->recv([](__llvm_libc::rpc::Buffer *buffer) {});
}
Expand Down

0 comments on commit c8c19e1

Please sign in to comment.