Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(groupBy): Adds subjectSelector argument to groupBy #2023

Merged
merged 4 commits into from Oct 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 22 additions & 0 deletions spec/operators/groupBy-spec.ts
Expand Up @@ -5,6 +5,7 @@ declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

declare const rxTestScheduler: Rx.TestScheduler;
const Observable = Rx.Observable;
const ReplaySubject = Rx.ReplaySubject;

/** @test {groupBy} */
describe('Observable.prototype.groupBy', () => {
Expand Down Expand Up @@ -98,6 +99,27 @@ describe('Observable.prototype.groupBy', () => {
expect(resultingGroups).to.deep.equal(expectedGroups);
});

it('should group values with a subject selector', (done: MochaDone) => {
const expectedGroups = [
{ key: 1, values: [3] },
{ key: 0, values: [2] }
];

Observable.of(1, 2, 3)
.groupBy((x: number) => x % 2, null, null, () => new ReplaySubject(1))
// Ensure each inner group reaches the destination after the first event
// has been next'd to the group
.delay(5)
.subscribe((g: any) => {
const expectedGroup = expectedGroups.shift();
expect(g.key).to.equal(expectedGroup.key);

g.subscribe((x: any) => {
expect(x).to.deep.equal(expectedGroup.values.shift());
});
}, null, done);
});

it('should handle an empty Observable', () => {
const e1 = cold('|');
const e1subs = '(^!)';
Expand Down
20 changes: 12 additions & 8 deletions src/operator/groupBy.ts
Expand Up @@ -31,11 +31,13 @@ import { FastMap } from '../util/FastMap';
export function groupBy<T, K>(this: Observable<T>, keySelector: (value: T) => K): Observable<GroupedObservable<K, T>>;
export function groupBy<T, K>(this: Observable<T>, keySelector: (value: T) => K, elementSelector: void, durationSelector: (grouped: GroupedObservable<K, T>) => Observable<any>): Observable<GroupedObservable<K, T>>;
export function groupBy<T, K, R>(this: Observable<T>, keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): Observable<GroupedObservable<K, R>>;
export function groupBy<T, K, R>(this: Observable<T>, keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>, subjectSelector?: () => Subject<R>): Observable<GroupedObservable<K, R>>;
/* tslint:disable:max-line-length */
export function groupBy<T, K, R>(this: Observable<T>, keySelector: (value: T) => K,
elementSelector?: ((value: T) => R) | void,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): Observable<GroupedObservable<K, R>> {
return this.lift(new GroupByOperator(this, keySelector, elementSelector, durationSelector));
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
subjectSelector?: () => Subject<R>): Observable<GroupedObservable<K, R>> {
return this.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector));
}

export interface RefCountSubscription {
Expand All @@ -46,15 +48,15 @@ export interface RefCountSubscription {
}

class GroupByOperator<T, K, R> implements Operator<T, GroupedObservable<K, R>> {
constructor(public source: Observable<T>,
private keySelector: (value: T) => K,
constructor(private keySelector: (value: T) => K,
private elementSelector?: ((value: T) => R) | void,
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>) {
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
private subjectSelector?: () => Subject<R>) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rename this to subjectFactory since it's not taking any input into this function

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

}

call(subscriber: Subscriber<GroupedObservable<K, R>>, source: any): any {
return source._subscribe(new GroupBySubscriber(
subscriber, this.keySelector, this.elementSelector, this.durationSelector
subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector
));
}
}
Expand All @@ -72,7 +74,8 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
constructor(destination: Subscriber<GroupedObservable<K, R>>,
private keySelector: (value: T) => K,
private elementSelector?: ((value: T) => R) | void,
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>) {
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
private subjectSelector?: () => Subject<R>) {
super(destination);
}

Expand Down Expand Up @@ -109,7 +112,8 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
}

if (!group) {
groups.set(key, group = new Subject<R>());
group = this.subjectSelector ? this.subjectSelector() : new Subject<R>();
groups.set(key, group);
const groupedObservable = new GroupedObservable(key, group, this);
this.destination.next(groupedObservable);
if (this.durationSelector) {
Expand Down