Skip to content

Commit

Permalink
fix: publish variants returning ConnectableObservable not properly ut…
Browse files Browse the repository at this point in the history
…ilizing lift (#6003)

- Adds tests to show fixed issues
- Adds tests to show issues that simply can't be fixed, and support a case against removing operators that return ConnectableObservable, as well as possibly a case against lifting Subject.
- Moves logic that patched lift for ConnectableObservable to the constructor so it is used by all multicast operators
  • Loading branch information
benlesh committed Feb 8, 2021
1 parent 412d1fd commit 9acb950
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 14 deletions.
242 changes: 241 additions & 1 deletion spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { expect } from 'chai';
import * as sinon from 'sinon';
import { Observer, TeardownLogic } from '../src/internal/types';
import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty } from 'rxjs';
import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap } from 'rxjs/operators';
import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap, publish, publishLast, publishBehavior, share} from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from './helpers/observableMatcher';

Expand Down Expand Up @@ -810,6 +810,246 @@ describe('Observable.lift', () => {
);
});


it('should compose through publish and refCount', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
publish(),
refCount(),
map((x) => 10 * x)
);

expect(result instanceof MyCustomObservable).to.be.true;

const expected = [10, 20, 30];

result.subscribe(
function (x) {
expect(x).to.equal(expected.shift());
},
() => {
done(new Error('should not be called'));
},
() => {
done();
}
);
});


it('should compose through publishLast and refCount', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
publishLast(),
refCount(),
map((x) => 10 * x)
);

expect(result instanceof MyCustomObservable).to.be.true;

const expected = [30];

result.subscribe(
function (x) {
expect(x).to.equal(expected.shift());
},
() => {
done(new Error('should not be called'));
},
() => {
done();
}
);
});

it('should compose through publishBehavior and refCount', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
publishBehavior(0),
refCount(),
map((x) => 10 * x)
);

expect(result instanceof MyCustomObservable).to.be.true;

const expected = [0, 10, 20, 30];

result.subscribe(
function (x) {
expect(x).to.equal(expected.shift());
},
() => {
done(new Error('should not be called'));
},
() => {
done();
}
);
});

it('should composes Subjects in the simple case', () => {
const subject = new Subject<number>();

const result = subject.pipe(
map((x) => 10 * x)
) as any as Subject<number>; // Yes, this is correct. (but you're advised not to do this)

expect(result instanceof Subject).to.be.true;

const emitted: any[] = [];
result.subscribe(value => emitted.push(value));

result.next(10);
result.next(20);
result.next(30);

expect(emitted).to.deep.equal([100, 200, 300]);
});

/**
* Seriously, never do this. It's probably bad that we've allowed this. Fortunately, it's not
* a common practice, so maybe we can remove it?
*/
it('should demonstrate the horrors of sharing and lifting the Subject through', () => {
const subject = new Subject<number>();

const shared = subject.pipe(
share()
);

const result1 = shared.pipe(
map(x => x * 10)
) as any as Subject<number>; // Yes, this is correct.

const result2 = shared.pipe(
map(x => x - 10)
) as any as Subject<number>; // Yes, this is correct.
expect(result1 instanceof Subject).to.be.true;

const emitted1: any[] = [];
result1.subscribe(value => emitted1.push(value));

const emitted2: any[] = [];
result2.subscribe(value => emitted2.push(value));

// THIS IS HORRIBLE DON'T DO THIS.
result1.next(10);
result2.next(20); // Yuck
result1.next(30);

expect(emitted1).to.deep.equal([100, 200, 300]);
expect(emitted2).to.deep.equal([0, 10, 20]);
});

