Skip to content

Commit

Permalink
[yarpl] Reduce operator for Flowable (#411)
Browse files Browse the repository at this point in the history
* [yarpl] Reduce operator for Flowable

* Reduce Flowable operator now utilizes different template for accumulation and value. Reduce operator's implementation now matches with the desired behavior. TODO - I will add unit tests for Reduce Observable.

* Fix the Reduce Observable and add unit tests for validating it

* trying to make is_trivially_assignable to work

* removed is_trivially_assignable, so it will check is_assignable

* fixing yarpl dependency (#436)

* fixup! updating the ReduceOperator class to comply with recent updates in the parent class
  • Loading branch information
phoad authored and lehecka committed May 19, 2017
1 parent c3566df commit e0867a9
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 18 deletions.
11 changes: 11 additions & 0 deletions yarpl/include/yarpl/flowable/Flowable.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ class Flowable : public virtual Refcounted {
template <typename Function>
auto filter(Function&& function);

template <typename Function>
auto reduce(Function&& function);

auto take(int64_t);

auto subscribeOn(Scheduler&);
Expand Down Expand Up @@ -332,6 +335,14 @@ auto Flowable<T>::filter(Function&& function) {
Reference<Flowable<T>>(this), std::forward<Function>(function)));
}

template <typename T>
template <typename Function>
auto Flowable<T>::reduce(Function&& function) {
using D = typename std::result_of<Function(T, T)>::type;
return Reference<Flowable<D>>(new ReduceOperator<T, D, Function>(
Reference<Flowable<T>>(this), std::forward<Function>(function)));
}

template <typename T>
auto Flowable<T>::take(int64_t limit) {
return Reference<Flowable<T>>(
Expand Down
79 changes: 70 additions & 9 deletions yarpl/include/yarpl/flowable/FlowableOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ class FlowableOperator : public Flowable<D> {
Refcounted::decRef(*this);
}

protected:
void onComplete() override {
subscriber_->onComplete();
upstream_.reset(); // breaking the cycle
Refcounted::decRef(*this);
}

private:
void onSubscribe(
Reference<::yarpl::flowable::Subscription> subscription) override {
Expand All @@ -81,12 +88,6 @@ class FlowableOperator : public Flowable<D> {
Reference<::yarpl::flowable::Subscription>(this));
}

void onComplete() override {
subscriber_->onComplete();
upstream_.reset(); // breaking the cycle
Refcounted::decRef(*this);
}

void onError(const std::exception_ptr error) override {
subscriber_->onError(error);
upstream_.reset(); // breaking the cycle
Expand Down Expand Up @@ -137,7 +138,7 @@ class MapOperator : public FlowableOperator<U, D> {
Subscription(
Reference<Flowable<D>> flowable,
Reference<Subscriber<D>> subscriber)
: FlowableOperator<U, D>::Subscription(
: Super(
std::move(flowable),
std::move(subscriber)) {}

Expand Down Expand Up @@ -175,7 +176,7 @@ class FilterOperator : public FlowableOperator<U, U> {
Subscription(
Reference<Flowable<U>> flowable,
Reference<Subscriber<U>> subscriber)
: FlowableOperator<U, U>::Subscription(
: Super(
std::move(flowable),
std::move(subscriber)) {}

Expand All @@ -187,8 +188,68 @@ class FilterOperator : public FlowableOperator<U, U> {
Super::request(1l);
}
}
};

private:
F function_;
};

template <
typename U,
typename D,
typename F,
typename = typename std::enable_if<std::is_assignable<D, U>::value>,
typename = typename std::enable_if<std::is_callable<F(D, U), D>::value>::type>
class ReduceOperator : public FlowableOperator<U, D> {
public:
ReduceOperator(Reference<Flowable<U>> upstream, F&& function)
: FlowableOperator<U, D>(std::move(upstream)),
function_(std::forward<F>(function)) {}

void subscribe(Reference<Subscriber<D>> subscriber) override {
FlowableOperator<U, D>::upstream_->subscribe(
// Note: implicit cast to a reference to a subscriber.
Reference<Subscription>(new Subscription(
Reference<Flowable<D>>(this), std::move(subscriber))));
}

private:
class Subscription : public FlowableOperator<U, D>::Subscription {
using Super = typename FlowableOperator<U,D>::Subscription;
public:
Subscription(
Reference<Flowable<D>> flowable,
Reference<Subscriber<D>> subscriber)
: Super(
std::move(flowable),
std::move(subscriber)),
accInitialized_(false) {
}

void request(int64_t /* delta */) override {
// Request all of the items
Super::request(FlowableOperator<U, D>::NO_FLOW_CONTROL);
}

void onNext(U value) override {
auto* reduce = Super::template getFlowableAs<ReduceOperator>();
if (accInitialized_) {
acc_ = reduce->function_(std::move(acc_), std::move(value));
} else {
acc_ = std::move(value);
accInitialized_ = true;
}
}

void onComplete() override {
if (accInitialized_) {
Super::subscriberOnNext(std::move(acc_));
}
Super::onComplete();
}

private:
bool accInitialized_;
D acc_;
};

F function_;
Expand Down
11 changes: 11 additions & 0 deletions yarpl/include/yarpl/observable/Observable.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ class Observable : public virtual Refcounted {
template <typename Function>
auto filter(Function&& function);

template <typename Function>
auto reduce(Function&& function);

auto take(int64_t);

auto subscribeOn(Scheduler&);
Expand Down Expand Up @@ -141,6 +144,14 @@ auto Observable<T>::filter(Function&& function) {
Reference<Observable<T>>(this), std::forward<Function>(function)));
}

template <typename T>
template <typename Function>
auto Observable<T>::reduce(Function&& function) {
using D = typename std::result_of<Function(T, T)>::type;
return Reference<Observable<D>>(new ReduceOperator<T, D, Function>(
Reference<Observable<T>>(this), std::forward<Function>(function)));
}

template <typename T>
auto Observable<T>::take(int64_t limit) {
return Reference<Observable<T>>(
Expand Down
76 changes: 67 additions & 9 deletions yarpl/include/yarpl/observable/ObservableOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,20 @@ class ObservableOperator : public Observable<D> {
observer_->onNext(std::move(value));
}

private:
protected:
void onComplete() override {
observer_->onComplete();
upstream_.reset(); // breaking the cycle
}

private:
void onSubscribe(
Reference<::yarpl::observable::Subscription> subscription) override {
upstream_ = std::move(subscription);
observer_->onSubscribe(
Reference<::yarpl::observable::Subscription>(this));
}

void onComplete() override {
observer_->onComplete();
upstream_.reset(); // breaking the cycle
}

void onError(const std::exception_ptr error) override {
observer_->onError(error);
upstream_.reset(); // breaking the cycle
Expand Down Expand Up @@ -129,7 +130,7 @@ class MapOperator : public ObservableOperator<U, D> {
Subscription(
Reference<Observable<D>> observable,
Reference<Observer<D>> observer)
: ObservableOperator<U, D>::Subscription(
: Super(
std::move(observable),
std::move(observer)) {}

Expand Down Expand Up @@ -167,7 +168,7 @@ class FilterOperator : public ObservableOperator<U, U> {
Subscription(
Reference<Observable<U>> observable,
Reference<Observer<U>> observer)
: ObservableOperator<U, U>::Subscription(
: Super(
std::move(observable),
std::move(observer)) {}

Expand All @@ -182,6 +183,63 @@ class FilterOperator : public ObservableOperator<U, U> {
F function_;
};

template<
typename U,
typename D,
typename F,
typename = typename std::enable_if<std::is_assignable<D, U>::value>,
typename = typename std::enable_if<std::is_callable<F(D, U), D>::value>::type>
class ReduceOperator : public ObservableOperator<U, D> {
public:
ReduceOperator(Reference<Observable<U>> upstream, F &&function)
: ObservableOperator<U, D>(std::move(upstream)),
function_(std::forward<F>(function)) {}

void subscribe(Reference<Observer<D>> subscriber) override {
ObservableOperator<U, D>::upstream_->subscribe(
// Note: implicit cast to a reference to a subscriber.
Reference<Subscription>(new Subscription(
Reference<Observable<D>>(this), std::move(subscriber))));
}

private:
class Subscription : public ObservableOperator<U, D>::Subscription {
using Super = typename ObservableOperator<U, D>::Subscription;

public:
Subscription(
Reference <Observable<D>> flowable,
Reference <Observer<D>> subscriber)
: Super(
std::move(flowable),
std::move(subscriber)),
accInitialized_(false) {}

void onNext(U value) override {
auto* reduce = Super::template getObservableAs<ReduceOperator>();
if (accInitialized_) {
acc_ = reduce->function_(std::move(acc_), std::move(value));
} else {
acc_ = std::move(value);
accInitialized_ = true;
}
}

void onComplete() override {
if (accInitialized_) {
Super::observerOnNext(std::move(acc_));
}
Super::onComplete();
}

private:
bool accInitialized_;
D acc_;
};

F function_;
};

template <typename T>
class TakeOperator : public ObservableOperator<T, T> {
public:
Expand All @@ -202,7 +260,7 @@ class TakeOperator : public ObservableOperator<T, T> {
Reference<Observable<T>> observable,
int64_t limit,
Reference<Observer<T>> observer)
: ObservableOperator<T, T>::Subscription(
: Super(
std::move(observable),
std::move(observer)),
limit_(limit) {}
Expand Down

0 comments on commit e0867a9

Please sign in to comment.