Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 293 additions & 1 deletion prisma/migrate.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PrismaClient } from '@prisma/client';
import { PrismaClient, Prisma } from '@prisma/client';
import * as path from 'path';
import * as fs from 'fs';
import * as readline from 'readline';
Expand Down Expand Up @@ -33,6 +33,7 @@ const DEFAULT_DATA_DIR = '/mnt/export/review_tables';
const DATA_DIR = process.env.DATA_DIR || DEFAULT_DATA_DIR;
const batchSize = 1000;
const logSize = 20000;
const submissionIsFileBatchSize = 500;
const DEFAULT_ES_DATA_FILE = path.join(
'/home/ubuntu',
'submissions-api.data.json',
Expand Down Expand Up @@ -64,6 +65,55 @@ if (shouldRepairMaps) {
);
}

const MIGRATION_TARGET_SUBMISSION_IS_FILE = 'submission-is-file';
const SUPPORTED_MIGRATION_TARGETS = new Set<string>([
MIGRATION_TARGET_SUBMISSION_IS_FILE,
]);
const MIGRATION_TARGET_ALIASES: Record<string, string> = {
[MIGRATION_TARGET_SUBMISSION_IS_FILE]: MIGRATION_TARGET_SUBMISSION_IS_FILE,
'submission-is-file-flag': MIGRATION_TARGET_SUBMISSION_IS_FILE,
'submission-isfilesubmission': MIGRATION_TARGET_SUBMISSION_IS_FILE,
'submission:isfile': MIGRATION_TARGET_SUBMISSION_IS_FILE,
'submission:isfilesubmission': MIGRATION_TARGET_SUBMISSION_IS_FILE,
'is-file-submission': MIGRATION_TARGET_SUBMISSION_IS_FILE,
submission_is_file: MIGRATION_TARGET_SUBMISSION_IS_FILE,
};

const rawTargets = extractTargets(process.argv.slice(2));
const normalizedTargets: string[] = [];
const unknownTargets: string[] = [];

for (const target of rawTargets) {
const normalized = normalizeTarget(target);
if (normalized) {
normalizedTargets.push(normalized);
} else {
unknownTargets.push(target);
}
}

if (unknownTargets.length > 0) {
throw new Error(
`Unknown migration target(s): ${unknownTargets.join(', ')}. Supported targets: ${Array.from(SUPPORTED_MIGRATION_TARGETS).join(', ')}`,
);
}

const requestedTargets = new Set<string>(normalizedTargets);
const runFullMigration = requestedTargets.size === 0;
if (!runFullMigration && requestedTargets.size > 1) {
throw new Error(
`Multiple migration targets are not currently supported. Requested targets: ${Array.from(requestedTargets).join(', ')}`,
);
}
const runSubmissionIsFileTarget = requestedTargets.has(
MIGRATION_TARGET_SUBMISSION_IS_FILE,
);
if (!runFullMigration) {
console.log(
`Executing targeted migration: ${Array.from(requestedTargets).join(', ')}`,
);
}

const errorSummary = new Map<
string,
{ count: number; files: Set<string>; examples: string[] }
Expand Down Expand Up @@ -108,6 +158,73 @@ const shouldProcessRecord = (
);
};

const toBoolean = (value: unknown): boolean => {
if (typeof value === 'boolean') {
return value;
}
if (typeof value === 'number') {
if (value === 1) {
return true;
}
if (value === 0) {
return false;
}
}
if (typeof value === 'string') {
const normalized = value.trim().toLowerCase();
if (['true', '1', 'yes', 'y'].includes(normalized)) {
return true;
}
if (
['false', '0', 'no', 'n', 'null', 'undefined', ''].includes(normalized)
) {
return false;
}
}
return Boolean(value);
};

function extractTargets(args: string[]): string[] {
const results: string[] = [];
for (let i = 0; i < args.length; i += 1) {
const arg = args[i];
if (arg === '--target' || arg === '--targets') {
const value = args[i + 1];
if (!value) {
throw new Error(
`${arg} requires a value with comma-separated migration target names.`,
);
}
results.push(
...value
.split(',')
.map((item) => item.trim())
.filter((item) => item.length > 0),
);
i += 1;
continue;
}
const match = arg.match(/^--targets?=(.*)$/);
if (match) {
results.push(
...match[1]
.split(',')
.map((item) => item.trim())
.filter((item) => item.length > 0),
);
}
}
return results;
}

