diff --git a/apps/backend/src/__tests__/downloads-service.test.ts b/apps/backend/src/__tests__/downloads-service.test.ts index ff12025..9e333ee 100644 --- a/apps/backend/src/__tests__/downloads-service.test.ts +++ b/apps/backend/src/__tests__/downloads-service.test.ts @@ -2,8 +2,9 @@ import type { Release } from '../lib/release' import { mkdtemp, rm, writeFile } from 'node:fs/promises' import { tmpdir } from 'node:os' import { join } from 'node:path' -import { afterEach, beforeEach, describe, expect, test } from 'bun:test' +import { afterEach, beforeEach, describe, expect, spyOn, test } from 'bun:test' import { openDatabase } from '../database/connection' +import { FetchError } from '../lib/errors/FetchError' import { DownloadsRepository } from '../modules/downloads/downloads.repository' import { DownloadsService } from '../modules/downloads/downloads.service' import { createTorrentStub } from '../modules/torznab/torrent' @@ -31,6 +32,18 @@ afterEach(async () => { await rm(tempDir, { recursive: true, force: true }) }) +function downloadsConfig(overrides: Partial> = {}) { + return { + watchPath, + completedPath, + maxConcurrentDownloads: 2, + maxDownloadAttempts: 3, + retryBaseDelayMs: 0, + retryMaxDelayMs: 0, + ...overrides, + } +} + function fakePeer(overrides: Partial> = {}) { return { id: 'peer-1', @@ -49,9 +62,9 @@ function fakeDestination() { return { isInitialized: true, canDestination: true, name: 'Radarr', triggerImport: async () => {} } } -async function writeTorrent(filename = 'movie.torrent') { +async function writeTorrent(filename = 'movie.torrent', itemId = 'movie:1') { const filePath = join(watchPath, filename) - await writeFile(filePath, createTorrentStub({ name: release.title, size: release.size, peerId: 'peer-1', itemId: 'movie:1' })) + await writeFile(filePath, createTorrentStub({ name: release.title, size: release.size, peerId: 'peer-1', itemId })) return filePath } @@ -68,12 +81,11 @@ describe('DownloadsService download progress persistence', () => { await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) }, }) - const service = new DownloadsService({ completedPath }, [peer as any], [fakeDestination() as any], repository) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) const filePath = await writeTorrent() await service.processTorrentFile(filePath, 'movie.torrent') - // The service forwards dest/part/size/torrentFilename into downloadFile. expect(calls).toHaveLength(1) expect(calls[0].destPath).toBe(join(completedPath, release.filename)) expect(calls[0].options.partPath).toBe(`${join(completedPath, release.filename)}.part`) @@ -82,14 +94,9 @@ describe('DownloadsService download progress persistence', () => { const downloads = repository.list() expect(downloads).toHaveLength(1) - expect(downloads[0]?.torrentFilename).toBe('movie.torrent') - expect(downloads[0]?.filename).toBe(release.filename) - expect(downloads[0]?.destPath).toBe(join(completedPath, release.filename)) - expect(downloads[0]?.partPath).toBe(`${join(completedPath, release.filename)}.part`) - expect(downloads[0]?.releaseSize).toBe(10) - expect(downloads[0]?.expectedBytes).toBe(10) - expect(downloads[0]?.downloadedBytes).toBe(10) expect(downloads[0]?.status).toBe('import_queued') + expect(downloads[0]?.downloadedBytes).toBe(10) + expect(downloads[0]?.attempts).toBe(1) handle.close() }) @@ -99,7 +106,7 @@ describe('DownloadsService download progress persistence', () => { const peer = fakePeer({ getRelease: async () => { throw new Error('metadata failed') } }) - const service = new DownloadsService({ completedPath }, [peer as any], [], repository) + const service = new DownloadsService(downloadsConfig(), [peer as any], [], repository) const filePath = await writeTorrent() await service.processTorrentFile(filePath, 'movie.torrent') @@ -108,7 +115,7 @@ describe('DownloadsService download progress persistence', () => { handle.close() }) - test('rejects a peer release with a path-traversal filename and does not write outside completedPath', async () => { + test('rejects a peer release with a path-traversal filename', async () => { const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) const repository = new DownloadsRepository(handle.db) const writtenPaths: string[] = [] @@ -118,36 +125,224 @@ describe('DownloadsService download progress persistence', () => { writtenPaths.push(destPath) }, }) - const service = new DownloadsService({ completedPath }, [peer as any], [fakeDestination() as any], repository) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) const filePath = await writeTorrent() await service.processTorrentFile(filePath, 'movie.torrent') - // The unsafe name must never reach downloadFile / be written to disk. expect(writtenPaths).toHaveLength(0) - const evilOutside = join(tempDir, 'evil.mkv') - expect(await Bun.file(evilOutside).exists()).toBe(false) - expect(await Bun.file(`${evilOutside}.part`).exists()).toBe(false) - expect(repository.list()).toHaveLength(0) handle.close() }) - test('marks an existing row failed when download fails after metadata resolves', async () => { + test('marks an existing row failed when a permanent download error occurs', async () => { const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) const repository = new DownloadsRepository(handle.db) + let calls = 0 const peer = fakePeer({ downloadFile: async () => { - throw new Error('download failed') + calls++ + throw new FetchError('not found', new Response(null, { status: 404 })) } }) - const service = new DownloadsService({ completedPath }, [peer as any], [], repository) + const service = new DownloadsService(downloadsConfig(), [peer as any], [], repository) const filePath = await writeTorrent() await service.processTorrentFile(filePath, 'movie.torrent') + expect(calls).toBe(1) // 404 is permanent — no retry const downloads = repository.list() - expect(downloads).toHaveLength(1) expect(downloads[0]?.status).toBe('failed') - expect(downloads[0]?.error).toContain('download failed') + handle.close() + }) + + test('retries a transient failure then succeeds', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + let calls = 0 + const peer = fakePeer({ + downloadFile: async (_itemId: string, _destPath: string, options: any) => { + calls++ + if (calls === 1) + throw new FetchError('busy', new Response(null, { status: 503 })) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + const filePath = await writeTorrent() + + await service.processTorrentFile(filePath, 'movie.torrent') + + expect(calls).toBe(2) + const downloads = repository.list() + expect(downloads[0]?.status).toBe('import_queued') + expect(downloads[0]?.attempts).toBe(2) + handle.close() + }) + + test('persists a resume reset from a restart event', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + // Spy, because a realistic download ends with a `completed` event whose + // markCompleted() clears the error markResumeReset() set — so asserting the + // final row state cannot prove the reset ran. + const resetSpy = spyOn(repository, 'markResumeReset') + const peer = fakePeer({ + downloadFile: async (_itemId: string, _destPath: string, options: any) => { + await options.onProgress({ type: 'headers', expectedBytes: 10, expectedBytesSource: 'content_length', expectedBytesMismatch: false }) + await options.onProgress({ type: 'restart', reason: 'range_ignored', discardedBytes: 4 }) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + const filePath = await writeTorrent() + + await service.processTorrentFile(filePath, 'movie.torrent') + + expect(resetSpy).toHaveBeenCalledTimes(1) + expect(repository.list()[0]?.status).toBe('import_queued') + resetSpy.mockRestore() + handle.close() + }) + + test('limits concurrent downloads to maxConcurrentDownloads', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + let active = 0 + let maxActive = 0 + const peer = { + id: 'peer-1', + name: 'Friend Jack', + url: 'http://peer.test', + // Distinct filename per item so each maps to a distinct destPath. + getRelease: async (itemId: string) => ({ ...release, id: `remote:${itemId}`, filename: `${itemId.replace(':', '_')}.mkv` }), + downloadFile: async (_itemId: string, _destPath: string, options: any) => { + active++ + maxActive = Math.max(maxActive, active) + await Bun.sleep(20) + active-- + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + } + const service = new DownloadsService(downloadsConfig({ maxConcurrentDownloads: 1 }), [peer as any], [fakeDestination() as any], repository) + const a = await writeTorrent('a.torrent', 'movie:1') + const b = await writeTorrent('b.torrent', 'movie:2') + + await Promise.all([ + service.processTorrentFile(a, 'a.torrent'), + service.processTorrentFile(b, 'b.torrent'), + ]) + + expect(maxActive).toBe(1) + expect(repository.list().filter(d => d.status === 'import_queued')).toHaveLength(2) + handle.close() + }) + + test('resumeStaleDownloads re-drives a stale downloading row to import_queued', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const calls: string[] = [] + const peer = fakePeer({ + downloadFile: async (_itemId: string, destPath: string, options: any) => { + calls.push(destPath) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + // Seed a stale `downloading` row, as if Jack crashed mid-download. + repository.create({ + torrentFilename: 'movie.torrent', + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: release.filename, + destPath: join(completedPath, release.filename), + partPath: `${join(completedPath, release.filename)}.part`, + releaseSize: release.size, + release, + }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + + const resumed = await service.resumeStaleDownloads() + // resumeStaleDownloads fires in the background; wait for the row to settle. + for (let i = 0; i < 50 && repository.list()[0]?.status !== 'import_queued'; i++) + await Bun.sleep(10) + + expect(resumed).toBe(1) + expect(calls).toEqual([join(completedPath, release.filename)]) + expect(repository.list()[0]?.status).toBe('import_queued') + handle.close() + }) + + test('marks superseded duplicate stale rows (same destPath) failed and re-drives only one', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const calls: string[] = [] + const peer = fakePeer({ + downloadFile: async (_itemId: string, destPath: string, options: any) => { + calls.push(destPath) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + const destPath = join(completedPath, release.filename) + const base = { + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: release.filename, + destPath, + partPath: `${destPath}.part`, + releaseSize: release.size, + release, + } + repository.create({ ...base, torrentFilename: 'first.torrent' }) + repository.create({ ...base, torrentFilename: 'second.torrent' }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + + const resumed = await service.resumeStaleDownloads() + for (let i = 0; i < 50 && !repository.list().some(d => d.status === 'import_queued'); i++) + await Bun.sleep(10) + + expect(resumed).toBe(1) + expect(calls).toEqual([destPath]) // only one of the two same-destPath rows is re-driven + const rows = repository.list() + expect(rows.filter(d => d.status === 'import_queued')).toHaveLength(1) + expect(rows.find(d => d.status === 'failed')?.error).toContain('superseded') + handle.close() + }) + + test('releases the re-enqueue claim after a successful resume so the filename can be processed again', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const calls: string[] = [] + const peer = fakePeer({ + downloadFile: async (itemId: string, _destPath: string, options: any) => { + calls.push(itemId) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + repository.create({ + torrentFilename: 'movie.torrent', + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: release.filename, + destPath: join(completedPath, release.filename), + partPath: `${join(completedPath, release.filename)}.part`, + releaseSize: release.size, + release, + }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + + await service.resumeStaleDownloads() + for (let i = 0; i < 50 && repository.list()[0]?.status !== 'import_queued'; i++) + await Bun.sleep(10) + expect(calls).toHaveLength(1) + + // A later legitimate re-drop of the same torrent filename must NOT be skipped + // by a stale re-enqueue claim once the resume has completed. + const filePath = await writeTorrent('movie.torrent') + await service.processTorrentFile(filePath, 'movie.torrent') + + expect(calls).toHaveLength(2) + expect(repository.list().filter(d => d.status === 'import_queued')).toHaveLength(2) handle.close() }) }) diff --git a/apps/backend/src/index.ts b/apps/backend/src/index.ts index db0c1e5..dc15b1a 100644 --- a/apps/backend/src/index.ts +++ b/apps/backend/src/index.ts @@ -31,10 +31,6 @@ const destinations = connectors.servers.filter(s => s.canDestination) const database = await openDatabase({ appConfigPath: envs.APP_CONFIG_PATH }) const downloadsRepository = new DownloadsRepository(database.db) -const reconciledDownloads = await downloadsRepository.reconcileStaleDownloads() - -if (reconciledDownloads > 0) - logger.warn({ downloads: reconciledDownloads, databasePath: database.path }, 'Reconciled stale downloads from previous Jack run') const app = getApp(envs, config, connectors, { downloadsRepository }) const server = Bun.serve({ @@ -100,13 +96,25 @@ if (config.jack) { } } -// Start blackhole watcher +// Start blackhole watcher (and re-drive interrupted downloads from a prior run) let blackholeWatcher: BlackholeWatcher | null = null if (config.downloads) { const downloadsService = new DownloadsService(config.downloads, connectors.peers, destinations, downloadsRepository) + // Active re-enqueue: resume stale `downloading` rows in place before the + // watcher scans, so the leftover .torrent stubs are not re-processed as new rows. + const resumed = await downloadsService.resumeStaleDownloads() + if (resumed > 0) + logger.warn({ downloads: resumed, databasePath: database.path }, 'Re-enqueued interrupted downloads from previous Jack run') + blackholeWatcher = new BlackholeWatcher(config.downloads, downloadsService) await blackholeWatcher.start() } +else { + // No downloads config means stale rows cannot be resumed — mark them failed. + const failed = await downloadsRepository.reconcileStaleDownloads() + if (failed > 0) + logger.warn({ downloads: failed, databasePath: database.path }, 'Marked stale downloads failed (no downloads config to resume them)') +} process.on('SIGINT', async () => { logger.info('SIGINT received, exiting') diff --git a/apps/backend/src/modules/downloads/downloads.service.ts b/apps/backend/src/modules/downloads/downloads.service.ts index 1a0932a..39fdfa6 100644 --- a/apps/backend/src/modules/downloads/downloads.service.ts +++ b/apps/backend/src/modules/downloads/downloads.service.ts @@ -1,25 +1,50 @@ import type { AppConfig } from '../../lib/config' import type { ArrServerConnector } from '../../lib/servers/arr/base' import type { PeerConnector, PeerDownloadProgressEvent } from '../../lib/servers/peer' -import type { DownloadsRepository } from './downloads.repository' +import type { DownloadRecord, DownloadsRepository } from './downloads.repository' import { Buffer } from 'node:buffer' import { unlink } from 'node:fs/promises' import { basename, join } from 'node:path' +import { retry } from '../../lib/retry' +import { Semaphore } from '../../lib/semaphore' import { withSpan } from '../../lib/tracing' import { logger } from '../../logger' import { parseTorrentStub } from '../torznab/torrent' +import { downloadRetryAfterMs, isTransientDownloadError } from './retry-policy' + +type DownloadsServiceConfig = NonNullable export class DownloadsService { + private readonly semaphore: Semaphore + // Dest paths with a download in flight — guards two concurrent live drops that + // resolve to the same destination (no duplicate rows / writers). + private readonly active = new Set() + // Torrent filenames owned by the startup re-enqueue. Their leftover stubs are + // skipped by the watcher's initial scan for the rest of the run, so a re-drive + // that fails fast cannot be re-processed into a duplicate row. + private readonly reenqueued = new Set() + constructor( - private readonly config: Pick, 'completedPath'>, + private readonly config: DownloadsServiceConfig, private readonly peers: PeerConnector[], private readonly destinations: ArrServerConnector[], private readonly downloadsRepository?: DownloadsRepository, - ) {} + ) { + this.semaphore = new Semaphore(config.maxConcurrentDownloads) + } async processTorrentFile(filePath: string, filename: string) { try { await withSpan('blackhole.process_torrent', { 'torrent.filename': filename }, async (span) => { + // The startup re-enqueue owns this stub — it is being (or will be) + // re-driven from the persisted row. Skip it so we never create a + // duplicate row, even if that re-drive already failed and cleared `active`. + if (this.reenqueued.has(filename)) { + span.setAttribute('torrent.reenqueued', true) + logger.debug({ torrentFilename: filename }, 'Stub owned by startup re-enqueue; skipping watcher processing') + return + } + const file = Bun.file(filePath) if (!await file.exists()) { span.setAttribute('torrent.exists', false) @@ -54,8 +79,6 @@ export class DownloadsService { // `release.filename` is peer-controlled and only validated as a string. // Force it to a plain basename inside `completedPath` so a value like // `../../evil.mkv` or an absolute path cannot escape the directory. - // Reject (rather than silently rewrite) anything that is not already a - // plain filename, so a malicious peer cannot smuggle in path separators. const safeName = basename(release.filename) const isSafeName = safeName.length > 0 && safeName !== '.' && safeName !== '..' && !safeName.includes('/') && !safeName.includes('\\') @@ -68,7 +91,12 @@ export class DownloadsService { const partPath = `${destPath}.part` span.setAttributes({ 'release.filename': safeName, 'release.size': release.size }) - const download = this.downloadsRepository?.create({ + if (this.active.has(destPath)) { + logger.debug({ torrentFilename: filename, destPath }, 'A download for this destination is already active; skipping duplicate') + return + } + + const created = this.downloadsRepository?.create({ torrentFilename: filename, peerId, peerName: peer.name ?? peer.url, @@ -80,56 +108,150 @@ export class DownloadsService { release, }) - const onProgress = async (event: PeerDownloadProgressEvent) => { - if (!download) - return + const record: DownloadRecord = created ?? { + id: -1, + torrentFilename: filename, + peerId, + peerName: peer.name ?? peer.url, + itemId, + filename: safeName, + destPath, + partPath, + releaseSize: release.size, + release, + expectedBytes: null, + expectedBytesSource: null, + expectedBytesMismatch: false, + downloadedBytes: 0, + attempts: 0, + status: 'downloading', + startedAt: '', + updatedAt: '', + completedAt: null, + error: null, + } - if (event.type === 'headers') { - this.downloadsRepository?.setExpectedBytes(download.id, event.expectedBytes, event.expectedBytesSource, event.expectedBytesMismatch) - return - } + await this.runDownload(record) + }) + } + catch (err) { + const message = err instanceof Error ? err.message : String(err) + logger.error({ torrentFilename: filename, filename, error: message }, 'Failed to process torrent') + } + } - if (event.type === 'progress') { - this.downloadsRepository?.updateProgress(download.id, event.downloadedBytes) - return - } + /** Re-drive stale `downloading` rows from a prior run, resuming from their .part files. */ + async resumeStaleDownloads(): Promise { + const repo = this.downloadsRepository + if (!repo) + return 0 + // Dedupe by destPath: a prior run could leave more than one stale row for the + // same destination (they share the same .part), but only one can be resumed. + // Re-driving two would make the second silently early-return in runDownload + // and stay stuck in `downloading`, so mark the superseded ones failed instead. + const seen = new Set() + const resumable: DownloadRecord[] = [] + for (const record of repo.listStaleDownloads()) { + if (seen.has(record.destPath)) { + repo.markFailed(record.id, 'superseded by another resumable download for the same destination') + continue + } + seen.add(record.destPath) + resumable.push(record) + } + // Claim every resumable stub up-front (synchronously, before the watcher + // starts) so the initial scan skips them regardless of re-drive timing/outcome. + for (const record of resumable) + this.reenqueued.add(record.torrentFilename) + for (const record of resumable) { + // Fire-and-forget: the semaphore caps concurrency, and the stub is already + // claimed in `reenqueued` so the watcher won't duplicate it. + void this.runDownload(record).catch((err) => { + const message = err instanceof Error ? err.message : String(err) + logger.error({ torrentFilename: record.torrentFilename, error: message }, 'Failed to resume stale download') + }) + } + if (resumable.length > 0) + logger.info({ downloads: resumable.length }, 'Re-enqueued interrupted downloads') + return resumable.length + } - if (event.type === 'restart') { - // Resume validation failed; the download restarts from byte 0. - // Persisting this is handled in a later phase. - return - } + private async runDownload(record: DownloadRecord): Promise { + if (this.active.has(record.destPath)) + return + this.active.add(record.destPath) + try { + await this.semaphore.run(() => this.downloadWithRetry(record)) + } + finally { + this.active.delete(record.destPath) + } + } - this.downloadsRepository?.markCompleted(download.id, event.downloadedBytes) - } + private async downloadWithRetry(record: DownloadRecord): Promise { + const repo = this.downloadsRepository + const peer = this.peers.find(p => p.id === record.peerId) + if (!peer) { + repo?.markFailed(record.id, `Peer ${record.peerId} not found`) + logger.error({ torrentFilename: record.torrentFilename, peerId: record.peerId }, 'Cannot run download: peer not found') + return + } - // Everything after the row is created is wrapped so any failure - // (download, stub unlink, or import trigger) marks the row failed - // instead of leaving it stuck in `completed`/`downloading`. - // `import_queued` means: the file downloaded AND triggerImport was - // attempted (best-effort per destination — see triggerImport below). - try { - await peer.downloadFile(itemId, destPath, { torrentFilename: filename, partPath, releaseSize: release.size, onProgress }) - await unlink(filePath) - await this.triggerImport(filename) - - if (download) - this.downloadsRepository?.markImportQueued(download.id) - } - catch (err) { - if (download) { - const message = err instanceof Error ? err.message : String(err) - this.downloadsRepository?.markFailed(download.id, message) - } - throw err - } + const onProgress = async (event: PeerDownloadProgressEvent) => { + if (event.type === 'headers') { + repo?.setExpectedBytes(record.id, event.expectedBytes, event.expectedBytesSource, event.expectedBytesMismatch) + return + } + if (event.type === 'progress') { + repo?.updateProgress(record.id, event.downloadedBytes) + return + } + if (event.type === 'restart') { + repo?.markResumeReset(record.id) + return + } + repo?.markCompleted(record.id, event.downloadedBytes) + } + + const stubPath = join(this.config.watchPath, record.torrentFilename) - logger.info({ torrentFilename: filename, filename: safeName }, 'Download complete, triggered import') + try { + await retry(async () => { + repo?.incrementAttempts(record.id) + await peer.downloadFile(record.itemId, record.destPath, { + torrentFilename: record.torrentFilename, + partPath: record.partPath, + releaseSize: record.releaseSize, + onProgress, + }) + }, { + maxAttempts: this.config.maxDownloadAttempts, + baseDelayMs: this.config.retryBaseDelayMs, + maxDelayMs: this.config.retryMaxDelayMs, + isRetryable: isTransientDownloadError, + retryAfterMs: downloadRetryAfterMs, + onRetry: ({ attempt, delayMs, error }) => { + const message = error instanceof Error ? error.message : String(error) + logger.warn({ torrentFilename: record.torrentFilename, attempt, delayMs, error: message }, 'Retrying peer download after transient failure') + }, }) + + await unlink(stubPath).catch(() => {}) + await this.triggerImport(record.torrentFilename) + repo?.markImportQueued(record.id) + // Release the startup-re-enqueue claim now that the stub is gone, so a + // later legitimate re-drop of the same filename isn't silently skipped. + // Only on success: a failed re-drive keeps its stub, so it stays claimed + // (and is re-driven on the next restart) to avoid in-session hammering. + this.reenqueued.delete(record.torrentFilename) + logger.info({ torrentFilename: record.torrentFilename, filename: record.filename }, 'Download complete, triggered import') } catch (err) { const message = err instanceof Error ? err.message : String(err) - logger.error({ torrentFilename: filename, filename, error: message }, 'Failed to process torrent') + // The .part file is preserved by downloadFile on failure, so a later + // restart re-enqueue can resume from it. + repo?.markFailed(record.id, message) + logger.error({ torrentFilename: record.torrentFilename, filename: record.filename, error: message }, 'Download failed') } }