Skip to content

Commit

Permalink
Add initial support for reactive values.
Browse files Browse the repository at this point in the history
  • Loading branch information
renggli committed Mar 31, 2024
1 parent 660549c commit 277518c
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 0 deletions.
3 changes: 3 additions & 0 deletions lib/reactive.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export 'src/reactive/computed.dart' show Computed;
export 'src/reactive/mutable.dart' show Mutable;
export 'src/reactive/value.dart' show Value;
10 changes: 10 additions & 0 deletions lib/src/core/errors.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,13 @@ class TimeoutError extends Error {
@override
String toString() => 'TimeoutError{message: $message}';
}

/// An error thrown when there are circular dependencies.
class CircularDependencyError extends Error {
CircularDependencyError([this.message = 'Circular dependencies detected.']);

final String message;

@override
String toString() => 'CircularDependencyError{message: $message}';
}
80 changes: 80 additions & 0 deletions lib/src/reactive/computed.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import 'package:more/functional.dart';

import '../../core.dart';
import 'value.dart';

class Computed<T> extends Value<T> {
Computed(this._callback) {
_update();
}

final Map0<T> _callback;
late final ComputedObserver<T> _observer = ComputedObserver<T>(this);

State _state = State.initializing;
T? _value;
Object? _error;
StackTrace? _stackTrace;

@override
T get value {
if (active != null) subscribe(active as Observer<T>);
switch (_state) {
case State.initializing:
case State.computing:
throw CircularDependencyError();
case State.computed:
return _value as T;
case State.errored:
throw UnhandledError(_error!, _stackTrace!);
}
}

void _update() {
final previous = active;
active = _observer;
_state = State.computing;
_value = null;
_error = null;
_stackTrace = null;
try {
final value = _callback();
_state = State.computed;
update(_value = value);
} catch (error, stackTrace) {
_state = State.errored;
_error = error;
_stackTrace = stackTrace;
} finally {
active = previous;
}
}
}

enum State {
initializing,
computing,
computed,
errored;
}

class ComputedObserver<T> implements Observer<Never> {
ComputedObserver(this._computed);

final Computed<T> _computed;

@override
void next(Object? value) => _computed._update();

@override
void error(Object error, StackTrace stackTrace) => _computed._update();

@override
void complete() => _computed._update();

@override
bool get isDisposed => throw UnimplementedError();

@override
void dispose() => throw UnimplementedError();
}
19 changes: 19 additions & 0 deletions lib/src/reactive/mutable.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import '../core/observer.dart';
import 'value.dart';

class Mutable<T> extends Value<T> {
Mutable(this._value);

T _value;

@override
T get value {
if (active != null) subscribe(active as Observer<T>);
return _value;
}

set value(T value) {
if (value == _value) return;
update(_value = value);
}
}
30 changes: 30 additions & 0 deletions lib/src/reactive/value.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import 'package:meta/meta.dart';

import '../core/observable.dart';
import '../core/observer.dart';
import '../disposables/collection.dart';
import '../disposables/disposable.dart';

/// Abstract reactive value
abstract class Value<T> implements Observable<T> {
/// The currently held value.
T get value;

/// The observers monitoring changes of this value.
final Set<Observer<T>> _observers = {};

/// Subscribes to changes of this value.
@override
Disposable subscribe(Observer<T> observer) =>
CollectionDisposable<Observer<T>>.forSet(_observers, observer);

/// Update all registered observers of this value.
@protected
void update(T value) {
for (final observer in _observers) {
observer.next(value);
}
}
}

Observer<Never>? active;
2 changes: 2 additions & 0 deletions test/all_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'converters_test.dart' as converters_test;
import 'disposables_test.dart' as disposables_test;
import 'events_test.dart' as events_test;
import 'operators_test.dart' as operators_test;
import 'reactive_test.dart' as reactive_test;
import 'schedulers_test.dart' as schedulers_test;
import 'shared_test.dart' as shared_test;
import 'store_test.dart' as store_test;
Expand All @@ -17,6 +18,7 @@ void main() {
group('disposables', disposables_test.main);
group('events', events_test.main);
group('operators', operators_test.main);
group('reactive', reactive_test.main);
group('schedulers', schedulers_test.main);
group('shared', shared_test.main);
group('store', store_test.main);
Expand Down
83 changes: 83 additions & 0 deletions test/reactive_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import 'package:rx/core.dart';
import 'package:rx/reactive.dart';
import 'package:test/test.dart';

void main() {
group('mutable', () {
test('basic', () {
final ref = Mutable(0);
final log = <int>[];
ref.subscribe(Observer.next(log.add));
expect(log, isEmpty);
ref.value = 1;
expect(log, [1]);
});
test('sequence', () {
final ref = Mutable(0);
final log = <int>[];
ref.subscribe(Observer.next(log.add));
expect(log, isEmpty);
ref.value = 1;
ref.value = 2;
ref.value = 3;
expect(log, [1, 2, 3]);
});
test('unmodified', () {
final ref = Mutable(0);
final log = <int>[];
ref.subscribe(Observer.next(log.add));
expect(log, isEmpty);
ref.value = 0;
expect(log, isEmpty);
});
});
group('computed', () {
test('no dependencies', () {
final ref = Computed(() => 42);
expect(ref.value, 42);
});
test('single dependency', () {
final dep = Mutable(1);
final ref = Computed(() => dep.value);
final log = <int>[];
ref.subscribe(Observer.next(log.add));
expect(ref.value, 1);
dep.value = 2;
expect(ref.value, 2);
expect(log, [2]);
});
test('double dependency', () {
final dep1 = Mutable('John'), dep2 = Mutable('Doe');
final ref = Computed(() => '${dep1.value} ${dep2.value}');
final log = <String>[];
ref.subscribe(Observer.next(log.add));
expect(ref.value, 'John Doe');
dep1.value = 'Jane';
dep2.value = 'Roe';
expect(ref.value, 'Jane Roe');
expect(log, ['Jane Doe', 'Jane Roe']);
});
test('linear dependency', () {
final dep1 = Mutable(2), dep2 = Computed(() => dep1.value * dep1.value);
final ref = Computed(() => dep2.value.toString());
final log = <String>[];
ref.subscribe(Observer.next(log.add));
expect(ref.value, '4');
dep1.value = 3;
expect(ref.value, '9');
expect(log, ['9']);
});
test('dynamic dependency', () {
final depBool = Mutable(false);
final depTrue = Mutable(1), depFalse = Mutable(2);
final ref =
Computed(() => depBool.value ? depTrue.value : depFalse.value);
final log = <int>[];
ref.subscribe(Observer.next(log.add));
expect(ref.value, 2);
depBool.value = true;
expect(ref.value, 1);
expect(log, [1]);
});
});
}

0 comments on commit 277518c

Please sign in to comment.