Skip to content

Commit

Permalink
feat(tasks): series and parallel can take a task array or record as a…
Browse files Browse the repository at this point in the history
… first argument
  • Loading branch information
rafamel committed Feb 19, 2021
1 parent 55c6cfd commit 8ab3802
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 21 deletions.
33 changes: 33 additions & 0 deletions src/helpers/flatten.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Task } from '../definitions';
import { context } from '../tasks/transform/context';
import { Members, Empty, TypeGuard } from 'type-core';
import { into } from 'pipettes';

export function flatten(
task?: Task | Empty | Array<Task | Empty> | Members<Task | Empty>,
...tasks: Array<Task | Empty>
): Task[] {
return into(
null,
() => {
if (!task) return [];
if (!TypeGuard.isObject(task)) return [task];
return Object.entries(task || {}).map(([key, task]): Task | Empty => {
return task
? context(
(ctx) => ({
...ctx,
route: ctx.route.concat(key)
}),
task
)
: null;
});
},
(arr) => {
return arr
.concat(tasks)
.filter((task): task is Task.Async => Boolean(task));
}
);
}
27 changes: 15 additions & 12 deletions src/tasks/aggregate/parallel.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { Task, Context } from '../../definitions';
import { flatten } from '../../helpers/flatten';
import { run } from '../../utils/run';
import { log } from '../stdio/log';
import { Empty, NullaryFn } from 'type-core';
import { Empty, NullaryFn, Members } from 'type-core';
import { into } from 'pipettes';

export function parallel(...tasks: Array<Task | Empty>): Task.Async {
export function parallel(
task?: Task | Empty | Array<Task | Empty> | Members<Task | Empty>,
...tasks: Array<Task | Empty>
): Task.Async {
const items = flatten(task, ...tasks);

return async (ctx: Context): Promise<void> => {
into(ctx, log('debug', 'Run tasks in parallel'));

Expand All @@ -20,16 +26,13 @@ export function parallel(...tasks: Array<Task | Empty>): Task.Async {

try {
await Promise.all(
tasks.map((task, i) => {
return task
? run(task, {
...ctx,
route: ctx.route.concat(i),
cancellation: new Promise((resolve) => {
cbs.push(resolve);
})
})
: Promise.resolve();
items.map((task) => {
return run(task, {
...ctx,
cancellation: new Promise((resolve) => {
cbs.push(resolve);
})
});
})
);
} catch (err) {
Expand Down
19 changes: 10 additions & 9 deletions src/tasks/aggregate/series.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import { Task, Context } from '../../definitions';
import { flatten } from '../../helpers/flatten';
import { isCancelled } from '../../utils/is-cancelled';
import { run } from '../../utils/run';
import { log } from '../stdio/log';
import { Empty } from 'type-core';
import { Empty, Members } from 'type-core';
import { into } from 'pipettes';

export function series(...tasks: Array<Task | Empty>): Task.Async {
export function series(
task?: Task | Empty | Array<Task | Empty> | Members<Task | Empty>,
...tasks: Array<Task | Empty>
): Task.Async {
const items = flatten(task, ...tasks);

return async (ctx: Context): Promise<void> => {
into(ctx, log('debug', 'Run tasks in series'));

for (let i = 0; i < tasks.length; i++) {
for (const task of items) {
if (await isCancelled(ctx)) break;

const task = tasks[i];

if (task) {
await run(task, { ...ctx, route: ctx.route.concat(i) });
}
await run(task, ctx);
}
};
}

0 comments on commit 8ab3802

Please sign in to comment.