Skip to content

Commit

Permalink
feat(extra): implement new extra operator: dropUntil
Browse files Browse the repository at this point in the history
  • Loading branch information
staltz committed May 11, 2016
1 parent 2a7be24 commit e06d502
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Expand Up @@ -30,7 +30,7 @@ core.js.map
index.d.ts
index.js
index.js.map
extra/
/extra/

# Generated directories
.sass-cache/
Expand Down
2 changes: 1 addition & 1 deletion src/core.ts
Expand Up @@ -18,7 +18,7 @@ export interface InternalListener<T> {
_c: () => void;
}

const emptyListener: InternalListener<any> = {
export const emptyListener: InternalListener<any> = {
_n: noop,
_e: noop,
_c: noop,
Expand Down
69 changes: 69 additions & 0 deletions src/extra/dropUntil.ts
@@ -0,0 +1,69 @@
import {Operator, InternalListener, Stream, emptyListener} from '../core';

class OtherIL<T> implements InternalListener<any> {
constructor(private out: Stream<T>,
private op: DropUntilOperator<T>) {
}

_n(t: T) {
this.op.up();
}

_e(err: any) {
this.out._e(err);
}

_c() {
this.op.up();
}
}

export class DropUntilOperator<T> implements Operator<T, T> {
public type = 'dropUntil';
private out: Stream<T> = null;
private oil: InternalListener<any> = emptyListener; // oil = other InternalListener
private on: boolean = false;

constructor(public o: Stream<any>, // o = other
public ins: Stream<T>) {
}

_start(out: Stream<T>): void {
this.out = out;
this.o._add(this.oil = new OtherIL(out, this));
this.ins._add(this);
}

_stop(): void {
this.ins._remove(this);
this.o._remove(this.oil);
this.out = null;
this.oil = null;
}

up(): void {
this.on = true;
this.o._remove(this.oil);
this.oil = null;
}

_n(t: T) {
if (!this.on) return;
this.out._n(t);
}

_e(err: any) {
this.out._e(err);
}

_c() {
this.up();
this.out._c();
}
}

export default function dropUntil<T>(other: Stream<any>): (ins: Stream<T>) => Stream<T> {
return function dropUntilOperator(ins: Stream<T>): Stream<T> {
return new Stream<T>(new DropUntilOperator(other, ins));
};
}
42 changes: 42 additions & 0 deletions tests/extra/dropUntil.ts
@@ -0,0 +1,42 @@
import xs from '../../src/index';
import dropUntil from '../../src/extra/dropUntil';
import delay from '../../src/extra/delay';
import * as assert from 'assert';

describe('dropUntil (extra)', () => {
it('should start emitting the stream when another stream emits next', (done) => {
const source = xs.periodic(50).take(6);
const other = xs.periodic(220).take(1);
const stream = source.compose(dropUntil(other));
const expected = [4, 5];

stream.addListener({
next: (x: number) => {
assert.strictEqual(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.strictEqual(expected.length, 0);
done();
},
});
});

it('should complete the stream when another stream emits complete', (done) => {
const source = xs.periodic(50).take(6);
const other = xs.empty().compose(delay<any>(220));
const stream = source.compose(dropUntil(other));
const expected = [4, 5];

stream.addListener({
next: (x: number) => {
assert.strictEqual(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.strictEqual(expected.length, 0);
done();
},
});
});
});
1 change: 1 addition & 0 deletions tsconfig.json
Expand Up @@ -22,6 +22,7 @@
"src/extra/debounce.ts",
"src/extra/delay.ts",
"src/extra/dropRepeats.ts",
"src/extra/dropUntil.ts",
"src/extra/flattenSequentially.ts",
"src/extra/fromEvent.ts",
"src/extra/pairwise.ts",
Expand Down

0 comments on commit e06d502

Please sign in to comment.