Skip to content

Commit

Permalink
More disposable cleanups.
Browse files Browse the repository at this point in the history
  • Loading branch information
renggli committed Nov 19, 2019
1 parent d50ed52 commit 0d584ef
Show file tree
Hide file tree
Showing 32 changed files with 272 additions and 291 deletions.
4 changes: 2 additions & 2 deletions lib/disposables.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
library rx.disposables;

export 'src/disposables/anonymous.dart';
export 'src/disposables/action.dart';
export 'src/disposables/composite.dart';
export 'src/disposables/disposable.dart';
export 'src/disposables/empty.dart';
export 'src/disposables/disposed.dart';
export 'src/disposables/errors.dart';
export 'src/disposables/sequential.dart';
export 'src/disposables/stateful.dart';
Expand Down
16 changes: 7 additions & 9 deletions lib/src/constructors/create.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
library rx.constructors.create;

import '../core/events.dart';
import '../core/observable.dart';
import '../core/observer.dart';
import '../core/subscriber.dart';
Expand All @@ -9,22 +8,21 @@ import '../shared/functions.dart';

/// Creates an observable sequence from a specified subscribe method
/// implementation.
Observable<T> create<T>(Map1<Subscriber<T>, dynamic> subscribeFunction) =>
CreateObservable<T>(subscribeFunction);
Observable<T> create<T>(Callback1<Subscriber<T>> callback) =>
CreateObservable<T>(callback);

