Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/3990 buffer emit remainder on complete #6042

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
137 changes: 105 additions & 32 deletions spec/operators/buffer-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { buffer, mergeMap, take } from 'rxjs/operators';
/** @prettier */
import { buffer, mergeMap, take, window, toArray } from 'rxjs/operators';
import { EMPTY, NEVER, throwError, of, Subject } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';
Expand All @@ -16,40 +17,78 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = hot(' -a-b-c-d-e-f-g-h-i-|');
const b = hot(' -----B-----B-----B-|');
const expected = '-----x-----y-----z-|';
const expected = '-----x-----y-----z-(F|)';
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you're going to see through a lot of these tests is I had to add the last emission of an empty buffer for cases when the closingNotifier was not complete when the source completed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but TBH I am a little disappointed that you didn't use an emoji:

      const a = hot('   -a-b-c-d-e-f-g-h-i-|');
      const b = hot('   -----B-----B-----B-|');
      const expected = '-----x-----y-----z-(🦖|)';
      const expectedValues = {
        x: ['a', 'b', 'c'],
        y: ['d', 'e', 'f'],
        z: ['g', 'h', 'i'],
        '🦖': [],
      };

const expectedValues = {
x: ['a', 'b', 'c'],
y: ['d', 'e', 'f'],
z: ['g', 'h', 'i']
z: ['g', 'h', 'i'],
F: [],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
});
});

it('should emit a final buffer if the closingNotifier is already complete', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = hot(' -a-b-c-d-e-f-g-h-i-|');
const b = hot(' -----B-----B--|');
const expected = '-----x-----y-------(F|)';
const expectedValues = {
x: ['a', 'b', 'c'],
y: ['d', 'e', 'f'],
F: ['g', 'h', 'i'],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
});
});

it('should emit all buffered values if the source completes before the closingNotifier does', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const source = hot('---^---a---b---c---d---e--f----|');
const sourceSubs = ' ^---------------------------!';
const closer = hot('---^-------------B----------------');
const closerSubs = ' ^---------------------------!';
const expected = ' --------------x-------------(F|)';

const result = source.pipe(buffer(closer));

const expectedValues = {
x: ['a', 'b', 'c'],
F: ['d', 'e', 'f'],
};

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expectSubscriptions(closer.subscriptions).toBe(closerSubs);
});
});

it('should work with empty and empty selector', () => {
testScheduler.run(({ expectObservable }) => {
const a = EMPTY;
const b = EMPTY;
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(F|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] });
});
});

it('should work with empty and non-empty selector', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = EMPTY;
const b = hot('-----a-----');
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(F|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] });
});
});

it('should work with non-empty and empty selector', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
const b = EMPTY;
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = ' --------------------------------(F|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, {
F: ['3', '4', '5', '6', '7', '8', '9', '0'],
});
});
});

Expand All @@ -66,7 +105,7 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ expectObservable }) => {
const a = NEVER;
const b = EMPTY;
const expected = '|';
const expected = '-';
expectObservable(a.pipe(buffer(b))).toBe(expected);
});
});
Expand All @@ -75,8 +114,8 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ expectObservable }) => {
const a = EMPTY;
const b = NEVER;
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(F|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] });
});
});

Expand Down Expand Up @@ -121,31 +160,32 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
const b = hot('--------^--a-------b---cd---------e---f---|');
const expected = ' ---a-------b---cd---------e---f-|';
const expected = ' ---a-------b---cd---------e---f-(F|)';
const expectedValues = {
a: ['3'],
b: ['4', '5'],
c: ['6'],
d: [] as string[],
e: ['7', '8', '9'],
f: ['0']
f: ['0'],
F: [],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
});
});

