Skip to content

Commit

Permalink
Abstracting function into a whole class to support clearing of semaph…
Browse files Browse the repository at this point in the history
…ores connected to timeouts
  • Loading branch information
jdalrymple committed Dec 19, 2023
1 parent 9deb498 commit a4d1b1a
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 91 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ jobs:
with:
name: build-artifacts
path: dist
- name: Deploy
- name: Dry Run Deployment
env:
GITHUB_TOKEN: ${{ secrets.GH_TOKEN }}
NPM_TOKEN: ${{ secrets.NPM_TOKEN }}
Expand Down
24 changes: 17 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,25 @@ Acquire a token from the semaphore, thus decrement the number of available execu

Release the semaphore, thus increment the number of free execution slots. If `initFn` is used then the `token` returned by `acquire()` should be given as an argument when calling this function.

#### `createRateLimiter(rptu, { timeUnit, uniformDistribution })`
### Rate Limit

Creates a rate limiter function that blocks with a promise whenever the rate limit is hit and resolves the promise once the call rate is within the limit.
#### Constructor(rate, { interval, uniformDistribution })

Creates a rate limit instance.

| Name | Type | Optional | Default | Description |
| ----------------------------- | ------- | -------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `rptu` | Integer | No | | Number of tasks allowed per `timeUnit` |
| `options.timeUnit` | Integer | Yes | 1000 | Defines the width of the rate limiting window in milliseconds |
| `options.uniformDistribution` | Boolean | Yes | False | Enforces a discrete uniform distribution over time. Setting the `uniformDistribution` option is mainly useful in a situation where the flow of rate limit function calls is continuous and and occurring faster than `timeUnit` (e.g. reading a file) and not enabling it would cause the maximum number of calls to resolve immediately (thus exhaust the limit immediately) and therefore the next bunch of calls would need to wait for `timeUnit` milliseconds. However if the flow is sparse then this option may make the code run slower with no advantages. |
| `rate` | Integer | No | | Number of tasks allowed per `interval` |
| `options.interval` | Integer | Yes | 1000 | Defines the width of the rate limiting window in milliseconds |
| `options.uniformDistribution` | Boolean | Yes | False | Enforces a discrete uniform distribution over time. Setting the `uniformDistribution` option is mainly useful in a situation where the flow of rate limit function calls is continuous and and occurring faster than `interval` (e.g. reading a file) and not enabling it would cause the maximum number of calls to resolve immediately (thus exhaust the limit immediately) and therefore the next bunch of calls would need to wait for `interval` milliseconds. However if the flow is sparse then this option may make the code run slower with no advantages. |

#### `async rateLimit.apply()`

Acquires a semaphore and connects a timeout for its release. If the rate limit is reached, the execution process is halted until an available semaphore is released.

#### `rateLimit.reset()`

Releases all acquired semaphores immediately and resets the timeouts connected to them.

## Examples

