Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Observable): RxJS doesn't even lift. #7202

Merged
merged 1 commit into from Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
281 changes: 0 additions & 281 deletions spec/Observable-spec.ts
Expand Up @@ -692,284 +692,3 @@ describe('Observable', () => {
});
});


/** @test {Observable} */
describe('Observable.lift', () => {
let rxTestScheduler: TestScheduler;

beforeEach(() => {
rxTestScheduler = new TestScheduler(observableMatcher);
});

class MyCustomObservable<T> extends Observable<T> {
static from<T>(source: any) {
const observable = new MyCustomObservable<T>();
observable.source = <Observable<T>>source;
return observable;
}
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new MyCustomObservable<R>();
(<any>observable).source = this;
(<any>observable).operator = operator;
return observable;
}
}

it('should return Observable which calls FinalizationLogic of operator on unsubscription', (done) => {
const myOperator: Operator<any, any> = {
call: (subscriber: Subscriber<any>, source: any) => {
const subscription = source.subscribe((x: any) => subscriber.next(x));
return () => {
subscription.unsubscribe();
done();
};
},
};

(NEVER as any).lift(myOperator)
.subscribe()
.unsubscribe();

});

it('should be overridable in a custom Observable type that composes', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
map((x) => {
return 10 * x;
})
);

expect(result instanceof MyCustomObservable).to.be.true;

const expected = [10, 20, 30];

result.subscribe(
{ next: function (x) {
expect(x).to.equal(expected.shift());
}, error: () => {
done(new Error('should not be called'));
}, complete: () => {
done();
} }
);
});


it('should composes Subjects in the simple case', () => {
const subject = new Subject<number>();

const result = subject.pipe(
map((x) => 10 * x)
) as any as Subject<number>; // Yes, this is correct. (but you're advised not to do this)

expect(result instanceof Subject).to.be.true;

const emitted: any[] = [];
result.subscribe(value => emitted.push(value));

result.next(10);
result.next(20);
result.next(30);

expect(emitted).to.deep.equal([100, 200, 300]);
});

/**
* Seriously, never do this. It's probably bad that we've allowed this. Fortunately, it's not
* a common practice, so maybe we can remove it?
*/
it('should demonstrate the horrors of sharing and lifting the Subject through', () => {
const subject = new Subject<number>();

const shared = subject.pipe(
share()
);

const result1 = shared.pipe(
map(x => x * 10)
) as any as Subject<number>; // Yes, this is correct.

const result2 = shared.pipe(
map(x => x - 10)
) as any as Subject<number>; // Yes, this is correct.
expect(result1 instanceof Subject).to.be.true;

const emitted1: any[] = [];
result1.subscribe(value => emitted1.push(value));

const emitted2: any[] = [];
result2.subscribe(value => emitted2.push(value));

// THIS IS HORRIBLE DON'T DO THIS.
result1.next(10);
result2.next(20); // Yuck
result1.next(30);

expect(emitted1).to.deep.equal([100, 200, 300]);
expect(emitted2).to.deep.equal([0, 10, 20]);
});

it('should compose through combineLatestWith', () => {
rxTestScheduler.run(({ cold, expectObservable }) => {
const e1 = cold(' -a--b-----c-d-e-|');
const e2 = cold(' --1--2-3-4---| ');
const expected = '--A-BC-D-EF-G-H-|';

const result = MyCustomObservable.from(e1).pipe(
combineLatestWith(e2),
map(([a, b]) => String(a) + String(b))
);

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected, {
A: 'a1',
B: 'b1',
C: 'b2',
D: 'b3',
E: 'b4',
F: 'c4',
G: 'd4',
H: 'e4',
});
});
});

it('should compose through concatWith', () => {
rxTestScheduler.run(({ cold, expectObservable }) => {
const e1 = cold(' --a--b-|');
const e2 = cold(' --x---y--|');
const expected = '--a--b---x---y--|';

const result = MyCustomObservable.from(e1).pipe(concatWith(e2));

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected);
});
});
it('should compose through mergeWith', () => {
rxTestScheduler.run(({ cold, expectObservable }) => {
const e1 = cold(' -a--b-| ');
const e2 = cold(' --x--y-|');
const expected = '-ax-by-|';

const result = MyCustomObservable.from(e1).pipe(mergeWith(e2));

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected);
});
});

it('should compose through raceWith', () => {
rxTestScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const e1 = cold(' ---a-----b-----c----|');
const e1subs = ' ^-------------------!';
const e2 = cold(' ------x-----y-----z----|');
const e2subs = ' ^--!';
const expected = '---a-----b-----c----|';

const result = MyCustomObservable.from<string>(e1).pipe(
raceWith(e2)
);

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});

it('should compose through zipWith', () => {
rxTestScheduler.run(({ cold, expectObservable }) => {
const e1 = cold(' -a--b-----c-d-e-|');
const e2 = cold(' --1--2-3-4---| ');
const expected = '--A--B----C-D| ';

const result = MyCustomObservable.from(e1).pipe(zipWith(e2));

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected, {
A: ['a', '1'],
B: ['b', '2'],
C: ['c', '3'],
D: ['d', '4'],
});
});
});

