Skip to content

Commit 18c680e

Browse files
committed
chore: wip
1 parent 71fddf7 commit 18c680e

File tree

1 file changed

+159
-0
lines changed

1 file changed

+159
-0
lines changed

src/queue.ts

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,4 +706,163 @@ export class Queue<T = any> {
706706
const dlq = this.getDeadLetterQueue()
707707
return dlq.clear()
708708
}
709+
710+
/**
711+
* Remove multiple jobs from the queue in a single operation
712+
* @param jobIds Array of job IDs to remove
713+
* @returns Number of jobs successfully removed
714+
*/
715+
async bulkRemove(jobIds: string[]): Promise<number> {
716+
if (!jobIds.length)
717+
return 0
718+
719+
try {
720+
// Use pipeline for better performance
721+
await this.redisClient.send('MULTI', [])
722+
723+
// Keep track of successful removes
724+
let removedCount = 0
725+
726+
for (const jobId of jobIds) {
727+
const jobKey = this.getJobKey(jobId)
728+
729+
// Check if job exists first
730+
const exists = await this.redisClient.exists(jobKey)
731+
if (!exists)
732+
continue
733+
734+
// Remove from all possible lists
735+
const statusLists = ['active', 'waiting', 'completed', 'failed', 'dependency-wait']
736+
for (const list of statusLists) {
737+
await this.redisClient.send('LREM', [this.getKey(list), '0', jobId])
738+
await this.redisClient.send('SREM', [this.getKey(list), jobId])
739+
}
740+
741+
// Remove from delayed set
742+
await this.redisClient.send('ZREM', [this.getKey('delayed'), jobId])
743+
744+
// Remove dependent jobs links
745+
const dependentKey = `${jobKey}:dependents`
746+
747+
// Remove the job hash and the dependent key
748+
await this.redisClient.send('DEL', [jobKey, dependentKey])
749+
750+
removedCount++
751+
752+
// Emit event
753+
this.events.emitJobRemoved(jobId)
754+
}
755+
756+
// Execute all commands
757+
await this.redisClient.send('EXEC', [])
758+
759+
this.logger.debug(`Bulk removed ${removedCount} jobs from queue ${this.name}`)
760+
return removedCount
761+
}
762+
catch (err) {
763+
this.logger.error(`Error in bulk remove operation for queue ${this.name}: ${(err as Error).message}`)
764+
return 0
765+
}
766+
}
767+
768+
/**
769+
* Pause multiple jobs (move them from waiting/delayed to paused state)
770+
* @param jobIds Array of job IDs to pause
771+
* @returns Number of jobs successfully paused
772+
*/
773+
async bulkPause(jobIds: string[]): Promise<number> {
774+
if (!jobIds.length)
775+
return 0
776+
777+
try {
778+
// Use pipeline for better performance
779+
await this.redisClient.send('MULTI', [])
780+
781+
let pausedCount = 0
782+
783+
for (const jobId of jobIds) {
784+
const jobKey = this.getJobKey(jobId)
785+
786+
// Check if job exists
787+
const exists = await this.redisClient.exists(jobKey)
788+
if (!exists)
789+
continue
790+
791+
// Check if it's in waiting or delayed
792+
const isWaiting = await this.redisClient.send('LREM', [this.getKey('waiting'), '0', jobId])
793+
const isDelayed = await this.redisClient.send('ZREM', [this.getKey('delayed'), jobId])
794+
795+
if ((isWaiting && isWaiting > 0) || (isDelayed && isDelayed > 0)) {
796+
// Add to paused list
797+
await this.redisClient.send('LPUSH', [this.getKey('paused'), jobId])
798+
799+
// Update job status
800+
await this.redisClient.send('HSET', [jobKey, 'status', 'paused'])
801+
802+
pausedCount++
803+
this.logger.debug(`Job ${jobId} paused`)
804+
}
805+
}
806+
807+
// Execute all commands
808+
await this.redisClient.send('EXEC', [])
809+
810+
this.logger.debug(`Bulk paused ${pausedCount} jobs in queue ${this.name}`)
811+
return pausedCount
812+
}
813+
catch (err) {
814+
this.logger.error(`Error in bulk pause operation for queue ${this.name}: ${(err as Error).message}`)
815+
return 0
816+
}
817+
}
818+
819+
/**
820+
* Resume multiple paused jobs (move them from paused to waiting state)
821+
* @param jobIds Array of job IDs to resume
822+
* @returns Number of jobs successfully resumed
823+
*/
824+
async bulkResume(jobIds: string[]): Promise<number> {
825+
if (!jobIds.length)
826+
return 0
827+
828+
try {
829+
// Use pipeline for better performance
830+
await this.redisClient.send('MULTI', [])
831+
832+
let resumedCount = 0
833+
834+
for (const jobId of jobIds) {
835+
const jobKey = this.getJobKey(jobId)
836+
837+
// Check if job exists
838+
const exists = await this.redisClient.exists(jobKey)
839+
if (!exists)
840+
continue
841+
842+
// Check if it's in paused list
843+
const isPaused = await this.redisClient.send('LREM', [this.getKey('paused'), '0', jobId])
844+
845+
if (isPaused && isPaused > 0) {
846+
// Add back to waiting list
847+
await this.redisClient.send('LPUSH', [this.getKey('waiting'), jobId])
848+
849+
// Update job status
850+
await this.redisClient.send('HSET', [jobKey, 'status', 'waiting'])
851+
852+
resumedCount++
853+
this.logger.debug(`Job ${jobId} resumed`)
854+
}
855+
}
856+
857+
// Execute all commands
858+
await this.redisClient.send('EXEC', [])
859+
860+
this.logger.debug(`Bulk resumed ${resumedCount} jobs in queue ${this.name}`)
861+
return resumedCount
862+
}
863+
catch (err) {
864+
this.logger.error(`Error in bulk resume operation for queue ${this.name}: ${(err as Error).message}`)
865+
return 0
866+
}
867+
}
709868
}

0 commit comments

Comments
 (0)