Skip to content

Commit

Permalink
fix: Add opBuffer operator to pipes (#5475)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason3S committed Apr 15, 2024
1 parent 5aa563b commit b84a0ae
Show file tree
Hide file tree
Showing 14 changed files with 216 additions and 4 deletions.
1 change: 1 addition & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const config = {
// 'import/order': 'error',
'simple-import-sort/imports': 'error',
'simple-import-sort/exports': 'error',
'unicorn/explicit-length-check': 'off',
},
},
{
Expand Down
9 changes: 9 additions & 0 deletions packages/cspell-pipe/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@
"./operators/index.js": {
"import": "./dist/operators/index.js"
},
"./async": {
"import": "./dist/async/index.js"
},
"./async/index": {
"import": "./dist/async/index.js"
},
"./async/index.js": {
"import": "./dist/async/index.js"
},
"./sync": {
"import": "./dist/sync/index.js"
},
Expand Down
1 change: 1 addition & 0 deletions packages/cspell-pipe/src/__snapshots__/index.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ exports[`Pipe API > pipe api 1`] = `
"isAsyncIterable",
"opAppend",
"opAwaitAsync",
"opBuffer",
"opConcatMap",
"opFilter",
"opFirst",
Expand Down
25 changes: 25 additions & 0 deletions packages/cspell-pipe/src/async/__snapshots__/index.test.ts.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html

exports[`Pipe Sync API > pipe api 1`] = `
[
"opAppend",
"opBuffer",
"opCombine",
"opConcatMap",
"opFilter",
"opFirst",
"opFlatten",
"opJoinStrings",
"opLast",
"opMap",
"opReduce",
"opSkip",
"opTake",
"opTap",
"opUnique",
"pipe",
"pipeAsync",
"reduce",
"toArray",
]
`;
9 changes: 9 additions & 0 deletions packages/cspell-pipe/src/async/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { describe, expect, test } from 'vitest';

import * as pipe from './index.js';

describe('Pipe Sync API', () => {
test('pipe api', () => {
expect(Object.keys(pipe).sort()).toMatchSnapshot();
});
});
21 changes: 21 additions & 0 deletions packages/cspell-pipe/src/async/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
export { toArrayAsync as toArray } from '../helpers/toArray.js';
export type { OperatorAsync as Operator } from '../operators/index.js';
export {
opAppendAsync as opAppend,
opBufferAsync as opBuffer,
opCombineAsync as opCombine,
opConcatMapAsync as opConcatMap,
opFilterAsync as opFilter,
opFirstAsync as opFirst,
opFlattenAsync as opFlatten,
opJoinStringsAsync as opJoinStrings,
opLastAsync as opLast,
opMapAsync as opMap,
opReduceAsync as opReduce,
opSkipAsync as opSkip,
opTakeAsync as opTake,
opTapAsync as opTap,
opUniqueAsync as opUnique,
} from '../operators/index.js';
export { pipeAsync as pipe, pipeAsync } from '../pipe.js';
export { reduceAsync as reduce } from '../reduce.js';
1 change: 1 addition & 0 deletions packages/cspell-pipe/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export { interleave, isAsyncIterable, toArray, toAsyncIterable, toDistributableI
export {
opAppend,
opAwaitAsync,
opBuffer,
opConcatMap,
opFilter,
opFirst,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ exports[`Operators > operators 1`] = `
"opAppendAsync",
"opAppendSync",
"opAwaitAsync",
"opBuffer",
"opBufferAsync",
"opBufferSync",
"opCombineAsync",
"opCombineSync",
"opConcatMap",
Expand Down
80 changes: 80 additions & 0 deletions packages/cspell-pipe/src/operators/buffer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { describe, expect, test } from 'vitest';

import { toArray } from '../helpers/index.js';
import { pipeAsync, pipeSync } from '../pipe.js';
import { opBuffer } from './buffer.js';
import { opTake } from './take.js';

describe('Validate buffer', () => {
const values = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'eleven'];
const values2 = [
['one', 'two'],
['three', 'four'],
['five', 'six'],
['seven', 'eight'],
['nine', 'ten'],
['eleven'],
];
const values3 = [
['one', 'two', 'three'],
['four', 'five', 'six'],
['seven', 'eight', 'nine'],
['ten', 'eleven'],
];

test.each`
values | size | expected
${values} | ${1} | ${values.map((v) => [v])}
${values} | ${2} | ${values2}
${values} | ${3} | ${values3}
`('buffer $size', async ({ values, size, expected }) => {
const resultSync = toArray(pipeSync(values, opBuffer(size)));
expect(resultSync).toEqual(expected);

const resultAsync = await toArray(pipeAsync(values, opBuffer(size)));
expect(resultAsync).toEqual(expected);
});

test('buffer async array of promises', async () => {
const result = await toArray(pipeAsync(delay(mapP(values), 1), opBuffer(3)));
expect(result).toEqual(values3);
});

test('buffer stop early', async () => {
let i = 0;
let finallyCalled = false;
function* gen() {
try {
i = 0;
for (const v of values) {
++i;
yield v;
}
} finally {
finallyCalled = true;
}
}

const result = toArray(pipeSync(gen(), opBuffer(3), opTake(3)));
expect(i).toBe(3 * 3);
expect(finallyCalled).toBe(true);
expect(result).toEqual(values3.slice(0, 3));

finallyCalled = false;
const resultAsync = await toArray(pipeAsync(gen(), opBuffer(3), opTake(2)));
expect(i).toBe(3 * 2);
expect(finallyCalled).toBe(true);
expect(resultAsync).toEqual(values3.slice(0, 2));
});
});

