Skip to content

Commit

Permalink
fix(distinctUntilChanged): Ensure reentrant code is compared properly (
Browse files Browse the repository at this point in the history
…#6014)

- Fixes an issue where reentrant code would not be compared properly against previous emissions
- Adds better code comments
- Updates documentation with better text and examples
  • Loading branch information
benlesh committed Feb 11, 2021
1 parent b0ce55b commit 0ebcf17
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 51 deletions.
4 changes: 2 additions & 2 deletions api_guard/dist/types/operators/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ export declare function dematerialize<N extends ObservableNotification<any>>():

export declare function distinct<T, K>(keySelector?: (value: T) => K, flushes?: Observable<any>): MonoTypeOperatorFunction<T>;

export declare function distinctUntilChanged<T>(compare?: (x: T, y: T) => boolean): MonoTypeOperatorFunction<T>;
export declare function distinctUntilChanged<T, K>(compare: (x: K, y: K) => boolean, keySelector: (x: T) => K): MonoTypeOperatorFunction<T>;
export declare function distinctUntilChanged<T>(comparator?: (previous: T, current: T) => boolean): MonoTypeOperatorFunction<T>;
export declare function distinctUntilChanged<T, K>(comparator: (previous: K, current: K) => boolean, keySelector: (value: T) => K): MonoTypeOperatorFunction<T>;

export declare function distinctUntilKeyChanged<T>(key: keyof T): MonoTypeOperatorFunction<T>;
export declare function distinctUntilKeyChanged<T, K extends keyof T>(key: K, compare: (x: T[K], y: T[K]) => boolean): MonoTypeOperatorFunction<T>;
Expand Down
32 changes: 31 additions & 1 deletion spec/operators/distinctUntilChanged-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/** @prettier */
import { expect } from 'chai';
import { distinctUntilChanged, mergeMap, take } from 'rxjs/operators';
import { of, Observable } from 'rxjs';
import { of, Observable, Subject } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -298,4 +298,34 @@ describe('distinctUntilChanged', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

// This test is to cover a corner case where someone might write
// synchronous, reentrant code. At the time this test was authored,
// the operator was written in such a way that it would allow
// the duplicate non-distinct values to be emitted repeatedly.
it('should work properly with reentrant streams', () => {
const subject = new Subject<number | undefined>();
const results: any[] = [];
let count = 0;

subject.pipe(distinctUntilChanged()).subscribe((n) => {
results.push(n);

// Protect against an infinite loop in this test.
// That shouldn't happen.
if (++count > 2) {
throw new Error('this should have only been hit once');
}

// If we reenter with the same value, it should not
// emit again.
subject.next(1);
});

// Start with 1.
subject.next(1);

// It should only have emitted one value.
expect(results).to.deep.equal([1]);
});
});
199 changes: 151 additions & 48 deletions src/internal/operators/distinctUntilChanged.ts
Original file line number Diff line number Diff line change
@@ -1,78 +1,181 @@
import { MonoTypeOperatorFunction } from '../types';
import { identity } from '../util/identity';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';

/* tslint:disable:max-line-length */
export function distinctUntilChanged<T>(compare?: (x: T, y: T) => boolean): MonoTypeOperatorFunction<T>;
export function distinctUntilChanged<T, K>(compare: (x: K, y: K) => boolean, keySelector: (x: T) => K): MonoTypeOperatorFunction<T>;
/* tslint:enable:max-line-length */

/**
* Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item.
*
* If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.
* The comparator function shourld return true if the values are the same, and false if they are different.
* Returns a result {@link Observable} that emits all values pushed by the source observable if they
* are distinct in comparison to the last value the result observable emitted.
*
* If a comparator function is not provided, an equality check is used by default.
* 1. It will always emit the first value from the source.
* 2. For all subsequent values pushed by the source, they will be compared to the previously emitted values
* using the provided `comparator` or an `===` equality check.
* 3. If the value pushed by the source is determined to be unequal by this check, that value is emitted and
* becomes the new "previously emitted value" internally.
*
* ## Example
* A simple example with numbers
*
* A very basic example with no `comparator`. Note that `1` is emitted more than once,
* because it's distinct in comparison to the _previously emitted_ value,
* not in comparison to _all other emitted values_.
*
* ```ts
* import { of } from 'rxjs';
* import { distinctUntilChanged } from 'rxjs/operators';
*
* of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4).pipe(
* distinctUntilChanged(),
* )
* .subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4
* of(1, 1, 1, 2, 2, 2, 1, 1, 3, 3).pipe(
* distinctUntilChanged()
* )
* .subscribe(console.log);
* // Logs: 1, 2, 1, 3
* ```
*
* An example using a compare function
* ```typescript
* ## Example
*
* With a `comparator`, you can do custom comparisons. Let's say
* you only want to emit a value when all of its components have
* changed:
*
* ```ts
* import { of } from 'rxjs';
* import { distinctUntilChanged } from 'rxjs/operators';
*
* interface Person {
* age: number,
* name: string
* }
*
*of(
* { age: 4, name: 'Foo'},
* { age: 7, name: 'Bar'},
* { age: 5, name: 'Foo'},
* { age: 6, name: 'Foo'},
* ).pipe(
* distinctUntilChanged((p: Person, q: Person) => p.name === q.name),
* )
* .subscribe(x => console.log(x));
*
* // displays:
* // { age: 4, name: 'Foo' }
* // { age: 7, name: 'Bar' }
* // { age: 5, name: 'Foo' }
* const totallyDifferentBuilds$ = of(
* { engineVersion: '1.1.0', transmissionVersion: '1.2.0' },
* { engineVersion: '1.1.0', transmissionVersion: '1.4.0' },
* { engineVersion: '1.3.0', transmissionVersion: '1.4.0' },
* { engineVersion: '1.3.0', transmissionVersion: '1.5.0' },
* { engineVersion: '2.0.0', transmissionVersion: '1.5.0' }
* ).pipe(
* distinctUntilChanged((prev, curr) => {
* return (
* prev.engineVersion === curr.engineVersion ||
* prev.transmissionVersion === curr.transmissionVersion
* );
* })
* );
*
* totallyDifferentBuilds$.subscribe(console.log);
*
* // Logs:
* // {engineVersion: "1.1.0", transmissionVersion: "1.2.0"}
* // {engineVersion: "1.3.0", transmissionVersion: "1.4.0"}
* // {engineVersion: "2.0.0", transmissionVersion: "1.5.0"}
* ```
*
* ## Example
*
* You can also provide a custom `comparator` to check that emitted
* changes are only in one direction. Let's say you only want to get
* the next record temperature:
*
* ```ts
* import { of } from "rxjs";
* import { distinctUntilChanged } from "rxjs/operators";
*
* const temps$ = of(30, 31, 20, 34, 33, 29, 35, 20);
*
* const recordHighs$ = temps$.pipe(
* distinctUntilChanged((prevHigh, temp) => {
* // If the current temp is less than
* // or the same as the previous record,
* // the record hasn't changed.
* return temp <= prevHigh;
* })
* );
*
* recordHighs$.subscribe(console.log);
* // Logs: 30, 31, 34, 35
* ```
*
* @see {@link distinct}
* @see {@link distinctUntilKeyChanged}
* @param comparator A function used to compare the previous and current values for
* equality. Defaults to a `===` check.
*/
export function distinctUntilChanged<T>(comparator?: (previous: T, current: T) => boolean): MonoTypeOperatorFunction<T>;

