Skip to content

Commit

Permalink
refactor: Don't return subscriber or subscription in operate unnece…
Browse files Browse the repository at this point in the history
…ssarily.
  • Loading branch information
benlesh committed Sep 23, 2020
1 parent 68fe174 commit 769a298
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/internal/operators/count.ts
Expand Up @@ -67,7 +67,7 @@ export function count<T>(predicate?: (value: T, index: number, source: Observabl
let index = 0;
let count = 0;

return source.subscribe(
source.subscribe(
new OperatorSubscriber(
subscriber,
(value) => (!predicate || predicate(value, index++, source)) && count++,
Expand Down
6 changes: 3 additions & 3 deletions src/internal/operators/dematerialize.ts
Expand Up @@ -52,7 +52,7 @@ import { OperatorSubscriber } from './OperatorSubscriber';
* embedded in Notification objects emitted by the source Observable.
*/
export function dematerialize<N extends ObservableNotification<any>>(): OperatorFunction<N, ValueFromNotification<N>> {
return operate((source, subscriber) =>
source.subscribe(new OperatorSubscriber(subscriber, (notification) => observeNotification(notification, subscriber)))
);
return operate((source, subscriber) => {
source.subscribe(new OperatorSubscriber(subscriber, (notification) => observeNotification(notification, subscriber)));
});
}
14 changes: 5 additions & 9 deletions src/internal/operators/filter.ts
Expand Up @@ -58,15 +58,11 @@ export function filter<T>(predicate: (value: T, index: number) => boolean, thisA

// Subscribe to the source, all errors and completions are
// forwarded to the consumer.
return source.subscribe(
new OperatorSubscriber(subscriber, (value) => {
// Call the predicate with the appropriate `this` context,
// if the predicate returns `true`, then send the value
// to the consumer.
if (predicate.call(thisArg, value, index++)) {
subscriber.next(value);
}
})
source.subscribe(
// Call the predicate with the appropriate `this` context,
// if the predicate returns `true`, then send the value
// to the consumer.
new OperatorSubscriber(subscriber, (value) => predicate.call(thisArg, value, index++) && subscriber.next(value))
);
});
}
4 changes: 1 addition & 3 deletions src/internal/operators/refCount.ts
Expand Up @@ -106,12 +106,10 @@ export function refCount<T>(): MonoTypeOperatorFunction<T> {
subscriber.unsubscribe();
});

const subscription = source.subscribe(refCounter);
source.subscribe(refCounter);

if (!refCounter.closed) {
connection = (source as ConnectableObservable<T>).connect();
}

return subscription;
});
}
6 changes: 1 addition & 5 deletions src/internal/operators/skip.ts
Expand Up @@ -15,10 +15,6 @@ import { OperatorSubscriber } from './OperatorSubscriber';
export function skip<T>(count: number): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let seen = 0;
return source.subscribe(
new OperatorSubscriber(subscriber, (value) => {
count === seen ? subscriber.next(value) : seen++;
})
);
source.subscribe(new OperatorSubscriber(subscriber, (value) => (count === seen ? subscriber.next(value) : seen++)));
});
}
2 changes: 1 addition & 1 deletion src/internal/operators/take.ts
Expand Up @@ -53,7 +53,7 @@ export function take<T>(count: number): MonoTypeOperatorFunction<T> {
? () => EMPTY
: operate((source, subscriber) => {
let seen = 0;
return source.subscribe(
source.subscribe(
new OperatorSubscriber(subscriber, (value) => {
if (++seen <= count) {
subscriber.next(value);
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/takeWhile.ts
Expand Up @@ -53,7 +53,7 @@ export function takeWhile<T>(predicate: (value: T, index: number) => boolean, in
export function takeWhile<T>(predicate: (value: T, index: number) => boolean, inclusive = false): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let index = 0;
return source.subscribe(
source.subscribe(
new OperatorSubscriber(subscriber, (value) => {
const result = predicate(value, index++);
(result || inclusive) && subscriber.next(value);
Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/tap.ts
Expand Up @@ -149,7 +149,8 @@ export function tap<T>(
onError = error ? wrap(error.bind(nextOrObserver)) : noop;
onComplete = complete ? wrap(complete.bind(nextOrObserver)) : noop;
}
return source.subscribe(new TapSubscriber(subscriber, onNext, onError, onComplete));

source.subscribe(new TapSubscriber(subscriber, onNext, onError, onComplete));
});
}

Expand Down
4 changes: 3 additions & 1 deletion src/internal/operators/zipWith.ts
Expand Up @@ -78,7 +78,9 @@ export function zip<T, TOther, R>(
* @deprecated Deprecated. Use {@link zipWith}.
*/
export function zip<T, R>(...sources: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): OperatorFunction<T, any> {
return operate((source, subscriber) => zipStatic(source, ...sources).subscribe(subscriber));
return operate((source, subscriber) => {
zipStatic(source, ...sources).subscribe(subscriber);
});
}

/**
Expand Down

0 comments on commit 769a298

Please sign in to comment.