Skip to content

Commit

Permalink
feat(async): async functions in parallel are now called with an add…
Browse files Browse the repository at this point in the history
…itional index parameter

test(async): added test for the new index parameter in `parallel`
  • Loading branch information
UnKnoWn-Consortium committed Apr 10, 2023
1 parent 77cb220 commit b2b9816
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 22 deletions.
8 changes: 5 additions & 3 deletions cdn/radash.esm.js
Original file line number Diff line number Diff line change
Expand Up @@ -383,21 +383,23 @@ const parallel = async (limit, array, func) => {
index,
item
}));
const processor = async (res) => {
const processor = (queueIndex) => async (res) => {
const results2 = [];
while (true) {
const next = work.pop();
if (!next)
return res(results2);
const [error, result] = await tryit(func)(next.item);
const [error, result] = await tryit(func)(next.item, queueIndex);
results2.push({
error,
result,
index: next.index
});
}
};
const queues = list(1, limit).map(() => new Promise(processor));
const queues = list(1, limit).map(
(queueIndex) => new Promise(processor(queueIndex - 1))
);
const itemResults = await Promise.all(queues);
const [errors, results] = fork(
sort(itemResults.flat(), (r) => r.index),
Expand Down
8 changes: 5 additions & 3 deletions cdn/radash.js
Original file line number Diff line number Diff line change
Expand Up @@ -386,21 +386,23 @@ var radash = (function (exports) {
index,
item
}));
const processor = async (res) => {
const processor = (queueIndex) => async (res) => {
const results2 = [];
while (true) {
const next = work.pop();
if (!next)
return res(results2);
const [error, result] = await tryit(func)(next.item);
const [error, result] = await tryit(func)(next.item, queueIndex);
results2.push({
error,
result,
index: next.index
});
}
};
const queues = list(1, limit).map(() => new Promise(processor));
const queues = list(1, limit).map(
(queueIndex) => new Promise(processor(queueIndex - 1))
);
const itemResults = await Promise.all(queues);
const [errors, results] = fork(
sort(itemResults.flat(), (r) => r.index),
Expand Down
2 changes: 1 addition & 1 deletion cdn/radash.min.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "radash",
"version": "10.8.1",
"version": "10.8.2",
"description": "Functional utility library - modern, simple, typed, powerful",
"main": "dist/cjs/index.cjs",
"module": "dist/esm/index.mjs",
Expand Down
32 changes: 18 additions & 14 deletions src/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,28 +111,32 @@ export class AggregateError extends Error {
export const parallel = async <T, K>(
limit: number,
array: readonly T[],
func: (item: T) => Promise<K>
func: (item: T, index: number) => Promise<K>
): Promise<K[]> => {
const work = array.map((item, index) => ({
index,
item
}))
// Process array items
const processor = async (res: (value: WorkItemResult<K>[]) => void) => {
const results: WorkItemResult<K>[] = []
while (true) {
const next = work.pop()
if (!next) return res(results)
const [error, result] = await tryit(func)(next.item)
results.push({
error,
result: result as K,
index: next.index
})
const processor =
(queueIndex: number) =>
async (res: (value: WorkItemResult<K>[]) => void) => {
const results: WorkItemResult<K>[] = []
while (true) {
const next = work.pop()
if (!next) return res(results)
const [error, result] = await tryit(func)(next.item, queueIndex)
results.push({
error,
result: result as K,
index: next.index
})
}
}
}
// Create queues
const queues = list(1, limit).map(() => new Promise(processor))
const queues = list(1, limit).map(
queueIndex => new Promise(processor(queueIndex - 1))
)
// Wait for all queues to complete
const itemResults = (await Promise.all(queues)) as WorkItemResult<K>[][]
const [errors, results] = fork(
Expand Down
13 changes: 13 additions & 0 deletions src/tests/async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,19 @@ describe('async module', () => {
})
assert.deepEqual(Math.max(...tracking), 3)
})
test('returns queue index in the async function', async () => {
const [errors, results] = await _.try(async () => {
return _.parallel(5, _.list(1, 5), async (num, index) => {
await _.sleep(1000)
return `q_${index}`
})
})()
assert.isUndefined(errors)
assert.deepEqual(
new Set(results),
new Set(['q_0', 'q_1', 'q_2', 'q_3', 'q_4'])
)
})
})

describe('_.retry', () => {
Expand Down

0 comments on commit b2b9816

Please sign in to comment.