Skip to content

Commit

Permalink
feat: export State class facilitates possible sink state management.
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobbubu committed Oct 8, 2020
1 parent 0ee7622 commit acab392
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 18 deletions.
7 changes: 4 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as pull from 'pull-stream'
import { Debug } from '@jacobbubu/debug'
import { SourceState } from './source-state'
import { trueToNull } from './utils'
import { State } from './state'

const getPushableName = (function() {
let counter = 1
Expand All @@ -12,6 +11,8 @@ type OnClose = (err?: pull.EndOrError) => void

const DefaultLogger = Debug.create('pushable')

export { State }

export enum BufferItemIndex {
Data = 0,
Cb
Expand Down Expand Up @@ -40,7 +41,7 @@ export function pushable<T>(name?: string | OnClose, onclose?: OnClose): Read<T>
_onclose = onclose
}

const _sourceState = new SourceState({ onEnd: _onclose })
const _sourceState = new State({ onEnd: _onclose })
let _cbs: pull.SourceCallback<T>[] = []

_name = name || getPushableName()
Expand Down
31 changes: 16 additions & 15 deletions src/source-state.ts → src/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,61 +2,62 @@ import * as pull from 'pull-stream'
import { trueToNull } from './utils'

export type OnEnd = (endOrError: pull.EndOrError) => void
export interface SourceStateOption {
export interface StateOption {
onEnd?: OnEnd
}

export class SourceState {
private _sourceEnding: pull.EndOrError = false
private _sourceAborting: pull.Abort = false
export class State {
private _ending: pull.EndOrError = false
private _aborting: pull.Abort = false
private _endReason: pull.EndOrError = false

constructor(private _opts: SourceStateOption = {}) {}
constructor(private _opts: StateOption = {}) {}

get finished() {
return this._endReason
}

get finishing() {
return this._sourceEnding || this._sourceAborting
return this._ending || this._aborting
}

get ending() {
return this._sourceEnding
return this._ending
}

get aborting() {
return this._sourceAborting
return this._aborting
}

get normal() {
return !this.finishing && !this.finished
}

askEnd(request: pull.EndOrError = true) {
if (this.finished || this._sourceEnding || this._sourceAborting) {
if (this.finished || this._ending || this._aborting) {
return false
}

this._sourceEnding = request
this._ending = request
return true
}

askAbort(request: pull.Abort = true) {
if (this.finished || this._sourceAborting) {
if (this.finished || this._aborting) {
// even in the ending state, abort has higher priority.
return false
}
this._sourceEnding = false
this._sourceAborting = request

this._ending = false
this._aborting = request
return true
}

ended(request: pull.EndOrError = true) {
const isFirst = !this._endReason
if (isFirst) {
this._sourceEnding = false
this._sourceAborting = false
this._ending = false
this._aborting = false
this._endReason = request
this._opts.onEnd?.(trueToNull(request))
}
Expand Down

0 comments on commit acab392

Please sign in to comment.