Skip to content

Commit

Permalink
@uppy/utils: migrate RateLimitedQueue to TS (#4981)
Browse files Browse the repository at this point in the history
Co-authored-by: Antoine du Hamel <antoine@transloadit.com>
  • Loading branch information
Murderlon and aduh95 committed Mar 7, 2024
1 parent d6ae3b4 commit 49bd8cb
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 50 deletions.
4 changes: 2 additions & 2 deletions packages/@uppy/tus/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ export default class Tus<M extends Meta, B extends Body> extends BasePlugin<

// Create a new tus upload
return new Promise<tus.Upload | string>((resolve, reject) => {
let queuedRequest: RateLimitedQueue.QueueEntry
let qRequest: () => void
let queuedRequest: ReturnType<RateLimitedQueue['run']>
let qRequest: () => () => void
let upload: tus.Upload

const opts = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { describe, expect, it } from 'vitest'
import { RateLimitedQueue } from './RateLimitedQueue.js'
import { RateLimitedQueue } from './RateLimitedQueue.ts'
import delay from './delay.ts'

describe('RateLimitedQueue', () => {
let pending = 0
function fn() {
async function fn() {
pending++
return delay(15).then(() => pending--)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,47 +1,71 @@
function createCancelError (cause) {
function createCancelError(cause?: string) {
return new Error('Cancelled', { cause })
}

function abortOn (signal) {
function abortOn(
this: { abort: (cause: string) => void; then?: Promise<any>['then'] },
signal?: AbortSignal,
) {
if (signal != null) {
const abortPromise = () => this.abort(signal.reason)
signal.addEventListener('abort', abortPromise, { once: true })
const removeAbortListener = () => { signal.removeEventListener('abort', abortPromise) }
const removeAbortListener = () => {
signal.removeEventListener('abort', abortPromise)
}
this.then?.(removeAbortListener, removeAbortListener)
}

return this
}

type Handler = {
shouldBeRequeued?: boolean
fn: () => (...args: any[]) => Promise<void> | void
priority: number
abort: (cause?: unknown) => void
done: () => void
}

type QueueOptions = {
priority?: number
}

interface AbortablePromise<T> extends Promise<T> {
abort(cause?: unknown): void
abortOn: typeof abortOn
}

export class RateLimitedQueue {
#activeRequests = 0

#queuedHandlers = []
#queuedHandlers: Handler[] = []

#paused = false

#pauseTimer
#pauseTimer: ReturnType<typeof setTimeout>

#downLimit = 1

#upperLimit
#upperLimit: number

#rateLimitingTimer: ReturnType<typeof setTimeout>

#rateLimitingTimer
limit: number

constructor (limit) {
constructor(limit?: number) {
if (typeof limit !== 'number' || limit === 0) {
this.limit = Infinity
} else {
this.limit = limit
}
}

#call (fn) {
#call(fn: Handler['fn']) {
this.#activeRequests += 1

let done = false

let cancelActive
let cancelActive: (cause?: unknown) => void
try {
cancelActive = fn()
} catch (err) {
Expand All @@ -50,7 +74,7 @@ export class RateLimitedQueue {
}

return {
abort: (cause) => {
abort: (cause?: unknown) => {
if (done) return
done = true
this.#activeRequests -= 1
Expand All @@ -67,14 +91,14 @@ export class RateLimitedQueue {
}
}

#queueNext () {
#queueNext() {
// Do it soon but not immediately, this allows clearing out the entire queue synchronously
// one by one without continuously _advancing_ it (and starting new tasks before immediately
// aborting them)
queueMicrotask(() => this.#next())
}

#next () {
#next() {
if (this.#paused || this.#activeRequests >= this.limit) {
return
}
Expand All @@ -86,20 +110,25 @@ export class RateLimitedQueue {
// so that cancelling it does the Right Thing (and doesn't just try
// to dequeue an already-running request).
const next = this.#queuedHandlers.shift()
if (next == null) {
throw new Error('Invariant violation: next is null')
}
const handler = this.#call(next.fn)
next.abort = handler.abort
next.done = handler.done
}

#queue (fn, options = {}) {
const handler = {
#queue(fn: Handler['fn'], options?: QueueOptions) {
const handler: Handler = {
fn,
priority: options.priority || 0,
priority: options?.priority || 0,
abort: () => {
this.#dequeue(handler)
},
done: () => {
throw new Error('Cannot mark a queued request as done: this indicates a bug')
throw new Error(
'Cannot mark a queued request as done: this indicates a bug',
)
},
}

Expand All @@ -114,22 +143,27 @@ export class RateLimitedQueue {
return handler
}

#dequeue (handler) {
#dequeue(handler: Handler) {
const index = this.#queuedHandlers.indexOf(handler)
if (index !== -1) {
this.#queuedHandlers.splice(index, 1)
}
}

run (fn, queueOptions) {
run(
fn: Handler['fn'],
queueOptions?: QueueOptions,
): Handler | Omit<Handler, 'fn' | 'priority'> {
if (!this.#paused && this.#activeRequests < this.limit) {
return this.#call(fn)
}
return this.#queue(fn, queueOptions)
}

wrapSyncFunction (fn, queueOptions) {
return (...args) => {
wrapSyncFunction(fn: () => void, queueOptions: QueueOptions) {
return (
...args: Parameters<Handler['fn']>
): { abortOn: typeof abortOn; abort: Handler['abort'] } => {
const queuedRequest = this.run(() => {
fn(...args)
queueMicrotask(() => queuedRequest.done())
Expand All @@ -138,47 +172,53 @@ export class RateLimitedQueue {

return {
abortOn,
abort () {
abort() {
queuedRequest.abort()
},
}
}
}

wrapPromiseFunction (fn, queueOptions) {
return (...args) => {
let queuedRequest
wrapPromiseFunction<T extends (...args: any[]) => any>(
fn: T,
queueOptions?: QueueOptions,
) {
return (...args: Parameters<T>): AbortablePromise<ReturnType<T>> => {
let queuedRequest: ReturnType<RateLimitedQueue['run']>
const outerPromise = new Promise((resolve, reject) => {
queuedRequest = this.run(() => {
let cancelError
let cancelError: ReturnType<typeof createCancelError>
let innerPromise
try {
innerPromise = Promise.resolve(fn(...args))
} catch (err) {
innerPromise = Promise.reject(err)
}

innerPromise.then((result) => {
if (cancelError) {
reject(cancelError)
} else {
queuedRequest.done()
resolve(result)
}
}, (err) => {
if (cancelError) {
reject(cancelError)
} else {
queuedRequest.done()
reject(err)
}
})
innerPromise.then(
(result) => {
if (cancelError) {
reject(cancelError)
} else {
queuedRequest.done()
resolve(result)
}
},
(err) => {
if (cancelError) {
reject(cancelError)
} else {
queuedRequest.done()
reject(err)
}
},
)

return (cause) => {
cancelError = createCancelError(cause)
}
}, queueOptions)
})
}) as AbortablePromise<ReturnType<T>>

outerPromise.abort = (cause) => {
queuedRequest.abort(cause)
Expand All @@ -189,7 +229,7 @@ export class RateLimitedQueue {
}
}

resume () {
resume(): void {
this.#paused = false
clearTimeout(this.#pauseTimer)
for (let i = 0; i < this.limit; i++) {
Expand All @@ -205,7 +245,7 @@ export class RateLimitedQueue {
* @param {number | null } [duration] Duration for the pause to happen, in milliseconds.
* If omitted, the queue won't resume automatically.
*/
pause (duration = null) {
pause(duration: number | null = null): void {
this.#paused = true
clearTimeout(this.#pauseTimer)
if (duration != null) {
Expand All @@ -223,7 +263,7 @@ export class RateLimitedQueue {
*
* @param {number} duration in milliseconds.
*/
rateLimit (duration) {
rateLimit(duration: number): void {
clearTimeout(this.#rateLimitingTimer)
this.pause(duration)
if (this.limit > 1 && Number.isFinite(this.limit)) {
Expand All @@ -250,7 +290,9 @@ export class RateLimitedQueue {
}
}

get isPaused () { return this.#paused }
get isPaused(): boolean {
return this.#paused
}
}

export const internalRateLimitedQueue = Symbol('__queue')

0 comments on commit 49bd8cb

Please sign in to comment.