diff --git a/src/scripts/migrate-dynamo-data.js b/src/scripts/migrate-dynamo-data.js index 24a80ad..2659cb7 100644 --- a/src/scripts/migrate-dynamo-data.js +++ b/src/scripts/migrate-dynamo-data.js @@ -72,7 +72,14 @@ const destructiveApprovals = new Map() function logWithLevel (level, message, context = null) { const timestamp = new Date().toISOString() - const contextSuffix = context ? ` | ${JSON.stringify(context)}` : '' + let contextSuffix = '' + if (context) { + try { + contextSuffix = ` | ${JSON.stringify(context, (_, value) => (typeof value === 'bigint' ? value.toString() : value))}` + } catch (serializeErr) { + contextSuffix = ` | ${JSON.stringify({ serializationError: serializeErr.message })}` + } + } const output = `[${level}] ${timestamp} ${message}${contextSuffix}` if (level === LOG_LEVELS.ERROR) { console.error(output) @@ -95,6 +102,14 @@ function logError (message, context) { logWithLevel(LOG_LEVELS.ERROR, message, context) } +function isBigIntSerializationError (err) { + if (!err) { + return false + } + const message = err.message || `${err}` + return typeof message === 'string' && message.toLowerCase().includes('serialize a bigint') +} + async function executeWrite (description, operation, context = {}) { if (DRY_RUN) { logInfo(`DRY_RUN active, skipping write: ${description}`, context) @@ -1169,6 +1184,7 @@ async function importDynamoMember (filename, dateFilter = null) { let total = 0 // count skipped items due to date filter let skipped = 0 + let skippedDueToErrors = 0 // store the temp json object string let stringObject = '' // store batch items @@ -1446,6 +1462,29 @@ function isInvalidUtf8Error (err) { return message.includes('invalid byte sequence for encoding "UTF8"') || message.includes('0x00') } +function isUniqueConstraintError (err) { + if (!err) { + return false + } + if (err.code === 'P2002') { + return true + } + const message = err.message || '' + return message.includes('Unique constraint failed') +} + +function logUniqueConstraintSkip (memberItem, err) { + const identifier = compactObject({ + userId: memberItem?.userId, + handle: memberItem?.handle, + handleLower: memberItem?.handleLower + }) + logWarn('Skipping member due to unique constraint violation', { + ...identifier, + target: err?.meta?.target + }) +} + async function createMembersIndividually (memberItems) { for (const memberItem of memberItems) { try { @@ -1457,6 +1496,10 @@ async function createMembersIndividually (memberItems) { timeout: TRANSACTION_TIMEOUT_MS })) } catch (err) { + if (isUniqueConstraintError(err)) { + logUniqueConstraintSkip(memberItem, err) + continue + } if (isInvalidUtf8Error(err)) { console.warn(`Skipping member ${memberItem.userId || memberItem.handleLower || 'unknown'} due to invalid UTF-8 data`) continue @@ -1495,6 +1538,11 @@ async function createMembers (memberItems) { await createMembersIndividually(memberItems) return } + if (isUniqueConstraintError(err)) { + console.warn('Batch insert failed due to unique constraint violation. Falling back to per-member inserts.') + await createMembersIndividually(memberItems) + return + } throw err } } @@ -2193,15 +2241,198 @@ async function importElasticSearchMember (filename, dateFilter = null) { continue } - await updateMembersWithTraitsAndSkills(dataObj) - total += 1 + const updated = await updateMembersWithTraitsAndSkills(dataObj) + if (updated) { + total += 1 + } else { + skippedDueToErrors += 1 + } } console.log(`\nIt has updated ${total} items totally, skipped ${skipped} items`) + if (skippedDueToErrors > 0) { + console.log(`Skipped due to errors: ${skippedDueToErrors}`) + } console.log(`Finished reading the file: ${filename}\n`) } +/** + * Import only the Dynamo basic_info traits and upsert them. + * @param {String} filename filename + * @param {Date|null} [dateFilter=null] optional date filter threshold + */ +async function importDynamoBasicInfoTraits (filename, dateFilter = null) { + const traitFilePath = path.join(MIGRATE_DIR, filename) + + const lineCount = await countFileLines(traitFilePath) + console.log(`${filename} has ${lineCount} lines in total`) + + const rlRead = readline.createInterface({ + input: fs.createReadStream(traitFilePath), + crlfDelay: Infinity + }) + + let currentLine = 0 + let count = 0 + let processed = 0 + let skipped = 0 + let errors = 0 + let stringObject = '' + + const processBasicInfoRecord = async (dataItem) => { + if (dataItem.traitId !== 'basic_info') { + return + } + + if (!shouldProcessRecord(dataItem, dateFilter)) { + skipped += 1 + return + } + + let traitsPayload = dataItem.traits + if (typeof traitsPayload === 'string') { + try { + traitsPayload = JSON.parse(traitsPayload) + } catch (err) { + errors += 1 + logWarn('Skipping basic_info trait due to invalid JSON payload', { + userId: dataItem.userId, + error: err?.message + }) + return + } + } + + const traitEntries = Array.isArray(traitsPayload?.data) ? traitsPayload.data : [] + if (!traitEntries.length) { + skipped += 1 + return + } + + const normalizedEntries = [] + let targetUserId = normalizeUserId(dataItem.userId) + + for (const rawEntry of traitEntries) { + const normalizedEntry = pick(rawEntry || {}, TRAIT_BASIC_INFO) + const normalizedUserId = normalizeUserId(normalizedEntry.userId || targetUserId) + if (!normalizedUserId) { + continue + } + targetUserId = targetUserId || normalizedUserId + normalizedEntries.push({ + userId: normalizedUserId, + country: normalizedEntry.country || '', + primaryInterestInTopcoder: normalizedEntry.primaryInterestInTopcoder || '', + tshirtSize: normalizedEntry.tshirtSize || null, + gender: normalizedEntry.gender || null, + shortBio: normalizedEntry.shortBio || '', + birthDate: _convert2Date(normalizedEntry.birthDate), + currentLocation: normalizedEntry.currentLocation || null, + createdBy: CREATED_BY + }) + } + + if (normalizedEntries.length === 0 || !targetUserId) { + skipped += 1 + return + } + + const payload = { + userId: targetUserId, + memberTraits: { + basicInfo: normalizedEntries.map(entry => ({ + ...entry, + userId: targetUserId + })) + } + } + + try { + const updated = await updateMembersWithTraitsAndSkills(payload) + if (updated) { + processed += 1 + } else { + errors += 1 + logWarn('Basic info trait update returned falsy result', { userId: targetUserId }) + } + } catch (err) { + errors += 1 + logError('Failed to upsert basic_info trait', { userId: targetUserId, error: err?.message }) + } + } + + for await (const line of rlRead) { + currentLine += 1 + if (currentLine % 10 === 0) { + const percentage = ((currentLine / lineCount) * 100).toFixed(2) + process.stdout.clearLine() + process.stdout.cursorTo(0) + process.stdout.write(`Migrate Progress: ${percentage}%, read ${count}, processed ${processed}, skipped ${skipped}, errors ${errors}`) + } + + let trimmedLine = line.trim() + if (!trimmedLine || trimmedLine === ',' || trimmedLine === '[' || trimmedLine === ']' || trimmedLine === '],') { + continue + } + + if (trimmedLine.startsWith('[')) { + trimmedLine = trimmedLine.substring(1).trim() + if (!trimmedLine) { + continue + } + } + if (trimmedLine.endsWith(']')) { + trimmedLine = trimmedLine.substring(0, trimmedLine.length - 1).trim() + if (!trimmedLine) { + continue + } + } + + if (!stringObject) { + stringObject = trimmedLine + } else { + stringObject += trimmedLine + } + + let jsonCandidate = stringObject + if (jsonCandidate.endsWith(',')) { + jsonCandidate = jsonCandidate.slice(0, -1) + } + + let dataItem + try { + dataItem = JSON.parse(jsonCandidate) + } catch (err) { + continue + } + + stringObject = '' + count += 1 + await processBasicInfoRecord(dataItem) + } + + if (stringObject) { + let jsonCandidate = stringObject + if (jsonCandidate.endsWith(',')) { + jsonCandidate = jsonCandidate.slice(0, -1) + } + if (jsonCandidate) { + try { + const dataItem = JSON.parse(jsonCandidate) + count += 1 + await processBasicInfoRecord(dataItem) + } catch (err) { + errors += 1 + logWarn('Skipping trailing basic_info trait due to invalid JSON payload', { error: err?.message }) + } + } + } + + console.log(`\nProcessed ${processed} basic_info traits, skipped ${skipped}, errors ${errors}`) + console.log(`Finished reading the file: ${filename}\n`) +} + /** * Update member status values from ElasticSearch snapshot * @param {String} filename filename @@ -2357,9 +2588,12 @@ async function fixMemberUpdateData (memberItem, dbItem) { } } else if (memberItem.traits.traitId === 'basic_info') { const traitData = pick(memberItem.traits.data[0], TRAIT_BASIC_INFO) - if (traitData.userId && traitData.country && traitData.primaryInterestInTopcoder && traitData.shortBio) { + if (traitData.userId) { memberItemUpdate.memberTraits.basicInfo = [{ ...traitData, + country: traitData.country || '', + primaryInterestInTopcoder: traitData.primaryInterestInTopcoder || '', + shortBio: traitData.shortBio || '', birthDate: _convert2Date(traitData.birthDate) }] } @@ -2718,6 +2952,10 @@ async function updateMembersWithTraitsAndSkills (memberObj) { })) } catch (err) { logError('Failed to update member with traits and skills', { ...context, error: err?.message }) + if (isBigIntSerializationError(err)) { + logWarn('Skipping member update due to BigInt serialization error', context) + return false + } throw err } } @@ -2727,9 +2965,15 @@ async function updateMembersWithTraitsAndSkills (memberObj) { await syncMemberSkills(memberObj.userId, memberObj.memberSkills, memberObj.handle) } catch (err) { logError('Failed to sync member skills', { ...context, error: err?.message }) + if (isBigIntSerializationError(err)) { + logWarn('Skipping member skill sync due to BigInt serialization error', context) + return false + } throw err } } + + return true } async function syncMemberAddresses (tx, userId, addresses = []) { @@ -3649,7 +3893,7 @@ async function runMigrationStep (step, dateFilter, askQuestion) { migrationRuntimeState.dateFilter = dateFilter destructiveApprovals.delete(step) - const shouldRunIntegrityCheck = ['1', '2', '3', '4', '5', '7'].includes(step) + const shouldRunIntegrityCheck = ['1', '2', '3', '4', '5', '7', '8'].includes(step) try { if (shouldRunIntegrityCheck) { @@ -3777,6 +4021,11 @@ async function runMigrationStep (step, dateFilter, askQuestion) { await updateMemberStatusFromElasticSearch(memberElasticsearchFilename, dateFilter) break } + case '8': { + const memberTraitFilename = 'MemberProfileTrait.json' + await importDynamoBasicInfoTraits(memberTraitFilename, dateFilter) + break + } default: throw new Error(`Unsupported step "${step}"`) } @@ -3813,6 +4062,7 @@ async function main () { console.log('5. Import Dynamo MemberStatHistory') console.log('6. Import Distribution Stats') console.log('7. Update ElasticSearch Member Status') + console.log('8. Import Dynamo Basic Info Traits') console.log('') console.log('Destructive clears require --full-reset or ALLOW_DESTRUCTIVE=true and an explicit confirmation. Incremental runs retain existing data by default.') @@ -3825,8 +4075,8 @@ async function main () { let selectedStep = null try { - selectedStep = (await askQuestion('Please select your step to run (0-7): ')).trim() - const validSteps = new Set(['0', '1', '2', '3', '4', '5', '6', '7']) + selectedStep = (await askQuestion('Please select your step to run (0-8): ')).trim() + const validSteps = new Set(['0', '1', '2', '3', '4', '5', '6', '7', '8']) if (!validSteps.has(selectedStep)) { console.log('Unsupported step selected. Script is finished.') return @@ -3840,7 +4090,7 @@ async function main () { } let dateFilter = null - if (['1', '2', '3', '4', '5', '7'].includes(selectedStep)) { + if (['1', '2', '3', '4', '5', '7', '8'].includes(selectedStep)) { const dateFilterInput = (await askQuestion('Enter date filter (YYYY-MM-DD UTC; timestamp-less records will be skipped, or press Enter to skip): ')).trim() if (dateFilterInput) { const parsed = parseDateFilter(dateFilterInput) diff --git a/src/scripts/update-member-traits.js b/src/scripts/update-member-traits.js index b8dc72e..409e430 100644 --- a/src/scripts/update-member-traits.js +++ b/src/scripts/update-member-traits.js @@ -24,6 +24,7 @@ async function updateTraitsFromFile (filename) { let updated = 0 let skippedNoMember = 0 let skippedNoTraits = 0 + let skippedDueToErrors = 0 for await (const line of rl) { const trimmed = line.trim() @@ -67,8 +68,12 @@ async function updateTraitsFromFile (filename) { memberTraits: updateData.memberTraits } - await updateMembersWithTraitsAndSkills(traitUpdate) - updated += 1 + const updatedSuccessfully = await updateMembersWithTraitsAndSkills(traitUpdate) + if (updatedSuccessfully) { + updated += 1 + } else { + skippedDueToErrors += 1 + } if (processed % 100 === 0) { console.log(`Processed ${processed} lines, updated ${updated} members`) @@ -80,6 +85,9 @@ async function updateTraitsFromFile (filename) { console.log(`Members updated: ${updated}`) console.log(`Skipped (no matching member): ${skippedNoMember}`) console.log(`Skipped (no trait changes): ${skippedNoTraits}`) + if (skippedDueToErrors > 0) { + console.log(`Skipped due to errors: ${skippedDueToErrors}`) + } } async function main () {