it('should allow injecting behaviors into all subscribers in an operator ' + 'chain when overridden', (done) => {
// The custom Subscriber
const log: Array<string> = [];

class LogSubscriber<T> extends Subscriber<T> {
next(value?: T): void {
log.push('next ' + value);
if (!this.isStopped) {
this._next(value!);
}
}
}

// The custom Operator
class LogOperator<T, R> implements Operator<T, R> {
constructor(private childOperator: Operator<T, R>) {}

call(subscriber: Subscriber<R>, source: any): TeardownLogic {
return this.childOperator.call(new LogSubscriber<R>(subscriber), source);
}
}

// The custom Observable
class LogObservable<T> extends Observable<T> {
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new LogObservable<R>();
observable.source = this;
observable.operator = new LogOperator(operator);
return observable;
}
}

// Use the LogObservable
const result = new LogObservable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
map((x) => 10 * x),
filter((x) => x > 15),
count()
);

expect(result instanceof LogObservable).to.be.true;

const expected = [2];

result.subscribe(
{ next: function (x) {
expect(x).to.equal(expected.shift());
}, error: () => {
done(new Error('should not be called'));
}, complete: () => {
expect(log).to.deep.equal([
'next 10', // map
'next 20', // map
'next 20', // filter
'next 30', // map
'next 30', // filter
'next 2', // count
]);
done();
} }
);
});
});
17 changes: 0 additions & 17 deletions spec/helpers/interop-helper.ts
Expand Up @@ -9,10 +9,6 @@ import { Observable, Operator, Subject, Subscriber, Subscription } from 'rxjs';
export function asInteropObservable<T>(observable: Observable<T>): Observable<T> {
return new Proxy(observable, {
get(target: Observable<T>, key: string | number | symbol) {
if (key === 'lift') {
const { lift } = target as any;
return interopLift(lift);
}
if (key === 'subscribe') {
const { subscribe } = target;
return interopSubscribe(subscribe);
Expand All @@ -23,7 +19,6 @@ export function asInteropObservable<T>(observable: Observable<T>): Observable<T>
const { lift, subscribe, ...rest } = Object.getPrototypeOf(target);
return {
...rest,
lift: interopLift(lift),
subscribe: interopSubscribe(subscribe)
};
}
Expand Down Expand Up @@ -56,18 +51,6 @@ export function asInteropSubscriber<T>(subscriber: Subscriber<T>): Subscriber<T>
});
}

function interopLift<T, R>(lift: (operator: Operator<T, R>) => Observable<R>) {
return function (this: Observable<T>, operator: Operator<T, R>): Observable<R> {
const observable = lift.call(this, operator);
const { call } = observable.operator!;
observable.operator!.call = function (this: Operator<T, R>, subscriber: Subscriber<R>, source: any) {
return call.call(this, asInteropSubscriber(subscriber), source);
};
observable.source = asInteropObservable(observable.source!);
return asInteropObservable(observable);
};
}

function interopSubscribe<T>(subscribe: (...args: any[]) => Subscription) {
return function (this: Observable<T>, ...args: any[]): Subscription {
const [arg] = args;
Expand Down
2 changes: 1 addition & 1 deletion spec/operators/delay-spec.ts
Expand Up @@ -237,7 +237,7 @@ describe('delay', () => {
const result = e1.pipe(
repeatWhen((notifications) => {
const delayed = notifications.pipe(delay(t));
subscribeSpy = sinon.spy((delayed as any)['source'], 'subscribe');
subscribeSpy = sinon.spy(notifications as any, 'subscribe');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit but I believe you can do sinon.spy<any>(notifications) instead

return delayed;
}),
skip(1),
Expand Down
47 changes: 0 additions & 47 deletions spec/operators/groupBy-spec.ts
Expand Up @@ -1463,53 +1463,6 @@ describe('groupBy operator', () => {
});
});

it('should not break lift() composability', (done) => {
class MyCustomObservable<T> extends Observable<T> {
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new MyCustomObservable<R>();
(<any>observable).source = this;
(<any>observable).operator = operator;
return observable;
}
}

const result = new MyCustomObservable((observer: Observer<number>) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
groupBy(
(x: number) => x % 2,
(x: number) => x + '!'
)
);

expect(result instanceof MyCustomObservable).to.be.true;

const expectedGroups = [
{ key: 1, values: ['1!', '3!'] },
{ key: 0, values: ['2!'] },
];

result.subscribe({
next: (g: any) => {
const expectedGroup = expectedGroups.shift()!;
expect(g.key).to.equal(expectedGroup.key);

g.subscribe((x: any) => {
expect(x).to.deep.equal(expectedGroup.values.shift());
});
},
error: (x) => {
done(new Error('should not be called'));
},
complete: () => {
done();
},
});
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>((subscriber) => {
Expand Down