Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions gloo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ set(GLOO_HDRS)
# Compiled sources in root directory
list(APPEND GLOO_SRCS
"${CMAKE_CURRENT_SOURCE_DIR}/algorithm.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/allgather.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/allreduce_local.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/context.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/gather.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/types.cc"
)

list(APPEND GLOO_HDRS
"${CMAKE_CURRENT_SOURCE_DIR}/algorithm.h"
"${CMAKE_CURRENT_SOURCE_DIR}/allgather.h"
"${CMAKE_CURRENT_SOURCE_DIR}/allgather_ring.h"
"${CMAKE_CURRENT_SOURCE_DIR}/allreduce_halving_doubling.h"
"${CMAKE_CURRENT_SOURCE_DIR}/allreduce_bcube.h"
Expand All @@ -23,6 +27,7 @@ list(APPEND GLOO_HDRS
"${CMAKE_CURRENT_SOURCE_DIR}/barrier_all_to_all.h"
"${CMAKE_CURRENT_SOURCE_DIR}/barrier_all_to_one.h"
"${CMAKE_CURRENT_SOURCE_DIR}/broadcast_one_to_all.h"
"${CMAKE_CURRENT_SOURCE_DIR}/gather.h"
"${CMAKE_CURRENT_SOURCE_DIR}/reduce_scatter.h"
"${CMAKE_CURRENT_SOURCE_DIR}/context.h"
"${CMAKE_CURRENT_SOURCE_DIR}/math.h"
Expand Down
109 changes: 109 additions & 0 deletions gloo/allgather.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Copyright (c) 2018-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "gloo/allgather.h"

#include <cstring>

#include "gloo/common/logging.h"
#include "gloo/types.h"

namespace gloo {

void allgather(const std::shared_ptr<Context>& context, AllgatherOptions& opts) {
std::unique_ptr<transport::UnboundBuffer> tmpInBuffer;
std::unique_ptr<transport::UnboundBuffer> tmpOutBuffer;
transport::UnboundBuffer* in = nullptr;
transport::UnboundBuffer* out = nullptr;
const auto slot = Slot::build(kAllgatherSlotPrefix, opts.tag);

// Sanity checks
GLOO_ENFORCE(opts.elementSize > 0);
const auto recvRank = (context->size + context->rank - 1) % context->size;
GLOO_ENFORCE(context->getPair(recvRank), "pair missing (rank ", recvRank, ")");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rephrasing pair missing to something that is easier to be understood

const auto sendRank = (context->size + context->rank + 1) % context->size;
GLOO_ENFORCE(context->getPair(sendRank), "pair missing (rank ", sendRank, ")");

// Figure out pointer to input buffer
if (opts.inBuffer) {
in = opts.inBuffer.get();
} else if (opts.inPtr != nullptr) {
GLOO_ENFORCE(opts.inElements > 0);
tmpInBuffer =
context->createUnboundBuffer(opts.inPtr, opts.inElements * opts.elementSize);
in = tmpInBuffer.get();
}

// Figure out pointer to output buffer
if (opts.outBuffer) {
out = opts.outBuffer.get();
} else {
GLOO_ENFORCE(opts.outPtr != nullptr);
GLOO_ENFORCE(opts.outElements > 0);
tmpOutBuffer =
context->createUnboundBuffer(opts.outPtr, opts.outElements * opts.elementSize);
out = tmpOutBuffer.get();
}

GLOO_ENFORCE_EQ(out->size, in->size * context->size);

// If the input buffer is specified, this is NOT an in place operation,
// and the output buffer needs to be primed with the input.
if (in != nullptr) {
memcpy(
(uint8_t*) out->ptr + context->rank * opts.inElements * opts.elementSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use C++ case, please

(uint8_t*) in->ptr,
opts.inElements * opts.elementSize);
}

// The chunk size may not be divisible by 2; use dynamic lookup.
std::array<size_t, 2> chunkSize;
chunkSize[0] = (opts.inElements * opts.elementSize) / 2;
chunkSize[1] = (opts.inElements * opts.elementSize) - chunkSize[0];
std::array<size_t, 2> chunkOffset;
chunkOffset[0] = 0;
chunkOffset[1] = chunkSize[0];

for (auto i = 0; i < (context->size - 1) * 2; i++) {
size_t sendOffset =
(((context->size + context->rank - (i / 2))
* opts.inElements
* opts.elementSize)
+ chunkOffset[i & 0x1])
% (opts.outElements * opts.elementSize);
size_t recvOffset =
(((context->size + context->rank - 1 - (i / 2))
* opts.inElements
* opts.elementSize)
+ chunkOffset[i & 0x1])
% (opts.outElements * opts.elementSize);
size_t size = chunkSize[i & 0x1];
if (i < 2) {
out->send(sendRank, slot, sendOffset, size);
out->recv(recvRank, slot, recvOffset, size);
continue;
}

// Wait for pending operations to complete to synchronize with the
// previous iteration. Because we kick off two operations before
// getting here we always wait for the next-to-last operation.
out->waitSend();
out->waitRecv();
out->send(sendRank, slot, sendOffset, size);
out->recv(recvRank, slot, recvOffset, size);
}

// Wait for completes
for (auto i = 0; i < 2; i++) {
out->waitSend();
out->waitRecv();
}
}

} // namespace gloo
42 changes: 42 additions & 0 deletions gloo/allgather.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright (c) 2018-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#pragma once

#include "gloo/context.h"
#include "gloo/transport/unbound_buffer.h"

namespace gloo {

struct AllgatherOptions {
// The input and output can either be specified as a unbound buffer
// (that can be cached and reused by the caller), or a literal
// pointer and number of elements stored at that pointer.
//
// The operation is executed in place on the output if the input is
// set to null. The input for this process is assumed to be at the
// location in the output buffer where it would otherwise be.
std::unique_ptr<transport::UnboundBuffer> inBuffer;
void* inPtr;
size_t inElements;
std::unique_ptr<transport::UnboundBuffer> outBuffer;
void* outPtr;
size_t outElements;

// Number of bytes per element.
size_t elementSize;

// Tag for this gather operation.
// Must be unique across operations executing in parallel.
uint32_t tag;
};

void allgather(const std::shared_ptr<Context>& context, AllgatherOptions& opts);

} // namespace gloo
81 changes: 81 additions & 0 deletions gloo/gather.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Copyright (c) 2018-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "gloo/gather.h"

#include <cstring>

#include "gloo/common/logging.h"
#include "gloo/types.h"

namespace gloo {

void gather(const std::shared_ptr<Context>& context, GatherOptions& opts) {
std::unique_ptr<transport::UnboundBuffer> tmpInBuffer;
std::unique_ptr<transport::UnboundBuffer> tmpOutBuffer;
transport::UnboundBuffer* in = nullptr;
transport::UnboundBuffer* out = nullptr;
const auto slot = Slot::build(kGatherSlotPrefix, opts.tag);

// Sanity checks
GLOO_ENFORCE(opts.elementSize > 0);

// Figure out pointer to input buffer
if (opts.inBuffer) {
in = opts.inBuffer.get();
} else {
GLOO_ENFORCE(opts.inPtr != nullptr);
GLOO_ENFORCE(opts.inElements > 0);
tmpInBuffer =
context->createUnboundBuffer(opts.inPtr, opts.inElements * opts.elementSize);
in = tmpInBuffer.get();
}

if (context->rank == opts.root) {
const size_t chunkSize = in->size;

// Figure out pointer to output buffer (only for root rank)
if (opts.outBuffer) {
out = opts.outBuffer.get();
} else {
GLOO_ENFORCE(opts.outPtr != nullptr);
GLOO_ENFORCE(opts.outElements > 0);
tmpOutBuffer =
context->createUnboundBuffer(opts.outPtr, opts.outElements * opts.elementSize);
out = tmpOutBuffer.get();
}

// Ensure the output buffer has the right size.
GLOO_ENFORCE(in->size * context->size == out->size);

// Post receive operations from peers into out buffer
for (size_t i = 0; i < context->size; i++) {
if (i == context->rank) {
continue;
}
out->recv(i, slot, i * chunkSize, chunkSize);
}

// Copy local input to output
memcpy((char*) out->ptr + (context->rank * chunkSize), in->ptr, chunkSize);

// Wait for receive operations to complete
for (size_t i = 0; i < context->size; i++) {
if (i == context->rank) {
continue;
}
out->waitRecv();
}
} else {
in->send(opts.root, slot);
in->waitSend();
}
}

} // namespace gloo
41 changes: 41 additions & 0 deletions gloo/gather.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright (c) 2018-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#pragma once

#include "gloo/context.h"
#include "gloo/transport/unbound_buffer.h"

namespace gloo {

struct GatherOptions {
// The input and output buffers can either be specified as a unbound
// buffer (that can be cached and reused by the caller), or a
// literal pointer and number of elements stored at that pointer.
std::unique_ptr<transport::UnboundBuffer> inBuffer;
void* inPtr;
size_t inElements;
std::unique_ptr<transport::UnboundBuffer> outBuffer;
void* outPtr;
size_t outElements;

// Number of bytes per element.
size_t elementSize;

// Rank of receiving process.
int root;

// Tag for this gather operation.
// Must be unique across operations executing in parallel.
uint32_t tag;
};

void gather(const std::shared_ptr<Context>& context, GatherOptions& opts);

} // namespace gloo
2 changes: 2 additions & 0 deletions gloo/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
set(GLOO_TEST_SRCS
"${CMAKE_CURRENT_SOURCE_DIR}/allgather_test.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/allreduce_builder_test.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/allreduce_test.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/barrier_test.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/broadcast_builder_test.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/broadcast_test.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/gather_test.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/linux_test.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/main.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/send_recv_test.cc"
Expand Down
71 changes: 71 additions & 0 deletions gloo/test/allgather_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <thread>
#include <vector>

#include "gloo/allgather.h"
#include "gloo/allgather_ring.h"
#include "gloo/common/common.h"
#include "gloo/test/base_test.h"
Expand Down Expand Up @@ -109,6 +110,76 @@ INSTANTIATE_TEST_CASE_P(
::testing::ValuesIn(genMemorySizes()),
::testing::Range(1, 4)));

using NewParam = std::tuple<int, int>;

class AllgatherNewTest : public BaseTest,
public ::testing::WithParamInterface<NewParam> {};

TEST_P(AllgatherNewTest, Default) {
auto contextSize = std::get<0>(GetParam());
auto dataSize = std::get<1>(GetParam());

auto validate = [dataSize](
const std::shared_ptr<Context>& context,
Fixture<uint64_t>& output) {
const auto ptr = output.getPointer();
const auto stride = context->size;
for (auto j = 0; j < context->size; j++) {
for (auto k = 0; k < dataSize; k++) {
ASSERT_EQ(j + k * stride, ptr[k + j * dataSize])
<< "Mismatch at index " << (k + j * dataSize);
}
}
};

spawn(contextSize, [&](std::shared_ptr<Context> context) {
auto input = Fixture<uint64_t>(context, 1, dataSize);
auto output = Fixture<uint64_t>(context, 1, contextSize * dataSize);

// Run with raw pointers and sizes in options
{
input.assignValues();
output.clear();

AllgatherOptions opts;
opts.inPtr = input.getPointer();
opts.inElements = dataSize;
opts.outPtr = output.getPointer();
opts.outElements = contextSize * dataSize;
opts.elementSize = sizeof(uint64_t);
input.assignValues();
output.clear();
allgather(context, opts);
validate(context, output);
}

// Run with (optionally cached) unbound buffers in options
{
input.assignValues();
output.clear();

AllgatherOptions opts;
opts.inBuffer = context->createUnboundBuffer(
input.getPointer(),
dataSize * sizeof(uint64_t));
opts.outBuffer = context->createUnboundBuffer(
output.getPointer(),
contextSize * dataSize * sizeof(uint64_t));
opts.elementSize = sizeof(uint64_t);
allgather(context, opts);
validate(context, output);
}
});
}

INSTANTIATE_TEST_CASE_P(
AllgatherNewDefault,
AllgatherNewTest,
::testing::Combine(
::testing::Values(2, 4, 7),
::testing::ValuesIn(genMemorySizes())));


} // namespace
} // namespace test
} // namespace gloo
Loading