Skip to content

Commit

Permalink
Add parallelTransformReduce and parallelForEachError
Browse files Browse the repository at this point in the history
parallelTransformReduce is modelled on the C++17 pstl API of
std::transform_reduce, except our wrappers do not use execution policy
parameters.

parallelForEachError allows loops that contain potentially failing
operations to propagate errors out of the loop. This was one of the
major challenges I encountered while parallelizing PDB type merging in
LLD. Parallelizing a loop with parallelForEachError is not behavior
preserving: the loop will no longer stop on the first error, it will
continue working and report all errors it encounters in a list.

I plan to use this to propagate errors out of LLD's
coff::TpiSource::remapTpiWithGHashes, which currently stores errors an
error in the TpiSource object.

Differential Revision: https://reviews.llvm.org/D90639
  • Loading branch information
rnk committed Nov 3, 2020
1 parent ca01a6b commit c0a922b
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 6 deletions.
104 changes: 98 additions & 6 deletions llvm/include/llvm/Support/Parallel.h
Expand Up @@ -11,6 +11,7 @@

#include "llvm/ADT/STLExtras.h"
#include "llvm/Config/llvm-config.h"
#include "llvm/Support/Error.h"
#include "llvm/Support/MathExtras.h"
#include "llvm/Support/Threading.h"

Expand Down Expand Up @@ -120,13 +121,17 @@ void parallel_sort(RandomAccessIterator Start, RandomAccessIterator End,
llvm::Log2_64(std::distance(Start, End)) + 1);
}

// TaskGroup has a relatively high overhead, so we want to reduce
// the number of spawn() calls. We'll create up to 1024 tasks here.
// (Note that 1024 is an arbitrary number. This code probably needs
// improving to take the number of available cores into account.)
enum { MaxTasksPerGroup = 1024 };

