From 0b67a9f047c21c742881eef7b9c594489971a5a0 Mon Sep 17 00:00:00 2001 From: Travis Rich Date: Mon, 20 Apr 2026 13:56:47 -0400 Subject: [PATCH 1/4] feat: Implement concurrent processing for community export tasks --- workers/tasks/communityExport.tsx | 81 +++++++++++++++++++------------ 1 file changed, 50 insertions(+), 31 deletions(-) diff --git a/workers/tasks/communityExport.tsx b/workers/tasks/communityExport.tsx index 5f0dd4e8e..3d2f45430 100644 --- a/workers/tasks/communityExport.tsx +++ b/workers/tasks/communityExport.tsx @@ -57,6 +57,21 @@ import { import { getNotesData } from './export/notes'; import SimpleNotesList from './export/SimpleNotesList'; +const runWithConcurrency = async ( + items: T[], + concurrency: number, + fn: (item: T) => Promise, +) => { + let index = 0; + const workers = Array.from({ length: Math.min(concurrency, items.length) }, async () => { + while (index < items.length) { + const item = items[index++]; + await fn(item); + } + }); + await Promise.all(workers); +}; + type FacetsProps = CascadedFacetsForScopes< 'CitationStyle' | 'License' | 'NodeLabels' | 'PubEdgeDisplay' | 'PubHeaderTheme' >['pub'][string]; @@ -278,14 +293,14 @@ const processHtmlAssets = async ( }); if (!response.ok) { console.error( - `[archive] Failed to download asset ${entry.normalizedUrl}: ${response.status}`, + `[communityExport] Failed to download asset ${entry.normalizedUrl}: ${response.status}`, ); return; } const buffer = Buffer.from(await response.arrayBuffer()); archiveStream.append(buffer, { name: entry.localPath }); } catch (e) { - console.error(`[archive] Error downloading asset ${entry.normalizedUrl}:`, e); + console.error(`[communityExport] Error downloading asset ${entry.normalizedUrl}:`, e); } }); @@ -339,23 +354,29 @@ const generatePubHtmlFiles = async ( order: [['createdAt', 'ASC']], }); - console.log(`[archive] generatePubHtmlFiles: ${allPubs.length} pubs to process`); + console.log(`[communityExport] generatePubHtmlFiles: ${allPubs.length} pubs to process`); // Track downloaded assets across all pubs to avoid duplicates const seenAssets = new Map(); + const PUB_CONCURRENCY = 10; + // Process in batches to limit memory usage for (let offset = 0; offset < allPubs.length; offset += batchSize) { const batch = allPubs.slice(offset, offset + batchSize); - console.log(`[archive] generatePubHtmlFiles: batch ${offset}–${offset + batch.length}`); + console.log( + `[communityExport] generatePubHtmlFiles: batch ${offset}–${offset + batch.length}`, + ); // Fetch facets for the batch const pubIds = batch.map((p) => p.id); // biome-ignore lint/performance/noAwaitInLoops: intentionally sequential — batched to limit memory const facets = await fetchFacetsForScopeIds({ pub: pubIds }); - // biome-ignore lint/performance/noAwaitInLoops: sequential per-pub processing required for shared archive stream - for (const pub of batch) { + // Process pubs concurrently within the batch to overlap network I/O + // (asset downloads and Firebase draft fetches) + // biome-ignore lint/performance/noAwaitInLoops: intentionally sequential across batches to limit memory + await runWithConcurrency(batch, PUB_CONCURRENCY, async (pub) => { const pubJson = pub.toJSON() as any; const pubFacets = facets.pub[pub.id]; const pubTitle = pubJson.title || pub.slug; @@ -367,18 +388,16 @@ const generatePubHtmlFiles = async ( // Generate HTML for each release (from frozen Doc snapshots — no Firebase needed) if (pubJson.releases?.length) { - // biome-ignore lint/performance/noAwaitInLoops: sequential — each release appends to shared archive stream for (let i = 0; i < pubJson.releases.length; i++) { const release = pubJson.releases[i]; const docContent = release.doc?.content; if (!docContent) { console.log( - `[archive] generatePubHtmlFiles: no doc for release ${i + 1} of pub ${pub.slug}`, + `[communityExport] generatePubHtmlFiles: no doc for release ${i + 1} of pub ${pub.slug}`, ); continue; } try { - // biome-ignore lint/performance/noAwaitInLoops: sequential — appends to shared archive stream const releaseBody = await getReleaseHtml( pubFacets, docContent, @@ -399,7 +418,7 @@ const generatePubHtmlFiles = async ( }); } catch (e) { console.error( - `[archive] generatePubHtmlFiles: error rendering release ${i + 1} for pub ${pub.slug}:`, + `[communityExport] generatePubHtmlFiles: error rendering release ${i + 1} for pub ${pub.slug}:`, e, ); } @@ -429,15 +448,15 @@ const generatePubHtmlFiles = async ( } } catch (e) { console.error( - `[archive] generatePubHtmlFiles: error rendering draft for pub ${pub.slug}:`, + `[communityExport] generatePubHtmlFiles: error rendering draft for pub ${pub.slug}:`, e, ); } } - } + }); } - console.log('[archive] generatePubHtmlFiles: done'); + console.log('[communityExport] generatePubHtmlFiles: done'); }; const createPubStream = async (pubs: Pub[], batchSize = 100) => { @@ -458,7 +477,7 @@ const createPubStream = async (pubs: Pub[], batchSize = 100) => { // Use a per-batch transaction to avoid holding a connection for the // entire streaming duration - console.log(`[archive] createPubStream: starting batch transaction (offset=${offset})`); + console.log(`[communityExport] createPubStream: starting batch transaction (offset=${offset})`); const [foundPubs, _draftDocs, batchFacets] = await sequelize.transaction( async (trx) => { const pubIds = pubIdSlice.map((p) => p.id); @@ -490,12 +509,12 @@ const createPubStream = async (pubs: Pub[], batchSize = 100) => { ); console.log( - `[archive] createPubStream: batch transaction done (offset=${offset}, found=${foundPubs.length})`, + `[communityExport] createPubStream: batch transaction done (offset=${offset}, found=${foundPubs.length})`, ); // Fetch draft docs in parallel (outside the DB transaction since // these hit Firebase/PG checkpoints) - console.log(`[archive] createPubStream: fetching ${pubIdSlice.length} draft docs`); + console.log(`[communityExport] createPubStream: fetching ${pubIdSlice.length} draft docs`); const drafts = await Promise.all( pubIdSlice.map(async (p) => { const firebasePath = p.draft?.firebasePath; @@ -664,10 +683,10 @@ const createPubsJsonTransform = () => { }; const getCommunityData = async (communityId: string) => { - console.log(`[archive] getCommunityData: starting for community ${communityId}`); + console.log(`[communityExport] getCommunityData: starting for community ${communityId}`); // fetch all community data in one transaction const result = await sequelize.transaction(async (trx) => { - console.log('[archive] getCommunityData: transaction started'); + console.log('[communityExport] getCommunityData: transaction started'); const [ community, customScripts, @@ -758,13 +777,13 @@ const getCommunityData = async (communityId: string) => { }), ]); - console.log('[archive] getCommunityData: main queries done, fetching facets'); + console.log('[communityExport] getCommunityData: main queries done, fetching facets'); const facets = await fetchFacetsForScopeIds({ community: [communityId], collection: collections.map((c) => c.id), }); - console.log('[archive] getCommunityData: facets done, building result'); + console.log('[communityExport] getCommunityData: facets done, building result'); return { community: { @@ -784,12 +803,12 @@ const getCommunityData = async (communityId: string) => { }; }); - console.log('[archive] getCommunityData: transaction complete'); + console.log('[communityExport] getCommunityData: transaction complete'); return result; }; const getPubs = async (communityId: string) => { - console.log(`[archive] getPubs: starting for community ${communityId}`); + console.log(`[communityExport] getPubs: starting for community ${communityId}`); const pubs = await Pub.findAll({ where: { communityId }, attributes: ['id', 'slug'], @@ -810,7 +829,7 @@ const getPubs = async (communityId: string) => { limit: 1_000_000, }); - console.log(`[archive] getPubs: found ${pubs.length} pubs`); + console.log(`[communityExport] getPubs: found ${pubs.length} pubs`); return pubs; }; @@ -828,7 +847,7 @@ export const communityExportTask = async ({ const startTime = performance.now(); devTools.getMemoryStats(); - console.log(`[archive] communityExportTask: starting for community ${communityId}, key=${key}`); + console.log(`[communityExport] communityExportTask: starting for community ${communityId}, key=${key}`); // get community data + all pubs first @@ -837,7 +856,7 @@ export const communityExportTask = async ({ getPubs(communityId), ]); - console.log(`[archive] communityExportTask: data fetched, ${pubs.length} pubs`); + console.log(`[communityExport] communityExportTask: data fetched, ${pubs.length} pubs`); // Capture title before communityData is nulled during streaming const communityTitle = communityData.community?.title ?? 'your community'; @@ -874,11 +893,11 @@ export const communityExportTask = async ({ // Wait until the pubs JSON stream is fully processed before // generating HTML files and starting S3 upload. pubsJsonStream.on('end', async () => { - console.log('[archive] Pubs JSON stream ended, generating HTML files'); + console.log('[communityExport] Pubs JSON stream ended, generating HTML files'); try { await generatePubHtmlFiles(communityId, archiveStream); } catch (e) { - console.error('[archive] Error generating HTML files:', e); + console.error('[communityExport] Error generating HTML files:', e); } archiveStream.finalize(); // free up memory @@ -900,12 +919,12 @@ export const communityExportTask = async ({ throw e; } - console.log(`[archive] Uploaded archive to ${s3Key}`); + console.log(`[communityExport] Uploaded archive to ${s3Key}`); // Generate a presigned URL (7-day expiry, rewritten to assets.pubpub.org) - console.log('[archive] Generating presigned URL'); + console.log('[communityExport] Generating presigned URL'); const downloadUrl = await exportsClient.getPresignedUrl(s3Key); - console.log('[archive] Presigned URL generated'); + console.log('[communityExport] Presigned URL generated'); // set final result in worker task (replacing progress info) if (workerTaskId) { @@ -931,7 +950,7 @@ export const communityExportTask = async ({ downloadUrl, }); } catch (emailError) { - console.error(`[archive] Failed to send export-ready email:`, emailError); + console.error(`[communityExport] Failed to send export-ready email:`, emailError); } } From 88019650eaa6316a6aad914de39fc13a25fc0220 Mon Sep 17 00:00:00 2001 From: Travis Rich Date: Mon, 20 Apr 2026 13:58:39 -0400 Subject: [PATCH 2/4] refactor: Improve logging and add comments for concurrency handling in community export --- workers/tasks/communityExport.tsx | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/workers/tasks/communityExport.tsx b/workers/tasks/communityExport.tsx index 3d2f45430..ae1c3059e 100644 --- a/workers/tasks/communityExport.tsx +++ b/workers/tasks/communityExport.tsx @@ -66,6 +66,7 @@ const runWithConcurrency = async ( const workers = Array.from({ length: Math.min(concurrency, items.length) }, async () => { while (index < items.length) { const item = items[index++]; + // biome-ignore lint/performance/noAwaitInLoops: intentional — this is a concurrency limiter, each worker processes items sequentially await fn(item); } }); @@ -398,6 +399,7 @@ const generatePubHtmlFiles = async ( continue; } try { + // biome-ignore lint/performance/noAwaitInLoops: sequential per-release — each appends to shared archive stream const releaseBody = await getReleaseHtml( pubFacets, docContent, @@ -477,7 +479,9 @@ const createPubStream = async (pubs: Pub[], batchSize = 100) => { // Use a per-batch transaction to avoid holding a connection for the // entire streaming duration - console.log(`[communityExport] createPubStream: starting batch transaction (offset=${offset})`); + console.log( + `[communityExport] createPubStream: starting batch transaction (offset=${offset})`, + ); const [foundPubs, _draftDocs, batchFacets] = await sequelize.transaction( async (trx) => { const pubIds = pubIdSlice.map((p) => p.id); @@ -514,7 +518,9 @@ const createPubStream = async (pubs: Pub[], batchSize = 100) => { // Fetch draft docs in parallel (outside the DB transaction since // these hit Firebase/PG checkpoints) - console.log(`[communityExport] createPubStream: fetching ${pubIdSlice.length} draft docs`); + console.log( + `[communityExport] createPubStream: fetching ${pubIdSlice.length} draft docs`, + ); const drafts = await Promise.all( pubIdSlice.map(async (p) => { const firebasePath = p.draft?.firebasePath; @@ -847,7 +853,9 @@ export const communityExportTask = async ({ const startTime = performance.now(); devTools.getMemoryStats(); - console.log(`[communityExport] communityExportTask: starting for community ${communityId}, key=${key}`); + console.log( + `[communityExport] communityExportTask: starting for community ${communityId}, key=${key}`, + ); // get community data + all pubs first From a0a1ca96897d16a62c6b0791e113d7d3d6f9b5e5 Mon Sep 17 00:00:00 2001 From: Travis Rich Date: Mon, 20 Apr 2026 14:43:07 -0400 Subject: [PATCH 3/4] feat: Implement retry logic for WorkerTask updates with exponential backoff --- workers/queue.ts | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/workers/queue.ts b/workers/queue.ts index 5e30001bc..4280a1388 100644 --- a/workers/queue.ts +++ b/workers/queue.ts @@ -82,12 +82,29 @@ const processTask = (channel) => async (message) => { const { hostname, ...restOutput } = output || {}; - await WorkerTask.update( - { ...rest, output: hostname ? restOutput : output }, - { - where: { id: taskData.id }, - }, - ); + const MAX_RETRIES = 3; + for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) { + try { + // biome-ignore lint/performance/noAwaitInLoops: intentional retry loop + await WorkerTask.update( + { ...rest, output: hostname ? restOutput : output }, + { + where: { id: taskData.id }, + }, + ); + break; + } catch (err) { + console.error( + `Failed to update WorkerTask ${taskData.id} (attempt ${attempt}/${MAX_RETRIES}):`, + err, + ); + if (attempt === MAX_RETRIES) { + throw err; + } + // biome-ignore lint/performance/noAwaitInLoops: intentional retry backoff + await new Promise((r) => setTimeout(r, 1000 * attempt)); + } + } if (hostname) { schedulePurgeWorker(hostname); From aa406c541c5102d756e4f650e8ef2ce89d075fa3 Mon Sep 17 00:00:00 2001 From: Travis Rich Date: Mon, 20 Apr 2026 14:56:34 -0400 Subject: [PATCH 4/4] feat: Enhance account export task with community and publication URLs --- workers/tasks/accountExport.ts | 211 ++++++++++++++++++++++++++++----- 1 file changed, 182 insertions(+), 29 deletions(-) diff --git a/workers/tasks/accountExport.ts b/workers/tasks/accountExport.ts index 7784b9d6d..0f908f3ee 100644 --- a/workers/tasks/accountExport.ts +++ b/workers/tasks/accountExport.ts @@ -4,9 +4,9 @@ import { PassThrough } from 'stream'; import { ActivityItem, AuthToken, + Collection, CollectionAttribution, Community, - CommunityBan, Discussion, Member, Pub, @@ -27,6 +27,20 @@ import { import { sendAccountExportReadyEmail } from 'server/utils/email'; import { exportsClient } from 'server/utils/s3'; import { updateWorkerTask } from 'server/workerTask/queries'; +import { communityUrl, pubUrl } from 'utils/canonicalUrls'; + +const makePubUrl = ( + community: { subdomain: string; domain?: string | null } | null, + pub: { slug: string } | null, +) => { + if (!community || !pub) return null; + return pubUrl(community as any, pub); +}; + +const makeCommunityUrl = (community: { subdomain: string; domain?: string | null } | null) => { + if (!community) return null; + return communityUrl(community as any); +}; /** * Fetches all data associated with a user account and packages it into a @@ -93,7 +107,6 @@ export const accountExportTask = async ({ notificationPreferences, activity, scopeVisits, - communityBans, authTokens, zoteroIntegration, dismissables, @@ -101,12 +114,18 @@ export const accountExportTask = async ({ Member.findAll({ where: { userId }, include: [ - { model: Community, as: 'community', attributes: ['id', 'title', 'subdomain'] }, + { + model: Community, + as: 'community', + attributes: ['id', 'title', 'subdomain', 'domain'], + }, { model: Pub, as: 'pub', attributes: ['id', 'title', 'slug'] }, ], }).then((rows) => rows.map((r) => { - const json = r.toJSON(); + const json = r.toJSON() as any; + const community = json.community ?? null; + const pub = json.pub ?? null; return { id: json.id, permissions: json.permissions, @@ -115,8 +134,10 @@ export const accountExportTask = async ({ pubId: json.pubId, collectionId: json.collectionId, communityId: json.communityId, - community: (json as any).community ?? null, - pub: (json as any).pub ?? null, + community, + pub, + communityUrl: makeCommunityUrl(community), + pubUrl: makePubUrl(community, pub), createdAt: json.createdAt, }; }), @@ -125,11 +146,24 @@ export const accountExportTask = async ({ PubAttribution.findAll({ where: { userId }, include: [ - { model: Pub, as: 'pub', attributes: ['id', 'title', 'slug', 'communityId'] }, + { + model: Pub, + as: 'pub', + attributes: ['id', 'title', 'slug', 'communityId'], + include: [ + { + model: Community, + as: 'community', + attributes: ['id', 'title', 'subdomain', 'domain'], + }, + ], + }, ], }).then((rows) => rows.map((r) => { - const json = r.toJSON(); + const json = r.toJSON() as any; + const pub = json.pub ?? null; + const community = pub?.community ?? null; return { id: json.id, name: json.name, @@ -139,7 +173,10 @@ export const accountExportTask = async ({ affiliation: json.affiliation, orcid: json.orcid, pubId: json.pubId, - pub: (json as any).pub ?? null, + pub: pub ? { id: pub.id, title: pub.title, slug: pub.slug } : null, + community, + communityUrl: makeCommunityUrl(community), + pubUrl: makePubUrl(community, pub), createdAt: json.createdAt, }; }), @@ -147,24 +184,86 @@ export const accountExportTask = async ({ CollectionAttribution.findAll({ where: { userId }, + include: [ + { + model: Collection, + as: 'collection', + attributes: ['id', 'title', 'slug', 'communityId'], + include: [ + { + model: Community, + as: 'community', + attributes: ['id', 'title', 'subdomain', 'domain'], + }, + ], + }, + ], }).then((rows) => - rows.map((r) => ({ - id: r.id, - name: r.name, - order: r.order, - isAuthor: r.isAuthor, - roles: r.roles, - affiliation: r.affiliation, - orcid: r.orcid, - collectionId: r.collectionId, - createdAt: r.createdAt, - })), + rows.map((r) => { + const json = r.toJSON() as any; + const collection = json.collection ?? null; + const community = collection?.community ?? null; + return { + id: json.id, + name: json.name, + order: json.order, + isAuthor: json.isAuthor, + roles: json.roles, + affiliation: json.affiliation, + orcid: json.orcid, + collectionId: json.collectionId, + collection: collection + ? { id: collection.id, title: collection.title, slug: collection.slug } + : null, + community, + communityUrl: makeCommunityUrl(community), + createdAt: json.createdAt, + }; + }), ), Discussion.findAll({ where: { userId }, attributes: ['id', 'title', 'number', 'isClosed', 'labels', 'pubId', 'createdAt'], - }).then((rows) => rows.map((r) => r.toJSON())), + include: [ + { + model: Pub, + as: 'pub', + attributes: ['id', 'title', 'slug', 'communityId'], + include: [ + { + model: Community, + as: 'community', + attributes: ['id', 'title', 'subdomain', 'domain'], + }, + ], + }, + ], + }).then((rows) => + rows.map((r) => { + const json = r.toJSON() as any; + const pub = json.pub ?? null; + const community = pub?.community ?? null; + return { + id: json.id, + title: json.title, + number: json.number, + isClosed: json.isClosed, + labels: json.labels, + pubId: json.pubId, + pub: pub ? { id: pub.id, title: pub.title, slug: pub.slug } : null, + community: community + ? { + id: community.id, + title: community.title, + subdomain: community.subdomain, + } + : null, + pubUrl: makePubUrl(community, pub), + createdAt: json.createdAt, + }; + }), + ), ThreadComment.findAll({ where: { userId }, @@ -183,7 +282,46 @@ export const accountExportTask = async ({ 'reviewContent', 'createdAt', ], - }).then((rows) => rows.map((r) => r.toJSON())), + include: [ + { + model: Pub, + as: 'pub', + attributes: ['id', 'title', 'slug', 'communityId'], + include: [ + { + model: Community, + as: 'community', + attributes: ['id', 'title', 'subdomain', 'domain'], + }, + ], + }, + ], + }).then((rows) => + rows.map((r) => { + const json = r.toJSON() as any; + const pub = json.pub ?? null; + const community = pub?.community ?? null; + return { + id: json.id, + title: json.title, + number: json.number, + status: json.status, + releaseRequested: json.releaseRequested, + pubId: json.pubId, + reviewContent: json.reviewContent, + pub: pub ? { id: pub.id, title: pub.title, slug: pub.slug } : null, + community: community + ? { + id: community.id, + title: community.title, + subdomain: community.subdomain, + } + : null, + pubUrl: makePubUrl(community, pub), + createdAt: json.createdAt, + }; + }), + ), ReviewEvent.findAll({ where: { userId }, @@ -195,6 +333,7 @@ export const accountExportTask = async ({ attributes: ['id', 'type', 'data', 'threadId', 'createdAt'], }).then((rows) => rows.map((r) => r.toJSON())), + // Release has no BelongsTo Pub association, so we look up pub info separately Release.findAll({ where: { userId }, attributes: ['id', 'noteText', 'pubId', 'historyKey', 'createdAt'], @@ -221,16 +360,31 @@ export const accountExportTask = async ({ where: { userId }, }).then((rows) => rows.map((r) => r.toJSON())), - CommunityBan.findAll({ - where: { userId }, - attributes: ['id', 'communityId', 'createdAt'], - }).then((rows) => rows.map((r) => r.toJSON())), - // Include auth token metadata but redact the actual token values AuthToken.findAll({ where: { userId }, attributes: ['id', 'communityId', 'expiresAt', 'createdAt'], - }).then((rows) => rows.map((r) => r.toJSON())), + include: [ + { + model: Community, + as: 'community', + attributes: ['id', 'title', 'subdomain', 'domain'], + }, + ], + }).then((rows) => + rows.map((r) => { + const json = r.toJSON() as any; + const community = json.community ?? null; + return { + id: json.id, + communityId: json.communityId, + community, + communityUrl: makeCommunityUrl(community), + expiresAt: json.expiresAt, + createdAt: json.createdAt, + }; + }), + ), ZoteroIntegration.findOne({ where: { userId }, @@ -266,7 +420,6 @@ export const accountExportTask = async ({ addJson('notification-preferences.json', notificationPreferences); addJson('activity.json', activity); addJson('scope-visits.json', scopeVisits); - addJson('community-bans.json', communityBans); addJson('auth-tokens.json', authTokens); addJson('zotero-integration.json', zoteroIntegration); addJson('dismissables.json', dismissables);