Skip to content

Add Connection::atomicAdd interface #532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
@@ -492,6 +492,12 @@ class Connection {
/// @param newValue The new value to write.
virtual void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) = 0;

/// Atomically add an 8-byte value to a destination RegisteredMemory.
/// @param dst The destination RegisteredMemory.
/// @param dstOffset The offset in bytes from the start of the destination RegisteredMemory.
/// @param value The value to add.
virtual void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) = 0;

/// Flush any pending writes to the remote process.
/// @param timeoutUsec Timeout in microseconds. Default: -1 (no timeout)
virtual void flush(int64_t timeoutUsec = -1) = 0;
1 change: 1 addition & 0 deletions include/mscclpp/gpu.hpp
Original file line number Diff line number Diff line change
@@ -77,6 +77,7 @@ constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWri
#define cudaIpcGetMemHandle(...) hipIpcGetMemHandle(__VA_ARGS__)
#define cudaIpcOpenMemHandle(...) hipIpcOpenMemHandle(__VA_ARGS__)
#define cudaIpcCloseMemHandle(...) hipIpcCloseMemHandle(__VA_ARGS__)
#define cudaLaunchKernel(...) hipLaunchKernel(__VA_ARGS__)

#define cuGetErrorString(...) hipDrvGetErrorString(__VA_ARGS__)
#define cuMemAddressReserve(...) hipMemAddressReserve(__VA_ARGS__)
2 changes: 1 addition & 1 deletion include/mscclpp/port_channel.hpp
Original file line number Diff line number Diff line change
@@ -73,7 +73,7 @@ class ProxyService : public BaseProxyService {
/// Stop the proxy service.
void stopProxy();

private:
protected:
std::vector<std::shared_ptr<Host2DeviceSemaphore>> semaphores_;
std::vector<RegisteredMemory> memories_;
std::shared_ptr<Proxy> proxy_;
1 change: 1 addition & 0 deletions python/mscclpp/core_py.cpp
Original file line number Diff line number Diff line change
@@ -149,6 +149,7 @@ void register_core(nb::module_& m) {
self->updateAndSync(dst, dstOffset, (uint64_t*)src, newValue);
},
nb::arg("dst"), nb::arg("dstOffset"), nb::arg("src"), nb::arg("newValue"))
.def("atomic_add", &Connection::atomicAdd, nb::arg("dst"), nb::arg("dstOffset"), nb::arg("value"))
.def("flush", &Connection::flush, nb::call_guard<nb::gil_scoped_release>(), nb::arg("timeoutUsec") = (int64_t)3e7)
.def("transport", &Connection::transport)
.def("remote_transport", &Connection::remoteTransport)
30 changes: 30 additions & 0 deletions src/connection.cc
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@
#include <thread>

#include "api.h"
#include "connection_kernels.hpp"
#include "debug.h"
#include "endpoint.hpp"

@@ -95,6 +96,17 @@ void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset,
#endif
}

void CudaIpcConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) {
validateTransport(dst, remoteTransport());

uint64_t* dstPtr = reinterpret_cast<uint64_t*>(reinterpret_cast<char*>(dst.data()) + dstOffset);
void* args[] = {reinterpret_cast<void**>(&dstPtr), &value};

stream_->launch(connectionAtomicAddKernelFunc(), dim3(1), dim3(1), args, 0);

INFO(MSCCLPP_P2P, "CudaIpcConnection atomicAdd: value %lu to %p", value, dstPtr);
}

void CudaIpcConnection::flush(int64_t timeoutUsec) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
@@ -196,6 +208,19 @@ void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint6
#endif
}

void IBConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) {
validateTransport(dst, remoteTransport());
auto dstTransportInfo = getImpl(dst)->getTransportInfo(remoteTransport());
if (dstTransportInfo.ibLocal) {
throw Error("dst is local, which is not supported", ErrorCode::InvalidUsage);
}

auto dstMrInfo = dstTransportInfo.ibMrInfo;
qp_->stageAtomicAdd(dstTransportInfo_.ibMr, dstMrInfo, /*wrId=*/0, dstOffset, value, /*signaled=*/true);
qp_->postSend();
INFO(MSCCLPP_NET, "IBConnection atomicAdd: value %lu to %p", value, (uint8_t*)dstMrInfo.addr + dstOffset);
}

void IBConnection::flush(int64_t timeoutUsec) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
@@ -361,6 +386,11 @@ void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset,
#endif
}

void EthernetConnection::atomicAdd([[maybe_unused]] RegisteredMemory dst, [[maybe_unused]] uint64_t dstOffset,
[[maybe_unused]] uint64_t value) {
throw mscclpp::Error("EthernetConnection does not support atomicAdd", ErrorCode::InvalidUsage);
}

void EthernetConnection::flush(int64_t) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
19 changes: 19 additions & 0 deletions src/connection_kernels.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#include <mscclpp/atomic_device.hpp>

#include "connection_kernels.hpp"

namespace mscclpp {

__global__ void connectionAtomicAddKernel(uint64_t* dst, uint64_t value) {
atomicFetchAdd(dst, value, memoryOrderRelaxed);
}

const void* connectionAtomicAddKernelFunc() {
static const void* func = reinterpret_cast<const void*>(&connectionAtomicAddKernel);
return func;
}

} // namespace mscclpp
5 changes: 5 additions & 0 deletions src/context.cc
Original file line number Diff line number Diff line change
@@ -35,6 +35,11 @@ void CudaIpcStream::memcpyH2D(void *dst, const void *src, size_t nbytes) {
dirty_ = true;
}