Expand Down Expand Up @@ -151,10 +161,10 @@ function foo() {
import { RateLimit } from 'sema4';

async function bar() {
const lim = RateLimit(5); // Limit to 5 tasks per default timeUnit
const rl = new RateLimit(5); // Limit to 5 tasks per default time interval

for (let i = 0; i < n; i++) {
await lim();
await rl.apply();
// Perform some async tasks here...
}
}
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
48 changes: 36 additions & 12 deletions examples/rate-limiting.ts → examples/rate-limiting.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ async function foo() {
console.log('Naive requests per second rate limiting');

const n = 50;
const lim = RateLimit(5);
const rl = new RateLimit(5);
const start = process.hrtime();

for (let i = 0; i < n; i++) {
await lim();
await rl.apply();

process.stdout.write('.');
}
Expand All @@ -23,19 +23,41 @@ async function foo() {
}

async function bar() {
console.log('Custom rate limit time unit');
console.log('Custom rate limit time unit');

const n = 20;
const rl = new RateLimit(5, { interval: 60 * 1000 });
const start = process.hrtime();

for (let i = 0; i < n; i++) {
await rl.limit();
process.stdout.write('.');
}
process.stdout.write('\n');

const hrt = process.hrtime(start);
const elapsed = (hrt[0] * 1000 + hrt[1] / 1e6) / 1000;
const rps = n / (elapsed / 60);
console.log(rps.toFixed(3) + ' req/min');
}

async function baz() {
console.log('Cleaning up rl after usage');

const n = 20;
const lim = RateLimit(5, { timeUnit: 60 * 1000 });
const n = 4;
const rl = new RateLimit(5, { interval: 60 * 1000 });
const start = process.hrtime();
const to = []

for (let i = 0; i < n; i++) {
await lim();
process.stdout.write('s');

process.stdout.write('.');
await rl.limit();

process.stdout.write('. ');
}

process.stdout.write('\n');
rl.reset()

const hrt = process.hrtime(start);
const elapsed = (hrt[0] * 1000 + hrt[1] / 1e6) / 1000;
Expand All @@ -44,15 +66,16 @@ async function bar() {
console.log(rps.toFixed(3) + ' req/min');
}

async function baz() {

async function qux() {
console.log('Uniform distribution of requests over time');

const n = 50;
const lim = RateLimit(5, { uniformDistribution: true });
const rl = new RateLimit(5, { uniformDistribution: true });
const start = process.hrtime();

for (let i = 0; i < n; i++) {
await lim();
await rl.limit();

process.stdout.write('.');
}
Expand All @@ -66,8 +89,9 @@ async function baz() {
console.log(rps.toFixed(3) + ' req/s');
}

foo()
bar()
.then(bar)
.then(qux)
.then(baz)
.catch((e) => console.log(e))
.then(() => console.log('READY'));
23 changes: 12 additions & 11 deletions src/Deque.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ export const MIN_CAPACITY = 4;
export const MAX_CAPACITY = 1073741824;
export const RESIZE_MULTIPLER = 1;

function arrayMove(src: any[], srcIndex: number, dst: any[], dstIndex: number, len: number) {
for (let j = 0; j < len; ++j) {
dst[j + dstIndex] = src[j + srcIndex];
src[j + srcIndex] = void 0;
export function arrayMove(
list: any[],
srcIndex: number,
dstIndex: number,
numberOfElements: number,
) {
for (let j = 0; j < numberOfElements; ++j) {
list[j + dstIndex] = list[j + srcIndex];
list[j + srcIndex] = void 0;
}
}

Expand Down Expand Up @@ -60,9 +65,7 @@ export class Deque<T> {
pop(): T | void {
const length = this._length;

if (length === 0) {
return void 0;
}
if (length === 0) return;

const i = (this._front + length - 1) & (this._capacity - 1);
const ret = this.arr[i];
Expand All @@ -76,9 +79,7 @@ export class Deque<T> {
shift() {
const length = this._length;

if (length === 0) {
return void 0;
}
if (length === 0) return;

const front = this._front;
const ret = this.arr[front];
Expand Down Expand Up @@ -110,7 +111,7 @@ export class Deque<T> {

if (front + length > oldCapacity) {
const moveItemsCount = (front + length) & (oldCapacity - 1);
arrayMove(this.arr, 0, this.arr, oldCapacity, moveItemsCount);
arrayMove(this.arr, 0, oldCapacity, moveItemsCount);
}
}
}
52 changes: 52 additions & 0 deletions src/RateLimit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Sema } from './Sema';

export class RateLimit {
private sema: Sema;

private delay: number;

private timeouts: NodeJS.Timeout[] = [];

constructor(
rate: number,
{
interval = 1000,
uniformDistribution = false,
}: {
interval?: number;
uniformDistribution?: boolean;
} = {},
) {
if (!Number.isInteger(rate) || rate < 0) {
throw new TypeError(
'The rate-limit-per-time-unit (rptu) should be an integer and greater than zero',
);
}

this.sema = new Sema(uniformDistribution ? 1 : rate);
this.delay = uniformDistribution ? interval / rate : interval;
}

async apply(): Promise<NodeJS.Timeout> {
await this.sema.acquire();

const tm = setTimeout(() => this.sema.release(), this.delay);

this.timeouts.push(tm);

return tm;
}

reset(): number {
const timeoutsToClear = this.timeouts.length;

this.timeouts.forEach((t) => {
clearTimeout(t);
this.sema.release();
});

this.timeouts = [];

return timeoutsToClear;
}
}
26 changes: 0 additions & 26 deletions src/Utils.ts

This file was deleted.

2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { Sema } from './Sema';
export { createRateLimiter } from './Utils';
export { RateLimit } from './RateLimit';
22 changes: 21 additions & 1 deletion test/Deque.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
import { Deque, MAX_CAPACITY, getCapacity } from '../src/Deque';
import { Deque, MAX_CAPACITY, arrayMove, getCapacity } from '../src/Deque';

describe('MAX_CAPACITY', () => {
it('should restrict max capacity to 1gb', () => {
expect(MAX_CAPACITY).toBe(1073741824);
});
});

describe('arrayMove', () => {
it('should add to the list starting at the index dstIndex and from the list starting from srcIndex for a numberOfElements number of elements', () => {
const list = [1, 2, 3, 4];

arrayMove(list, 2, 0, 2);

expect(list).toHaveLength(4);
expect(list).toMatchObject([3, 4, undefined, undefined]);
});

it('should clear source array starting from srcIndex for a len number of elements', () => {
const list = [1, 2, 3, 4];

arrayMove(list, 0, 0, 2);

expect(list).toHaveLength(4);
expect(list).toMatchObject([undefined, undefined, 3, 4]);
});
});

describe('getCapacity', () => {
it('should accept a initial capacity for the queue, with a default of 4 if passed value is less than 4', () => {
const d = getCapacity(2);
Expand Down
Loading

0 comments on commit a4d1b1a

Please sign in to comment.