Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@uppy/utils: migrate RateLimitedQueue to TS #4981

Merged
merged 5 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/@uppy/tus/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ export default class Tus<M extends Meta, B extends Body> extends BasePlugin<

// Create a new tus upload
return new Promise<tus.Upload | string>((resolve, reject) => {
let queuedRequest: RateLimitedQueue.QueueEntry
let qRequest: () => void
let queuedRequest: ReturnType<RateLimitedQueue['run']>
let qRequest: () => () => void
let upload: tus.Upload

const opts = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { describe, expect, it } from 'vitest'
import { RateLimitedQueue } from './RateLimitedQueue.js'
import { RateLimitedQueue } from './RateLimitedQueue.ts'
import delay from './delay.ts'

describe('RateLimitedQueue', () => {
let pending = 0
function fn() {
async function fn() {
pending++
return delay(15).then(() => pending--)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,47 +1,71 @@
function createCancelError (cause) {
function createCancelError(cause?: string) {
return new Error('Cancelled', { cause })
}

function abortOn (signal) {
function abortOn(
this: { abort: (cause: string) => void; then?: Promise<any>['then'] },
signal?: AbortSignal,
) {
if (signal != null) {
const abortPromise = () => this.abort(signal.reason)
signal.addEventListener('abort', abortPromise, { once: true })
const removeAbortListener = () => { signal.removeEventListener('abort', abortPromise) }
const removeAbortListener = () => {
signal.removeEventListener('abort', abortPromise)
}
this.then?.(removeAbortListener, removeAbortListener)
}

return this
}

type Handler = {
shouldBeRequeued?: boolean
fn: () => (...args: any[]) => Promise<void> | void
priority: number
abort: (cause?: unknown) => void
done: () => void
}

type QueueOptions = {
priority?: number
}

interface AbortablePromise<T> extends Promise<T> {
abort(cause?: unknown): void
abortOn: typeof abortOn
}

export class RateLimitedQueue {
#activeRequests = 0

#queuedHandlers = []
#queuedHandlers: Handler[] = []

#paused = false

#pauseTimer
#pauseTimer: ReturnType<typeof setTimeout>

#downLimit = 1

#upperLimit
#upperLimit: number

#rateLimitingTimer: ReturnType<typeof setTimeout>

#rateLimitingTimer
limit: number

constructor (limit) {
constructor(limit?: number) {
if (typeof limit !== 'number' || limit === 0) {
this.limit = Infinity
} else {
this.limit = limit
}
}

#call (fn) {
#call(fn: Handler['fn']) {
this.#activeRequests += 1

let done = false

let cancelActive
let cancelActive: (cause?: unknown) => void
try {
cancelActive = fn()
} catch (err) {
Expand All @@ -50,7 +74,7 @@ export class RateLimitedQueue {
}

return {
abort: (cause) => {
abort: (cause?: unknown) => {
if (done) return
done = true
this.#activeRequests -= 1
Expand All @@ -67,14 +91,14 @@ export class RateLimitedQueue {
}
}

#queueNext () {
#queueNext() {
// Do it soon but not immediately, this allows clearing out the entire queue synchronously
// one by one without continuously _advancing_ it (and starting new tasks before immediately
// aborting them)
queueMicrotask(() => this.#next())
}

#next () {
#next() {
if (this.#paused || this.#activeRequests >= this.limit) {
return
}
Expand All @@ -86,20 +110,25 @@ export class RateLimitedQueue {
// so that cancelling it does the Right Thing (and doesn't just try
// to dequeue an already-running request).
const next = this.#queuedHandlers.shift()
if (next == null) {
throw new Error('Invariant violation: next is null')
}
const handler = this.#call(next.fn)
next.abort = handler.abort
next.done = handler.done
}

#queue (fn, options = {}) {
const handler = {
#queue(fn: Handler['fn'], options?: QueueOptions) {
const handler: Handler = {
fn,
priority: options.priority || 0,
priority: options?.priority || 0,
abort: () => {
this.#dequeue(handler)
},
done: () => {
throw new Error('Cannot mark a queued request as done: this indicates a bug')
throw new Error(
'Cannot mark a queued request as done: this indicates a bug',
)
},
}

Expand All @@ -114,22 +143,27 @@ export class RateLimitedQueue {
return handler
}

#dequeue (handler) {
#dequeue(handler: Handler) {
const index = this.#queuedHandlers.indexOf(handler)
if (index !== -1) {
this.#queuedHandlers.splice(index, 1)
}
}

run (fn, queueOptions) {
run(
fn: Handler['fn'],
queueOptions?: QueueOptions,
): Handler | Omit<Handler, 'fn' | 'priority'> {
if (!this.#paused && this.#activeRequests < this.limit) {
return this.#call(fn)
}
return this.#queue(fn, queueOptions)
}

wrapSyncFunction (fn, queueOptions) {
return (...args) => {
wrapSyncFunction(fn: () => void, queueOptions: QueueOptions) {
return (
...args: Parameters<Handler['fn']>
): { abortOn: typeof abortOn; abort: Handler['abort'] } => {
const queuedRequest = this.run(() => {
fn(...args)
queueMicrotask(() => queuedRequest.done())
Expand All @@ -138,47 +172,53 @@ export class RateLimitedQueue {

return {
abortOn,
abort () {
abort() {
queuedRequest.abort()
},
}
}
}

wrapPromiseFunction (fn, queueOptions) {
return (...args) => {
let queuedRequest
wrapPromiseFunction<T extends (...args: any[]) => any>(
fn: T,
queueOptions?: QueueOptions,
) {
return (...args: Parameters<T>): AbortablePromise<ReturnType<T>> => {
let queuedRequest: ReturnType<RateLimitedQueue['run']>
const outerPromise = new Promise((resolve, reject) => {
queuedRequest = this.run(() => {
let cancelError
let cancelError: ReturnType<typeof createCancelError>
let innerPromise
try {
innerPromise = Promise.resolve(fn(...args))
} catch (err) {
innerPromise = Promise.reject(err)
}

innerPromise.then((result) => {
if (cancelError) {
reject(cancelError)
} else {
queuedRequest.done()
resolve(result)
}
}, (err) => {
if (cancelError) {
reject(cancelError)
} else {
queuedRequest.done()
reject(err)
}
})
innerPromise.then(
(result) => {
if (cancelError) {
reject(cancelError)
} else {
queuedRequest.done()
resolve(result)
}
},
(err) => {
if (cancelError) {
reject(cancelError)
} else {
queuedRequest.done()
reject(err)
}
},
)

return (cause) => {
cancelError = createCancelError(cause)
}
}, queueOptions)
})
}) as AbortablePromise<ReturnType<T>>

outerPromise.abort = (cause) => {
queuedRequest.abort(cause)
Expand All @@ -189,7 +229,7 @@ export class RateLimitedQueue {
}
}

resume () {
resume(): void {
this.#paused = false
clearTimeout(this.#pauseTimer)
for (let i = 0; i < this.limit; i++) {
Expand All @@ -205,7 +245,7 @@ export class RateLimitedQueue {
* @param {number | null } [duration] Duration for the pause to happen, in milliseconds.
* If omitted, the queue won't resume automatically.
*/
pause (duration = null) {
pause(duration: number | null = null): void {
this.#paused = true
clearTimeout(this.#pauseTimer)
if (duration != null) {
Expand All @@ -223,7 +263,7 @@ export class RateLimitedQueue {
*
* @param {number} duration in milliseconds.
*/
rateLimit (duration) {
rateLimit(duration: number): void {
clearTimeout(this.#rateLimitingTimer)
this.pause(duration)
if (this.limit > 1 && Number.isFinite(this.limit)) {
Expand All @@ -250,7 +290,9 @@ export class RateLimitedQueue {
}
}

get isPaused () { return this.#paused }
get isPaused(): boolean {
return this.#paused
}
}

export const internalRateLimitedQueue = Symbol('__queue')