From 2b09b7cd45f5dad132363c0f79b375fa0e71ee48 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Wed, 10 May 2023 11:08:53 +0200 Subject: [PATCH] Replace async-await-queue with mutex based queue (#694) * selectively use supported ts libs for supported browser targets * reenable babel * update readme * fix grammar * add compat action to check es compatibility of build artificats * replace async-await-queue with mutex based queue * rename * Create rude-snails-yell.md * revert clearing queued requests * add snapshot * rename --- .changeset/rude-snails-yell.md | 5 ++ package.json | 1 - src/AsyncQueue.test.ts | 99 ++++++++++++++++++++++++++++++++++ src/AsyncQueue.ts | 57 ++++++++++++++++++++ src/api/SignalClient.ts | 6 +-- yarn.lock | 5 -- 6 files changed, 164 insertions(+), 9 deletions(-) create mode 100644 .changeset/rude-snails-yell.md create mode 100644 src/AsyncQueue.test.ts create mode 100644 src/AsyncQueue.ts diff --git a/.changeset/rude-snails-yell.md b/.changeset/rude-snails-yell.md new file mode 100644 index 0000000000..8a1db65d68 --- /dev/null +++ b/.changeset/rude-snails-yell.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +Replace async-await-queue with mutex based queue diff --git a/package.json b/package.json index 569ac7271b..ee3e987b43 100644 --- a/package.json +++ b/package.json @@ -41,7 +41,6 @@ "compat": "eslint --no-eslintrc --config ./.eslintrc.dist.cjs ./dist/livekit-client.umd.js" }, "dependencies": { - "async-await-queue": "^1.2.1", "events": "^3.3.0", "loglevel": "^1.8.0", "protobufjs": "^7.0.0", diff --git a/src/AsyncQueue.test.ts b/src/AsyncQueue.test.ts new file mode 100644 index 0000000000..ca71ecd408 --- /dev/null +++ b/src/AsyncQueue.test.ts @@ -0,0 +1,99 @@ +import { AsyncQueue } from './AsyncQueue'; +import { sleep } from './room/utils'; + +describe('asyncQueue', () => { + it('runs multiple tasks in order', async () => { + const queue = new AsyncQueue(); + const tasksExecuted: number[] = []; + + for (let i = 0; i < 5; i++) { + queue.run(async () => { + await sleep(50); + tasksExecuted.push(i); + }); + } + await queue.flush(); + expect(tasksExecuted).toMatchObject([0, 1, 2, 3, 4]); + }); + it('runs tasks sequentially and not in parallel', async () => { + const queue = new AsyncQueue(); + const results: number[] = []; + for (let i = 0; i < 5; i++) { + queue.run(async () => { + results.push(i); + await sleep(10); + results.push(i); + }); + } + await queue.flush(); + expect(results).toMatchObject([0, 0, 1, 1, 2, 2, 3, 3, 4, 4]); + }); + it('continues executing tasks if one task throws an error', async () => { + const queue = new AsyncQueue(); + + let task1threw = false; + let task2Executed = false; + + queue + .run(async () => { + await sleep(100); + throw Error('task 1 throws'); + }) + .catch(() => { + task1threw = true; + }); + + await queue + .run(async () => { + task2Executed = true; + }) + .catch(() => { + fail('task 2 should not have thrown'); + }); + + expect(task1threw).toBeTruthy(); + expect(task2Executed).toBeTruthy(); + }); + it('returns the result of the task', async () => { + const queue = new AsyncQueue(); + + const result = await queue.run(async () => { + await sleep(10); + return 'result'; + }); + + expect(result).toBe('result'); + }); + it('returns only when the enqueued task and all previous tasks have completed', async () => { + const queue = new AsyncQueue(); + const tasksExecuted: number[] = []; + for (let i = 0; i < 10; i += 1) { + queue.run(async () => { + await sleep(10); + tasksExecuted.push(i); + return i; + }); + } + + const result = await queue.run(async () => { + await sleep(10); + tasksExecuted.push(999); + return 'result'; + }); + + expect(result).toBe('result'); + expect(tasksExecuted).toMatchObject([...new Array(10).fill(0).map((_, idx) => idx), 999]); + }); + it('can handle queue sizes of up to 10_000 tasks', async () => { + const queue = new AsyncQueue(); + const tasksExecuted: number[] = []; + + for (let i = 0; i < 10_000; i++) { + queue.run(async () => { + tasksExecuted.push(i); + }); + } + await queue.flush(); + expect(tasksExecuted).toMatchObject(new Array(10_000).fill(0).map((_, idx) => idx)); + }); +}); diff --git a/src/AsyncQueue.ts b/src/AsyncQueue.ts new file mode 100644 index 0000000000..1d5302602f --- /dev/null +++ b/src/AsyncQueue.ts @@ -0,0 +1,57 @@ +import { Mutex } from './room/utils'; + +type QueueTask = () => PromiseLike; + +enum QueueTaskStatus { + 'WAITING', + 'RUNNING', + 'COMPLETED', +} + +type QueueTaskInfo = { + id: number; + enqueuedAt: number; + executedAt?: number; + status: QueueTaskStatus; +}; + +export class AsyncQueue { + private pendingTasks: Map; + + private taskMutex: Mutex; + + private nextTaskIndex: number; + + constructor() { + this.pendingTasks = new Map(); + this.taskMutex = new Mutex(); + this.nextTaskIndex = 0; + } + + async run(task: QueueTask) { + const taskInfo: QueueTaskInfo = { + id: this.nextTaskIndex++, + enqueuedAt: Date.now(), + status: QueueTaskStatus.WAITING, + }; + this.pendingTasks.set(taskInfo.id, taskInfo); + const unlock = await this.taskMutex.lock(); + try { + taskInfo.executedAt = Date.now(); + taskInfo.status = QueueTaskStatus.RUNNING; + return await task(); + } finally { + taskInfo.status = QueueTaskStatus.COMPLETED; + this.pendingTasks.delete(taskInfo.id); + unlock(); + } + } + + async flush() { + return this.run(async () => {}); + } + + snapshot() { + return Array.from(this.pendingTasks.values()); + } +} diff --git a/src/api/SignalClient.ts b/src/api/SignalClient.ts index 8adc85462c..d9e300ac50 100644 --- a/src/api/SignalClient.ts +++ b/src/api/SignalClient.ts @@ -1,5 +1,5 @@ -import Queue from 'async-await-queue'; import 'webrtc-adapter'; +import { AsyncQueue } from '../AsyncQueue'; import log from '../logger'; import { ClientInfo, @@ -87,7 +87,7 @@ export class SignalClient { isReconnecting: boolean; - requestQueue: Queue; + requestQueue: AsyncQueue; queuedRequests: Array<() => Promise>; @@ -154,7 +154,7 @@ export class SignalClient { this.isConnected = false; this.isReconnecting = false; this.useJSON = useJSON; - this.requestQueue = new Queue(); + this.requestQueue = new AsyncQueue(); this.queuedRequests = []; this.closingLock = new Mutex(); } diff --git a/yarn.lock b/yarn.lock index 8e9fec097a..38c1a2337c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2616,11 +2616,6 @@ arrify@^1.0.1: resolved "https://registry.npmjs.org/arrify/-/arrify-1.0.1.tgz" integrity sha512-3CYzex9M9FGQjCGMGyi6/31c8GJbgb0qGyrx5HWxPd0aCwh4cB2YjMb2Xf9UuoogrMrlO9cTqnB5rI5GHZTcUA== -async-await-queue@^1.2.1: - version "1.2.1" - resolved "https://registry.npmjs.org/async-await-queue/-/async-await-queue-1.2.1.tgz" - integrity sha512-v2j+/EMzAnuJZ8I4570KJMFhi6G9g3WZyFh6cPnmQSJh3nLao77XpRt01kyFegQazPSKvue1yaIYDp/NfV/b0g== - async@^3.2.4: version "3.2.4" resolved "https://registry.npmjs.org/async/-/async-3.2.4.tgz"