function normalizeTarget(target: string): string | undefined {
if (!target) {
return undefined;
}
const normalized = target.trim().toLowerCase();
return MIGRATION_TARGET_ALIASES[normalized];
}

const buildLogPrefix = (type: string, file: string, subtype?: string) =>
subtype ? `[${type}][${subtype}][${file}]` : `[${type}][${file}]`;

Expand Down Expand Up @@ -689,6 +806,7 @@ function convertSubmissionES(esData): any {
legacyChallengeId,
submissionPhaseId: String(esData.submissionPhaseId),
fileType: esData.fileType,
isFileSubmission: toBoolean(esData.isFileSubmission),
esId: esData.id,
submittedDate: esData.submittedDate ? new Date(esData.submittedDate) : null,
updatedBy: esData.updatedBy ?? null,
Expand Down Expand Up @@ -717,6 +835,171 @@ function convertSubmissionES(esData): any {
return submission;
}

async function migrateSubmissionIsFileFlagOnly() {
console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Using ElasticSearch export file: ${ES_DATA_FILE}`,
);
if (!fs.existsSync(ES_DATA_FILE)) {
throw new Error(
`ElasticSearch export file not found at ${ES_DATA_FILE}. Set ES_DATA_FILE to override the default.`,
);
}
console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Starting isFileSubmission synchronization...`,
);

const pendingUpdates = new Map<string, boolean>();
const updatedLegacyIds = new Set<string>();
const missingExamples: string[] = [];

let totalUpdatedRows = 0;
let totalMissing = 0;
let processedRecords = 0;
let skippedMissingLegacyId = 0;
let skippedOutsideWindow = 0;
let parseErrors = 0;
let lineNumber = 0;

const flushPending = async () => {
if (pendingUpdates.size === 0) {
return;
}
const entries = Array.from(pendingUpdates.entries());
pendingUpdates.clear();
const valueTuples = entries.map(
([legacySubmissionId, isFileSubmission]) =>
Prisma.sql`(${legacySubmissionId}, ${isFileSubmission})`,
);
let updatedRows: Array<{ legacySubmissionId: string }> = [];
try {
updatedRows = await prisma.$queryRaw<
Array<{ legacySubmissionId: string }>
>(Prisma.sql`
WITH input("legacySubmissionId", "isFileSubmission") AS (
VALUES ${Prisma.join(valueTuples)}
)
UPDATE "submission" AS s
SET "isFileSubmission" = input."isFileSubmission"
FROM input
WHERE s."legacySubmissionId" = input."legacySubmissionId"
RETURNING s."legacySubmissionId" AS "legacySubmissionId"
`);
} catch (err) {
console.error(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Failed to update a batch of ${entries.length} submissions.`,
);
throw err;
}
totalUpdatedRows += updatedRows.length;
const batchUpdatedSet = new Set(
updatedRows.map((row) => String(row.legacySubmissionId)),
);
batchUpdatedSet.forEach((id) => updatedLegacyIds.add(id));
const batchMissing = entries.length - batchUpdatedSet.size;
totalMissing += batchMissing;
if (batchMissing > 0 && missingExamples.length < 5) {
for (const [legacyId] of entries) {
if (!batchUpdatedSet.has(legacyId)) {
missingExamples.push(legacyId);
if (missingExamples.length >= 5) {
break;
}
}
}
}
};

const fileStream = fs.createReadStream(ES_DATA_FILE);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity,
});

for await (const line of rl) {
lineNumber += 1;
if (!line) {
continue;
}
let parsed: any;
try {
parsed = JSON.parse(line);
} catch (err: any) {
parseErrors += 1;
if (parseErrors <= 5) {
console.warn(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Failed to parse line ${lineNumber}: ${err?.message ?? err}`,
);
}
continue;
}
const source = parsed?._source;
if (!source || source.resource !== 'submission') {
continue;
}
if (source.legacySubmissionId == null) {
skippedMissingLegacyId += 1;
continue;
}
const createdAudit = source.created ?? source.submittedDate ?? null;
const updatedAudit = source.updated ?? null;
if (!shouldProcessRecord(createdAudit, updatedAudit)) {
skippedOutsideWindow += 1;
continue;
}
const legacySubmissionId = String(source.legacySubmissionId);
const isFileSubmission = toBoolean(source.isFileSubmission);
pendingUpdates.set(legacySubmissionId, isFileSubmission);
processedRecords += 1;
if (pendingUpdates.size >= submissionIsFileBatchSize) {
await flushPending();
}
if (processedRecords > 0 && processedRecords % logSize === 0) {
console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Queued ${processedRecords} submissions so far...`,
);
}
}

await flushPending();

const distinctUpdated = updatedLegacyIds.size;

console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Processed ${processedRecords} submission records.`,
);
console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Updated ${totalUpdatedRows} row(s) affecting ${distinctUpdated} submission(s).`,
);
if (totalMissing > 0) {
console.warn(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Unable to locate ${totalMissing} submission(s) in the database.`,
);
if (missingExamples.length > 0) {
console.warn(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Missing legacySubmissionId examples: ${missingExamples.join(', ')}`,
);
}
}
if (skippedMissingLegacyId > 0) {
console.warn(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Skipped ${skippedMissingLegacyId} record(s) without a legacySubmissionId.`,
);
}
if (skippedOutsideWindow > 0) {
console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Skipped ${skippedOutsideWindow} record(s) outside the incremental window.`,
);
}
if (parseErrors > 5) {
console.warn(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Encountered ${parseErrors} parse error(s) while reading the export.`,
);
}
console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] isFileSubmission synchronization completed.`,
);
}

async function migrateElasticSearch() {
// migrate elastic search data
const filepath = ES_DATA_FILE;
Expand Down Expand Up @@ -3269,6 +3552,15 @@ async function migrateResourceSubmissions() {
}

async function migrate() {
if (!runFullMigration && runSubmissionIsFileTarget) {
await migrateSubmissionIsFileFlagOnly();
return;
}

if (!runFullMigration) {
throw new Error('No recognized migration targets were provided.');
}

if (!fs.existsSync(DATA_DIR)) {
throw new Error(
`DATA_DIR "${DATA_DIR}" does not exist. Set DATA_DIR to a valid export path.`,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Add isFileSubmission flag to submissions to differentiate file vs URL entries
ALTER TABLE "submission"
ADD COLUMN IF NOT EXISTS "isFileSubmission" BOOLEAN NOT NULL DEFAULT false;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[⚠️ performance]
Consider adding an index on the isFileSubmission column if this field will be frequently queried or filtered on. This can improve query performance, especially on large datasets.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Improve review lookups that combine submission/phase filters with ordering
CREATE INDEX "review_submissionId_id_idx" ON "review"("submissionId", "id");
CREATE INDEX "review_phaseId_id_idx" ON "review"("phaseId", "id");
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Add a composite index to speed up paginated lookups by review item
CREATE INDEX IF NOT EXISTS "reviewItemComment_reviewItemId_sortOrder_id_idx"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[⚠️ performance]
Consider verifying that the order of columns in the composite index aligns with the query patterns used in the application. The order can significantly impact the performance of the index.

ON "reviews"."reviewItemComment"("reviewItemId", "sortOrder", "id");
4 changes: 4 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,10 @@ model review {

@@index([committed]) // Index for filtering by committed status
@@index([submissionId]) // Index for filtering by submission
@@index([submissionId, id]) // Helps ORDER BY id after filtering by submission
@@index([resourceId]) // Index for filtering by resource (reviewer)
@@index([phaseId]) // Index for filtering by phase
@@index([phaseId, id]) // Helps ORDER BY id after filtering by phase
@@index([scorecardId]) // Index for joining with scorecard table
@@index([status]) // Index for filtering by review status
@@index([status, phaseId])
Expand Down Expand Up @@ -228,6 +230,7 @@ model reviewItemComment {
appeal appeal?

@@index([reviewItemId]) // Index for joining with reviewItem table
@@index([reviewItemId, sortOrder, id]) // Helps ordered pagination on review item comments
@@index([id]) // Index for direct ID lookups
@@index([resourceId]) // Index for filtering by resource (commenter)
@@index([type]) // Index for filtering by comment type
Expand Down Expand Up @@ -397,6 +400,7 @@ model submission {
fileSize Int?
viewCount Int?
systemFileName String?
isFileSubmission Boolean @default(false)
thurgoodJobId String?
virusScan Boolean @default(false)

Expand Down
Loading