Skip to content

Commit

Permalink
fix: 🐛 make concurrency() decorator apply per class instance
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Mar 19, 2024
1 parent c50e19f commit 839c898
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 6 deletions.
59 changes: 59 additions & 0 deletions src/__tests__/concurrencyDecorator.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,65 @@ test('can execute one function with limit 10', async () => {
expect(res).toStrictEqual([123, 456, 1]);
});

test(`limits concurrency separately per method`, async () => {
const resA: number[] = [];
const resB: number[] = [];
class A {
@concurrency(2)
async a(timeout: number) {
await tick(timeout);
resA.push(timeout);
}
@concurrency(3)
async b(timeout: number) {
await tick(timeout);
resB.push(timeout);
}
}
const promises: Promise<any>[] = [];
const a = new A();
promises.push(a.a(3));
promises.push(a.a(4));
promises.push(a.a(10));
promises.push(a.b(2));
promises.push(a.b(3));
promises.push(a.b(4));
await tick(5);
expect(resA).toStrictEqual([3, 4]);
expect(resB).toStrictEqual([2, 3, 4]);
await Promise.all(promises);
expect(resA).toStrictEqual([3, 4, 10]);
expect(resB).toStrictEqual([2, 3, 4]);
});

test(`limits concurrency separately per instance`, async () => {
const resA: number[] = [];
const resB: number[] = [];
class A {
constructor (protected res: number[]) {}
@concurrency(2)
async a(timeout: number) {
await tick(timeout);
this.res.push(timeout);
}
}
const promises: Promise<any>[] = [];
const a = new A(resA);
const b = new A(resB);
promises.push(a.a(4));
promises.push(a.a(4));
promises.push(a.a(10));
promises.push(b.a(3));
promises.push(b.a(3));
promises.push(b.a(11));
await tick(5);
expect(resA).toStrictEqual([4, 4]);
expect(resB).toStrictEqual([3, 3]);
await Promise.all(promises);
expect(resA).toStrictEqual([4, 4, 10]);
expect(resB).toStrictEqual([3, 3, 11]);
});

describe('limits concurrency to 1', () => {
for (let i = 0; i < 10; i++) {
test(`${i + 1}`, async () => {
Expand Down
10 changes: 7 additions & 3 deletions src/concurrencyDecorator.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import {concurrency as _concurrency} from './concurrency';

const instances = new WeakMap<any, WeakMap<any, any>>();

/**
* A class method decorator that limits the concurrency of the method to the
* given number of parallel executions. All invocations are queued and executed
* in the order they were called.
*/
export function concurrency<This, Args extends any[], Return>(limit: number) {
return (
target: (this: This, ...args: Args) => Promise<Return>,
fn: (this: This, ...args: Args) => Promise<Return>,
context?: ClassMethodDecoratorContext<This, (this: This, ...args: Args) => Promise<Return>>,
) => {
const limiter = _concurrency(limit);
return async function (this: This, ...args: Args): Promise<Return> {
return limiter(async () => await target.call(this, ...args));
let map = instances.get(this);
if (!map) instances.set(this, map = new WeakMap<any, any>());
if (!map.has(fn)) map.set(fn, _concurrency(limit));
return map.get(fn)!(async () => await fn.call(this, ...args));
};
};
}
3 changes: 0 additions & 3 deletions src/mutex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ import {codeMutex} from './codeMutex';
/**
* Executes only one instance of give code at a time. For parallel calls, it
* returns the result of the ongoing execution.
*
* {@link mutex} can be used as a class method decorator or a higher order
* function.
*/
export function mutex<This, Args extends any[], Return>(
target: (this: This, ...args: Args) => Promise<Return>,
Expand Down

0 comments on commit 839c898

Please sign in to comment.