diff --git a/packages/core/src/LinkedList.js b/packages/core/src/LinkedList.js index ce97eb12..b4602132 100644 --- a/packages/core/src/LinkedList.js +++ b/packages/core/src/LinkedList.js @@ -1,6 +1,7 @@ /** @license MIT License (c) copyright 2010-2016 original author or authors */ /** @author Brian Cavalier */ /** @author John Hann */ +import { disposeAll } from '@most/disposable' /** * Doubly linked list @@ -57,20 +58,16 @@ export default class LinkedList { * or rejects if an error occurs while disposing */ dispose () { - if (this.isEmpty()) { - return Promise.resolve() - } - - const promises = [] + const disposables = [] let x = this.head this.head = null this.length = 0 while (x !== null) { - promises.push(x.dispose()) + disposables.push(x) x = x.next } - return Promise.all(promises) + return disposeAll(disposables).dispose() } } diff --git a/packages/core/src/combinator/mergeConcurrently.js b/packages/core/src/combinator/mergeConcurrently.js index 8d2fafc7..bf12f54c 100644 --- a/packages/core/src/combinator/mergeConcurrently.js +++ b/packages/core/src/combinator/mergeConcurrently.js @@ -2,7 +2,7 @@ /** @author Brian Cavalier */ /** @author John Hann */ -import { disposeOnce, tryDispose } from '@most/disposable' +import { disposeOnce, tryDispose, disposeBoth } from '@most/disposable' import LinkedList from '../LinkedList' import { id as identity } from '@most/prelude' import { schedulerRelativeTo } from '@most/scheduler' @@ -34,6 +34,7 @@ class Outer { this.pending = [] this.current = new LinkedList() this.disposable = disposeOnce(source.run(this, scheduler)) + this.disposer = disposeBoth(this.disposable, this.current) this.active = true } @@ -77,8 +78,7 @@ class Outer { dispose () { this.active = false this.pending.length = 0 - this.disposable.dispose() - this.current.dispose() + return this.disposer.dispose() } _endInner (t, inner) { diff --git a/packages/core/src/combinator/multicast.js b/packages/core/src/combinator/multicast.js index 0faee9c6..eaf9a9bc 100644 --- a/packages/core/src/combinator/multicast.js +++ b/packages/core/src/combinator/multicast.js @@ -83,7 +83,7 @@ export class MulticastDisposable { dispose () { if (this.source.remove(this.sink) === 0) { - this.source.dispose() + return this.source.dispose() } } } diff --git a/packages/core/src/combinator/switch.js b/packages/core/src/combinator/switch.js index c6cbbad8..1ad3dad9 100644 --- a/packages/core/src/combinator/switch.js +++ b/packages/core/src/combinator/switch.js @@ -106,6 +106,6 @@ class Segment { } _dispose (t) { - tryDispose(t + this.min, this.disposable, this.sink) + return tryDispose(t + this.min, this.disposable, this.sink) } } diff --git a/packages/core/test/helper/testEnv.js b/packages/core/test/helper/testEnv.js index 1c33965d..db980c54 100644 --- a/packages/core/test/helper/testEnv.js +++ b/packages/core/test/helper/testEnv.js @@ -7,7 +7,7 @@ import { propagateEventTask, propagateEndTask } from '../../src/scheduler/Propag import VirtualTimer from './VirtualTimer' import { runEffects } from '../../src/runEffects' import { tap } from '../../src/combinator/transform' -import { disposeWith, disposeNone } from '@most/disposable' +import { disposeWith, disposeNone, disposeAll } from '@most/disposable' export function newEnv () { const timer = new VirtualTimer() @@ -62,6 +62,4 @@ const appendEvent = (sink, scheduler) => (s, event) => { return { tasks: s.tasks.concat(task), time: Math.max(s.time, event.time) } } -const cancelAll = tasks => Promise.all(tasks.map(cancelOne)) - -const cancelOne = task => task.dispose() +const cancelAll = tasks => disposeAll(tasks).dispose() diff --git a/packages/core/test/newStream-test.js b/packages/core/test/newStream-test.js index ddd839a7..a82e445c 100644 --- a/packages/core/test/newStream-test.js +++ b/packages/core/test/newStream-test.js @@ -5,7 +5,7 @@ import { newStream } from '../src/source/newStream' describe('newStream', () => { it('should create new stream from RunStream function', () => { - const run = (sink, scheduler) => ({ dispose: () => Promise.resolve() }) + const run = (sink, scheduler) => ({ dispose: () => {} }) const s = newStream(run) is(run, s.run) diff --git a/packages/disposable/src/disposeAll.js b/packages/disposable/src/disposeAll.js index d61da308..dadbcf94 100644 --- a/packages/disposable/src/disposeAll.js +++ b/packages/disposable/src/disposeAll.js @@ -15,26 +15,39 @@ class DisposeAll { } dispose () { - throwIfErrors(disposeCollectErrors(this.disposables)) + return throwIfErrors(disposeCollectErrors(this.disposables)) } } // Dispose all, safely collecting errors into an array const disposeCollectErrors = disposables => - reduce(appendIfError, [], disposables) + reduce((errors, d) => { + if (typeof errors.then === 'function') { + return errors.then((errors) => appendIfError(errors, d)) + } + return appendIfError(errors, d) + }, [], disposables) // Call dispose and if throws, append thrown error to errors const appendIfError = (errors, d) => { try { - d.dispose() + const result = d.dispose() + if (result && typeof result.then === 'function') { + return result.then(() => Promise.resolve(errors), (e) => { + return Promise.resolve([...errors, e]) + }) + } } catch (e) { - errors.push(e) + return [...errors, e] } return errors } // Throw DisposeAllError if errors is non-empty const throwIfErrors = errors => { + if (typeof errors.then === 'function') { + return errors + } if (errors.length > 0) { throw new DisposeAllError(`${errors.length} errors`, errors) } diff --git a/packages/disposable/src/disposeOnce.js b/packages/disposable/src/disposeOnce.js index 6caca152..bff0cbcb 100644 --- a/packages/disposable/src/disposeOnce.js +++ b/packages/disposable/src/disposeOnce.js @@ -9,13 +9,15 @@ class DisposeOnce { constructor (disposable) { this.disposed = false this.disposable = disposable + this.result = null } dispose () { if (!this.disposed) { this.disposed = true - this.disposable.dispose() + this.result = this.disposable.dispose() this.disposable = undefined } + return this.result } } diff --git a/packages/disposable/src/disposeWith.js b/packages/disposable/src/disposeWith.js index eac6fb60..10e2e2e6 100644 --- a/packages/disposable/src/disposeWith.js +++ b/packages/disposable/src/disposeWith.js @@ -18,6 +18,6 @@ class DisposeWith { } dispose () { - this._dispose(this._resource) + return this._dispose(this._resource) } } diff --git a/packages/disposable/src/tryDispose.js b/packages/disposable/src/tryDispose.js index c2c46876..35470522 100644 --- a/packages/disposable/src/tryDispose.js +++ b/packages/disposable/src/tryDispose.js @@ -5,7 +5,12 @@ import { curry3 } from '@most/prelude' // the error to sink.error with the provided Time value export const tryDispose = curry3((t, disposable, sink) => { try { - disposable.dispose() + const result = disposable.dispose() + if (result && typeof result.catch === 'function') { + return result.catch((e) => { + sink.error(t, e) + }) + } } catch (e) { sink.error(t, e) }