Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rx): A new method of piping has been added #7229

Merged
merged 7 commits into from May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 8 additions & 3 deletions spec-dtslint/Observable-spec.ts
@@ -1,5 +1,4 @@
import { Observable, of, OperatorFunction } from 'rxjs';
import { mapTo } from 'rxjs/operators';
import { Observable, of, OperatorFunction, map, filter } from 'rxjs';

function a<I extends string, O extends string>(input: I, output: O): OperatorFunction<I, O>;
function a<I, O extends string>(output: O): OperatorFunction<I, O>;
Expand Down Expand Up @@ -30,7 +29,7 @@ function a<I, O extends string>(output: O): OperatorFunction<I, O>;
* @param {string} output The `OperatorFunction` output type parameter
*/
function a<I, O extends string>(inputOrOutput: I | O, output?: O): OperatorFunction<I, O> {
return mapTo<I, O>(output === undefined ? inputOrOutput as O : output);
return map(() => output === undefined ? inputOrOutput as O : output);
}

describe('pipe', () => {
Expand Down Expand Up @@ -126,4 +125,10 @@ describe('pipe', () => {
const customOperator = () => <T>(a: Observable<T>) => a;
const o = of('foo').pipe(customOperator()); // $ExpectType Observable<string>
});

it('should infer properly for any reasonable pipe chain', () => {
const o1 = of('foo').pipe(source => source.toString(), s => s.length, n => n + 1); // $ExpectType number
const o2 = of(123).pipe(map(n => n + '?'), source => source.subscribe()); // $ExpectType Subscription
const o3 = of('test').pipe(map(n => n + ':' + n), filter(n => n < 30)); // $ExpectError
})
});
56 changes: 56 additions & 0 deletions spec-dtslint/util/rx-spec.ts
@@ -0,0 +1,56 @@
import { rx, map, of, toArray, filter } from 'rxjs';
import { A, B, C, D, E, F, G, H, I, J } from '../helpers'

it('should infer conversions from ObservableInputs', () => {
const o1 = rx([1, 2, 3]); // $ExpectType Observable<number>
const o2 = rx(new Set<number>()); // $ExpectType Observable<number>
const o3 = rx(new Map<string, number>()); // $ExpectType Observable<[string, number]>
const o4 = rx(of(1, 2, 3)); // $ExpectType Observable<number>
const o5 = rx(Promise.resolve(1)); // $ExpectType Observable<number>
const o6 = rx(Promise.resolve([1, 2, 3])); // $ExpectType Observable<number[]>

function* test() {
yield 1;
yield 2;
yield 3;
}

const o7 = rx(test()); // $ExpectType Observable<1 | 2 | 3>

async function* test2() {
yield 1;
yield 2;
yield 3;
}

const o8 = rx(test2()); // $ExpectType Observable<1 | 2 | 3>
const o9 = rx({}); // $ExpectError
});

it('should compose with pipeable functions, passing an Observable to the first of those functions', () => {
const o1 = rx([1, 2, 3], map(n => n + 1)); // $ExpectType Observable<number>
const o2 = rx([1, 2, 3], map(n => n + 1), filter(n => n < 3)); // $ExpectType Observable<number>
const o3 = rx([1, 2, 3], map(n => n + 1), filter(n => n < 3), toArray()); // $ExpectType Observable<number[]>
const o4 = rx([1, 2, 3], map(n => n + 1), filter(n => n < 3), toArray(), map(n => n.length)); // $ExpectType Observable<number>

// Even with unary functions that are not RxJS operators
const o5 = rx([1, 2, 3], map(n => n + 1), toArray(), source => Object.keys(source), keys => keys.length); // $ExpectType number

// Maybe as a means of subscription
const o6 = rx([1, 2, 3], map(n => n + 1), toArray(), source => source.subscribe()); // $ExpectType Subscription
})


