Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions src/storage/file.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { mkdir, readdir, readFile, unlink, watch, rm } from 'node:fs/promises'
import { mkdir, readdir, readFile, rename, unlink, watch, rm } from 'node:fs/promises'
import { join } from 'node:path'
import { EventEmitter } from 'node:events'
import fastWriteAtomic from 'fast-write-atomic'
Expand Down Expand Up @@ -194,25 +194,27 @@ export class FileStorage implements Storage {
const files = await this.#getQueueFiles()
if (files.length === 0) return null

// Try to claim the first file
const file = files[0]
const srcPath = join(this.#queuePath, file)
const id = this.#extractIdFromFilename(file)
const dstPath = join(this.#processingPath, workerId, `${id}.msg`)
// Ensure processing directory exists
await mkdir(join(this.#processingPath, workerId), { recursive: true })

try {
// Ensure processing directory exists
await mkdir(join(this.#processingPath, workerId), { recursive: true })

// Read and move atomically
const message = await readFile(srcPath)
await writeFileAtomic(dstPath, message)
await unlink(srcPath)
return message
} catch {
// Another worker may have claimed it
return null
// Try to claim files until one succeeds
for (const file of files) {
const srcPath = join(this.#queuePath, file)
const id = this.#extractIdFromFilename(file)
const dstPath = join(this.#processingPath, workerId, `${id}.msg`)

try {
// Atomic rename - only one worker can succeed on same filesystem
await rename(srcPath, dstPath)
// Only read after successful claim
return await readFile(dstPath)
} catch {
// Another worker claimed this file, try the next one
continue
}
}

return null
}

async #getQueueFiles (): Promise<string[]> {
Expand Down