Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions packages/core/src/LinkedList.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
import { disposeAll } from '@most/disposable'

/**
* Doubly linked list
Expand Down Expand Up @@ -57,20 +58,16 @@ export default class LinkedList {
* or rejects if an error occurs while disposing
*/
dispose () {
if (this.isEmpty()) {
return Promise.resolve()
}

const promises = []
const disposables = []
let x = this.head
this.head = null
this.length = 0

while (x !== null) {
promises.push(x.dispose())
disposables.push(x)
x = x.next
}

return Promise.all(promises)
return disposeAll(disposables).dispose()
}
}
6 changes: 3 additions & 3 deletions packages/core/src/combinator/mergeConcurrently.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/** @author Brian Cavalier */
/** @author John Hann */

import { disposeOnce, tryDispose } from '@most/disposable'
import { disposeOnce, tryDispose, disposeBoth } from '@most/disposable'
import LinkedList from '../LinkedList'
import { id as identity } from '@most/prelude'
import { schedulerRelativeTo } from '@most/scheduler'
Expand Down Expand Up @@ -34,6 +34,7 @@ class Outer {
this.pending = []
this.current = new LinkedList()
this.disposable = disposeOnce(source.run(this, scheduler))
this.disposer = disposeBoth(this.disposable, this.current)
this.active = true
}

Expand Down Expand Up @@ -77,8 +78,7 @@ class Outer {
dispose () {
this.active = false
this.pending.length = 0
this.disposable.dispose()
this.current.dispose()
return this.disposer.dispose()
}

_endInner (t, inner) {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/combinator/multicast.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export class MulticastDisposable {

dispose () {
if (this.source.remove(this.sink) === 0) {
this.source.dispose()
return this.source.dispose()
}
}
}
2 changes: 1 addition & 1 deletion packages/core/src/combinator/switch.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,6 @@ class Segment {
}

_dispose (t) {
tryDispose(t + this.min, this.disposable, this.sink)
return tryDispose(t + this.min, this.disposable, this.sink)
}
}
6 changes: 2 additions & 4 deletions packages/core/test/helper/testEnv.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { propagateEventTask, propagateEndTask } from '../../src/scheduler/Propag
import VirtualTimer from './VirtualTimer'
import { runEffects } from '../../src/runEffects'
import { tap } from '../../src/combinator/transform'
import { disposeWith, disposeNone } from '@most/disposable'
import { disposeWith, disposeNone, disposeAll } from '@most/disposable'

export function newEnv () {
const timer = new VirtualTimer()
Expand Down Expand Up @@ -62,6 +62,4 @@ const appendEvent = (sink, scheduler) => (s, event) => {
return { tasks: s.tasks.concat(task), time: Math.max(s.time, event.time) }
}

const cancelAll = tasks => Promise.all(tasks.map(cancelOne))

const cancelOne = task => task.dispose()
const cancelAll = tasks => disposeAll(tasks).dispose()
2 changes: 1 addition & 1 deletion packages/core/test/newStream-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { newStream } from '../src/source/newStream'

describe('newStream', () => {
it('should create new stream from RunStream function', () => {
const run = (sink, scheduler) => ({ dispose: () => Promise.resolve() })
const run = (sink, scheduler) => ({ dispose: () => {} })
const s = newStream(run)

is(run, s.run)
Expand Down
21 changes: 17 additions & 4 deletions packages/disposable/src/disposeAll.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,39 @@ class DisposeAll {
}

dispose () {
throwIfErrors(disposeCollectErrors(this.disposables))
return throwIfErrors(disposeCollectErrors(this.disposables))
}
}

// Dispose all, safely collecting errors into an array
const disposeCollectErrors = disposables =>
reduce(appendIfError, [], disposables)
reduce((errors, d) => {
if (typeof errors.then === 'function') {
return errors.then((errors) => appendIfError(errors, d))
}
return appendIfError(errors, d)
}, [], disposables)

// Call dispose and if throws, append thrown error to errors
const appendIfError = (errors, d) => {
try {
d.dispose()
const result = d.dispose()
if (result && typeof result.then === 'function') {
return result.then(() => Promise.resolve(errors), (e) => {
return Promise.resolve([...errors, e])
})
}
} catch (e) {
errors.push(e)
return [...errors, e]
}
return errors
}

// Throw DisposeAllError if errors is non-empty
const throwIfErrors = errors => {
if (typeof errors.then === 'function') {
return errors
}
if (errors.length > 0) {
throw new DisposeAllError(`${errors.length} errors`, errors)
}
Expand Down
4 changes: 3 additions & 1 deletion packages/disposable/src/disposeOnce.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ class DisposeOnce {
constructor (disposable) {
this.disposed = false
this.disposable = disposable
this.result = null
}

dispose () {
if (!this.disposed) {
this.disposed = true
this.disposable.dispose()
this.result = this.disposable.dispose()
this.disposable = undefined
}
return this.result
}
}
2 changes: 1 addition & 1 deletion packages/disposable/src/disposeWith.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ class DisposeWith {
}

dispose () {
this._dispose(this._resource)
return this._dispose(this._resource)
}
}
7 changes: 6 additions & 1 deletion packages/disposable/src/tryDispose.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import { curry3 } from '@most/prelude'
// the error to sink.error with the provided Time value
export const tryDispose = curry3((t, disposable, sink) => {
try {
disposable.dispose()
const result = disposable.dispose()
if (result && typeof result.catch === 'function') {
return result.catch((e) => {
sink.error(t, e)
})
}
} catch (e) {
sink.error(t, e)
}
Expand Down