/**
* This section outlines one of the reasons that we need to get rid of operators that return
* Connectable observable. Likewise it also reveals a slight design flaw in `lift`. It
* probably should have never tried to compose through the Subject's observer methods.
* If you're a user and you're reading this... NEVER try to use this feature, it's likely
* to go away at some point.
*
* The problem is that you can have the Subject parts, or you can have the ConnectableObservable parts,
* but you can't have both.
*/
describe.skip('The lift through Connectable gaff', () => {
it('should compose through multicast and refCount, even if it is a Subject', () => {
const subject = new Subject<number>();

const result = subject.pipe(
multicast(() => new Subject<number>()),
refCount(),
map((x) => 10 * x)
) as any as Subject<number>; // Yes, this is correct.

expect(result instanceof Subject).to.be.true;

const emitted: any[] = [];
result.subscribe(value => emitted.push(value));

result.next(10);
result.next(20);
result.next(30);

expect(emitted).to.deep.equal([100, 200, 300]);
});

it('should compose through publish and refCount, even if it is a Subject', () => {
const subject = new Subject<number>();

const result = subject.pipe(
publish(),
refCount(),
map((x) => 10 * x)
) as any as Subject<number>; // Yes, this is correct.

expect(result instanceof Subject).to.be.true;

const emitted: any[] = [];
result.subscribe(value => emitted.push(value));

result.next(10);
result.next(20);
result.next(30);

expect(emitted).to.deep.equal([100, 200, 300]);
});


it('should compose through publishLast and refCount, even if it is a Subject', () => {
const subject = new Subject<number>();

const result = subject.pipe(
publishLast(),
refCount(),
map((x) => 10 * x)
) as any as Subject<number>; // Yes, this is correct.

expect(result instanceof Subject).to.be.true;

const emitted: any[] = [];
result.subscribe(value => emitted.push(value));

result.next(10);
result.next(20);
result.next(30);

expect(emitted).to.deep.equal([100, 200, 300]);
});

it('should compose through publishBehavior and refCount, even if it is a Subject', () => {
const subject = new Subject<number>();

const result = subject.pipe(
publishBehavior(0),
refCount(),
map((x) => 10 * x)
) as any as Subject<number>; // Yes, this is correct.

expect(result instanceof Subject).to.be.true;

const emitted: any[] = [];
result.subscribe(value => emitted.push(value));

result.next(10);
result.next(20);
result.next(30);

expect(emitted).to.deep.equal([0, 100, 200, 300]);
});
});

it('should compose through multicast with selector function', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
Expand Down
7 changes: 7 additions & 0 deletions src/internal/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { refCount as higherOrderRefCount } from '../operators/refCount';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { hasLift } from '../util/lift';

/**
* @class ConnectableObservable<T>
Expand All @@ -28,6 +29,12 @@ export class ConnectableObservable<T> extends Observable<T> {
*/
constructor(public source: Observable<T>, protected subjectFactory: () => Subject<T>) {
super();
// If we have lift, monkey patch that here. This is done so custom observable
// types will compose through multicast. Otherwise the resulting observable would
// simply be an instance of `ConnectableObservable`.
if (hasLift(source)) {
this.lift = source.lift;
}
}

protected _subscribe(subscriber: Subscriber<T>) {
Expand Down
14 changes: 1 addition & 13 deletions src/internal/operators/multicast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Subject } from '../Subject';
import { Observable } from '../Observable';
import { ConnectableObservable } from '../observable/ConnectableObservable';
import { OperatorFunction, UnaryFunction, ObservedValueOf, ObservableInput } from '../types';
import { hasLift } from '../util/lift';
import { isFunction } from '../util/isFunction';
import { connect } from './connect';

Expand Down Expand Up @@ -81,16 +80,5 @@ export function multicast<T, R>(
});
}

return (source: Observable<T>) => {
const connectable: any = new ConnectableObservable(source, subjectFactory);
// If we have lift, monkey patch that here. This is done so custom observable
// types will compose through multicast. Otherwise the resulting observable would
// simply be an instance of `ConnectableObservable`.
if (hasLift(source)) {
connectable.lift = source.lift;
}
connectable.source = source;
connectable.subjectFactory = subjectFactory;
return connectable;
};
return (source: Observable<T>) => new ConnectableObservable<any>(source, subjectFactory);
}

0 comments on commit 9acb950

Please sign in to comment.