function mapP<T>(v: T[]): Promise<T>[] {
return v.map((v) => Promise.resolve(v));
}

async function* delay<T>(values: T[], ms: number): AsyncIterable<T> {
for (const v of values) {
await new Promise((resolve) => setTimeout(resolve, ms));
yield v;
}
}
61 changes: 61 additions & 0 deletions packages/cspell-pipe/src/operators/buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { isAsyncIterable } from '../helpers/util.js';
import type { PipeFn } from '../internalTypes.js';

/**
* Buffer the input iterable into arrays of the given size.
* @param size - The size of the buffer.
* @returns A function that takes an async iterable and returns an async iterable of arrays of the given size.
*/
export function opBufferAsync<T>(size: number): (iter: AsyncIterable<T>) => AsyncIterable<T[]> {
async function* fn(iter: Iterable<T> | AsyncIterable<T>) {
let buffer: T[] = [];
for await (const v of iter) {
buffer.push(v);
if (buffer.length >= size) {
yield buffer;
buffer = [];
}
}

if (buffer.length > 0) {
yield buffer;
}
}

return fn;
}

/**
* @param size - The size of the buffer.
* @returns A function that takes an iterable and returns an iterable of arrays of the given size.
*/
export function opBufferSync<T>(size: number): (iter: Iterable<T>) => Iterable<T[]> {
function* fn(iter: Iterable<T>) {
let buffer: T[] = [];
for (const v of iter) {
buffer.push(v);
if (buffer.length >= size) {
yield buffer;
buffer = [];
}
}

if (buffer.length > 0) {
yield buffer;
}
}

return fn;
}

export function opBuffer<T>(size: number): PipeFn<T, T[]> {
const asyncFn = opBufferAsync<T>(size);
const syncFn = opBufferSync<T>(size);

function _(i: Iterable<T>): Iterable<T[]>;
function _(i: AsyncIterable<T>): AsyncIterable<T[]>;
function _(i: Iterable<T> | AsyncIterable<T>): Iterable<T[]> | AsyncIterable<T[]> {
return isAsyncIterable(i) ? asyncFn(i) : syncFn(i);
}
return _;
}
1 change: 1 addition & 0 deletions packages/cspell-pipe/src/operators/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export { opAppend, opAppendAsync, opAppendSync } from './append.js';
export { opAwaitAsync } from './await.js';
export { opBuffer, opBufferAsync, opBufferSync } from './buffer.js';
export { opCombineAsync, opCombineSync } from './combine.js';
export { opConcatMap, opConcatMapAsync, opConcatMapSync } from './concatMap.js';
export { opFilter, opFilterAsync, opFilterSync } from './filter.js';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
exports[`Pipe Sync API > pipe api 1`] = `
[
"opAppend",
"opBuffer",
"opCombine",
"opConcatMap",
"opFilter",
Expand Down
1 change: 1 addition & 0 deletions packages/cspell-pipe/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export { toArraySync as toArray } from '../helpers/toArray.js';
export type { OperatorSync as Operator } from '../operators/index.js';
export {
opAppendSync as opAppend,
opBufferSync as opBuffer,
opCombineSync as opCombine,
opConcatMapSync as opConcatMap,
opFilterSync as opFilter,
Expand Down
6 changes: 2 additions & 4 deletions packages/cspell/src/app/util/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ import assert from 'assert';
export function* prefetchIterable<T>(iterable: Iterable<T>, size: number): Iterable<T> {
assert(size >= 0);

const iter = iterable[Symbol.iterator]();

const buffer: T[] = [];

for (let next = iter.next(); !next.done; next = iter.next()) {
buffer.push(next.value);
for (const value of iterable) {
buffer.push(value);
if (buffer.length >= size - 1) {
const value = buffer[0];
buffer.shift();
Expand Down

0 comments on commit b84a0ae

Please sign in to comment.