Skip to content

Commit

Permalink
refactor: create new module structure
Browse files Browse the repository at this point in the history
BREAKING CHANGE: applyEach now always return a function
BREAKING CHANGE: Queue constructor now takes options instead of list
BREAKING CHANGE: Queue 'length', 'running' and 'idle' now are properties
BREAKING CHANGE: removing Queues factory functions
  • Loading branch information
SomaticIT committed Nov 8, 2020
1 parent 1b75405 commit d6fea97
Show file tree
Hide file tree
Showing 155 changed files with 7,779 additions and 4,390 deletions.
72 changes: 66 additions & 6 deletions index.ts
@@ -1,6 +1,66 @@
export * from "./src/collections";
export * from "./src/flow";
export * from "./src/nextTick";
export * from "./src/ProgressPromise";
export * from "./src/queue";
export * from "./src/utils";
export * from "./lib/_types";

export { default as apply } from "./lib/apply";
export { default as applyEach } from "./lib/applyEach";
export { default as applyEachSeries } from "./lib/applyEachSeries";
export { default as applyOn } from "./lib/applyOn";
export { default as cbpromisify } from "./lib/cbpromisify";
export { default as compose } from "./lib/compose";
export { default as concat } from "./lib/concat";
export { default as concatSeries } from "./lib/concatSeries";
export { default as defer } from "./lib/defer";
export { default as denodify } from "./lib/denodify";
export { default as dir } from "./lib/dir";
export { default as doUntil } from "./lib/doUntil";
export { default as doWhilst } from "./lib/doWhilst";
export { default as each } from "./lib/each";
export { default as eachLimit } from "./lib/eachLimit";
export { default as eachSeries } from "./lib/eachSeries";
export { default as every } from "./lib/every";
export { default as exec } from "./lib/exec";
export { default as execOn } from "./lib/execOn";
export { default as filter } from "./lib/filter";
export { default as filterSeries } from "./lib/filterSeries";
export { default as find } from "./lib/find";
export { default as findSeries } from "./lib/findSeries";
export { default as forEach } from "./lib/forEach";
export { default as forEachLimit } from "./lib/forEachLimit";
export { default as forEachSeries } from "./lib/forEachSeries";
export { default as forever } from "./lib/forever";
export { default as immediate } from "./lib/immediate";
export { default as log } from "./lib/log";
export { default as map } from "./lib/map";
export { default as mapLimit } from "./lib/mapLimit";
export { default as mapSeries } from "./lib/mapSeries";
export { default as memoize } from "./lib/memoize";
export { default as nextTick } from "./lib/nextTick";
export { default as parallel } from "./lib/parallel";
export { default as parallelLimit } from "./lib/parallelLimit";
export { default as partial } from "./lib/partial";
export { default as partialOn } from "./lib/partialOn";
export { default as PriorityQueue } from "./lib/PriorityQueue";
export { default as PriorityTaskQueue } from "./lib/PriorityTaskQueue";
export { default as ProgressPromise } from "./lib/ProgressPromise";
export { default as promisify } from "./lib/promisify";
export { default as Queue } from "./lib/Queue";
export { default as QueueError } from "./lib/QueueError";
export { default as reduce } from "./lib/reduce";
export { default as reduceRight } from "./lib/reduceRight";
export { default as reject } from "./lib/reject";
export { default as rejectSeries } from "./lib/rejectSeries";
export { default as resolve } from "./lib/resolve";
export { default as retry } from "./lib/retry";
export { default as seq } from "./lib/seq";
export { default as series } from "./lib/series";
export { default as some } from "./lib/some";
export { default as sortBy } from "./lib/sortBy";
export { default as tap } from "./lib/tap";
export { default as tapOn } from "./lib/tapOn";
export { default as TaskQueue } from "./lib/TaskQueue";
export { default as timeout } from "./lib/timeout";
export { default as times } from "./lib/times";
export { default as timesSeries } from "./lib/timesSeries";
export { default as uncallbackify } from "./lib/uncallbackify";
export { default as until } from "./lib/until";
export { default as waterfall } from "./lib/waterfall";
export { default as whilst } from "./lib/whilst";
6 changes: 5 additions & 1 deletion jest.config.js
Expand Up @@ -2,6 +2,10 @@ module.exports = {
preset: "ts-jest",
testEnvironment: "node",
testMatch: [
"tests/**/*.spec.ts"
"<rootDir>/tests/**/*.spec.ts"
],
collectCoverageFrom: [
"*.ts",
"lib/**/*.ts"
]
};
121 changes: 121 additions & 0 deletions lib/PriorityQueue.ts
@@ -0,0 +1,121 @@
import type { Deferred, QueueOptions } from "./_types";
import type { QueueItem, QueueWorker } from "./_internal";