it('should handle a large number of unary functions appropriately', () => {
const r0 = rx([1, 2, 3]); // $ExpectType Observable<number>
const r1 = rx([1, 2, 3], () => new A()); // $ExpectType A
const r2 = rx([1, 2, 3], () => new A(), () => new B()); // $ExpectType B
const r3 = rx([1, 2, 3], () => new A(), () => new B(), () => new C()); // $ExpectType C
const r4 = rx([1, 2, 3], () => new A(), () => new B(), () => new C(), () => new D()); // $ExpectType D
const r5 = rx([1, 2, 3], () => new A(), () => new B(), () => new C(), () => new D(), () => new E()); // $ExpectType E
const r6 = rx([1, 2, 3], () => new A(), () => new B(), () => new C(), () => new D(), () => new E(), () => new F()); // $ExpectType F
const r7 = rx([1, 2, 3], () => new A(), () => new B(), () => new C(), () => new D(), () => new E(), () => new F(), () => new G()); // $ExpectType G
const r8 = rx([1, 2, 3], () => new A(), () => new B(), () => new C(), () => new D(), () => new E(), () => new F(), () => new G(), () => new H()); // $ExpectType H
const r9 = rx([1, 2, 3], () => new A(), () => new B(), () => new C(), () => new D(), () => new E(), () => new F(), () => new G(), () => new H(), () => new I()); // $ExpectType unknown
const r10 = rx([1, 2, 3], () => new A(), () => new B(), () => new C(), () => new D(), () => new E(), () => new F(), () => new G(), () => new H(), () => new I(), () => new J()); // $ExpectType unknown
})
9 changes: 9 additions & 0 deletions spec/Observable-spec.ts
Expand Up @@ -638,6 +638,15 @@ describe('Observable', () => {
const result = source.pipe();
expect(result).to.equal(source);
});

it('should allow any kind of piped function', () => {
const source = of('test');
const result = source.pipe(
source => source instanceof Observable,
isObservable => isObservable ? 'Well hello, there.' : 'Huh?'
);
expect(result).to.equal('Well hello, there.');
})
});