void CudaIpcStream::launch(const void *func, dim3 gridDim, dim3 blockDim, void **args, size_t sharedMem) {
setStreamIfNeeded();
MSCCLPP_CUDATHROW(cudaLaunchKernel(func, gridDim, blockDim, args, sharedMem, *stream_));
}

void CudaIpcStream::sync() {
setStreamIfNeeded();
if (dirty_) {
9 changes: 9 additions & 0 deletions src/include/connection.hpp
Original file line number Diff line number Diff line change
@@ -29,8 +29,11 @@ class CudaIpcConnection : public Connection {

void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) override;

void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;

void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) override;

void flush(int64_t timeoutUsec) override;
};

@@ -52,8 +55,11 @@ class IBConnection : public Connection {

void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) override;

void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;

void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) override;

void flush(int64_t timeoutUsec) override;
};

@@ -83,8 +89,11 @@ class EthernetConnection : public Connection {

void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) override;

void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;

void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) override;

void flush(int64_t timeoutUsec) override;
};

13 changes: 13 additions & 0 deletions src/include/connection_kernels.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#ifndef MSCCLPP_CONNECTION_KERNEL_HPP_
#define MSCCLPP_CONNECTION_KERNEL_HPP_

namespace mscclpp {

const void *connectionAtomicAddKernelFunc();

} // namespace mscclpp

#endif // MSCCLPP_CONNECTION_KERNEL_HPP_
2 changes: 2 additions & 0 deletions src/include/context.hpp
Original file line number Diff line number Diff line change
@@ -28,6 +28,8 @@ class CudaIpcStream {

void memcpyH2D(void *dst, const void *src, size_t nbytes);

void launch(const void *func, dim3 gridDim, dim3 blockDim, void **args, size_t sharedMem);

void sync();

operator cudaStream_t() const { return *stream_; }
96 changes: 95 additions & 1 deletion test/mp_unit/communicator_tests.cu
Original file line number Diff line number Diff line change
@@ -280,4 +280,98 @@ TEST_F(CommunicatorTest, WriteWithHostSemaphores) {

ASSERT_TRUE(testWriteCorrectness());
communicator->bootstrap()->barrier();
}
}

TEST_F(CommunicatorTest, ConcurrentAtomicAdd) {
if (gEnv->rank >= numRanksToUse) return;

// Check if any connections support atomicAdd
bool hasAtomicAddSupport = false;
int numAtomicAddConnections = 0;
for (auto& entry : connections) {
auto& conn = entry.second;
if (conn->transport() == mscclpp::Transport::CudaIpc || mscclpp::AllIBTransports.has(conn->transport()) ||
conn->transport() == mscclpp::Transport::Nvls) {
hasAtomicAddSupport = true;
if (entry.first == 0) { // Only count connections to rank 0
numAtomicAddConnections++;
}
}
}

if (!hasAtomicAddSupport) {
GTEST_SKIP() << "No connections support atomicAdd in this configuration";
return;
}

// Initialize device buffers with zeros for atomic add test
size_t dataCount = deviceBufferSize / sizeof(uint64_t);
for (int n = 0; n < (int)devicePtr.size(); n++) {
std::vector<uint64_t> hostBuffer(dataCount, 0);
mscclpp::gpuMemcpy<uint64_t>(reinterpret_cast<uint64_t*>(devicePtr[n].get()), hostBuffer.data(), dataCount,
cudaMemcpyHostToDevice);
}
communicator->bootstrap()->barrier();

// All ranks (except rank 0) atomic add to offset 0 in rank 0's memory
if (gEnv->rank != 0) {
auto it = connections.find(0);
if (it != connections.end()) {
auto& conn = it->second;
auto& rank0Memory = remoteMemory[0].at(0);

// Check if connection to rank 0 supports atomicAdd
if (conn->transport() == mscclpp::Transport::CudaIpc || mscclpp::AllIBTransports.has(conn->transport()) ||
conn->transport() == mscclpp::Transport::Nvls) {
try {
// Each rank adds its rank value to offset 0 in rank 0's memory
uint64_t valueToAdd = gEnv->rank;
conn->atomicAdd(rank0Memory, 0, valueToAdd);
conn->flush();
} catch (const mscclpp::Error& e) {
if (e.getErrorCode() == mscclpp::ErrorCode::InvalidUsage) {
// Connection doesn't support atomicAdd, skip
} else {
throw;
}
}
}
}
}
communicator->bootstrap()->barrier();

// Only rank 0 checks the result
if (gEnv->rank == 0) {
// Calculate expected sum: sum of ranks that successfully added (excluding rank 0)
uint64_t expectedSum = 0;
for (int i = 1; i < gEnv->worldSize; i++) {
auto it = connections.find(i);
if (it != connections.end()) {
auto& conn = it->second;
if (conn->transport() == mscclpp::Transport::CudaIpc || mscclpp::AllIBTransports.has(conn->transport()) ||
conn->transport() == mscclpp::Transport::Nvls) {
expectedSum += i; // Each rank i adds its rank value
}
}
}

// Poll until the atomic additions are complete
bool ready = false;
int niter = 0;
do {
std::vector<uint64_t> hostBuffer(dataCount, 0);
mscclpp::gpuMemcpy<uint64_t>(hostBuffer.data(), reinterpret_cast<uint64_t*>(devicePtr[0].get()), dataCount,
cudaMemcpyDeviceToHost);

uint64_t actualSum = hostBuffer[0];
ready = (actualSum == expectedSum);

niter++;
if (niter == 10000) {
FAIL() << "Polling is stuck. Expected sum: " << expectedSum << ", Actual sum: " << actualSum;
}
} while (!ready);
}

communicator->bootstrap()->barrier();
}
Loading
Oops, something went wrong.