/**
* Returns a result {@link Observable} that emits all values pushed by the source observable if they
* are distinct in comparison to the last value the result observable emitted.
*
* 1. It will always emit the first value from the source.
* 2. The `keySelector` will be run against all values, including the first value.
* 3. For all values after the first, the selected key will be compared against the key selected from
* the previously emitted value using the `comparator`.
* 4. If the keys are determined to be unequal by this check, the value (not the key), is emitted
* and the selected key from that value is saved for future comparisons against other keys.
*
* ## Example
*
* Selecting update events only when the `updatedBy` field shows
* the account changed hands...
*
* ```ts
* // A stream of updates to a given account
* const accountUpdates$ = of(
* { updatedBy: "blesh", data: [] },
* { updatedBy: "blesh", data: [] },
* { updatedBy: "ncjamieson", data: [] },
* { updatedBy: "ncjamieson", data: [] },
* { updatedBy: "blesh", data: [] }
* );
*
* // We only want the events where it changed hands
* const changedHands$ = accountUpdates$.pipe(
* distinctUntilChanged(undefined, update => update.updatedBy)
* );
*
* @param {function} [compare] Optional comparison function called to test if an item is distinct from the previous item in the source.
* A return value of true indicates that it is the same, and a return value of false means they are different.
* @return {Observable} An Observable that emits items from the source Observable with distinct values.
* changedHands$.subscribe(console.log);
* // Logs:
* // {updatedBy: "blesh", data: Array[0]}
* // {updatedBy: "ncjamieson", data: Array[0]}
* // {updatedBy: "blesh", data: Array[0]}
* ```
*
* @param comparator A function used to compare the previous and current keys for
* equality. Defaults to a `===` check.
* @param keySelector Used to select a key value to be passed to the `comparator`.
*/
export function distinctUntilChanged<T, K>(compare?: (a: K, b: K) => boolean, keySelector?: (x: T) => K): MonoTypeOperatorFunction<T> {
compare = compare ?? defaultCompare;
export function distinctUntilChanged<T, K>(
comparator: (previous: K, current: K) => boolean,
keySelector: (value: T) => K
): MonoTypeOperatorFunction<T>;

export function distinctUntilChanged<T, K>(
comparator?: (previous: K, current: K) => boolean,
keySelector: (value: T) => K = identity as (value: T) => K
): MonoTypeOperatorFunction<T> {
// We've been allowing `null` do be passed as the `compare`, so we can't do
// a default value for the parameter, because that will only work
// for `undefined`.
comparator = comparator ?? defaultCompare;

return operate((source, subscriber) => {
let prev: any;
// The previous key, used to compare against keys selected
// from new arrivals to determine "distinctiveness".
let previousKey: K;
// Whether or not this is the first value we've gotten.
let first = true;

source.subscribe(
new OperatorSubscriber(subscriber, (value) => {
const key: any = keySelector ? keySelector(value) : value;
if (first || !compare!(prev, key)) {
// We always call the key selector.
const currentKey = keySelector(value);

// If it's the first value, we always emit it.
// Otherwise, we compare this key to the previous key, and
// if the comparer returns false, we emit.
if (first || !comparator!(previousKey, currentKey)) {
// Update our state *before* we emit the value
// as emission can be the source of re-entrant code
// in functional libraries like this. We only really
// need to do this if it's the first value, or if the
// key we're tracking in previous needs to change.
first = false;
previousKey = currentKey;

// Emit the value!
subscriber.next(value);
}
prev = key;
first = false;
})
);
});
Expand Down

0 comments on commit 0ebcf17

Please sign in to comment.