it('should not swallow internal errors', (done) => {
Expand Down
2 changes: 1 addition & 1 deletion spec/util/pipe-spec.ts
@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { pipe } from 'rxjs';
import { map, Observable, pipe, rx } from 'rxjs';

describe('pipe', () => {
it('should exist', () => {
Expand Down
36 changes: 36 additions & 0 deletions spec/util/rx-spec.ts
@@ -0,0 +1,36 @@
import { expect } from "chai";
import { map, Observable, rx } from "rxjs";

describe('rx', () => {
it('should work like pipe, convert the first argument to an observable', () => {
const a = [1, 2, 3];
const results: any[] = [];

rx(a, map(x => x + 1)).subscribe({
next: value => results.push(value),
complete: () => {
results.push('done');
}
})
expect(results).to.deep.equal([2, 3, 4, 'done'])
});

it('should simply convert the first argument to an observable if it is the only thing provided', () => {
const a = [1, 2, 3];
const results: any[] = [];

rx(a).subscribe({
next: value => results.push(value),
complete: () => {
results.push('done');
}
})
expect(results).to.deep.equal([1, 2, 3, 'done'])
});

it('should allow any kind of custom piping', () => {
const a = [1, 2, 3];
const result = rx(a, map(x => x + 1), source => source instanceof Observable)
expect(result).to.be.true;
});
});
1 change: 1 addition & 0 deletions src/index.ts
Expand Up @@ -38,6 +38,7 @@ export { Subscription } from './internal/Subscription';
export { Subscriber } from './internal/Subscriber';

/* Utils */
export { rx } from './internal/util/rx';
export { pipe } from './internal/util/pipe';
export { noop } from './internal/util/noop';
export { identity } from './internal/util/identity';
Expand Down
128 changes: 67 additions & 61 deletions src/internal/Observable.ts
@@ -1,9 +1,8 @@
import { Subscriber } from './Subscriber';
import { Subscription } from './Subscription';
import { TeardownLogic, OperatorFunction, Subscribable, Observer } from './types';
import { TeardownLogic, UnaryFunction, Subscribable, Observer, OperatorFunction } from './types';
import { observable as Symbol_observable } from './symbol/observable';
import { pipeFromArray } from './util/pipe';

/**
* A representation of any set of values over any amount of time. This is the most basic building block
* of RxJS.
Expand Down Expand Up @@ -256,72 +255,79 @@ export class Observable<T> implements Subscribable<T> {

/* tslint:disable:max-line-length */
pipe(): Observable<T>;
pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Observable<B>;
pipe<A, B, C>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>): Observable<C>;
pipe<A, B, C, D>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>
): Observable<D>;
pipe<A>(op1: UnaryFunction<Observable<T>, A>): A;
pipe<A, B>(op1: UnaryFunction<Observable<T>, A>, op2: UnaryFunction<A, B>): B;
pipe<A, B, C>(op1: UnaryFunction<Observable<T>, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>): C;
pipe<A, B, C, D>(op1: UnaryFunction<Observable<T>, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>): D;
pipe<A, B, C, D, E>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>
): Observable<E>;
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>
): E;
pipe<A, B, C, D, E, F>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>
): Observable<F>;
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>
): F;
pipe<A, B, C, D, E, F, G>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>,
op7: OperatorFunction<F, G>
): Observable<G>;
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>
): G;
pipe<A, B, C, D, E, F, G, H>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>,
op7: OperatorFunction<F, G>,
op8: OperatorFunction<G, H>
): Observable<H>;
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>,
op8: UnaryFunction<G, H>
): H;
pipe<A, B, C, D, E, F, G, H, I>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>,
op7: OperatorFunction<F, G>,
op8: OperatorFunction<G, H>,
op9: OperatorFunction<H, I>
): Observable<I>;
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>,
op8: UnaryFunction<G, H>,
op9: UnaryFunction<H, I>
): I;
pipe<A, B, C, D, E, F, G, H, I>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>,
op7: OperatorFunction<F, G>,
op8: OperatorFunction<G, H>,
op9: OperatorFunction<H, I>,
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>,
op8: UnaryFunction<G, H>,
op9: UnaryFunction<H, I>,
...operations: OperatorFunction<any, any>[]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace the any usage here with unknown?

): Observable<unknown>;
pipe<A, B, C, D, E, F, G, H, I>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really wish we either had a dedicated pipe operator in js or at least had some way to use recursive/conditional types powerful enough to support not needing to do this.

op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>,
op8: UnaryFunction<G, H>,
op9: UnaryFunction<H, I>,
...operations: UnaryFunction<any, any>[]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace the any usage here with unknown?

): unknown;
/* tslint:enable:max-line-length */

/**
Expand All @@ -344,7 +350,7 @@ export class Observable<T> implements Subscribable<T> {
* .subscribe(x => console.log(x));
* ```
*/
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
pipe(...operations: UnaryFunction<any, any>[]): unknown {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace the any usage here with unknown?

return pipeFromArray(operations)(this);
}
}
8 changes: 4 additions & 4 deletions src/internal/util/pipe.ts
Expand Up @@ -73,14 +73,14 @@ export function pipe<T, A, B, C, D, E, F, G, H, I>(
* pipe() can be called on one or more functions, each of which can take one argument ("UnaryFunction")
* and uses it to return a value.
* It returns a function that takes one argument, passes it to the first UnaryFunction, and then
* passes the result to the next one, passes that result to the next one, and so on.
* passes the result to the next one, passes that result to the next one, and so on.
*/
export function pipe(...fns: Array<UnaryFunction<any, any>>): UnaryFunction<any, any> {
return pipeFromArray(fns);
}

/** @internal */
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
export function pipeFromArray(fns: UnaryFunction<unknown, unknown>[]): UnaryFunction<unknown, unknown> {
if (fns.length === 0) {
return identity as UnaryFunction<any, any>;
}
Expand All @@ -89,7 +89,7 @@ export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunct
return fns[0];
}

return function piped(input: T): R {
return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
return function piped(input) {
return fns.reduce((prev, fn) => fn(prev), input);
};
}