Skip to content

Commit

Permalink
refactor: remove last uses of wrappedLift
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Sep 23, 2020
1 parent bde8eda commit cea0cae
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 66 deletions.
97 changes: 47 additions & 50 deletions src/internal/operators/bufferToggle.ts
@@ -1,8 +1,7 @@
/** @prettier */
import { Observable } from '../Observable';
import { Subscription } from '../Subscription';
import { OperatorFunction, SubscribableOrPromise } from '../types';
import { wrappedLift } from '../util/lift';
import { operate } from '../util/lift';
import { from } from '../observable/from';
import { OperatorSubscriber } from './OperatorSubscriber';
import { noop } from '../util/noop';
Expand Down Expand Up @@ -57,57 +56,55 @@ export function bufferToggle<T, O>(
openings: SubscribableOrPromise<O>,
closingSelector: (value: O) => SubscribableOrPromise<any>
): OperatorFunction<T, T[]> {
return function bufferToggleOperatorFunction(source: Observable<T>) {
return wrappedLift(source, (subscriber, liftedSource) => {
const buffers: T[][] = [];
return operate((liftedSource, subscriber) => {
const buffers: T[][] = [];

// Subscribe to the openings notifier first
from(openings).subscribe(
new OperatorSubscriber(
subscriber,
(openValue) => {
const buffer: T[] = [];
buffers.push(buffer);
// We use this composite subscription, so that
// when the closing notifier emits, we can tear it down.
const closingSubscription = new Subscription();
// Subscribe to the openings notifier first
from(openings).subscribe(
new OperatorSubscriber(
subscriber,
(openValue) => {
const buffer: T[] = [];
buffers.push(buffer);
// We use this composite subscription, so that
// when the closing notifier emits, we can tear it down.
const closingSubscription = new Subscription();

// This is captured here, because we emit on both next or
// if the closing notifier completes without value.
// TODO: We probably want to not have closing notifiers emit!!
const emit = () => {
arrRemove(buffers, buffer);
subscriber.next(buffer);
closingSubscription.unsubscribe();
};
// This is captured here, because we emit on both next or
// if the closing notifier completes without value.
// TODO: We probably want to not have closing notifiers emit!!
const emit = () => {
arrRemove(buffers, buffer);
subscriber.next(buffer);
closingSubscription.unsubscribe();
};

// The line below will add the subscription to the parent subscriber *and* the closing subscription.
closingSubscription.add(from(closingSelector(openValue)).subscribe(new OperatorSubscriber(subscriber, emit, undefined, emit)));
},
undefined,
noop
)
);
// The line below will add the subscription to the parent subscriber *and* the closing subscription.
closingSubscription.add(from(closingSelector(openValue)).subscribe(new OperatorSubscriber(subscriber, emit, undefined, emit)));
},
undefined,
noop
)
);

liftedSource.subscribe(
new OperatorSubscriber(
subscriber,
(value) => {
// Value from our source. Add it to all pending buffers.
for (const buffer of buffers) {
buffer.push(value);
}
},
undefined,
() => {
// Source complete. Emit all pending buffers.
while (buffers.length > 0) {
subscriber.next(buffers.shift()!);
}
subscriber.complete();
liftedSource.subscribe(
new OperatorSubscriber(
subscriber,
(value) => {
// Value from our source. Add it to all pending buffers.
for (const buffer of buffers) {
buffer.push(value);
}
)
);
});
};
},
undefined,
() => {
// Source complete. Emit all pending buffers.
while (buffers.length > 0) {
subscriber.next(buffers.shift()!);
}
subscriber.complete();
}
)
);
});
}
31 changes: 15 additions & 16 deletions src/internal/operators/skipUntil.ts
@@ -1,7 +1,7 @@
/** @prettier */
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction } from '../types';
import { wrappedLift } from '../util/lift';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { from } from '../observable/from';
import { noop } from '../util/noop';
Expand Down Expand Up @@ -45,22 +45,21 @@ import { noop } from '../util/noop';
* @name skipUntil
*/
export function skipUntil<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) =>
wrappedLift(source, (subscriber, liftedSource) => {
let taking = false;
return operate((source, subscriber) => {
let taking = false;

const skipSubscriber = new OperatorSubscriber(
subscriber,
() => {
skipSubscriber?.unsubscribe();
taking = true;
},
undefined,
noop
);
const skipSubscriber = new OperatorSubscriber(
subscriber,
() => {
skipSubscriber?.unsubscribe();
taking = true;
},
undefined,
noop
);

from(notifier).subscribe(skipSubscriber);
from(notifier).subscribe(skipSubscriber);

liftedSource.subscribe(new OperatorSubscriber(subscriber, (value) => taking && subscriber.next(value)));
});
source.subscribe(new OperatorSubscriber(subscriber, (value) => taking && subscriber.next(value)));
});
}

0 comments on commit cea0cae

Please sign in to comment.