it(' work with selector completed', () => {
// Buffshoulder Boundaries onCompletedBoundaries (RxJS 4)
it('should work with selector completed', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
const subs = ' ^----------------! ';
const subs = ' ^-------------------------------!';
const b = hot('--------^--a-------b---cd| ');
const expected = ' ---a-------b---cd| ';
const expected = ' ---a-------b---cd---------------(F|)';
const expectedValues = {
a: ['3'],
b: ['4', '5'],
c: ['6'],
d: [] as string[]
d: [] as string[],
F: ['7', '8', '9', '0'],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
expectSubscriptions(a.subscriptions).toBe(subs);
Expand All @@ -161,7 +201,7 @@ describe('Observable.prototype.buffer', () => {
const expected = ' ---a-------b--- ';
const expectedValues = {
a: ['3'],
b: ['4', '5']
b: ['4', '5'],
};
expectObservable(a.pipe(buffer(b)), unsub).toBe(expected, expectedValues);
expectSubscriptions(a.subscriptions).toBe(subs);
Expand All @@ -177,13 +217,13 @@ describe('Observable.prototype.buffer', () => {
const unsub = ' --------------! ';
const expectedValues = {
a: ['3'],
b: ['4', '5']
b: ['4', '5'],
};

const result = a.pipe(
mergeMap((x: any) => of(x)),
buffer(b),
mergeMap((x: any) => of(x)),
mergeMap((x: any) => of(x))
);

expectObservable(result, unsub).toBe(expected, expectedValues);
Expand All @@ -194,13 +234,13 @@ describe('Observable.prototype.buffer', () => {
it('should work with non-empty and selector error', () => {
// Buffer Boundaries onErrorSource (RxJS 4)
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const a = hot('--1--2--^--3-----#', {'3': 3}, new Error('too bad'));
const a = hot('--1--2--^--3-----#', { '3': 3 }, new Error('too bad'));
const subs = ' ^--------!';
const b = hot('--------^--a--b---');
const expected = ' ---a--b--#';
const expectedValues = {
a: [3],
b: [] as string[]
b: [] as string[],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues, new Error('too bad'));
expectSubscriptions(a.subscriptions).toBe(subs);
Expand All @@ -227,7 +267,7 @@ describe('Observable.prototype.buffer', () => {
const expectedValues = {
a: ['3'],
b: ['4', '5'],
c: ['6']
c: ['6'],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues, new Error('too bad'));
expectSubscriptions(a.subscriptions).toBe(subs);
Expand All @@ -244,7 +284,7 @@ describe('Observable.prototype.buffer', () => {
const expected = ' ---a-------b--- ';
const expectedValues = {
a: ['3'],
b: ['4', '5']
b: ['4', '5'],
};

expectObservable(a.pipe(buffer(b)), unsub).toBe(expected, expectedValues);
Expand Down Expand Up @@ -272,18 +312,51 @@ describe('Observable.prototype.buffer', () => {
const results: any[] = [];
const subject = new Subject<number>();

const source = subject.pipe(
buffer(subject)
).subscribe({
next: value => results.push(value),
complete: () => results.push('complete')
subject.pipe(buffer(subject)).subscribe({
next: (value) => results.push(value),
complete: () => results.push('complete'),
});

subject.next(1);
expect(results).to.deep.equal([[1]]);
subject.next(2);
expect(results).to.deep.equal([[1], [2]]);
subject.complete();
expect(results).to.deep.equal([[1], [2], 'complete']);
expect(results).to.deep.equal([[1], [2], [], 'complete']);
});

describe('equivalence with the window operator', () => {
const cases = [
{
source: ' -a-b-c-d-e-f-g-h-i-|',
notifier: ' -----B-----B-----B-|',
},
{
source: ' -a-b-c-d-e-f-g-h-i-|',
notifier: ' -----B-----B--| ',
},
{
source: ' -a-b-c-d-e---------|',
notifier: ' -----B-----B-----B-|',
},
{
source: ' -a-b-c-d-e-f-g-h-i-|',
notifier: ' -------------------|',
},
];
cases.forEach(({ source, notifier }, index) => {
it(`should be equivalent for case ${index}`, () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = hot(source);
const b = hot(notifier);
expectObservable(a.pipe(buffer(b))).toEqual(
a.pipe(
window(b),
mergeMap((w) => w.pipe(toArray()))
)
);
});
});
});
});
});
12 changes: 6 additions & 6 deletions spec/operators/window-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ describe('window operator', () => {

it('should be able to split a never Observable into timely empty windows', () => {
const source = hot('^--------');
const sourceSubs = '^ !';
const sourceSubs = '^ ';
const closings = cold('--x--x--|');
const closingSubs = '^ !';
const expected = 'a-b--c--|';
const expected = 'a-b--c---';
const a = cold('--| ');
const b = cold( '---| ');
const c = cold( '---|');
const c = cold( '----');
const expectedValues = { a: a, b: b, c: c };

const result = source.pipe(window(closings));
Expand Down Expand Up @@ -234,13 +234,13 @@ describe('window operator', () => {

it('should complete the resulting Observable when window closings completes', () => {
const source = hot('-1-2-^3-4-5-6-7-8-9-|');
const subs = '^ ! ';
const subs = '^ !';
const closings = hot('---^---x---x---| ');
const closingSubs = '^ ! ';
const expected = 'a---b---c---| ';
const expected = 'a---b---c------|';
const a = cold( '-3-4| ');
const b = cold( '-5-6| ');
const c = cold( '-7-8| ');
const c = cold( '-7-8-9-|');
const expectedValues = { a: a, b: b, c: c };

const result = source.pipe(window(closings));
Expand Down
33 changes: 26 additions & 7 deletions src/internal/operators/buffer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Observable } from '../Observable';
import { OperatorFunction } from '../types';
import { operate } from '../util/lift';
import { noop } from '../util/noop';
import { OperatorSubscriber } from './OperatorSubscriber';

/**
Expand Down Expand Up @@ -43,19 +44,37 @@ import { OperatorSubscriber } from './OperatorSubscriber';
*/
export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T, T[]> {
return operate((source, subscriber) => {
// The current buffered values.
let currentBuffer: T[] = [];

// Subscribe to our source.
source.subscribe(new OperatorSubscriber(subscriber, (value) => currentBuffer.push(value)));
source.subscribe(
new OperatorSubscriber(
subscriber,
(value) => currentBuffer.push(value),
// Pass all errors to the consumer.
undefined,
() => {
subscriber.next(currentBuffer);
subscriber.complete();
}
)
);

// Subscribe to the closing notifier.
closingNotifier.subscribe(
new OperatorSubscriber(subscriber, () => {
// Start a new buffer and emit the previous one.
const b = currentBuffer;
currentBuffer = [];
subscriber.next(b);
})
new OperatorSubscriber(
subscriber,
() => {
// Start a new buffer and emit the previous one.
const b = currentBuffer;
currentBuffer = [];
subscriber.next(b);
},
// Pass all errors to the consumer.
undefined,
noop
)
);

return () => {
Expand Down