import Queue from "./Queue";
import defer from "./defer";


export default class PriorityQueue<T, U> extends Queue<T, U> {
public defaultPriority = 1;

/**
* Creates a new PriorityQueue.
*
* @param worker The worker function to apply on each item in PriorityQueue
* @param limit The maximum number of concurrent workers to launch
* @param options The options for the PriorityQueue
*/
constructor(worker: QueueWorker<T, U>, limit?: number, options?: QueueOptions) {
super(worker, limit, options);
}

public push(priority: number, data?: T): Promise<U>;
public push(priority: number, datas: T[]): Promise<U[]>;
public push(priority: number, ...datas: T[]): Promise<U[]>;
public push(data: T): Promise<U>;
public push(datas: T[]): Promise<U[]>;
public push(...datas: T[]): Promise<U[]>;
public push(...datas: any[]): Promise<U | U[]> {
let priority = this.defaultPriority;
if (typeof datas[0] === "number" && datas.length > 1) {
priority = datas.shift();
}

if (datas.length === 1 && Array.isArray(datas[0])) {
datas = datas[0];
}

return this.insertAt(datas, priority);
}

public unshift(priority: number, data?: T): Promise<U>;
public unshift(priority: number, datas: T[]): Promise<U[]>;
public unshift(priority: number, ...datas: T[]): Promise<U[]>;
public unshift(data: T): Promise<U>;
public unshift(datas: T[]): Promise<U[]>;
public unshift(...datas: T[]): Promise<U[]>;
public unshift(...datas: any[]): Promise<U | U[]> {
let priority = this.defaultPriority;
if (typeof datas[0] === "number") {
priority = datas.shift();
}

if (datas.length === 1 && Array.isArray(datas[0])) {
datas = datas[0];
}

return this.insertAt(datas, priority);
}

private insertAt(datas: T[], priority: number): Promise<U | U[]> {
const length = datas.length;
if (length === 0) {
return Promise.resolve([]);
}

const index = this.binarySearch(this.items, { priority }, this.compareTasks) + 1;

const dfd = defer<U | U[]>();
if (!this.started) {
this.started = true;
}

const iterator = createIterator(length, dfd, priority);
this.items.splice(index, 0, ...datas.map(iterator, this));

if (this.onsaturated && this.items.length >= this.limit) {
this.onsaturated();
}

for (let i = this.limit; i > 0; i--) {
this.process();
}

return dfd.promise;


function createIterator(count: number, dfd: Deferred<U | U[]>, priority: number): (data: T) => QueueItem<T, U> {
const errors: any[] = [];
const results: U[] = [];

return function (this: PriorityQueue<T, U>, data: T): QueueItem<T, U> {
const item = this.createItem(data, results, errors, count, dfd.resolve, dfd.reject);
item.priority = priority;
return item;
};
}
}

private binarySearch(seq: Array<QueueItem<T, U>>, item: { priority?: number }, compare: (a: { priority?: number }, b: { priority?: number }) => number): number {
let beg = -1;
let end = seq.length - 1;

let mid: number;
while (beg < end) {
mid = beg + ((end - beg + 1) >>> 1);

if (compare(item, seq[mid]) >= 0) {
beg = mid;
}
else {
end = mid - 1;
}
}

return beg;
}

private compareTasks(a: { priority?: number }, b: { priority?: number }): number {
return (a.priority || 0) - (b.priority || 0);
}
}
17 changes: 17 additions & 0 deletions lib/PriorityTaskQueue.ts
@@ -0,0 +1,17 @@
import type { AsyncTask, QueueOptions } from "./_types";

import PriorityQueue from "./PriorityQueue";

export default class PriorityTaskQueue<T> extends PriorityQueue<AsyncTask<T>, T> {

/**
* Creates a new PriorityTaskQueue.
*
* @param limit The maximum number of concurrent tasks to launch
* @param options The options for the PriorityTaskQueue
*/
constructor(limit?: number, options?: QueueOptions) {
super(item => item(), limit, options);
}

}

0 comments on commit d6fea97

Please sign in to comment.