template <class IterTy, class FuncTy>
void parallel_for_each(IterTy Begin, IterTy End, FuncTy Fn) {
// TaskGroup has a relatively high overhead, so we want to reduce
// the number of spawn() calls. We'll create up to 1024 tasks here.
// (Note that 1024 is an arbitrary number. This code probably needs
// improving to take the number of available cores into account.)
ptrdiff_t TaskSize = std::distance(Begin, End) / 1024;
// Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
// overhead on large inputs.
ptrdiff_t TaskSize = std::distance(Begin, End) / MaxTasksPerGroup;
if (TaskSize == 0)
TaskSize = 1;

Expand All @@ -140,7 +145,9 @@ void parallel_for_each(IterTy Begin, IterTy End, FuncTy Fn) {

template <class IndexTy, class FuncTy>
void parallel_for_each_n(IndexTy Begin, IndexTy End, FuncTy Fn) {
ptrdiff_t TaskSize = (End - Begin) / 1024;
// Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
// overhead on large inputs.
ptrdiff_t TaskSize = (End - Begin) / MaxTasksPerGroup;
if (TaskSize == 0)
TaskSize = 1;

Expand All @@ -156,6 +163,50 @@ void parallel_for_each_n(IndexTy Begin, IndexTy End, FuncTy Fn) {
Fn(J);
}

template <class IterTy, class ResultTy, class ReduceFuncTy,
class TransformFuncTy>
ResultTy parallel_transform_reduce(IterTy Begin, IterTy End, ResultTy Init,
ReduceFuncTy Reduce,
TransformFuncTy Transform) {
// Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
// overhead on large inputs.
size_t NumInputs = std::distance(Begin, End);
if (NumInputs == 0)
return std::move(Init);
size_t NumTasks = std::min(static_cast<size_t>(MaxTasksPerGroup), NumInputs);
std::vector<ResultTy> Results(NumTasks, Init);
{
// Each task processes either TaskSize or TaskSize+1 inputs. Any inputs
// remaining after dividing them equally amongst tasks are distributed as
// one extra input over the first tasks.
TaskGroup TG;
size_t TaskSize = NumInputs / NumTasks;
size_t RemainingInputs = NumInputs % NumTasks;
IterTy TBegin = Begin;
for (size_t TaskId = 0; TaskId < NumTasks; ++TaskId) {
IterTy TEnd = TBegin + TaskSize + (TaskId < RemainingInputs ? 1 : 0);
TG.spawn([=, &Transform, &Reduce, &Results] {
// Reduce the result of transformation eagerly within each task.
ResultTy R = Init;
for (IterTy It = TBegin; It != TEnd; ++It)
R = Reduce(R, Transform(*It));
Results[TaskId] = R;
});
TBegin = TEnd;
}
assert(TBegin == End);
}

// Do a final reduction. There are at most 1024 tasks, so this only adds
// constant single-threaded overhead for large inputs. Hopefully most
// reductions are cheaper than the transformation.
ResultTy FinalResult = std::move(Results.front());
for (ResultTy &PartialResult :
makeMutableArrayRef(Results.data() + 1, Results.size() - 1))
FinalResult = Reduce(FinalResult, std::move(PartialResult));
return std::move(FinalResult);
}

#endif

} // namespace detail
Expand Down Expand Up @@ -198,6 +249,22 @@ void parallelForEachN(size_t Begin, size_t End, FuncTy Fn) {
Fn(I);
}

template <class IterTy, class ResultTy, class ReduceFuncTy,
class TransformFuncTy>
ResultTy parallelTransformReduce(IterTy Begin, IterTy End, ResultTy Init,
ReduceFuncTy Reduce,
TransformFuncTy Transform) {
#if LLVM_ENABLE_THREADS
if (parallel::strategy.ThreadsRequested != 1) {
return parallel::detail::parallel_transform_reduce(Begin, End, Init, Reduce,
Transform);
}
#endif
for (IterTy I = Begin; I != End; ++I)
Init = Reduce(std::move(Init), Transform(*I));
return std::move(Init);
}

// Range wrappers.
template <class RangeTy,
class Comparator = std::less<decltype(*std::begin(RangeTy()))>>
Expand All @@ -210,6 +277,31 @@ void parallelForEach(RangeTy &&R, FuncTy Fn) {
parallelForEach(std::begin(R), std::end(R), Fn);
}

template <class RangeTy, class ResultTy, class ReduceFuncTy,
class TransformFuncTy>
ResultTy parallelTransformReduce(RangeTy &&R, ResultTy Init,
ReduceFuncTy Reduce,
TransformFuncTy Transform) {
return parallelTransformReduce(std::begin(R), std::end(R), Init, Reduce,
Transform);
}

// Parallel for-each, but with error handling.
template <class RangeTy, class FuncTy>
Error parallelForEachError(RangeTy &&R, FuncTy Fn) {
// The transform_reduce algorithm requires that the initial value be copyable.
// Error objects are uncopyable. We only need to copy initial success values,
// so work around this mismatch via the C API. The C API represents success
// values with a null pointer. The joinErrors discards null values and joins
// multiple errors into an ErrorList.
return unwrap(parallelTransformReduce(
std::begin(R), std::end(R), wrap(Error::success()),
[](LLVMErrorRef Lhs, LLVMErrorRef Rhs) {
return wrap(joinErrors(unwrap(Lhs), unwrap(Rhs)));
},
[&Fn](auto &&V) { return wrap(Fn(V)); }));
}

} // namespace llvm

#endif // LLVM_SUPPORT_PARALLEL_H
43 changes: 43 additions & 0 deletions llvm/unittests/Support/ParallelTest.cpp
Expand Up @@ -49,4 +49,47 @@ TEST(Parallel, parallel_for) {
ASSERT_EQ(range[2049], 1u);
}

TEST(Parallel, TransformReduce) {
// Sum an empty list, check that it works.
auto identity = [](uint32_t v) { return v; };
uint32_t sum = parallelTransformReduce(ArrayRef<uint32_t>(), 0U,
std::plus<uint32_t>(), identity);
EXPECT_EQ(sum, 0U);

// Sum the lengths of these strings in parallel.
const char *strs[] = {"a", "ab", "abc", "abcd", "abcde", "abcdef"};
size_t lenSum =
parallelTransformReduce(strs, static_cast<size_t>(0), std::plus<size_t>(),
[](const char *s) { return strlen(s); });
EXPECT_EQ(lenSum, static_cast<size_t>(21));

// Check that we handle non-divisible task sizes as above.
uint32_t range[2050];
std::fill(std::begin(range), std::end(range), 1);
sum = parallelTransformReduce(range, 0U, std::plus<uint32_t>(), identity);
EXPECT_EQ(sum, 2050U);

std::fill(std::begin(range), std::end(range), 2);
sum = parallelTransformReduce(range, 0U, std::plus<uint32_t>(), identity);
EXPECT_EQ(sum, 4100U);

// Avoid one large task.
uint32_t range2[3060];
std::fill(std::begin(range2), std::end(range2), 1);
sum = parallelTransformReduce(range2, 0U, std::plus<uint32_t>(), identity);
EXPECT_EQ(sum, 3060U);
}

TEST(Parallel, ForEachError) {
int nums[] = {1, 2, 3, 4, 5, 6};
Error e = parallelForEachError(nums, [](int v) -> Error {
if ((v & 1) == 0)
return createStringError(std::errc::invalid_argument, "asdf");
return Error::success();
});
EXPECT_TRUE(e.isA<ErrorList>());
std::string errText = toString(std::move(e));
EXPECT_EQ(errText, std::string("asdf\nasdf\nasdf"));
}

#endif

0 comments on commit c0a922b

Please sign in to comment.