Skip to content

Commit

Permalink
Replace async-await-queue with mutex based queue (#694)
Browse files Browse the repository at this point in the history
* selectively use supported ts libs for supported browser targets

* reenable babel

* update readme

* fix grammar

* add compat action to check es compatibility of build artificats

* replace async-await-queue with mutex based queue

* rename

* Create rude-snails-yell.md

* revert clearing queued requests

* add snapshot

* rename
  • Loading branch information
lukasIO committed May 10, 2023
1 parent 41d9002 commit 2b09b7c
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 9 deletions.
5 changes: 5 additions & 0 deletions .changeset/rude-snails-yell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-client": patch
---

Replace async-await-queue with mutex based queue
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
"compat": "eslint --no-eslintrc --config ./.eslintrc.dist.cjs ./dist/livekit-client.umd.js"
},
"dependencies": {
"async-await-queue": "^1.2.1",
"events": "^3.3.0",
"loglevel": "^1.8.0",
"protobufjs": "^7.0.0",
Expand Down
99 changes: 99 additions & 0 deletions src/AsyncQueue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { AsyncQueue } from './AsyncQueue';
import { sleep } from './room/utils';

describe('asyncQueue', () => {
it('runs multiple tasks in order', async () => {
const queue = new AsyncQueue();
const tasksExecuted: number[] = [];

for (let i = 0; i < 5; i++) {
queue.run(async () => {
await sleep(50);
tasksExecuted.push(i);
});
}
await queue.flush();
expect(tasksExecuted).toMatchObject([0, 1, 2, 3, 4]);
});
it('runs tasks sequentially and not in parallel', async () => {
const queue = new AsyncQueue();
const results: number[] = [];
for (let i = 0; i < 5; i++) {
queue.run(async () => {
results.push(i);
await sleep(10);
results.push(i);
});
}
await queue.flush();
expect(results).toMatchObject([0, 0, 1, 1, 2, 2, 3, 3, 4, 4]);
});
it('continues executing tasks if one task throws an error', async () => {
const queue = new AsyncQueue();

let task1threw = false;
let task2Executed = false;

queue
.run(async () => {
await sleep(100);
throw Error('task 1 throws');
})
.catch(() => {
task1threw = true;
});

await queue
.run(async () => {
task2Executed = true;
})
.catch(() => {
fail('task 2 should not have thrown');
});

expect(task1threw).toBeTruthy();
expect(task2Executed).toBeTruthy();
});
it('returns the result of the task', async () => {
const queue = new AsyncQueue();

const result = await queue.run(async () => {
await sleep(10);
return 'result';
});

expect(result).toBe('result');
});
it('returns only when the enqueued task and all previous tasks have completed', async () => {
const queue = new AsyncQueue();
const tasksExecuted: number[] = [];
for (let i = 0; i < 10; i += 1) {
queue.run(async () => {
await sleep(10);
tasksExecuted.push(i);
return i;
});
}

const result = await queue.run(async () => {
await sleep(10);
tasksExecuted.push(999);
return 'result';
});

expect(result).toBe('result');
expect(tasksExecuted).toMatchObject([...new Array(10).fill(0).map((_, idx) => idx), 999]);
});
it('can handle queue sizes of up to 10_000 tasks', async () => {
const queue = new AsyncQueue();
const tasksExecuted: number[] = [];

for (let i = 0; i < 10_000; i++) {
queue.run(async () => {
tasksExecuted.push(i);
});
}
await queue.flush();
expect(tasksExecuted).toMatchObject(new Array(10_000).fill(0).map((_, idx) => idx));
});
});
57 changes: 57 additions & 0 deletions src/AsyncQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { Mutex } from './room/utils';

type QueueTask<T> = () => PromiseLike<T>;

enum QueueTaskStatus {
'WAITING',
'RUNNING',
'COMPLETED',
}

type QueueTaskInfo = {
id: number;
enqueuedAt: number;
executedAt?: number;
status: QueueTaskStatus;
};

export class AsyncQueue {
private pendingTasks: Map<number, QueueTaskInfo>;

private taskMutex: Mutex;

private nextTaskIndex: number;

constructor() {
this.pendingTasks = new Map();
this.taskMutex = new Mutex();
this.nextTaskIndex = 0;
}

async run<T>(task: QueueTask<T>) {
const taskInfo: QueueTaskInfo = {
id: this.nextTaskIndex++,
enqueuedAt: Date.now(),
status: QueueTaskStatus.WAITING,
};
this.pendingTasks.set(taskInfo.id, taskInfo);
const unlock = await this.taskMutex.lock();
try {
taskInfo.executedAt = Date.now();
taskInfo.status = QueueTaskStatus.RUNNING;
return await task();
} finally {
taskInfo.status = QueueTaskStatus.COMPLETED;
this.pendingTasks.delete(taskInfo.id);
unlock();
}
}

async flush() {
return this.run(async () => {});
}

snapshot() {
return Array.from(this.pendingTasks.values());
}
}
6 changes: 3 additions & 3 deletions src/api/SignalClient.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Queue from 'async-await-queue';
import 'webrtc-adapter';
import { AsyncQueue } from '../AsyncQueue';
import log from '../logger';
import {
ClientInfo,
Expand Down Expand Up @@ -87,7 +87,7 @@ export class SignalClient {

isReconnecting: boolean;

requestQueue: Queue;
requestQueue: AsyncQueue;

queuedRequests: Array<() => Promise<void>>;

Expand Down Expand Up @@ -154,7 +154,7 @@ export class SignalClient {
this.isConnected = false;
this.isReconnecting = false;
this.useJSON = useJSON;
this.requestQueue = new Queue();
this.requestQueue = new AsyncQueue();
this.queuedRequests = [];
this.closingLock = new Mutex();
}
Expand Down
5 changes: 0 additions & 5 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2616,11 +2616,6 @@ arrify@^1.0.1:
resolved "https://registry.npmjs.org/arrify/-/arrify-1.0.1.tgz"
integrity sha512-3CYzex9M9FGQjCGMGyi6/31c8GJbgb0qGyrx5HWxPd0aCwh4cB2YjMb2Xf9UuoogrMrlO9cTqnB5rI5GHZTcUA==

async-await-queue@^1.2.1:
version "1.2.1"
resolved "https://registry.npmjs.org/async-await-queue/-/async-await-queue-1.2.1.tgz"
integrity sha512-v2j+/EMzAnuJZ8I4570KJMFhi6G9g3WZyFh6cPnmQSJh3nLao77XpRt01kyFegQazPSKvue1yaIYDp/NfV/b0g==

async@^3.2.4:
version "3.2.4"
resolved "https://registry.npmjs.org/async/-/async-3.2.4.tgz"
Expand Down

0 comments on commit 2b09b7c

Please sign in to comment.