Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove operate #7249

Merged
merged 5 commits into from Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
79 changes: 40 additions & 39 deletions src/internal/operators/audit.ts
@@ -1,7 +1,7 @@
import { Subscriber } from '../Subscriber';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';

import { operate } from '../util/lift';
jakovljevic-mladen marked this conversation as resolved.
Show resolved Hide resolved
import { Observable } from '../Observable';
import { from } from '../observable/from';
import { createOperatorSubscriber } from './OperatorSubscriber';

Expand Down Expand Up @@ -51,46 +51,47 @@ import { createOperatorSubscriber } from './OperatorSubscriber';
* emissions from the source Observable.
*/
export function audit<T>(durationSelector: (value: T) => ObservableInput<any>): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let hasValue = false;
let lastValue: T | null = null;
let durationSubscriber: Subscriber<any> | null = null;
let isComplete = false;
return (source) =>
new Observable((subscriber) => {
benlesh marked this conversation as resolved.
Show resolved Hide resolved
let hasValue = false;
let lastValue: T | null = null;
let durationSubscriber: Subscriber<any> | null = null;
let isComplete = false;

const endDuration = () => {
durationSubscriber?.unsubscribe();
durationSubscriber = null;
if (hasValue) {
hasValue = false;
const value = lastValue!;
lastValue = null;
subscriber.next(value);
}
isComplete && subscriber.complete();
};
const endDuration = () => {
durationSubscriber?.unsubscribe();
durationSubscriber = null;
if (hasValue) {
hasValue = false;
const value = lastValue!;
lastValue = null;
subscriber.next(value);
}
isComplete && subscriber.complete();
};

const cleanupDuration = () => {
durationSubscriber = null;
isComplete && subscriber.complete();
};
const cleanupDuration = () => {
durationSubscriber = null;
isComplete && subscriber.complete();
};

source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => {
hasValue = true;
lastValue = value;
if (!durationSubscriber) {
from(durationSelector(value)).subscribe(
(durationSubscriber = createOperatorSubscriber(subscriber, endDuration, cleanupDuration))
);
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => {
hasValue = true;
lastValue = value;
if (!durationSubscriber) {
from(durationSelector(value)).subscribe(
(durationSubscriber = createOperatorSubscriber(subscriber, endDuration, cleanupDuration))
);
}
},
() => {
isComplete = true;
(!hasValue || !durationSubscriber || durationSubscriber.closed) && subscriber.complete();
}
},
() => {
isComplete = true;
(!hasValue || !durationSubscriber || durationSubscriber.closed) && subscriber.complete();
}
)
);
});
)
);
});
}
67 changes: 34 additions & 33 deletions src/internal/operators/buffer.ts
@@ -1,5 +1,5 @@
import { OperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { Observable } from '../Observable';
import { noop } from '../util/noop';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { from } from '../observable/from';
Expand Down Expand Up @@ -43,39 +43,40 @@ import { from } from '../observable/from';
* of values.
*/
export function buffer<T>(closingNotifier: ObservableInput<any>): OperatorFunction<T, T[]> {
return operate((source, subscriber) => {
// The current buffered values.
let currentBuffer: T[] = [];
return (source) =>
new Observable((subscriber) => {
// The current buffered values.
let currentBuffer: T[] = [];

// Subscribe to the closing notifier first.
from(closingNotifier).subscribe(
createOperatorSubscriber(
subscriber,
() => {
// Start a new buffer and emit the previous one.
const b = currentBuffer;
currentBuffer = [];
subscriber.next(b);
},
noop
)
);
// Subscribe to the closing notifier first.
from(closingNotifier).subscribe(
createOperatorSubscriber(
subscriber,
() => {
// Start a new buffer and emit the previous one.
const b = currentBuffer;
currentBuffer = [];
subscriber.next(b);
},
noop
)
);

// Subscribe to our source.
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => currentBuffer.push(value),
() => {
subscriber.next(currentBuffer);
subscriber.complete();
}
)
);
// Subscribe to our source.
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => currentBuffer.push(value),
() => {
subscriber.next(currentBuffer);
subscriber.complete();
}
)
);

return () => {
// Ensure buffered values are released on finalization.
currentBuffer = null!;
};
});
return () => {
// Ensure buffered values are released on finalization.
currentBuffer = null!;
};
});
}
103 changes: 52 additions & 51 deletions src/internal/operators/bufferCount.ts
@@ -1,5 +1,5 @@
import { OperatorFunction } from '../types';
import { operate } from '../util/lift';
import { Observable } from '../Observable';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { arrRemove } from '../util/arrRemove';

Expand Down Expand Up @@ -59,62 +59,63 @@ export function bufferCount<T>(bufferSize: number, startBufferEvery: number | nu
// opening and closing on the bufferSize itself.
startBufferEvery = startBufferEvery ?? bufferSize;

return operate((source, subscriber) => {
let buffers: T[][] = [];
let count = 0;
return (source) =>
new Observable((subscriber) => {
let buffers: T[][] = [];
let count = 0;

source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => {
let toEmit: T[][] | null = null;
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => {
let toEmit: T[][] | null = null;

// Check to see if we need to start a buffer.
// This will start one at the first value, and then
// a new one every N after that.
if (count++ % startBufferEvery! === 0) {
buffers.push([]);
}
// Check to see if we need to start a buffer.
// This will start one at the first value, and then
// a new one every N after that.
if (count++ % startBufferEvery! === 0) {
buffers.push([]);
}

// Push our value into our active buffers.
for (const buffer of buffers) {
buffer.push(value);
// Check to see if we're over the bufferSize
// if we are, record it so we can emit it later.
// If we emitted it now and removed it, it would
// mutate the `buffers` array while we're looping
// over it.
if (bufferSize <= buffer.length) {
toEmit = toEmit ?? [];
toEmit.push(buffer);
// Push our value into our active buffers.
for (const buffer of buffers) {
buffer.push(value);
// Check to see if we're over the bufferSize
// if we are, record it so we can emit it later.
// If we emitted it now and removed it, it would
// mutate the `buffers` array while we're looping
// over it.
if (bufferSize <= buffer.length) {
toEmit = toEmit ?? [];
toEmit.push(buffer);
}
}
}

if (toEmit) {
// We have found some buffers that are over the
// `bufferSize`. Emit them, and remove them from our
// buffers list.
for (const buffer of toEmit) {
arrRemove(buffers, buffer);
if (toEmit) {
// We have found some buffers that are over the
// `bufferSize`. Emit them, and remove them from our
// buffers list.
for (const buffer of toEmit) {
arrRemove(buffers, buffer);
subscriber.next(buffer);
}
}
},
() => {
// When the source completes, emit all of our
// active buffers.
for (const buffer of buffers) {
subscriber.next(buffer);
}
subscriber.complete();
},
// Pass all errors through to consumer.
undefined,
() => {
// Clean up our memory when we finalize
buffers = null!;
}
},
() => {
// When the source completes, emit all of our
// active buffers.
for (const buffer of buffers) {
subscriber.next(buffer);
}
subscriber.complete();
},
// Pass all errors through to consumer.
undefined,
() => {
// Clean up our memory when we finalize
buffers = null!;
}
)
);
});
)
);
});
}