class CreateObservable<T> extends Observable<T> {
final Map1<Subscriber<T>, dynamic> callback;
final Callback1<Subscriber<T>> callback;

CreateObservable(this.callback);

@override
Disposable subscribe(Observer<T> observer) {
final subscriber = Subscriber<T>(observer);
final event = Event.map1(callback, subscriber);
if (event is ErrorEvent) {
subscriber.error(event.error, event.stackTrace);
} else {
subscriber.add(Disposable.of(event.value));
try {
callback(subscriber);
} catch (error, stackTrace) {
subscriber.error(error, stackTrace);
}
return subscriber;
}
Expand Down
1 change: 0 additions & 1 deletion lib/src/constructors/defer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ import 'empty.dart';
Observable<T> defer<T>(Map0<Observable<T>> callback) => create<T>((subscriber) {
final observable = callback() ?? empty<T>();
subscriber.add(observable.subscribe(subscriber));
return subscriber;
});
3 changes: 2 additions & 1 deletion lib/src/constructors/never.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ library rx.constructors.never;
import '../core/observable.dart';
import '../core/observer.dart';
import '../disposables/disposable.dart';
import '../disposables/disposed.dart';

/// An [Observable] that emits no items and never completes.
Observable<T> never<T>() => NeverObservable<T>();
Expand All @@ -11,5 +12,5 @@ class NeverObservable<T> with Observable<T> {
const NeverObservable();

@override
Disposable subscribe(Observer<void> observer) => Disposable.empty();
Disposable subscribe(Observer<T> observer) => const DisposedDisposable();
}
6 changes: 4 additions & 2 deletions lib/src/constructors/timer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ library rx.constructors.timer;
import '../core/observable.dart';
import '../core/observer.dart';
import '../core/scheduler.dart';
import '../disposables/action.dart';
import '../disposables/composite.dart';
import '../disposables/disposable.dart';
import '../schedulers/settings.dart';

Expand All @@ -23,8 +25,8 @@ class TimerObservable with Observable<int> {

@override
Disposable subscribe(Observer<int> observer) {
final subscription = Disposable.composite();
subscription.add(Disposable.create(() => observer.complete()));
final subscription = CompositeDisposable();
subscription.add(ActionDisposable(() => observer.complete()));
subscription.add(scheduler.scheduleRelative(delay, () {
observer.next(0);
if (period == null) {
Expand Down
11 changes: 6 additions & 5 deletions lib/src/converters/future_to_observable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'dart:async';
import '../core/observable.dart';
import '../core/observer.dart';
import '../disposables/disposable.dart';
import '../disposables/stateful.dart';

extension FutureToObservable<T> on Future<T> {
/// An [Observable] that listens to the completion of a [Future].
Expand All @@ -18,7 +19,7 @@ class FutureObservable<T> with Observable<T> {

@override
Disposable subscribe(Observer<T> observer) {
final subscription = Disposable.stateful();
final subscription = StatefulDisposable();
future.then(
(value) => _onValue(subscription, observer, value),
onError: (error, stackTrace) =>
Expand All @@ -27,17 +28,17 @@ class FutureObservable<T> with Observable<T> {
return subscription;
}

void _onValue(Disposable subscription, Observer<T> observer, T value) {
if (subscription.isDisposed) {
void _onValue(Disposable disposable, Observer<T> observer, T value) {
if (disposable.isDisposed) {
return;
}
observer.next(value);
observer.complete();
}

void _onError(Disposable subscription, Observer<T> observer, Object error,
void _onError(Disposable disposable, Observer<T> observer, Object error,
StackTrace stackTrace) {
if (subscription.isDisposed) {
if (disposable.isDisposed) {
return;
}
observer.error(error, stackTrace);
Expand Down
14 changes: 7 additions & 7 deletions lib/src/converters/observable_to_future.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@ import 'dart:async';
import '../core/errors.dart';
import '../core/observable.dart';
import '../core/observer.dart';
import '../disposables/disposable.dart';
import '../disposables/composite.dart';

extension ObservableToFuture<T> on Observable<T> {
/// A [Future] that completes with the first value of an [Observable].
Future<T> toFuture() {
final subscriptions = Disposable.composite();
final disposable = CompositeDisposable();
final completer = Completer<T>();
final observer = Observer<T>(
next: (value) {
completer.complete(value);
subscriptions.dispose();
disposable.dispose();
},
error: (error, [stackTrace]) {
completer.completeError(error, stackTrace);
subscriptions.dispose();
disposable.dispose();
},
complete: () {
completer.completeError(TooFewError());
subscriptions.dispose();
disposable.dispose();
},
);
subscriptions.add(observer);
subscriptions.add(subscribe(observer));
disposable.add(observer);
disposable.add(subscribe(observer));
return completer.future;
}
}
19 changes: 8 additions & 11 deletions lib/src/converters/observable_to_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,21 @@ import 'dart:async' show Stream, StreamController;

import '../core/observable.dart';
import '../core/observer.dart';
import '../disposables/disposable.dart';
import '../disposables/sequential.dart';

extension ObservableToStream<T> on Observable<T> {
/// A [Stream] that listens to an [Observable].
Stream<T> toStream() {
var subscription = Disposable.empty();
final disposable = SequentialDisposable();
final controller = StreamController<T>();
final observer = Observer<T>(
next: controller.add,
error: controller.addError,
complete: controller.close,
);
controller.onListen = () {
if (subscription.isDisposed) {
subscription = subscribe(observer);
}
disposable.current = subscribe(Observer<T>(
next: controller.add,
error: controller.addError,
complete: controller.close,
));
};
controller.onCancel = subscription.dispose;
controller.onCancel = disposable.dispose;
return controller.stream;
}
}
3 changes: 2 additions & 1 deletion lib/src/converters/stream_to_observable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'dart:async' show Stream;

import '../core/observable.dart';
import '../core/observer.dart';
import '../disposables/action.dart';
import '../disposables/disposable.dart';

extension StreamToObservable<T> on Stream<T> {
Expand All @@ -20,6 +21,6 @@ class StreamObservable<T> with Observable<T> {
Disposable subscribe(Observer<T> observer) {
final subscription = stream.listen(observer.next,
onError: observer.error, onDone: observer.complete);
return Disposable.create(subscription.cancel);
return ActionDisposable(subscription.cancel);
}
}
8 changes: 5 additions & 3 deletions lib/src/core/subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ library rx.core.subject;

import 'package:meta/meta.dart';

import '../disposables/action.dart';
import '../disposables/disposable.dart';
import '../disposables/disposed.dart';
import '../disposables/errors.dart';
import 'observable.dart';
import 'observer.dart';
Expand Down Expand Up @@ -81,20 +83,20 @@ class Subject<T>
@protected
Disposable subscribeToActive(Observer observer) {
_observers.add(observer);
return Disposable.create(() => _observers.remove(observer));
return ActionDisposable(() => _observers.remove(observer));
}

@protected
Disposable subscribeToError(
Observer observer, Object error, StackTrace stackTrace) {
observer.error(error, stackTrace);
return Disposable.empty();
return const DisposedDisposable();
}

@protected
Disposable subscribeToComplete(Observer observer) {
observer.complete();
return Disposable.empty();
return const DisposedDisposable();
}

@override
Expand Down
2 changes: 1 addition & 1 deletion lib/src/core/subscriber.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Subscriber<T> extends CompositeDisposable
@protected
final Observer destination;

Subscriber(this.destination);
Subscriber(this.destination) : super();

/// Receives the next value.
@override
Expand Down
13 changes: 13 additions & 0 deletions lib/src/disposables/action.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
library rx.disposables.action;

import '../shared/functions.dart';
import 'disposable.dart';
import 'reference.dart';

/// A [Disposable] with a callback that is called exactly once on disposal.
class ActionDisposable extends ReferenceDisposable<Callback0> {
ActionDisposable(Callback0 callback) : super(callback);

@override
void onDispose(Callback0 value) => value();
}
24 changes: 0 additions & 24 deletions lib/src/disposables/anonymous.dart

This file was deleted.

42 changes: 23 additions & 19 deletions lib/src/disposables/composite.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,37 @@ import 'disposable.dart';
import 'errors.dart';
import 'stateful.dart';

/// A [Disposable] container that holds other disposables.
class CompositeDisposable extends StatefulDisposable {
final Set<Disposable> _subscriptions = {};
final Set<Disposable> _disposables = Set.identity();

CompositeDisposable();

CompositeDisposable.of(Iterable<Disposable> subscriptions) {
subscriptions.forEach(add);
CompositeDisposable([Iterable<Disposable> disposables]) : super() {
if (disposables != null && disposables.isNotEmpty) {
disposables.forEach(add);
}
}

Set<Disposable> get subscriptions => {..._subscriptions};
Set<Disposable> get disposables => {..._disposables};

void add(Disposable subscription) {
ArgumentError.checkNotNull(subscription, 'subscription');
void add(Disposable disposable) {
ArgumentError.checkNotNull(disposable, 'disposable');
if (isDisposed) {
subscription.dispose();
disposable.dispose();
return;
}
if (subscription.isDisposed) {
if (disposable.isDisposed) {
return;
}
_subscriptions.add(subscription);
_disposables.add(disposable);
}

void remove(Disposable subscription) {
ArgumentError.checkNotNull(subscription, 'subscription');
if (_subscriptions.remove(subscription)) {
subscription.dispose();
void remove(Disposable disposable) {
ArgumentError.checkNotNull(disposable, 'disposable');
if (isDisposed) {
return;
}
if (_disposables.remove(disposable)) {
disposable.dispose();
}
}

Expand All @@ -39,13 +43,13 @@ class CompositeDisposable extends StatefulDisposable {
if (isDisposed) {
return;
}
final subscriptions = _subscriptions.toList();
final disposables = _disposables.toList();
super.dispose();
_subscriptions.clear();
_disposables.clear();
final errors = [];
for (final subscription in subscriptions) {
for (final disposable in disposables) {
try {
subscription.dispose();
disposable.dispose();
} catch (error) {
errors.add(error);
}
Expand Down

0 comments on commit 0d584ef

Please sign in to comment.