Skip to content

Commit

Permalink
Fix a bunch of typing problems in Buffer, Sample, Fold and Reduce ope…
Browse files Browse the repository at this point in the history
…rator.
  • Loading branch information
renggli committed Mar 14, 2021
1 parent 27fe9b0 commit 1c6efe4
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 151 deletions.
3 changes: 2 additions & 1 deletion lib/operators.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export 'src/operators/exhaust.dart' show ExhaustAllOperator, ExhaustMapOperator;
export 'src/operators/finalize.dart' show FinalizeOperator;
export 'src/operators/first.dart' show FirstOperator;
export 'src/operators/flat_map.dart' show FlattenObservable, FlatMapOperator;
export 'src/operators/fold.dart' show FoldOperator;
export 'src/operators/ignore_elements.dart' show IgnoreElementsOperator;
export 'src/operators/is_empty.dart' show IsEmptyOperator;
export 'src/operators/last.dart' show LastOperator;
Expand All @@ -26,10 +27,10 @@ export 'src/operators/merge.dart' show MergeAllOperator, MergeMapOperator;
export 'src/operators/multicast.dart' show MulticastOperator;
export 'src/operators/observe_on.dart' show ObserveOnOperator;
export 'src/operators/publish.dart' show PublishOperator;
export 'src/operators/reduce.dart' show ReduceOperator;
export 'src/operators/ref_count.dart' show RefCountOperator;
export 'src/operators/repeat.dart' show RepeatOperator;
export 'src/operators/sample.dart' show SampleOperator;
export 'src/operators/scan.dart' show ScanOperator;
export 'src/operators/single.dart' show SingleOperator;
export 'src/operators/skip.dart' show SkipOperator;
export 'src/operators/skip_while.dart' show SkipWhileOperator;
Expand Down
22 changes: 11 additions & 11 deletions lib/src/operators/buffer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ extension BufferOperator<T> on Observable<T> {
/// - the buffer reaches [maxLength], or
/// - the buffer reaches [maxAge].
///
Observable<List<T>> buffer(
Observable<List<T>> buffer<R>(
{Scheduler? scheduler,
Observable? trigger,
Observable<R>? trigger,
int? maxLength,
Duration? maxAge}) =>
BufferObservable<T>(
BufferObservable<T, R>(
this, scheduler ?? defaultScheduler, trigger, maxLength, maxAge);
}

class BufferObservable<T> extends Observable<List<T>> {
class BufferObservable<T, R> extends Observable<List<T>> {
final Observable<T> delegate;
final Scheduler scheduler;
final Observable? trigger;
final Observable<R>? trigger;
final int? maxLength;
final Duration? maxAge;

Expand All @@ -36,14 +36,14 @@ class BufferObservable<T> extends Observable<List<T>> {
@override
Disposable subscribe(Observer<List<T>> observer) {
final subscriber =
BufferSubscriber<T>(observer, scheduler, trigger, maxLength, maxAge);
BufferSubscriber<T, R>(observer, scheduler, trigger, maxLength, maxAge);
subscriber.add(delegate.subscribe(subscriber));
return subscriber;
}
}

class BufferSubscriber<T> extends Subscriber<T>
implements InnerEvents<T, void> {
class BufferSubscriber<T, R> extends Subscriber<T>
implements InnerEvents<R, void> {
final Scheduler scheduler;
final int? maxLength;
final Duration? maxAge;
Expand All @@ -52,11 +52,11 @@ class BufferSubscriber<T> extends Subscriber<T>
DateTime? bufferBirth;

BufferSubscriber(Observer<List<T>> observer, this.scheduler,
Observable? trigger, this.maxLength, this.maxAge)
Observable<R>? trigger, this.maxLength, this.maxAge)
: super(observer) {
reset();
if (trigger != null) {
add(InnerObserver<dynamic, void>(this, trigger, null));
add(InnerObserver<R, void>(this, trigger, null));
}
}

Expand All @@ -76,7 +76,7 @@ class BufferSubscriber<T> extends Subscriber<T>
}

@override
void notifyNext(Disposable disposable, void state, T value) => flush();
void notifyNext(Disposable disposable, void state, R value) => flush();

@override
void notifyError(Disposable disposable, void state, Object error,
Expand Down
47 changes: 47 additions & 0 deletions lib/src/operators/fold.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import '../core/observable.dart';
import '../core/observer.dart';
import '../core/subscriber.dart';
import '../disposables/disposable.dart';
import '../events/event.dart';
import '../shared/functions.dart';

extension FoldOperator<T> on Observable<T> {
/// Combines a sequence of values by repeatedly applying [transform], starting
/// with the provided [initialValue].
Observable<R> fold<R>(R initialValue, Map2<R, T, R> transform) =>
FoldObservable<T, R>(this, transform, initialValue);
}

class FoldObservable<T, R> extends Observable<R> {
final Observable<T> delegate;
final Map2<R, T, R> transform;
final R seedValue;

FoldObservable(this.delegate, this.transform, this.seedValue);

@override
Disposable subscribe(Observer<R> observer) {
final subscriber = FoldSubscriber<T, R>(observer, transform, seedValue);
subscriber.add(delegate.subscribe(subscriber));
return subscriber;
}
}

class FoldSubscriber<T, R> extends Subscriber<T> {
final Map2<R, T, R> transform;
R seedValue;

FoldSubscriber(Observer<R> destination, this.transform, this.seedValue)
: super(destination);

@override
void onNext(T value) {
final transformEvent = Event.map2(transform, seedValue, value);
if (transformEvent.isError) {
doError(transformEvent.error, transformEvent.stackTrace);
} else {
seedValue = transformEvent.value;
}
doNext(seedValue);
}
}
51 changes: 51 additions & 0 deletions lib/src/operators/reduce.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import '../core/observable.dart';
import '../core/observer.dart';
import '../core/subscriber.dart';
import '../disposables/disposable.dart';
import '../events/event.dart';
import '../shared/functions.dart';

extension ReduceOperator<T> on Observable<T> {
/// Combines a sequence of values by repeatedly applying [transform].
Observable<T> reduce(Map2<T, T, T> transform) =>
ReduceObservable<T>(this, transform);
}

class ReduceObservable<T> extends Observable<T> {
final Observable<T> delegate;
final Map2<T, T, T> transform;

ReduceObservable(this.delegate, this.transform);

@override
Disposable subscribe(Observer<T> observer) {
final subscriber = ReduceSubscriber<T>(observer, transform);
subscriber.add(delegate.subscribe(subscriber));
return subscriber;
}
}

class ReduceSubscriber<T> extends Subscriber<T> {
final Map2<T, T, T> transform;
bool hasSeed = false;
late T seedValue;

ReduceSubscriber(Observer<T> destination, this.transform)
: super(destination);

@override
void onNext(T value) {
if (hasSeed) {
final transformEvent = Event.map2(transform, seedValue, value);
if (transformEvent.isError) {
doError(transformEvent.error, transformEvent.stackTrace);
} else {
seedValue = transformEvent.value;
}
} else {
seedValue = value;
hasSeed = true;
}
doNext(seedValue);
}
}
2 changes: 1 addition & 1 deletion lib/src/operators/sample.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SampleSubscriber<T, R> extends Subscriber<T>

SampleSubscriber(Observer<T> observer, Observable<R> trigger)
: super(observer) {
add(InnerObserver(this, trigger, null));
add(InnerObserver<R, void>(this, trigger, null));
}

@override
Expand Down
60 changes: 0 additions & 60 deletions lib/src/operators/scan.dart

This file was deleted.

0 comments on commit 1c6efe4

Please sign in to comment.