From 7879ba95eac4eb574163df0eac00b50d8a1be11e Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 3 Oct 2018 17:07:13 -0700 Subject: [PATCH 1/3] Add gather collective This is the first "new style" collective that doesn't use the pattern where it is initialized once, holds some state, and is used many times. The new style is intended for use where collectives can be called at any time, such that initialization can no longer be amortized because reuse may no longer apply. --- gloo/CMakeLists.txt | 3 ++ gloo/gather.cc | 81 ++++++++++++++++++++++++++++++++++++ gloo/gather.h | 41 +++++++++++++++++++ gloo/test/CMakeLists.txt | 1 + gloo/test/base_test.h | 4 ++ gloo/test/gather_test.cc | 88 ++++++++++++++++++++++++++++++++++++++++ gloo/types.cc | 36 ++++++++++++++++ gloo/types.h | 48 ++++++++++++++++++++++ 8 files changed, 302 insertions(+) create mode 100644 gloo/gather.cc create mode 100644 gloo/gather.h create mode 100644 gloo/test/gather_test.cc create mode 100644 gloo/types.cc diff --git a/gloo/CMakeLists.txt b/gloo/CMakeLists.txt index 4631d2162..592db5bd5 100644 --- a/gloo/CMakeLists.txt +++ b/gloo/CMakeLists.txt @@ -9,6 +9,8 @@ list(APPEND GLOO_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/algorithm.cc" "${CMAKE_CURRENT_SOURCE_DIR}/allreduce_local.cc" "${CMAKE_CURRENT_SOURCE_DIR}/context.cc" + "${CMAKE_CURRENT_SOURCE_DIR}/gather.cc" + "${CMAKE_CURRENT_SOURCE_DIR}/types.cc" ) list(APPEND GLOO_HDRS @@ -23,6 +25,7 @@ list(APPEND GLOO_HDRS "${CMAKE_CURRENT_SOURCE_DIR}/barrier_all_to_all.h" "${CMAKE_CURRENT_SOURCE_DIR}/barrier_all_to_one.h" "${CMAKE_CURRENT_SOURCE_DIR}/broadcast_one_to_all.h" + "${CMAKE_CURRENT_SOURCE_DIR}/gather.h" "${CMAKE_CURRENT_SOURCE_DIR}/reduce_scatter.h" "${CMAKE_CURRENT_SOURCE_DIR}/context.h" "${CMAKE_CURRENT_SOURCE_DIR}/math.h" diff --git a/gloo/gather.cc b/gloo/gather.cc new file mode 100644 index 000000000..5d1bdf5d4 --- /dev/null +++ b/gloo/gather.cc @@ -0,0 +1,81 @@ +/** + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "gloo/gather.h" + +#include + +#include "gloo/common/logging.h" +#include "gloo/types.h" + +namespace gloo { + +void gather(const std::shared_ptr& context, GatherOptions& opts) { + std::unique_ptr tmpInBuffer; + std::unique_ptr tmpOutBuffer; + transport::UnboundBuffer* in = nullptr; + transport::UnboundBuffer* out = nullptr; + const auto slot = Slot::build(kGatherSlotPrefix, opts.tag); + + // Sanity checks + GLOO_ENFORCE(opts.elementSize > 0); + + // Figure out pointer to input buffer + if (opts.inBuffer) { + in = opts.inBuffer.get(); + } else { + GLOO_ENFORCE(opts.inPtr != nullptr); + GLOO_ENFORCE(opts.inElements > 0); + tmpInBuffer = + context->createUnboundBuffer(opts.inPtr, opts.inElements * opts.elementSize); + in = tmpInBuffer.get(); + } + + if (context->rank == opts.root) { + const size_t chunkSize = in->size; + + // Figure out pointer to output buffer (only for root rank) + if (opts.outBuffer) { + out = opts.outBuffer.get(); + } else { + GLOO_ENFORCE(opts.outPtr != nullptr); + GLOO_ENFORCE(opts.outElements > 0); + tmpOutBuffer = + context->createUnboundBuffer(opts.outPtr, opts.outElements * opts.elementSize); + out = tmpOutBuffer.get(); + } + + // Ensure the output buffer has the right size. + GLOO_ENFORCE(in->size * context->size == out->size); + + // Post receive operations from peers into out buffer + for (size_t i = 0; i < context->size; i++) { + if (i == context->rank) { + continue; + } + out->recv(i, slot, i * chunkSize, chunkSize); + } + + // Copy local input to output + memcpy((char*) out->ptr + (context->rank * chunkSize), in->ptr, chunkSize); + + // Wait for receive operations to complete + for (size_t i = 0; i < context->size; i++) { + if (i == context->rank) { + continue; + } + out->waitRecv(); + } + } else { + in->send(opts.root, slot); + in->waitSend(); + } +} + +} // namespace gloo diff --git a/gloo/gather.h b/gloo/gather.h new file mode 100644 index 000000000..f1bf9ae91 --- /dev/null +++ b/gloo/gather.h @@ -0,0 +1,41 @@ +/** + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include "gloo/context.h" +#include "gloo/transport/unbound_buffer.h" + +namespace gloo { + +struct GatherOptions { + // The input and output buffers can either be specified as a unbound + // buffer (that can be cached and reused by the caller), or a + // literal pointer and number of elements stored at that pointer. + std::unique_ptr inBuffer; + void* inPtr; + size_t inElements; + std::unique_ptr outBuffer; + void* outPtr; + size_t outElements; + + // Number of bytes per element. + size_t elementSize; + + // Rank of receiving process. + int root; + + // Tag for this gather operation. + // Must be unique across operations executing in parallel. + uint32_t tag; +}; + +void gather(const std::shared_ptr& context, GatherOptions& opts); + +} // namespace gloo diff --git a/gloo/test/CMakeLists.txt b/gloo/test/CMakeLists.txt index ec7ec15e2..a126b3b15 100644 --- a/gloo/test/CMakeLists.txt +++ b/gloo/test/CMakeLists.txt @@ -4,6 +4,7 @@ set(GLOO_TEST_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/barrier_test.cc" "${CMAKE_CURRENT_SOURCE_DIR}/broadcast_builder_test.cc" "${CMAKE_CURRENT_SOURCE_DIR}/broadcast_test.cc" + "${CMAKE_CURRENT_SOURCE_DIR}/gather_test.cc" "${CMAKE_CURRENT_SOURCE_DIR}/linux_test.cc" "${CMAKE_CURRENT_SOURCE_DIR}/main.cc" "${CMAKE_CURRENT_SOURCE_DIR}/send_recv_test.cc" diff --git a/gloo/test/base_test.h b/gloo/test/base_test.h index d74a48983..091214520 100644 --- a/gloo/test/base_test.h +++ b/gloo/test/base_test.h @@ -172,6 +172,10 @@ class Fixture { } } + T* getPointer() const { + return srcs.front().get(); + } + std::vector getPointers() const { std::vector out; for (const auto& src : srcs) { diff --git a/gloo/test/gather_test.cc b/gloo/test/gather_test.cc new file mode 100644 index 000000000..13acf74af --- /dev/null +++ b/gloo/test/gather_test.cc @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "gloo/gather.h" +#include "gloo/test/base_test.h" + +namespace gloo { +namespace test { +namespace { + +// Test parameterization. +using Param = std::tuple; + +// Test fixture. +class GatherTest : public BaseTest, + public ::testing::WithParamInterface { +}; + +TEST_P(GatherTest, Default) { + auto contextSize = std::get<0>(GetParam()); + auto dataSize = std::get<1>(GetParam()); + + spawn(contextSize, [&](std::shared_ptr context) { + auto input = Fixture(context, 1, dataSize); + auto output = Fixture(context, 1, contextSize * dataSize); + + // Initialize fixture with globally unique values + input.assignValues(); + + GatherOptions opts; + opts.inPtr = input.getPointer(); + opts.inElements = dataSize; + opts.elementSize = sizeof(uint64_t); + + // Take turns being root + for (auto i = 0; i < context->size; i++) { + // Set output pointer only when root + if (i == context->rank) { + opts.outPtr = output.getPointer(); + opts.outElements = dataSize * contextSize; + } else { + opts.outPtr = nullptr; + opts.outElements = 0; + } + + opts.root = i; + gather(context, opts); + + // Validate result if root + if (i == context->rank) { + const auto ptr = output.getPointer(); + const auto stride = context->size; + for (auto j = 0; j < context->size; j++) { + for (auto k = 0; k < dataSize; k++) { + ASSERT_EQ(j + k * stride, ptr[k + j * dataSize]) + << "Mismatch at index " << (k + j * dataSize); + } + } + } + } + }); +} + +std::vector genMemorySizes() { + std::vector v; + v.push_back(1); + v.push_back(10); + v.push_back(100); + v.push_back(1000); + return v; +} + +INSTANTIATE_TEST_CASE_P( + GatherDefault, + GatherTest, + ::testing::Combine( + ::testing::Values(2, 4, 7), + ::testing::ValuesIn(genMemorySizes()))); + +} // namespace +} // namespace test +} // namespace gloo diff --git a/gloo/types.cc b/gloo/types.cc new file mode 100644 index 000000000..72c50b2ca --- /dev/null +++ b/gloo/types.cc @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "gloo/types.h" + +#include +#include + +namespace gloo { + +Slot Slot::build(uint8_t prefix, uint32_t tag) { + uint64_t u64prefix = ((uint64_t) prefix) << 56; + uint64_t u64tag = (((uint64_t) tag) & 0xffffffff) << 24; + return Slot(u64prefix || u64tag, 0); +} + +const Slot Slot::operator+(uint8_t i) const { + // Maximum of 8 bits for use in a single collective operation. + // To avoid conflicts between them, raise if it overflows. + auto delta = delta_ + i; + if (delta > 0xff) { + std::stringstream ss; + ss << "Slot overflow: delta " << delta << " > 0xff"; + throw std::runtime_error(ss.str()); + } + + return Slot(base_, delta); +} + +} // namespace gloo diff --git a/gloo/types.h b/gloo/types.h index bd678e669..d2ef9c2ce 100644 --- a/gloo/types.h +++ b/gloo/types.h @@ -27,6 +27,54 @@ namespace gloo { +// Unlike old style collectives that are class instances that hold +// some state, the new style collectives do not need initialization +// before they can run. Instead of asking the context for a series of +// slots and storing them for later use and reuse, the new style +// collectives take a slot (or tag) argument that allows for +// concurrent execution of multiple collectives on the same context. +// +// This tag is what determines the slot numbers for the send and recv +// operations that the collectives end up executing. A single +// collective may have many send and recv operations running in +// parallel, so instead of using the specified tag verbatim, we use it +// as a prefix. Also, to avoid conflicts between collectives with the +// same tag, we have another tag prefix per collective type. Out of +// the 64 bits we can use for a slot, we use 8 of them to identify a +// collective, 32 to identify the collective tag, another 8 for use by +// the collective operation itself (allowing for 256 independent send +// and recv operations against the same point to point pair), and +// leave 16 bits unused. +// +// Below, you find constexprs for the prefix per collective type, as +// well as a way to compute slots when executing a collective. The +// slot class below captures both a prefix and a delta on that prefix +// to support addition with bounds checking. It is usable as an +// uint64_t, but one that cannot overflow beyond the bits allocated +// for use within a collective. +// + +constexpr uint8_t kGatherSlotPrefix = 0x01; + +class Slot { + public: + static Slot build(uint8_t prefix, uint32_t tag); + + operator uint64_t() const { + return base_ + delta_; + } + + const Slot operator+(uint8_t i) const; + + protected: + explicit Slot(uint64_t base, uint64_t delta) + : base_(base), delta_(delta) { + } + + const uint64_t base_; + const uint64_t delta_; +}; + struct float16; float16 cpu_float2half_rn(float f); float cpu_half2float(float16 h); From fa6f694c94697fa56696c9c13f9bccfbd914f986 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 8 Oct 2018 16:54:11 -0700 Subject: [PATCH 2/3] Add new style allgather collective Like gather in the previous commit, this is a single function and doesn't hold on to any state. This implements a ring allgather. --- gloo/CMakeLists.txt | 2 + gloo/allgather.cc | 109 ++++++++++++++++++++++++++++++++++++ gloo/allgather.h | 42 ++++++++++++++ gloo/test/CMakeLists.txt | 1 + gloo/test/allgather_test.cc | 71 +++++++++++++++++++++++ gloo/test/base_test.h | 8 +++ gloo/types.h | 1 + 7 files changed, 234 insertions(+) create mode 100644 gloo/allgather.cc create mode 100644 gloo/allgather.h diff --git a/gloo/CMakeLists.txt b/gloo/CMakeLists.txt index 592db5bd5..f468eebfc 100644 --- a/gloo/CMakeLists.txt +++ b/gloo/CMakeLists.txt @@ -7,6 +7,7 @@ set(GLOO_HDRS) # Compiled sources in root directory list(APPEND GLOO_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/algorithm.cc" + "${CMAKE_CURRENT_SOURCE_DIR}/allgather.cc" "${CMAKE_CURRENT_SOURCE_DIR}/allreduce_local.cc" "${CMAKE_CURRENT_SOURCE_DIR}/context.cc" "${CMAKE_CURRENT_SOURCE_DIR}/gather.cc" @@ -15,6 +16,7 @@ list(APPEND GLOO_SRCS list(APPEND GLOO_HDRS "${CMAKE_CURRENT_SOURCE_DIR}/algorithm.h" + "${CMAKE_CURRENT_SOURCE_DIR}/allgather.h" "${CMAKE_CURRENT_SOURCE_DIR}/allgather_ring.h" "${CMAKE_CURRENT_SOURCE_DIR}/allreduce_halving_doubling.h" "${CMAKE_CURRENT_SOURCE_DIR}/allreduce_bcube.h" diff --git a/gloo/allgather.cc b/gloo/allgather.cc new file mode 100644 index 000000000..2000d29f2 --- /dev/null +++ b/gloo/allgather.cc @@ -0,0 +1,109 @@ +/** + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "gloo/allgather.h" + +#include + +#include "gloo/common/logging.h" +#include "gloo/types.h" + +namespace gloo { + +void allgather(const std::shared_ptr& context, AllgatherOptions& opts) { + std::unique_ptr tmpInBuffer; + std::unique_ptr tmpOutBuffer; + transport::UnboundBuffer* in = nullptr; + transport::UnboundBuffer* out = nullptr; + const auto slot = Slot::build(kAllgatherSlotPrefix, opts.tag); + + // Sanity checks + GLOO_ENFORCE(opts.elementSize > 0); + const auto recvRank = (context->size + context->rank - 1) % context->size; + GLOO_ENFORCE(context->getPair(recvRank), "pair missing (rank ", recvRank, ")"); + const auto sendRank = (context->size + context->rank + 1) % context->size; + GLOO_ENFORCE(context->getPair(sendRank), "pair missing (rank ", sendRank, ")"); + + // Figure out pointer to input buffer + if (opts.inBuffer) { + in = opts.inBuffer.get(); + } else if (opts.inPtr != nullptr) { + GLOO_ENFORCE(opts.inElements > 0); + tmpInBuffer = + context->createUnboundBuffer(opts.inPtr, opts.inElements * opts.elementSize); + in = tmpInBuffer.get(); + } + + // Figure out pointer to output buffer + if (opts.outBuffer) { + out = opts.outBuffer.get(); + } else { + GLOO_ENFORCE(opts.outPtr != nullptr); + GLOO_ENFORCE(opts.outElements > 0); + tmpOutBuffer = + context->createUnboundBuffer(opts.outPtr, opts.outElements * opts.elementSize); + out = tmpOutBuffer.get(); + } + + GLOO_ENFORCE_EQ(out->size, in->size * context->size); + + // If the input buffer is specified, this is NOT an in place operation, + // and the output buffer needs to be primed with the input. + if (in != nullptr) { + memcpy( + (uint8_t*) out->ptr + context->rank * opts.inElements * opts.elementSize, + (uint8_t*) in->ptr, + opts.inElements * opts.elementSize); + } + + // The chunk size may not be divisible by 2; use dynamic lookup. + std::array chunkSize; + chunkSize[0] = (opts.inElements * opts.elementSize) / 2; + chunkSize[1] = (opts.inElements * opts.elementSize) - chunkSize[0]; + std::array chunkOffset; + chunkOffset[0] = 0; + chunkOffset[1] = chunkSize[0]; + + for (auto i = 0; i < (context->size - 1) * 2; i++) { + size_t sendOffset = + (((context->size + context->rank - (i / 2)) + * opts.inElements + * opts.elementSize) + + chunkOffset[i & 0x1]) + % (opts.outElements * opts.elementSize); + size_t recvOffset = + (((context->size + context->rank - 1 - (i / 2)) + * opts.inElements + * opts.elementSize) + + chunkOffset[i & 0x1]) + % (opts.outElements * opts.elementSize); + size_t size = chunkSize[i & 0x1]; + if (i < 2) { + out->send(sendRank, slot, sendOffset, size); + out->recv(recvRank, slot, recvOffset, size); + continue; + } + + // Wait for pending operations to complete to synchronize with the + // previous iteration. Because we kick off two operations before + // getting here we always wait for the next-to-last operation. + out->waitSend(); + out->waitRecv(); + out->send(sendRank, slot, sendOffset, size); + out->recv(recvRank, slot, recvOffset, size); + } + + // Wait for completes + for (auto i = 0; i < 2; i++) { + out->waitSend(); + out->waitRecv(); + } +} + +} // namespace gloo diff --git a/gloo/allgather.h b/gloo/allgather.h new file mode 100644 index 000000000..8c166b800 --- /dev/null +++ b/gloo/allgather.h @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include "gloo/context.h" +#include "gloo/transport/unbound_buffer.h" + +namespace gloo { + +struct AllgatherOptions { + // The input and output can either be specified as a unbound buffer + // (that can be cached and reused by the caller), or a literal + // pointer and number of elements stored at that pointer. + // + // The operation is executed in place on the output if the input is + // set to null. The input for this process is assumed to be at the + // location in the output buffer where it would otherwise be. + std::unique_ptr inBuffer; + void* inPtr; + size_t inElements; + std::unique_ptr outBuffer; + void* outPtr; + size_t outElements; + + // Number of bytes per element. + size_t elementSize; + + // Tag for this gather operation. + // Must be unique across operations executing in parallel. + uint32_t tag; +}; + +void allgather(const std::shared_ptr& context, AllgatherOptions& opts); + +} // namespace gloo diff --git a/gloo/test/CMakeLists.txt b/gloo/test/CMakeLists.txt index a126b3b15..b84c3b261 100644 --- a/gloo/test/CMakeLists.txt +++ b/gloo/test/CMakeLists.txt @@ -1,4 +1,5 @@ set(GLOO_TEST_SRCS + "${CMAKE_CURRENT_SOURCE_DIR}/allgather_test.cc" "${CMAKE_CURRENT_SOURCE_DIR}/allreduce_builder_test.cc" "${CMAKE_CURRENT_SOURCE_DIR}/allreduce_test.cc" "${CMAKE_CURRENT_SOURCE_DIR}/barrier_test.cc" diff --git a/gloo/test/allgather_test.cc b/gloo/test/allgather_test.cc index eddcdebc7..87b82c324 100644 --- a/gloo/test/allgather_test.cc +++ b/gloo/test/allgather_test.cc @@ -11,6 +11,7 @@ #include #include +#include "gloo/allgather.h" #include "gloo/allgather_ring.h" #include "gloo/common/common.h" #include "gloo/test/base_test.h" @@ -109,6 +110,76 @@ INSTANTIATE_TEST_CASE_P( ::testing::ValuesIn(genMemorySizes()), ::testing::Range(1, 4))); +using NewParam = std::tuple; + +class AllgatherNewTest : public BaseTest, + public ::testing::WithParamInterface {}; + +TEST_P(AllgatherNewTest, Default) { + auto contextSize = std::get<0>(GetParam()); + auto dataSize = std::get<1>(GetParam()); + + auto validate = [dataSize]( + const std::shared_ptr& context, + Fixture& output) { + const auto ptr = output.getPointer(); + const auto stride = context->size; + for (auto j = 0; j < context->size; j++) { + for (auto k = 0; k < dataSize; k++) { + ASSERT_EQ(j + k * stride, ptr[k + j * dataSize]) + << "Mismatch at index " << (k + j * dataSize); + } + } + }; + + spawn(contextSize, [&](std::shared_ptr context) { + auto input = Fixture(context, 1, dataSize); + auto output = Fixture(context, 1, contextSize * dataSize); + + // Run with raw pointers and sizes in options + { + input.assignValues(); + output.clear(); + + AllgatherOptions opts; + opts.inPtr = input.getPointer(); + opts.inElements = dataSize; + opts.outPtr = output.getPointer(); + opts.outElements = contextSize * dataSize; + opts.elementSize = sizeof(uint64_t); + input.assignValues(); + output.clear(); + allgather(context, opts); + validate(context, output); + } + + // Run with (optionally cached) unbound buffers in options + { + input.assignValues(); + output.clear(); + + AllgatherOptions opts; + opts.inBuffer = context->createUnboundBuffer( + input.getPointer(), + dataSize * sizeof(uint64_t)); + opts.outBuffer = context->createUnboundBuffer( + output.getPointer(), + contextSize * dataSize * sizeof(uint64_t)); + opts.elementSize = sizeof(uint64_t); + allgather(context, opts); + validate(context, output); + } + }); +} + +INSTANTIATE_TEST_CASE_P( + AllgatherNewDefault, + AllgatherNewTest, + ::testing::Combine( + ::testing::Values(2, 4, 7), + ::testing::ValuesIn(genMemorySizes()))); + + } // namespace } // namespace test } // namespace gloo diff --git a/gloo/test/base_test.h b/gloo/test/base_test.h index 091214520..9cea0c117 100644 --- a/gloo/test/base_test.h +++ b/gloo/test/base_test.h @@ -136,6 +136,14 @@ class Fixture { } } + void clear() { + for (auto i = 0; i < srcs.size(); i++) { + for (auto j = 0; j < count; j++) { + srcs[i][j] = 0; + } + } + } + void checkBroadcastResult(Fixture& fixture, int root, int rootPointer) { // Expected is set to the expected value at ptr[0] const auto expected = root * fixture.srcs.size() + rootPointer; diff --git a/gloo/types.h b/gloo/types.h index d2ef9c2ce..33af1c2d5 100644 --- a/gloo/types.h +++ b/gloo/types.h @@ -55,6 +55,7 @@ namespace gloo { // constexpr uint8_t kGatherSlotPrefix = 0x01; +constexpr uint8_t kAllgatherSlotPrefix = 0x02; class Slot { public: From 561d083a45e797835e2dc6c601dc0e6335a899ea Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Tue, 9 Oct 2018 15:14:29 -0700 Subject: [PATCH 3/3] Add new style reduce collective This implements a ring reduction resulting the result scattered across processes, followed by a gather operation to the root process. The halving/doubling implementation is slated to be ported to this stateless style later and provided as an option through RedudeOptions. --- gloo/CMakeLists.txt | 2 + gloo/context.h | 1 - gloo/reduce.cc | 257 +++++++++++++++++++++++++++++++++++++++ gloo/reduce.h | 56 +++++++++ gloo/test/CMakeLists.txt | 1 + gloo/test/reduce_test.cc | 104 ++++++++++++++++ gloo/types.h | 1 + 7 files changed, 421 insertions(+), 1 deletion(-) create mode 100644 gloo/reduce.cc create mode 100644 gloo/reduce.h create mode 100644 gloo/test/reduce_test.cc diff --git a/gloo/CMakeLists.txt b/gloo/CMakeLists.txt index f468eebfc..9d02ab69c 100644 --- a/gloo/CMakeLists.txt +++ b/gloo/CMakeLists.txt @@ -11,6 +11,7 @@ list(APPEND GLOO_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/allreduce_local.cc" "${CMAKE_CURRENT_SOURCE_DIR}/context.cc" "${CMAKE_CURRENT_SOURCE_DIR}/gather.cc" + "${CMAKE_CURRENT_SOURCE_DIR}/reduce.cc" "${CMAKE_CURRENT_SOURCE_DIR}/types.cc" ) @@ -28,6 +29,7 @@ list(APPEND GLOO_HDRS "${CMAKE_CURRENT_SOURCE_DIR}/barrier_all_to_one.h" "${CMAKE_CURRENT_SOURCE_DIR}/broadcast_one_to_all.h" "${CMAKE_CURRENT_SOURCE_DIR}/gather.h" + "${CMAKE_CURRENT_SOURCE_DIR}/reduce.h" "${CMAKE_CURRENT_SOURCE_DIR}/reduce_scatter.h" "${CMAKE_CURRENT_SOURCE_DIR}/context.h" "${CMAKE_CURRENT_SOURCE_DIR}/math.h" diff --git a/gloo/context.h b/gloo/context.h index af19723d7..f156a4a7f 100644 --- a/gloo/context.h +++ b/gloo/context.h @@ -50,7 +50,6 @@ class Context { std::shared_ptr transportContext_; int slot_; std::chrono::milliseconds timeout_; - }; } // namespace gloo diff --git a/gloo/reduce.cc b/gloo/reduce.cc new file mode 100644 index 000000000..2090d5209 --- /dev/null +++ b/gloo/reduce.cc @@ -0,0 +1,257 @@ +/** + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "gloo/reduce.h" + +#include +#include + +#include "gloo/common/logging.h" +#include "gloo/types.h" + +namespace gloo { + +namespace { + +template +T roundUp(T value, T multiple) { + T remainder = value % multiple; + if (remainder == 0) { + return value; + } + return value + multiple - remainder; +} + +} // namespace + +void reduce(const std::shared_ptr& context, ReduceOptions& opts) { + std::unique_ptr tmpInBuffer; + std::unique_ptr tmpOutBuffer; + transport::UnboundBuffer* in = nullptr; + transport::UnboundBuffer* out = nullptr; + const auto slot = Slot::build(kReduceSlotPrefix, opts.tag); + + // Sanity checks + GLOO_ENFORCE(opts.elements > 0); + GLOO_ENFORCE(opts.elementSize > 0); + GLOO_ENFORCE(opts.root >= 0 && opts.root < context->size); + GLOO_ENFORCE(opts.reduce != nullptr); + const auto recvRank = (context->size + context->rank + 1) % context->size; + GLOO_ENFORCE( + context->getPair(recvRank), "connection to rank ", recvRank, " missing"); + const auto sendRank = (context->size + context->rank - 1) % context->size; + GLOO_ENFORCE( + context->getPair(sendRank), "connection to rank ", sendRank, " missing"); + + // Figure out pointer to output buffer + if (opts.outBuffer) { + out = opts.outBuffer.get(); + } else { + GLOO_ENFORCE(opts.outPtr != nullptr); + tmpOutBuffer = context->createUnboundBuffer( + opts.outPtr, opts.elements * opts.elementSize); + out = tmpOutBuffer.get(); + } + + // Figure out pointer to input buffer + if (opts.inBuffer) { + in = opts.inBuffer.get(); + } else if (opts.inPtr != nullptr) { + tmpInBuffer = context->createUnboundBuffer( + opts.inPtr, opts.elements * opts.elementSize); + in = tmpInBuffer.get(); + } else { + in = out; + } + + GLOO_ENFORCE_EQ(in->size, opts.elements * opts.elementSize); + GLOO_ENFORCE_EQ(out->size, opts.elements * opts.elementSize); + + // The ring algorithm works as follows. + // + // The given input is split into a number of chunks equal to the + // number of processes. Once the algorithm has finished, every + // process hosts one chunk of reduced output, in sequential order + // (rank 0 has chunk 0, rank 1 has chunk 1, etc.). As the input may + // not be divisible by the number of processes, the chunk on the + // final ranks may have partial output or may be empty. + // + // As a chunk is passed along the ring and contains the reduction of + // successively more ranks, we have to alternate between performing + // I/O for that chunk and computing the reduction between the + // received chunk and the local chunk. To avoid this alternating + // pattern, we split up a chunk into multiple segments (>= 2), and + // ensure we have one segment in flight while computing a reduction + // on the other. The segment size has an upper bound to minimize + // memory usage and avoid poor cache behavior. This means we may + // have many segments per chunk when dealing with very large inputs. + // + // The nomenclature here is reflected in the variable naming below + // (one chunk per rank and many segments per chunk). + // + const size_t totalBytes = opts.elements * opts.elementSize; + + // The number of bytes per segment must be a multiple of the bytes + // per element for the reduction to work; round up if necessary. + const size_t segmentBytes = roundUp( + std::min( + // Rounded division to have >= 2 segments per chunk. + (totalBytes + (context->size * 2 - 1)) / (context->size * 2), + // Configurable segment size limit + opts.maxSegmentSize + ), + opts.elementSize); + + // Compute how many segments make up the input buffer. + // + // Round up to the nearest multiple of the context size such that + // there is an equal number of segments per process and execution is + // symmetric across processes. + // + // The minimum is twice the context size, because the algorithm + // below overlaps sending/receiving a segment with computing the + // reduction of the another segment. + // + const size_t numSegments = roundUp( + std::max( + (totalBytes + (segmentBytes - 1)) / segmentBytes, + (size_t)context->size * 2), + (size_t)context->size); + GLOO_ENFORCE_EQ(numSegments % context->size, 0); + GLOO_ENFORCE_GE(numSegments, context->size * 2); + const size_t numSegmentsPerRank = numSegments / context->size; + const size_t chunkBytes = numSegmentsPerRank * segmentBytes; + + // Allocate scratch space to hold two chunks + std::vector tmpAllocation(segmentBytes * 2); + std::unique_ptr tmpBuffer = + context->createUnboundBuffer(tmpAllocation.data(), segmentBytes * 2); + transport::UnboundBuffer* tmp = tmpBuffer.get(); + + // Use dynamic lookup for chunk offset in the temporary buffer. + // With two operations in flight we need two offsets. + // They can be indexed using the loop counter. + std::array segmentOffset; + segmentOffset[0] = 0; + segmentOffset[1] = segmentBytes; + + // Function computes the offsets and lengths of the chunks to be + // sent and received for a given chunk iteration. + auto fn = [&](size_t i) { + struct { + size_t sendOffset; + size_t recvOffset; + ssize_t sendLength; + ssize_t recvLength; + } result; + + // Compute segment index to send from (to rank - 1) and segment + // index to receive into (from rank + 1). Multiply by the number + // of bytes in a chunk to get to an offset. The offset is allowed + // to be out of range (>= totalBytes) and this is taken into + // account when computing the associated length. + result.sendOffset = + ((((context->rank + 1) * numSegmentsPerRank) + i) * segmentBytes) + % (numSegments * segmentBytes); + result.recvOffset = + ((((context->rank + 2) * numSegmentsPerRank) + i) * segmentBytes) + % (numSegments * segmentBytes); + + // If the segment is entirely in range, the following statement is + // equal to segmentBytes. If it isn't, it will be less, or even + // negative. This is why the ssize_t typecasts are needed. + result.sendLength = std::min( + (ssize_t)segmentBytes, (ssize_t)totalBytes - (ssize_t)result.sendOffset); + result.recvLength = std::min( + (ssize_t)segmentBytes, (ssize_t)totalBytes - (ssize_t)result.recvOffset); + + return result; + }; + + for (auto i = 0; i < numSegments; i++) { + if (i >= 2) { + // Compute send and receive offsets and lengths two iterations + // ago. Needed so we know when to wait for an operation and when + // to ignore (when the offset was out of bounds), and know where + // to reduce the contents of the temporary buffer. + auto prev = fn(i - 2); + if (prev.recvLength > 0) { + tmp->waitRecv(); + opts.reduce( + (uint8_t*)out->ptr + prev.recvOffset, + (const uint8_t*)in->ptr + prev.recvOffset, + (const uint8_t*)tmp->ptr + segmentOffset[i & 0x1], + prev.recvLength / opts.elementSize); + } + if (prev.sendLength > 0) { + if ((i - 2) < numSegmentsPerRank) { + in->waitSend(); + } else { + out->waitSend(); + } + } + } + + // Issue new send and receive operation in all but the final two + // iterations. At that point we have already sent all data we + // needed to and only have to wait for the final segments to be + // reduced into the output. + if (i < (numSegments - 2)) { + // Compute send and receive offsets and lengths for this iteration. + auto cur = fn(i); + if (cur.recvLength > 0) { + tmp->recv(recvRank, slot, segmentOffset[i & 0x1], cur.recvLength); + } + if (cur.sendLength > 0) { + if (i < numSegmentsPerRank) { + in->send(sendRank, slot, cur.sendOffset, cur.sendLength); + } else { + out->send(sendRank, slot, cur.sendOffset, cur.sendLength); + } + } + } + } + + // Gather to root rank. + // + // Beware: totalBytes <= (numSegments * segmentBytes), which is + // incompatible with the generic gather algorithm where the + // contribution is identical across processes. + // + if (context->rank == opts.root) { + size_t numRecv = 0; + for (size_t rank = 0; rank < context->size; rank++) { + if (rank == context->rank) { + continue; + } + size_t recvOffset = rank * numSegmentsPerRank * segmentBytes; + ssize_t recvLength = std::min( + (ssize_t)chunkBytes, + (ssize_t)totalBytes - (ssize_t)recvOffset); + if (recvLength > 0) { + out->recv(rank, slot, recvOffset, recvLength); + numRecv++; + } + } + for (size_t i = 0; i < numRecv; i++) { + out->waitRecv(); + } + } else { + size_t sendOffset = context->rank * numSegmentsPerRank * segmentBytes; + ssize_t sendLength = std::min( + (ssize_t)chunkBytes, + (ssize_t)totalBytes - (ssize_t)sendOffset); + if (sendLength > 0) { + out->send(opts.root, slot, sendOffset, sendLength); + out->waitSend(); + } + } +} + +} // namespace gloo diff --git a/gloo/reduce.h b/gloo/reduce.h new file mode 100644 index 000000000..c4a502122 --- /dev/null +++ b/gloo/reduce.h @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include "gloo/context.h" +#include "gloo/transport/unbound_buffer.h" + +namespace gloo { + +struct ReduceOptions { + // The input and output buffers can either be specified as a unbound + // buffer (that can be cached and reused by the caller), or a + // literal pointer and number of elements stored at that pointer. + std::unique_ptr inBuffer; + void* inPtr = nullptr; + std::unique_ptr outBuffer; + void* outPtr = nullptr; + + // Number of elements. + size_t elements = 0; + + // Number of bytes per element. + size_t elementSize = 0; + + // Rank of process to reduce to. + int root = 0; + + // Reduction function (output, input 1, input 2, number of elements). + std::function reduce; + + // Tag for this gather operation. + // Must be unique across operations executing in parallel. + uint32_t tag = 0; + + // This is the maximum size of each I/O operation (send/recv) of which + // two are in flight at all times. A smaller value leads to more + // overhead and a larger value leads to poor cache behavior. + static constexpr size_t kMaxSegmentSize = 1024 * 1024; + + // Internal use only. This is used to exercise code paths where we + // have more than 2 segments per rank without making the tests slow + // (because they would require millions of elements if the default + // were not configurable). + size_t maxSegmentSize = kMaxSegmentSize; +}; + +void reduce(const std::shared_ptr& context, ReduceOptions& opts); + +} // namespace gloo diff --git a/gloo/test/CMakeLists.txt b/gloo/test/CMakeLists.txt index b84c3b261..010a8f314 100644 --- a/gloo/test/CMakeLists.txt +++ b/gloo/test/CMakeLists.txt @@ -8,6 +8,7 @@ set(GLOO_TEST_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/gather_test.cc" "${CMAKE_CURRENT_SOURCE_DIR}/linux_test.cc" "${CMAKE_CURRENT_SOURCE_DIR}/main.cc" + "${CMAKE_CURRENT_SOURCE_DIR}/reduce_test.cc" "${CMAKE_CURRENT_SOURCE_DIR}/send_recv_test.cc" ) diff --git a/gloo/test/reduce_test.cc b/gloo/test/reduce_test.cc new file mode 100644 index 000000000..11977c432 --- /dev/null +++ b/gloo/test/reduce_test.cc @@ -0,0 +1,104 @@ +/** + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "gloo/math.h" +#include "gloo/reduce.h" +#include "gloo/test/base_test.h" + +namespace gloo { +namespace test { +namespace { + +// Test parameterization. +using Param = std::tuple; + +// Test fixture. +class ReduceTest : public BaseTest, + public ::testing::WithParamInterface { +}; + +TEST_P(ReduceTest, Default) { + auto contextSize = std::get<0>(GetParam()); + auto dataSize = std::get<1>(GetParam()); + auto inPlace = std::get<2>(GetParam()); + + spawn(contextSize, [&](std::shared_ptr context) { + auto input = Fixture(context, 1, dataSize); + auto output = Fixture(context, 1, dataSize); + + ReduceOptions opts; + + if (inPlace) { + opts.outPtr = output.getPointer(); + } else { + opts.inPtr = input.getPointer(); + opts.outPtr = output.getPointer(); + } + + opts.elements = dataSize; + opts.elementSize = sizeof(uint64_t); + opts.reduce = [] (void* a, const void* b, const void *c, size_t n) { + auto ua = (uint64_t*)a; + const auto ub = (const uint64_t*)b; + const auto uc = (const uint64_t*)c; + for (size_t i = 0; i < n; i++) { + ua[i] = ub[i] + uc[i]; + } + }; + + // A small maximum segment size triggers code paths where we'll + // have a number of segments larger than the lower bound of + // twice the context size. + opts.maxSegmentSize = 128; + + // Take turns being root + for (opts.root = 0; opts.root < context->size; opts.root++) { + if (inPlace) { + output.assignValues(); + } else { + input.assignValues(); + output.clear(); + } + reduce(context, opts); + + // Validate result if this process was root + if (context->rank == opts.root) { + const auto base = (contextSize * (contextSize - 1)) / 2; + const auto ptr = output.getPointer(); + const auto stride = context->size; + for (auto j = 0; j < dataSize; j++) { + ASSERT_EQ(j * stride * stride + base, ptr[j]) + << "Mismatch at index " << j; + } + } + } + }); +} + +std::vector genMemorySizes() { + std::vector v; + v.push_back(1); + v.push_back(10); + v.push_back(100); + v.push_back(1000); + v.push_back(10000); + return v; +} + +INSTANTIATE_TEST_CASE_P( + ReduceDefault, + ReduceTest, + ::testing::Combine( + ::testing::Values(2, 4, 7), + ::testing::ValuesIn(genMemorySizes()), + ::testing::Values(true, false))); + +} // namespace +} // namespace test +} // namespace gloo diff --git a/gloo/types.h b/gloo/types.h index 33af1c2d5..e9aa3e9a8 100644 --- a/gloo/types.h +++ b/gloo/types.h @@ -56,6 +56,7 @@ namespace gloo { constexpr uint8_t kGatherSlotPrefix = 0x01; constexpr uint8_t kAllgatherSlotPrefix = 0x02; +constexpr uint8_t kReduceSlotPrefix = 0x03; class Slot { public: