Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 258 additions & 8 deletions src/scripts/migrate-dynamo-data.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ const destructiveApprovals = new Map()

function logWithLevel (level, message, context = null) {
const timestamp = new Date().toISOString()
const contextSuffix = context ? ` | ${JSON.stringify(context)}` : ''
let contextSuffix = ''
if (context) {
try {
contextSuffix = ` | ${JSON.stringify(context, (_, value) => (typeof value === 'bigint' ? value.toString() : value))}`
} catch (serializeErr) {
contextSuffix = ` | ${JSON.stringify({ serializationError: serializeErr.message })}`
}
}
const output = `[${level}] ${timestamp} ${message}${contextSuffix}`
if (level === LOG_LEVELS.ERROR) {
console.error(output)
Expand All @@ -95,6 +102,14 @@ function logError (message, context) {
logWithLevel(LOG_LEVELS.ERROR, message, context)
}

function isBigIntSerializationError (err) {
if (!err) {
return false
}
const message = err.message || `${err}`
return typeof message === 'string' && message.toLowerCase().includes('serialize a bigint')
}

async function executeWrite (description, operation, context = {}) {
if (DRY_RUN) {
logInfo(`DRY_RUN active, skipping write: ${description}`, context)
Expand Down Expand Up @@ -1169,6 +1184,7 @@ async function importDynamoMember (filename, dateFilter = null) {
let total = 0
// count skipped items due to date filter
let skipped = 0
let skippedDueToErrors = 0
// store the temp json object string
let stringObject = ''
// store batch items
Expand Down Expand Up @@ -1446,6 +1462,29 @@ function isInvalidUtf8Error (err) {
return message.includes('invalid byte sequence for encoding "UTF8"') || message.includes('0x00')
}

function isUniqueConstraintError (err) {
if (!err) {
return false
}
if (err.code === 'P2002') {
return true
}
const message = err.message || ''
return message.includes('Unique constraint failed')
}

function logUniqueConstraintSkip (memberItem, err) {
const identifier = compactObject({
userId: memberItem?.userId,
handle: memberItem?.handle,
handleLower: memberItem?.handleLower
})
logWarn('Skipping member due to unique constraint violation', {
...identifier,
target: err?.meta?.target
})
}

async function createMembersIndividually (memberItems) {
for (const memberItem of memberItems) {
try {
Expand All @@ -1457,6 +1496,10 @@ async function createMembersIndividually (memberItems) {
timeout: TRANSACTION_TIMEOUT_MS
}))
} catch (err) {
if (isUniqueConstraintError(err)) {
logUniqueConstraintSkip(memberItem, err)
continue
}
if (isInvalidUtf8Error(err)) {
console.warn(`Skipping member ${memberItem.userId || memberItem.handleLower || 'unknown'} due to invalid UTF-8 data`)
continue
Expand Down Expand Up @@ -1495,6 +1538,11 @@ async function createMembers (memberItems) {
await createMembersIndividually(memberItems)
return
}
if (isUniqueConstraintError(err)) {
console.warn('Batch insert failed due to unique constraint violation. Falling back to per-member inserts.')
await createMembersIndividually(memberItems)
return
}
throw err
}
}
Expand Down Expand Up @@ -2193,15 +2241,198 @@ async function importElasticSearchMember (filename, dateFilter = null) {
continue
}

await updateMembersWithTraitsAndSkills(dataObj)
total += 1
const updated = await updateMembersWithTraitsAndSkills(dataObj)
if (updated) {
total += 1
} else {
skippedDueToErrors += 1
}
}

console.log(`\nIt has updated ${total} items totally, skipped ${skipped} items`)
if (skippedDueToErrors > 0) {
console.log(`Skipped due to errors: ${skippedDueToErrors}`)
}

console.log(`Finished reading the file: ${filename}\n`)
}

/**
* Import only the Dynamo basic_info traits and upsert them.
* @param {String} filename filename
* @param {Date|null} [dateFilter=null] optional date filter threshold
*/
async function importDynamoBasicInfoTraits (filename, dateFilter = null) {
const traitFilePath = path.join(MIGRATE_DIR, filename)

const lineCount = await countFileLines(traitFilePath)
console.log(`${filename} has ${lineCount} lines in total`)

const rlRead = readline.createInterface({
input: fs.createReadStream(traitFilePath),
crlfDelay: Infinity
})

let currentLine = 0
let count = 0
let processed = 0
let skipped = 0
let errors = 0
let stringObject = ''

const processBasicInfoRecord = async (dataItem) => {
if (dataItem.traitId !== 'basic_info') {
return
}

if (!shouldProcessRecord(dataItem, dateFilter)) {
skipped += 1
return
}

let traitsPayload = dataItem.traits
if (typeof traitsPayload === 'string') {
try {
traitsPayload = JSON.parse(traitsPayload)
} catch (err) {
errors += 1
logWarn('Skipping basic_info trait due to invalid JSON payload', {
userId: dataItem.userId,
error: err?.message
})
return
}
}

const traitEntries = Array.isArray(traitsPayload?.data) ? traitsPayload.data : []
if (!traitEntries.length) {
skipped += 1
return
}

const normalizedEntries = []
let targetUserId = normalizeUserId(dataItem.userId)

for (const rawEntry of traitEntries) {
const normalizedEntry = pick(rawEntry || {}, TRAIT_BASIC_INFO)
const normalizedUserId = normalizeUserId(normalizedEntry.userId || targetUserId)
if (!normalizedUserId) {
continue
}
targetUserId = targetUserId || normalizedUserId
normalizedEntries.push({
userId: normalizedUserId,
country: normalizedEntry.country || '',
primaryInterestInTopcoder: normalizedEntry.primaryInterestInTopcoder || '',
tshirtSize: normalizedEntry.tshirtSize || null,
gender: normalizedEntry.gender || null,
shortBio: normalizedEntry.shortBio || '',
birthDate: _convert2Date(normalizedEntry.birthDate),
currentLocation: normalizedEntry.currentLocation || null,
createdBy: CREATED_BY
})
}

if (normalizedEntries.length === 0 || !targetUserId) {
skipped += 1
return
}

const payload = {
userId: targetUserId,
memberTraits: {
basicInfo: normalizedEntries.map(entry => ({
...entry,
userId: targetUserId
}))
}
}

try {
const updated = await updateMembersWithTraitsAndSkills(payload)
if (updated) {
processed += 1
} else {
errors += 1
logWarn('Basic info trait update returned falsy result', { userId: targetUserId })
}
} catch (err) {
errors += 1
logError('Failed to upsert basic_info trait', { userId: targetUserId, error: err?.message })
}
}

for await (const line of rlRead) {
currentLine += 1
if (currentLine % 10 === 0) {
const percentage = ((currentLine / lineCount) * 100).toFixed(2)
process.stdout.clearLine()
process.stdout.cursorTo(0)
process.stdout.write(`Migrate Progress: ${percentage}%, read ${count}, processed ${processed}, skipped ${skipped}, errors ${errors}`)
}

let trimmedLine = line.trim()
if (!trimmedLine || trimmedLine === ',' || trimmedLine === '[' || trimmedLine === ']' || trimmedLine === '],') {
continue
}

if (trimmedLine.startsWith('[')) {
trimmedLine = trimmedLine.substring(1).trim()
if (!trimmedLine) {
continue
}
}
if (trimmedLine.endsWith(']')) {
trimmedLine = trimmedLine.substring(0, trimmedLine.length - 1).trim()
if (!trimmedLine) {
continue
}
}

if (!stringObject) {
stringObject = trimmedLine
} else {
stringObject += trimmedLine
}

let jsonCandidate = stringObject
if (jsonCandidate.endsWith(',')) {
jsonCandidate = jsonCandidate.slice(0, -1)
}

let dataItem
try {
dataItem = JSON.parse(jsonCandidate)
} catch (err) {
continue
}

stringObject = ''
count += 1
await processBasicInfoRecord(dataItem)
}

if (stringObject) {
let jsonCandidate = stringObject
if (jsonCandidate.endsWith(',')) {
jsonCandidate = jsonCandidate.slice(0, -1)
}
if (jsonCandidate) {
try {
const dataItem = JSON.parse(jsonCandidate)
count += 1
await processBasicInfoRecord(dataItem)
} catch (err) {
errors += 1
logWarn('Skipping trailing basic_info trait due to invalid JSON payload', { error: err?.message })
}
}
}

console.log(`\nProcessed ${processed} basic_info traits, skipped ${skipped}, errors ${errors}`)
console.log(`Finished reading the file: ${filename}\n`)
}

/**
* Update member status values from ElasticSearch snapshot
* @param {String} filename filename
Expand Down Expand Up @@ -2357,9 +2588,12 @@ async function fixMemberUpdateData (memberItem, dbItem) {
}
} else if (memberItem.traits.traitId === 'basic_info') {
const traitData = pick(memberItem.traits.data[0], TRAIT_BASIC_INFO)
if (traitData.userId && traitData.country && traitData.primaryInterestInTopcoder && traitData.shortBio) {
if (traitData.userId) {
memberItemUpdate.memberTraits.basicInfo = [{
...traitData,
country: traitData.country || '',
primaryInterestInTopcoder: traitData.primaryInterestInTopcoder || '',
shortBio: traitData.shortBio || '',
birthDate: _convert2Date(traitData.birthDate)
}]
}
Expand Down Expand Up @@ -2718,6 +2952,10 @@ async function updateMembersWithTraitsAndSkills (memberObj) {
}))
} catch (err) {
logError('Failed to update member with traits and skills', { ...context, error: err?.message })
if (isBigIntSerializationError(err)) {
logWarn('Skipping member update due to BigInt serialization error', context)
return false
}
throw err
}
}
Expand All @@ -2727,9 +2965,15 @@ async function updateMembersWithTraitsAndSkills (memberObj) {
await syncMemberSkills(memberObj.userId, memberObj.memberSkills, memberObj.handle)
} catch (err) {
logError('Failed to sync member skills', { ...context, error: err?.message })
if (isBigIntSerializationError(err)) {
logWarn('Skipping member skill sync due to BigInt serialization error', context)
return false
}
throw err
}
}

return true
}

async function syncMemberAddresses (tx, userId, addresses = []) {
Expand Down Expand Up @@ -3649,7 +3893,7 @@ async function runMigrationStep (step, dateFilter, askQuestion) {
migrationRuntimeState.dateFilter = dateFilter
destructiveApprovals.delete(step)

const shouldRunIntegrityCheck = ['1', '2', '3', '4', '5', '7'].includes(step)
const shouldRunIntegrityCheck = ['1', '2', '3', '4', '5', '7', '8'].includes(step)

try {
if (shouldRunIntegrityCheck) {
Expand Down Expand Up @@ -3777,6 +4021,11 @@ async function runMigrationStep (step, dateFilter, askQuestion) {
await updateMemberStatusFromElasticSearch(memberElasticsearchFilename, dateFilter)
break
}
case '8': {
const memberTraitFilename = 'MemberProfileTrait.json'
await importDynamoBasicInfoTraits(memberTraitFilename, dateFilter)
break
}
default:
throw new Error(`Unsupported step "${step}"`)
}
Expand Down Expand Up @@ -3813,6 +4062,7 @@ async function main () {
console.log('5. Import Dynamo MemberStatHistory')
console.log('6. Import Distribution Stats')
console.log('7. Update ElasticSearch Member Status')
console.log('8. Import Dynamo Basic Info Traits')
console.log('')
console.log('Destructive clears require --full-reset or ALLOW_DESTRUCTIVE=true and an explicit confirmation. Incremental runs retain existing data by default.')

Expand All @@ -3825,8 +4075,8 @@ async function main () {

let selectedStep = null
try {
selectedStep = (await askQuestion('Please select your step to run (0-7): ')).trim()
const validSteps = new Set(['0', '1', '2', '3', '4', '5', '6', '7'])
selectedStep = (await askQuestion('Please select your step to run (0-8): ')).trim()
const validSteps = new Set(['0', '1', '2', '3', '4', '5', '6', '7', '8'])
if (!validSteps.has(selectedStep)) {
console.log('Unsupported step selected. Script is finished.')
return
Expand All @@ -3840,7 +4090,7 @@ async function main () {
}

let dateFilter = null
if (['1', '2', '3', '4', '5', '7'].includes(selectedStep)) {
if (['1', '2', '3', '4', '5', '7', '8'].includes(selectedStep)) {
const dateFilterInput = (await askQuestion('Enter date filter (YYYY-MM-DD UTC; timestamp-less records will be skipped, or press Enter to skip): ')).trim()
if (dateFilterInput) {
const parsed = parseDateFilter(dateFilterInput)
Expand Down
Loading