Skip to content

Commit

Permalink
ARROW-11935: [C++] Add push generator
Browse files Browse the repository at this point in the history
A push generator has a producer end which pushes values to a queue, and a consumer end (the generator itself) which yields futures that receive the values pushed by the producer.

Closes apache#9714 from pitrou/ARROW-11935-push-gen

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
pitrou authored and GeorgeAp committed Jun 7, 2021
1 parent 37b1adb commit fb4b257
Show file tree
Hide file tree
Showing 8 changed files with 360 additions and 75 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/csv/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/io/memory.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/future.h"
#include "arrow/util/thread_pool.h"
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/filesystem/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/filesystem/util_internal.h"
#include "arrow/io/interfaces.h"
#include "arrow/status.h"
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/future.h"

Expand Down
104 changes: 104 additions & 0 deletions cpp/src/arrow/testing/future_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "arrow/testing/gtest_util.h"
#include "arrow/util/future.h"

// This macro should be called by futures that are expected to
// complete pretty quickly. 2 seconds is the default max wait
// here. Anything longer than that and it's a questionable
// unit test anyways.
#define ASSERT_FINISHES_IMPL(fut) \
do { \
ASSERT_TRUE(fut.Wait(10)); \
if (!fut.is_finished()) { \
FAIL() << "Future did not finish in a timely fashion"; \
} \
} while (false)

#define ASSERT_FINISHES_OK(expr) \
do { \
auto&& _fut = (expr); \
ASSERT_TRUE(_fut.Wait(10)); \
if (!_fut.is_finished()) { \
FAIL() << "Future did not finish in a timely fashion"; \
} \
auto _st = _fut.status(); \
if (!_st.ok()) { \
FAIL() << "'" ARROW_STRINGIFY(expr) "' failed with " << _st.ToString(); \
} \
} while (false)

#define ASSERT_FINISHES_AND_RAISES(ENUM, expr) \
do { \
auto&& fut = (expr); \
ASSERT_FINISHES_IMPL(fut); \
ASSERT_RAISES(ENUM, fut.status()); \
} while (false)

#define ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr, future_name) \
auto future_name = (rexpr); \
ASSERT_FINISHES_IMPL(future_name); \
ASSERT_OK_AND_ASSIGN(lhs, future_name.result());

#define ASSERT_FINISHES_OK_AND_ASSIGN(lhs, rexpr) \
ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr, \
ARROW_ASSIGN_OR_RAISE_NAME(_fut, __COUNTER__))

#define ASSERT_FINISHES_OK_AND_EQ(expected, expr) \
do { \
ASSERT_FINISHES_OK_AND_ASSIGN(auto _actual, (expr)); \
ASSERT_EQ(expected, _actual); \
} while (0)

namespace arrow {

template <typename T>
void AssertNotFinished(const Future<T>& fut) {
ASSERT_FALSE(IsFutureFinished(fut.state()));
}

template <typename T>
void AssertFinished(const Future<T>& fut) {
ASSERT_TRUE(IsFutureFinished(fut.state()));
}

// Assert the future is successful *now*
template <typename T>
void AssertSuccessful(const Future<T>& fut) {
if (IsFutureFinished(fut.state())) {
ASSERT_EQ(fut.state(), FutureState::SUCCESS);
ASSERT_OK(fut.status());
} else {
FAIL() << "Expected future to be completed successfully but it was still pending";
}
}

// Assert the future is failed *now*
template <typename T>
void AssertFailed(const Future<T>& fut) {
if (IsFutureFinished(fut.state())) {
ASSERT_EQ(fut.state(), FutureState::FAILURE);
ASSERT_FALSE(fut.status().ok());
} else {
FAIL() << "Expected future to have failed but it was still pending";
}
}

} // namespace arrow
43 changes: 2 additions & 41 deletions cpp/src/arrow/testing/gtest_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "arrow/type_traits.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/macros.h"
#include "arrow/util/type_fwd.h"

// NOTE: failing must be inline in the macros below, to get correct file / line number
// reporting on test failures.
Expand Down Expand Up @@ -134,48 +135,8 @@
ASSERT_EQ(expected, _actual); \
} while (0)

// This macro should be called by futures that are expected to
// complete pretty quickly. 2 seconds is the default max wait
// here. Anything longer than that and it's a questionable
// unit test anyways.
#define ASSERT_FINISHES_IMPL(fut) \
do { \
ASSERT_TRUE(fut.Wait(10)); \
if (!fut.is_finished()) { \
FAIL() << "Future did not finish in a timely fashion"; \
} \
} while (false)

#define ASSERT_FINISHES_OK(expr) \
do { \
auto&& _fut = (expr); \
ASSERT_TRUE(_fut.Wait(10)); \
if (!_fut.is_finished()) { \
FAIL() << "Future did not finish in a timely fashion"; \
} \
auto _st = _fut.status(); \
if (!_st.ok()) { \
FAIL() << "'" ARROW_STRINGIFY(expr) "' failed with " << _st.ToString(); \
} \
} while (false)

#define ASSERT_FINISHES_ERR(ENUM, expr) \
do { \
auto&& fut = (expr); \
ASSERT_FINISHES_IMPL(fut); \
ASSERT_RAISES(ENUM, fut.status()); \
} while (false)

#define ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr, future_name) \
auto future_name = (rexpr); \
ASSERT_FINISHES_IMPL(future_name); \
ASSERT_OK_AND_ASSIGN(lhs, future_name.result());

#define ASSERT_FINISHES_OK_AND_ASSIGN(lhs, rexpr) \
ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr, \
ARROW_ASSIGN_OR_RAISE_NAME(_fut, __COUNTER__))

namespace arrow {

// ----------------------------------------------------------------------
// Useful testing::Types declarations

Expand Down
106 changes: 106 additions & 0 deletions cpp/src/arrow/util/async_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
// under the License.

#pragma once

#include <cassert>
#include <deque>
#include <queue>

#include "arrow/util/functional.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/mutex.h"
#include "arrow/util/optional.h"
#include "arrow/util/queue.h"
#include "arrow/util/thread_pool.h"
Expand All @@ -36,6 +40,11 @@ Future<T> AsyncGeneratorEnd() {
return Future<T>::MakeFinished(IterationTraits<T>::End());
}

template <typename T>
bool IsGeneratorEnd(const T& value) {
return value == IterationTraits<T>::End();
}

/// Iterates through a generator of futures, visiting the result of each one and
/// returning a future that completes when all have been visited
template <typename T>
Expand Down Expand Up @@ -336,6 +345,103 @@ class ReadaheadGenerator {
std::queue<Future<T>> readahead_queue_;
};

/// \brief A generator where the producer pushes items on a queue.
///
/// No back-pressure is applied, so this generator is mostly useful when
/// producing the values is neither CPU- nor memory-expensive (e.g. fetching
/// filesystem metadata).
///
/// This generator is not async-reentrant.
template <typename T>
class PushGenerator {
struct State {
util::Mutex mutex;
std::deque<Result<T>> result_q;
util::optional<Future<T>> consumer_fut;
bool finished = false;
};

public:
/// Producer API for PushGenerator
class Producer {
public:
explicit Producer(std::shared_ptr<State> state) : state_(std::move(state)) {}

/// Push a value on the queue
void Push(Result<T> result) {
auto lock = state_->mutex.Lock();
if (state_->finished) {
// Closed early
return;
}
if (state_->consumer_fut.has_value()) {
auto fut = std::move(state_->consumer_fut.value());
state_->consumer_fut.reset();
lock.Unlock(); // unlock before potentially invoking a callback
fut.MarkFinished(std::move(result));
return;
}
state_->result_q.push_back(std::move(result));
}

/// \brief Tell the consumer we have finished producing
///
/// It is allowed to call this and later call Push() again ("early close").
/// In this case, calls to Push() after the queue is closed are silently
/// ignored. This can help implementing non-trivial cancellation cases.
void Close() {
auto lock = state_->mutex.Lock();
if (state_->finished) {
// Already closed
return;
}
state_->finished = true;
if (state_->consumer_fut.has_value()) {
auto fut = std::move(state_->consumer_fut.value());
state_->consumer_fut.reset();
lock.Unlock(); // unlock before potentially invoking a callback
fut.MarkFinished(IterationTraits<T>::End());
}
}

bool is_closed() const {
auto lock = state_->mutex.Lock();
return state_->finished;
}

private:
const std::shared_ptr<State> state_;
};

PushGenerator() : state_(std::make_shared<State>()) {}

/// Read an item from the queue
Future<T> operator()() {
auto lock = state_->mutex.Lock();
assert(!state_->consumer_fut.has_value()); // Non-reentrant
if (!state_->result_q.empty()) {
auto fut = Future<T>::MakeFinished(std::move(state_->result_q.front()));
state_->result_q.pop_front();
return fut;
}
if (state_->finished) {
return AsyncGeneratorEnd<T>();
}
auto fut = Future<T>::Make();
state_->consumer_fut = fut;
return fut;
}

/// \brief Return producer-side interface
///
/// The returned object must be used by the producer to push values on the queue.
/// Only a single Producer object should be instantiated.
Producer producer() { return Producer{state_}; }

private:
const std::shared_ptr<State> state_;
};

/// \brief Creates a generator that pulls reentrantly from a source
/// This generator will pull reentrantly from a source, ensuring that max_readahead
/// requests are active at any given time.
Expand Down
33 changes: 1 addition & 32 deletions cpp/src/arrow/util/future_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>

#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/thread_pool.h"
Expand Down Expand Up @@ -70,38 +71,6 @@ struct IterationTraits<MoveOnlyDataType> {
static MoveOnlyDataType End() { return MoveOnlyDataType(-1); }
};

template <typename T>
void AssertNotFinished(const Future<T>& fut) {
ASSERT_FALSE(IsFutureFinished(fut.state()));
}

template <typename T>
void AssertFinished(const Future<T>& fut) {
ASSERT_TRUE(IsFutureFinished(fut.state()));
}

// Assert the future is successful *now*
template <typename T>
void AssertSuccessful(const Future<T>& fut) {
if (IsFutureFinished(fut.state())) {
ASSERT_EQ(fut.state(), FutureState::SUCCESS);
ASSERT_OK(fut.status());
} else {
FAIL() << "Expected future to be completed successfully but it was still pending";
}
}

// Assert the future is failed *now*
template <typename T>
void AssertFailed(const Future<T>& fut) {
if (IsFutureFinished(fut.state())) {
ASSERT_EQ(fut.state(), FutureState::FAILURE);
ASSERT_FALSE(fut.status().ok());
} else {
FAIL() << "Expected future to have failed but it was still pending";
}
}

template <typename T>
struct IteratorResults {
std::vector<T> values;
Expand Down

0 comments on commit fb4b257

Please sign in to comment.