diff --git a/.circleci/config.yml b/.circleci/config.yml index 0d7d4ee..1647bf6 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -67,6 +67,7 @@ workflows: branches: only: - develop + - pm-2539 # Production builds are exectuted only on tagged commits to the # master branch. diff --git a/.github/workflows/trivy.yaml b/.github/workflows/trivy.yaml new file mode 100644 index 0000000..7b9fa48 --- /dev/null +++ b/.github/workflows/trivy.yaml @@ -0,0 +1,34 @@ +name: Trivy Scanner + +permissions: + contents: read + security-events: write +on: + push: + branches: + - main + - dev + pull_request: +jobs: + trivy-scan: + name: Use Trivy + runs-on: ubuntu-24.04 + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Run Trivy scanner in repo mode + uses: aquasecurity/trivy-action@0.33.1 + with: + scan-type: "fs" + ignore-unfixed: true + format: "sarif" + output: "trivy-results.sarif" + severity: "CRITICAL,HIGH,UNKNOWN" + scanners: vuln,secret,misconfig,license + github-pat: ${{ secrets.GITHUB_TOKEN }} + + - name: Upload Trivy scan results to GitHub Security tab + uses: github/codeql-action/upload-sarif@v3 + with: + sarif_file: "trivy-results.sarif" diff --git a/config/default.js b/config/default.js index d4ea797..e3b872f 100644 --- a/config/default.js +++ b/config/default.js @@ -121,5 +121,6 @@ module.exports = { HASHING_KEYS: { USERFLOW: process.env.USERFLOW_PRIVATE_KEY - } + }, + MEMBER_SERVICE_PRISMA_TIMEOUT: process.env.MEMBER_SERVICE_PRISMA_TIMEOUT ? parseInt(process.env.MEMBER_SERVICE_PRISMA_TIMEOUT, 10) : 10000, } diff --git a/src/common/prisma.js b/src/common/prisma.js index 50f8fa1..ca758cc 100644 --- a/src/common/prisma.js +++ b/src/common/prisma.js @@ -4,8 +4,12 @@ const { Prisma } = require('../../prisma/generated/client') const { PrismaClient: SkillsPrismaClient } = require('../../prisma/generated/skills-client') +const config = require('config') const clientOptions = { + transactionOptions: { + timeout: config.MEMBER_SERVICE_PRISMA_TIMEOUT, + }, log: [ { level: 'query', emit: 'event' }, { level: 'info', emit: 'event' }, diff --git a/src/scripts/migrate-dynamo-data.js b/src/scripts/migrate-dynamo-data.js index f4b467c..24a80ad 100644 --- a/src/scripts/migrate-dynamo-data.js +++ b/src/scripts/migrate-dynamo-data.js @@ -1,7 +1,7 @@ const path = require('path') const fs = require('fs') const readline = require('readline') -const { concat, isArray, isBoolean, isEmpty, isEqual, isInteger, find, omit, pick, isNumber, forEach, map, uniqBy, isString, cloneDeep } = require('lodash') +const { concat, isArray, isBoolean, isEmpty, isEqual, isInteger, find, omit, pick, isNumber, forEach, map, uniqBy, isString, cloneDeep, flattenDeep } = require('lodash') const { v4: uuidv4 } = require('uuid') const config = require('./config') const prismaManager = require('../common/prisma') @@ -17,6 +17,12 @@ const TRANSACTION_RETRY_DELAY_MS = 1000 const DEFAULT_RATING_COLOR = '#EF3A3A' const DEFAULT_SRM_ID = 101 const DEFAULT_MARATHON_MATCH_ID = 102 +const DRY_RUN = process.env.MEMBER_MIGRATION_DRY_RUN === 'true' +const LOG_LEVELS = { + INFO: 'INFO', + WARN: 'WARN', + ERROR: 'ERROR' +} const MEMBER_FIELDS = ['userId', 'handle', 'handleLower', 'firstName', 'lastName', 'tracks', 'status', 'addresses', 'description', 'email', 'country', 'homeCountryCode', 'competitionCountryCode', 'photoURL', 'verified', 'maxRating', @@ -48,64 +54,644 @@ const DATE_ONLY_REGEX = /^\d{4}-\d{2}-\d{2}$/ const FALLBACK_RECORD_DATE_FIELDS = ['lastLoginDate', 'modified', 'modifiedAt', 'modified_on', 'modifiedOn', 'lastModified', 'lastModifiedAt', 'lastModifiedOn', 'timestamp', 'lastActivityDate'] const NULL_BYTE_REGEX = /\u0000/g +const SKILL_IMPORT_LOG_PATH = path.join(MIGRATE_DIR, 'skill-import.log') +let skillImportLogStream + +const CHILD_HISTORY_LOG_PATH = path.join(MIGRATE_DIR, 'child-history.ndjson') +let childHistoryLogStream + +const destructiveSteps = new Set(['0', '1', '2', '3', '5', '6']) +const destructiveFlagFromArgs = process.argv.slice(2).some(arg => arg === '--full-reset' || arg.startsWith('--full-reset=')) +const allowDestructiveClears = destructiveFlagFromArgs || process.env.ALLOW_DESTRUCTIVE === 'true' +const ALLOW_STALE_DELETIONS = process.env.ALLOW_STALE_DELETIONS === 'true' +const migrationRuntimeState = { + step: null, + dateFilter: null +} +const destructiveApprovals = new Map() + +function logWithLevel (level, message, context = null) { + const timestamp = new Date().toISOString() + const contextSuffix = context ? ` | ${JSON.stringify(context)}` : '' + const output = `[${level}] ${timestamp} ${message}${contextSuffix}` + if (level === LOG_LEVELS.ERROR) { + console.error(output) + } else if (level === LOG_LEVELS.WARN) { + console.warn(output) + } else { + console.log(output) + } +} + +function logInfo (message, context) { + logWithLevel(LOG_LEVELS.INFO, message, context) +} + +function logWarn (message, context) { + logWithLevel(LOG_LEVELS.WARN, message, context) +} + +function logError (message, context) { + logWithLevel(LOG_LEVELS.ERROR, message, context) +} + +async function executeWrite (description, operation, context = {}) { + if (DRY_RUN) { + logInfo(`DRY_RUN active, skipping write: ${description}`, context) + return null + } + try { + return await operation() + } catch (err) { + logError(`Failed to execute write: ${description}`, { ...context, error: err?.message }) + throw err + } +} + +function compactObject (obj) { + if (!obj) { + return {} + } + const result = {} + for (const [key, value] of Object.entries(obj)) { + if (value !== undefined) { + result[key] = value + } + } + return result +} + +function buildAddressKey (address) { + if (!address) { + return 'unknown' + } + const typePart = (address.type || 'unknown').trim().toLowerCase() + const streetPart = (address.streetAddr1 || '').trim().toLowerCase() + return `${typePart}::${streetPart}` +} + +function createTraitHash (item) { + if (!item) { + return '' + } + const ignoreKeys = new Set(['id', 'memberTraitId', 'createdAt', 'updatedAt', 'createdBy', 'updatedBy']) + const normalized = {} + Object.keys(item) + .filter(key => !ignoreKeys.has(key)) + .sort() + .forEach(key => { + normalized[key] = item[key] + }) + return JSON.stringify(normalized) +} + +const skillCaches = { + categoriesById: new Map(), + skillsById: new Map(), + skillLevelsById: new Map(), + skillLevelsByName: new Map(), + displayModesById: new Map(), + displayModesByName: new Map() +} + +function resetSkillCaches () { + skillCaches.categoriesById.clear() + skillCaches.skillsById.clear() + skillCaches.skillLevelsById.clear() + skillCaches.skillLevelsByName.clear() + skillCaches.displayModesById.clear() + skillCaches.displayModesByName.clear() +} + +function normalizeUserId (value) { + if (value === null || value === undefined) { + return null + } + + if (typeof value === 'number' && Number.isFinite(value)) { + return Math.trunc(value) + } + + if (typeof value === 'bigint') { + return Number(value) + } + + if (typeof value === 'string' && value.trim()) { + const parsed = Number(value) + if (Number.isFinite(parsed)) { + return Math.trunc(parsed) + } + } + + return null +} + +function appendSkillImportLog (message) { + if (!skillImportLogStream) { + const logDir = path.dirname(SKILL_IMPORT_LOG_PATH) + if (!fs.existsSync(logDir)) { + fs.mkdirSync(logDir, { recursive: true }) + } + skillImportLogStream = fs.createWriteStream(SKILL_IMPORT_LOG_PATH, { flags: 'a' }) + } + skillImportLogStream.write(`${new Date().toISOString()} ${message}\n`) +} + +function ensureChildHistoryStream () { + if (!childHistoryLogStream) { + const logDir = path.dirname(CHILD_HISTORY_LOG_PATH) + if (!fs.existsSync(logDir)) { + fs.mkdirSync(logDir, { recursive: true }) + } + childHistoryLogStream = fs.createWriteStream(CHILD_HISTORY_LOG_PATH, { flags: 'a' }) + } + return childHistoryLogStream +} + +function toSerializable (value) { + if (value === null || value === undefined) { + return value + } + if (typeof value === 'bigint') { + return value.toString() + } + if (value instanceof Date) { + return value.toISOString() + } + if (Array.isArray(value)) { + return value.map(entry => toSerializable(entry)) + } + if (typeof value === 'object') { + const normalized = {} + Object.keys(value).forEach((key) => { + normalized[key] = toSerializable(value[key]) + }) + return normalized + } + return value +} + +function appendChildHistoryLog (entry) { + const stream = ensureChildHistoryStream() + stream.write(`${JSON.stringify(toSerializable(entry))}\n`) +} + +function recordChildHistory (entityName, records, action, context = {}) { + if (!Array.isArray(records) || records.length === 0) { + return + } + const timestamp = new Date().toISOString() + const baseContext = { + ...context, + step: migrationRuntimeState.step || null, + dateFilter: migrationRuntimeState.dateFilter ? migrationRuntimeState.dateFilter.toISOString() : null + } + records.forEach((record) => { + appendChildHistoryLog({ + timestamp, + action, + entityName, + recordId: record?.id ?? null, + userId: record?.userId ?? baseContext.userId ?? null, + validFrom: record?.createdAt ? new Date(record.createdAt).toISOString() : null, + validTo: timestamp, + context: baseContext, + snapshot: toSerializable(record) + }) + }) +} + +async function handleStaleRecords (entityName, txModel, staleRecords, context = {}) { + if (!Array.isArray(staleRecords) || staleRecords.length === 0) { + return { action: 'none', count: 0 } + } + + const logContext = { + ...context, + entityName, + step: migrationRuntimeState.step || null, + dateFilter: migrationRuntimeState.dateFilter ? migrationRuntimeState.dateFilter.toISOString() : null, + staleIds: staleRecords.map(record => record.id) + } + + const initialAction = ALLOW_STALE_DELETIONS ? 'pending-delete' : 'retained' + recordChildHistory(entityName, staleRecords, initialAction, context) + + if (migrationRuntimeState.dateFilter) { + logInfo('Incremental migration run detected; retaining stale records for review', logContext) + return { action: 'skipped', count: staleRecords.length, reason: 'incremental-run' } + } + + if (!ALLOW_STALE_DELETIONS) { + logInfo('Destructive stale deletions disabled; logging retained records for review', logContext) + return { action: 'skipped', count: staleRecords.length, reason: 'configuration-disabled' } + } + + await executeWrite(`${entityName}.deleteMany`, () => txModel.deleteMany({ + where: { + id: { + in: staleRecords.map(record => record.id) + } + } + }), logContext) + + logInfo('Deleted stale records', logContext) + recordChildHistory(entityName, staleRecords, 'deleted', context) + return { action: 'deleted', count: staleRecords.length } +} + +function isIncrementalRunActive () { + return Boolean(migrationRuntimeState.dateFilter) +} + +function destructiveConfirmationToken (step) { + return `ERASE STEP ${step}` +} + +async function ensureDestructiveApproval ({ step, askQuestion, description }) { + if (destructiveApprovals.has(step)) { + return destructiveApprovals.get(step).confirmed === true + } + const token = destructiveConfirmationToken(step) + const answer = (await askQuestion(`Destructive action "${description}" will clear existing data. Type "${token}" to continue, or press Enter to cancel: `)).trim() + if (answer !== token) { + logWarn('Destructive confirmation not granted; skipping destructive operation', { step, description }) + destructiveApprovals.set(step, { confirmed: false }) + return false + } + destructiveApprovals.set(step, { confirmed: true }) + return true +} + +async function withDestructiveGuard ({ step, askQuestion, description }, operation) { + const context = { + step, + description, + dateFilter: migrationRuntimeState.dateFilter ? migrationRuntimeState.dateFilter.toISOString() : null + } + + if (!destructiveSteps.has(step)) { + await operation() + return true + } + + if (isIncrementalRunActive()) { + logInfo('Incremental run detected; destructive clear skipped', context) + return false + } + + if (!allowDestructiveClears) { + logWarn('Destructive clears disabled; rerun with --full-reset or ALLOW_DESTRUCTIVE=true to enable', context) + return false + } + + if (!askQuestion || typeof askQuestion !== 'function') { + throw new Error('Destructive operations require an interactive confirmation handler') + } + + const approved = await ensureDestructiveApproval({ step, askQuestion, description }) + if (!approved) { + return false + } + + await operation() + return true +} + +async function ensureSkillCategory (tx, category) { + if (!category || !category.id || !category.name) { + return null + } + + if (skillCaches.categoriesById.has(category.id)) { + return skillCaches.categoriesById.get(category.id) + } + + let existing = await tx.skillCategory.findUnique({ + where: { id: category.id } + }) + + if (!existing) { + existing = await tx.skillCategory.findFirst({ + where: { name: category.name } + }) + } + + if (!existing) { + existing = await tx.skillCategory.create({ + data: { + id: category.id, + name: category.name, + description: category.description || null + } + }) + } + + skillCaches.categoriesById.set(existing.id, existing) + return existing +} + +async function ensureSkillDefinition (tx, skill, categoryId) { + if (!skill || !skill.id || !skill.name) { + return null + } + + if (skillCaches.skillsById.has(skill.id)) { + return skillCaches.skillsById.get(skill.id) + } + + let existing = await tx.skill.findUnique({ + where: { id: skill.id } + }) + + if (!existing) { + existing = await tx.skill.findFirst({ + where: { name: skill.name } + }) + } + + if (!existing) { + const data = { + id: skill.id, + name: skill.name, + description: skill.description || null + } + + if (categoryId) { + data.categoryId = categoryId + } + + existing = await tx.skill.create({ data }) + } else if (categoryId && existing.categoryId !== categoryId) { + existing = await tx.skill.update({ + where: { id: existing.id }, + data: { categoryId } + }) + } + + skillCaches.skillsById.set(existing.id, existing) + return existing +} + +async function ensureSkillLevel (tx, level) { + if (!level || (!level.id && !level.name)) { + return null + } + + const idKey = level.id + const nameKey = level.name + + if (idKey && skillCaches.skillLevelsById.has(idKey)) { + return skillCaches.skillLevelsById.get(idKey) + } + + if (nameKey && skillCaches.skillLevelsByName.has(nameKey)) { + return skillCaches.skillLevelsByName.get(nameKey) + } + + let existing = null + + if (idKey) { + existing = await tx.userSkillLevel.findUnique({ + where: { id: idKey } + }) + } + + if (!existing && nameKey) { + existing = await tx.userSkillLevel.findFirst({ + where: { name: nameKey } + }) + } + + if (!existing) { + const data = { + name: nameKey || String(idKey), + description: level.description || null + } + + if (idKey) { + data.id = idKey + } + + existing = await tx.userSkillLevel.create({ data }) + } + + if (existing.id) { + skillCaches.skillLevelsById.set(existing.id, existing) + } + + if (existing.name) { + skillCaches.skillLevelsByName.set(existing.name, existing) + } + + return existing +} + +async function ensureSkillDisplayMode (tx, displayMode) { + const fallbackName = 'principal' + const payload = displayMode && (displayMode.id || displayMode.name) ? displayMode : { name: fallbackName } + const idKey = payload.id + const nameKey = payload.name || fallbackName + + if (idKey && skillCaches.displayModesById.has(idKey)) { + return skillCaches.displayModesById.get(idKey) + } + + if (nameKey && skillCaches.displayModesByName.has(nameKey)) { + return skillCaches.displayModesByName.get(nameKey) + } + + let existing = null + + if (idKey) { + existing = await tx.userSkillDisplayMode.findUnique({ + where: { id: idKey } + }) + } + + if (!existing && nameKey) { + existing = await tx.userSkillDisplayMode.findFirst({ + where: { name: nameKey } + }) + } + + if (!existing) { + const data = { + name: nameKey, + description: payload.description || null + } + + if (idKey) { + data.id = idKey + } + + existing = await tx.userSkillDisplayMode.create({ data }) + } + + if (existing.id) { + skillCaches.displayModesById.set(existing.id, existing) + } + + if (existing.name) { + skillCaches.displayModesByName.set(existing.name, existing) + } + + return existing +} + +async function syncMemberSkills (userId, memberSkills, handle = null) { + if (!Array.isArray(memberSkills) || memberSkills.length === 0) { + return + } + + const normalizedUserId = normalizeUserId(userId) + if (normalizedUserId === null) { + return + } + + await skillsPrisma.$transaction(async (skillsTx) => { + const existingUserSkills = await skillsTx.userSkill.findMany({ + where: { userId: normalizedUserId }, + select: { + id: true, + skillId: true, + userSkillLevelId: true, + userSkillDisplayModeId: true + } + }) + const existingMap = new Map() + existingUserSkills.forEach(record => { + const key = `${record.skillId}:${record.userSkillLevelId}` + existingMap.set(key, record) + }) + const processedSkillIds = new Set() + + for (const skill of memberSkills) { + if (!skill || !skill.id || !skill.name || !skill.category) { + continue + } + + const categoryRecord = await ensureSkillCategory(skillsTx, skill.category) + if (!categoryRecord) { + continue + } + + const skillRecord = await ensureSkillDefinition(skillsTx, skill, categoryRecord.id) + if (!skillRecord) { + continue + } + + const displayModeRecord = await ensureSkillDisplayMode(skillsTx, skill.displayMode) + if (!displayModeRecord) { + continue + } + + const levelCandidates = isArray(skill.levels) ? skill.levels : [] + const filteredLevels = levelCandidates.filter(level => level && (level.id || level.name)) + const uniqueLevels = uniqBy(filteredLevels, level => level.id || level.name) + + if (uniqueLevels.length === 0) { + continue + } + + let skillProcessed = false + for (const level of uniqueLevels) { + const levelRecord = await ensureSkillLevel(skillsTx, level) + if (!levelRecord) { + continue + } + + const compositeKey = `${skillRecord.id}:${levelRecord.id}` + const existingEntry = existingMap.get(compositeKey) + + if (existingEntry) { + if (existingEntry.userSkillDisplayModeId !== displayModeRecord.id) { + const updated = await skillsTx.userSkill.update({ + where: { id: existingEntry.id }, + data: { userSkillDisplayModeId: displayModeRecord.id } + }) + existingMap.set(compositeKey, updated) + } + } else { + const created = await skillsTx.userSkill.create({ + data: { + userId: normalizedUserId, + skillId: skillRecord.id, + userSkillLevelId: levelRecord.id, + userSkillDisplayModeId: displayModeRecord.id + } + }) + existingMap.set(compositeKey, created) + } + skillProcessed = true + } + if (skillProcessed) { + processedSkillIds.add(skillRecord.id) + } + } + + if (processedSkillIds.size > 0) { + const identifier = handle || `userId:${normalizedUserId}` + appendSkillImportLog(`Imported ${processedSkillIds.size} skills for user ${identifier}`) + } + }) +} + /** * Clear All DB. */ async function clearDB () { + const context = { step: '0', operation: 'full-reset' } console.log('Clearing address and financial data') - // delete address and financial data - await prisma.memberAddress.deleteMany() - await prisma.memberFinancial.deleteMany() - // delete stats + await executeWrite('memberAddress.deleteMany', () => prisma.memberAddress.deleteMany(), context) + await executeWrite('memberFinancial.deleteMany', () => prisma.memberFinancial.deleteMany(), context) + console.log('Clearing member stats data') - await prisma.memberCopilotStats.deleteMany() - await prisma.memberMarathonStats.deleteMany() - await prisma.memberDesignStatsItem.deleteMany() - await prisma.memberDesignStats.deleteMany() - await prisma.memberDevelopStatsItem.deleteMany() - await prisma.memberDevelopStats.deleteMany() - await prisma.memberSrmChallengeDetail.deleteMany() - await prisma.memberSrmDivisionDetail.deleteMany() - await prisma.memberSrmStats.deleteMany() - await prisma.memberStats.deleteMany() - await prisma.memberDataScienceStats.deleteMany() - // delete stats history + await executeWrite('memberCopilotStats.deleteMany', () => prisma.memberCopilotStats.deleteMany(), context) + await executeWrite('memberMarathonStats.deleteMany', () => prisma.memberMarathonStats.deleteMany(), context) + await executeWrite('memberDesignStatsItem.deleteMany', () => prisma.memberDesignStatsItem.deleteMany(), context) + await executeWrite('memberDesignStats.deleteMany', () => prisma.memberDesignStats.deleteMany(), context) + await executeWrite('memberDevelopStatsItem.deleteMany', () => prisma.memberDevelopStatsItem.deleteMany(), context) + await executeWrite('memberDevelopStats.deleteMany', () => prisma.memberDevelopStats.deleteMany(), context) + await executeWrite('memberSrmChallengeDetail.deleteMany', () => prisma.memberSrmChallengeDetail.deleteMany(), context) + await executeWrite('memberSrmDivisionDetail.deleteMany', () => prisma.memberSrmDivisionDetail.deleteMany(), context) + await executeWrite('memberSrmStats.deleteMany', () => prisma.memberSrmStats.deleteMany(), context) + await executeWrite('memberStats.deleteMany', () => prisma.memberStats.deleteMany(), context) + await executeWrite('memberDataScienceStats.deleteMany', () => prisma.memberDataScienceStats.deleteMany(), context) + console.log('Clearing member stats history data') - await prisma.memberDataScienceHistoryStats.deleteMany() - await prisma.memberDevelopHistoryStats.deleteMany() - await prisma.memberHistoryStats.deleteMany() - // delete traits + await executeWrite('memberDataScienceHistoryStats.deleteMany', () => prisma.memberDataScienceHistoryStats.deleteMany(), context) + await executeWrite('memberDevelopHistoryStats.deleteMany', () => prisma.memberDevelopHistoryStats.deleteMany(), context) + await executeWrite('memberHistoryStats.deleteMany', () => prisma.memberHistoryStats.deleteMany(), context) + console.log('Clearing member traits data') - await prisma.memberTraitBasicInfo.deleteMany() - await prisma.memberTraitCommunity.deleteMany() - await prisma.memberTraitDevice.deleteMany() - await prisma.memberTraitEducation.deleteMany() - await prisma.memberTraitLanguage.deleteMany() - await prisma.memberTraitOnboardChecklist.deleteMany() - await prisma.memberTraitPersonalization.deleteMany() - await prisma.memberTraitServiceProvider.deleteMany() - await prisma.memberTraitSoftware.deleteMany() - await prisma.memberTraitWork.deleteMany() - await prisma.memberTraits.deleteMany() - // delete member skills + await executeWrite('memberTraitBasicInfo.deleteMany', () => prisma.memberTraitBasicInfo.deleteMany(), context) + await executeWrite('memberTraitCommunity.deleteMany', () => prisma.memberTraitCommunity.deleteMany(), context) + await executeWrite('memberTraitDevice.deleteMany', () => prisma.memberTraitDevice.deleteMany(), context) + await executeWrite('memberTraitEducation.deleteMany', () => prisma.memberTraitEducation.deleteMany(), context) + await executeWrite('memberTraitLanguage.deleteMany', () => prisma.memberTraitLanguage.deleteMany(), context) + await executeWrite('memberTraitOnboardChecklist.deleteMany', () => prisma.memberTraitOnboardChecklist.deleteMany(), context) + await executeWrite('memberTraitPersonalization.deleteMany', () => prisma.memberTraitPersonalization.deleteMany(), context) + await executeWrite('memberTraitServiceProvider.deleteMany', () => prisma.memberTraitServiceProvider.deleteMany(), context) + await executeWrite('memberTraitSoftware.deleteMany', () => prisma.memberTraitSoftware.deleteMany(), context) + await executeWrite('memberTraitWork.deleteMany', () => prisma.memberTraitWork.deleteMany(), context) + await executeWrite('memberTraits.deleteMany', () => prisma.memberTraits.deleteMany(), context) + console.log('Clearing member skills data') - await prisma.memberSkillLevel.deleteMany() - await prisma.memberSkill.deleteMany() - // delete member - console.log('Clearing maxRating and member data') - await prisma.memberMaxRating.deleteMany() - await prisma.member.deleteMany() + await executeWrite('skills.userSkill.deleteMany', () => skillsPrisma.userSkill.deleteMany(), context) + console.log('Clearing skill reference data') + await executeWrite('skills.userSkillDisplayMode.deleteMany', () => skillsPrisma.userSkillDisplayMode.deleteMany(), context) + await executeWrite('skills.userSkillLevel.deleteMany', () => skillsPrisma.userSkillLevel.deleteMany(), context) + await executeWrite('skills.skill.deleteMany', () => skillsPrisma.skill.deleteMany(), context) + await executeWrite('skills.skillCategory.deleteMany', () => skillsPrisma.skillCategory.deleteMany(), context) + resetSkillCaches() - // delete skills - console.log('Clearing skill data') - await skillsPrisma.skillLevel.deleteMany() - await skillsPrisma.skill.deleteMany() - await skillsPrisma.skillCategory.deleteMany() - await prisma.displayMode.deleteMany() + console.log('Clearing maxRating and member data') + await executeWrite('memberMaxRating.deleteMany', () => prisma.memberMaxRating.deleteMany(), context) + await executeWrite('member.deleteMany', () => prisma.member.deleteMany(), context) - // delete distribution console.log('Clearing rating distribution data') - await prisma.distributionStats.deleteMany() + await executeWrite('distributionStats.deleteMany', () => prisma.distributionStats.deleteMany(), context) console.log('All done') } @@ -391,6 +977,175 @@ function sanitizeNullBytesDeep (target) { return removed } +function ensureDirectoryReadable (dirPath) { + const errors = [] + try { + const stats = fs.statSync(dirPath) + if (!stats.isDirectory()) { + errors.push('Path is not a directory') + } + } catch (err) { + errors.push(err.message) + } + return errors +} + +function validateMigrationConfiguration () { + const errors = [] + const warnings = [] + + if (!MIGRATE_DIR) { + errors.push('MIGRATE_DIR is not configured') + } else { + const dirErrors = ensureDirectoryReadable(MIGRATE_DIR) + if (dirErrors.length > 0) { + errors.push(`MIGRATE_DIR is not accessible: ${dirErrors.join('; ')}`) + } + } + + const requiredEnvVars = ['DATABASE_URL'] + requiredEnvVars.forEach((envVar) => { + if (!process.env[envVar]) { + warnings.push(`Environment variable ${envVar} is not set`) + } + }) + + const result = { + isValid: errors.length === 0, + errors, + warnings + } + + if (!result.isValid) { + logError('Migration configuration validation failed', { errors }) + } else { + logInfo('Migration configuration validated', { warnings }) + } + + return result +} + +async function getMemberIntegritySnapshot () { + const [memberCount, addressCount, traitCount] = await Promise.all([ + prisma.member.count(), + prisma.memberAddress.count(), + prisma.memberTraits.count() + ]) + + return { + memberCount, + addressCount, + traitCount, + capturedAt: new Date().toISOString() + } +} + +async function validateIncrementalMigrationData (sampleRecord = null, dateFilter = null) { + const warnings = [] + const errors = [] + + if (dateFilter && Number.isNaN(dateFilter.getTime())) { + errors.push('Date filter is invalid or not a real date') + } + + if (dateFilter && dateFilter > new Date()) { + warnings.push('Date filter is in the future; no records are expected to match') + } + + const requiredFields = ['userId', 'handle', 'handleLower', 'email'] + if (sampleRecord) { + requiredFields.forEach((field) => { + if (!sampleRecord[field]) { + errors.push(`Missing required field ${field} in source record`) + } + }) + } else { + warnings.push('No sample record provided for validation; required field checks were skipped') + } + + try { + await prisma.$queryRawUnsafe('SELECT 1') + } catch (err) { + errors.push(`Database connectivity check failed: ${err.message}`) + } + + const result = { + isValid: errors.length === 0, + warnings, + errors + } + + if (!result.isValid) { + logError('Incremental migration validation failed', { errors }) + } else { + logInfo('Incremental migration validation completed', { warnings }) + } + + return result +} + +async function verifyDataIntegrity (beforeSnapshot, afterSnapshot, context = {}) { + const issues = [] + if (beforeSnapshot && afterSnapshot) { + if (afterSnapshot.memberCount < beforeSnapshot.memberCount) { + issues.push('Member count decreased after migration') + } + if (afterSnapshot.addressCount < beforeSnapshot.addressCount) { + issues.push('Address count decreased after migration') + } + if (afterSnapshot.traitCount < beforeSnapshot.traitCount) { + issues.push('Trait count decreased after migration') + } + } + + let orphanAddresses = 0 + let orphanTraits = 0 + + try { + orphanAddresses = await prisma.memberAddress.count({ + where: { + member: { + is: null + } + } + }) + } catch (err) { + logWarn('Failed to detect orphan member addresses', { error: err.message }) + } + + try { + orphanTraits = await prisma.memberTraits.count({ + where: { + member: { + is: null + } + } + }) + } catch (err) { + logWarn('Failed to detect orphan member traits', { error: err.message }) + } + + const summary = { + beforeSnapshot, + afterSnapshot, + orphanAddresses, + orphanTraits, + context + } + + if (issues.length > 0 || orphanAddresses > 0 || orphanTraits > 0) { + logWarn('Data integrity verification detected potential issues', { ...summary, issues }) + } else { + logInfo('Data integrity verification passed', summary) + } + + return { + issues, + orphanAddresses, + orphanTraits + } +} + /** * Import the Dynamo members from file * @param {String} filename filename @@ -516,6 +1271,12 @@ async function importDynamoMember (filename, dateFilter = null) { await createMembers(batchItems) total += batchItems.length } + logInfo('Dynamo member import completed', { + filename, + processed: count, + inserted: total, + skipped + }) console.log(`\nIt has inserted ${total} items totally, skipped ${skipped} items`) console.log(`Finished reading the file: ${filename}\n`) @@ -1410,30 +2171,111 @@ async function importElasticSearchMember (filename, dateFilter = null) { process.stdout.write(`Migrate Progress: ${percentage}%, read ${count} items, skipped ${skipped}`) } - count += 1 - const dataItem = JSON.parse(line.trim()) + count += 1 + const dataItem = JSON.parse(line.trim()) + + if (!shouldProcessRecord(dataItem._source, dateFilter)) { + skipped += 1 + continue + } + + const dbItem = await prisma.member.findFirst({ + where: { + userId: dataItem._source.userId + }, + include: { + addresses: true + } + }) + + const dataObj = await fixMemberUpdateData(dataItem._source, dbItem || {}) + if (!dataObj) { + continue + } + + await updateMembersWithTraitsAndSkills(dataObj) + total += 1 + } + + console.log(`\nIt has updated ${total} items totally, skipped ${skipped} items`) + + console.log(`Finished reading the file: ${filename}\n`) +} + +/** + * Update member status values from ElasticSearch snapshot + * @param {String} filename filename + * @param {Date|null} [dateFilter=null] optional date filter threshold + */ +async function updateMemberStatusFromElasticSearch (filename, dateFilter = null) { + const memberElasticFilePath = path.join(MIGRATE_DIR, filename) + + const lineCount = await countFileLines(memberElasticFilePath) + console.log(`${filename} has ${lineCount} lines in total`) + + const rlRead = readline.createInterface({ + input: fs.createReadStream(memberElasticFilePath), + crlfDelay: Infinity // To recognize all instances of CR LF as a single line break + }) + + let currentLine = 0 + let count = 0 + let total = 0 + let skipped = 0 + let unchanged = 0 + for await (const line of rlRead) { + currentLine += 1 + if (currentLine % 10 === 0) { + const percentage = ((currentLine / lineCount) * 100).toFixed(2) + process.stdout.clearLine() + process.stdout.cursorTo(0) + process.stdout.write(`Migrate Progress: ${percentage}%, read ${count} items, skipped ${skipped}, unchanged ${unchanged}`) + } + + count += 1 + const parsedLine = JSON.parse(line.trim()) + const dataItem = parsedLine._source || parsedLine + + if (!shouldProcessRecord(dataItem, dateFilter)) { + skipped += 1 + continue + } + + const normalizedUserId = normalizeUserId(dataItem.userId) + const statusValue = dataItem.status + if (!normalizedUserId || !isString(statusValue)) { + skipped += 1 + continue + } - if (!shouldProcessRecord(dataItem._source, dateFilter)) { + let normalizedStatus = statusValue + if (normalizedStatus === 'INACTIVE') { + normalizedStatus = 'INACTIVE_USER_REQUEST' + } else if (!MEMBER_STATUS.includes(normalizedStatus)) { skipped += 1 continue } - const dbItem = await prisma.member.findFirst({ + const updateResult = await prisma.member.updateMany({ where: { - userId: dataItem._source.userId + userId: normalizedUserId, + NOT: { + status: normalizedStatus + } }, - include: { - addresses: true + data: { + status: normalizedStatus } }) - const dataObj = await fixMemberUpdateData(dataItem._source, dbItem || {}) - - await updateMembersWithTraitsAndSkills(dataObj) - total += 1 + if (updateResult.count > 0) { + total += updateResult.count + } else { + unchanged += 1 + } } - console.log(`\nIt has updated ${total} items totally, skipped ${skipped} items`) + console.log(`\nIt has updated ${total} items totally, skipped ${skipped} items, unchanged ${unchanged} items`) console.log(`Finished reading the file: ${filename}\n`) } @@ -1566,10 +2408,23 @@ async function fixMemberUpdateData (memberItem, dbItem) { }] } } else if (memberItem.traits.traitId === 'hobby') { - memberItemUpdate.memberTraits.hobby = [] + const hobbyValues = [] forEach(memberItem.traits.data, traitData => { - memberItemUpdate.memberTraits.hobby.push(traitData.hobby) + if (!traitData || traitData.hobby === undefined || traitData.hobby === null) { + return + } + const rawValues = isArray(traitData.hobby) ? flattenDeep(traitData.hobby) : [traitData.hobby] + forEach(rawValues, hobbyValue => { + if (isString(hobbyValue) && hobbyValue.trim()) { + hobbyValues.push(hobbyValue) + } else if (hobbyValue !== undefined && hobbyValue !== null) { + console.warn(`[WARN] Skipping invalid hobby value for user ${memberItem.userId}: ${JSON.stringify(hobbyValue)}`) + } + }) }) + if (!isEmpty(hobbyValues)) { + memberItemUpdate.memberTraits.hobby = hobbyValues + } } else if (memberItem.traits.traitId === 'subscription') { memberItemUpdate.memberTraits.subscription = [] forEach(memberItem.traits.data, traitData => { @@ -1700,239 +2555,254 @@ async function fixMemberUpdateData (memberItem, dbItem) { * @param {Object} memberObj member item */ async function updateMembersWithTraitsAndSkills (memberObj) { - if (!isEmpty(omit(memberObj, ['userId', 'createdAt', 'createdBy', 'updatedAt', 'updatedBy', 'lastLoginDate']))) { - await prisma.$transaction(async (tx) => { - const onlyMemberObj = omit(memberObj, ['userId', 'createdAt', 'createdBy', 'updatedAt', 'updatedBy', 'lastLoginDate', 'skills', 'maxRating', 'addresses', 'memberTraits', 'memberSkills']) - - if (onlyMemberObj.status === 'INACTIVE') { - onlyMemberObj.status = 'INACTIVE_USER_REQUEST' - } else if (!MEMBER_STATUS.find(status => status === onlyMemberObj.status)) { - onlyMemberObj.status = 'UNKNOWN' - } - - if (!isEmpty(onlyMemberObj)) { - await tx.member.update({ - where: { - userId: memberObj.userId - }, - data: onlyMemberObj - }) - } + const hasUpdatableFields = !isEmpty(omit(memberObj, ['userId', 'createdAt', 'createdBy', 'updatedAt', 'updatedBy', 'lastLoginDate'])) + const context = { userId: memberObj.userId } - if (memberObj.maxRating && memberObj.maxRating.rating && memberObj.maxRating.track && memberObj.maxRating.subTrack) { - const existMaxRating = await tx.memberMaxRating.findFirst({ - where: { userId: memberObj.userId } - }) + if (hasUpdatableFields) { + try { + await executeWithTransactionRetry(() => prisma.$transaction(async (tx) => { + const onlyMemberObj = omit(memberObj, ['userId', 'createdAt', 'createdBy', 'updatedAt', 'updatedBy', 'lastLoginDate', 'skills', 'maxRating', 'addresses', 'memberTraits', 'memberSkills']) - if (existMaxRating) { - // update current maxRating - await tx.memberMaxRating.update({ - where: { - id: existMaxRating.id - }, - data: { - ...memberObj.maxRating, - userId: memberObj.userId, - updatedBy: CREATED_BY - } - }) - } else { - // create new maxRating - await tx.memberMaxRating.create({ - data: { - ...memberObj.maxRating, - ratingColor: memberObj.maxRating.ratingColor || DEFAULT_RATING_COLOR, - userId: memberObj.userId, - createdBy: CREATED_BY - } - }) + if (onlyMemberObj.status === 'INACTIVE') { + onlyMemberObj.status = 'INACTIVE_USER_REQUEST' + } else if (!MEMBER_STATUS.find(status => status === onlyMemberObj.status)) { + onlyMemberObj.status = 'UNKNOWN' } - } - if (memberObj.addresses && memberObj.addresses.length > 0) { - // clear current addresses - await tx.memberAddress.deleteMany({ - where: { userId: memberObj.userId } - }) - // create new addresses - await tx.memberAddress.createMany({ - data: map(memberObj.addresses, t => ({ - ...t, + const memberUpdateData = compactObject(onlyMemberObj) + + if (!isEmpty(memberUpdateData) || (memberObj.userId && memberObj.handle && memberObj.handleLower && memberObj.email)) { + const createDefaults = compactObject({ userId: memberObj.userId, - createdBy: CREATED_BY - })) - }) - } + handle: memberObj.handle, + handleLower: memberObj.handleLower, + email: memberObj.email, + status: memberUpdateData.status || memberObj.status || 'UNKNOWN', + firstName: memberObj.firstName, + lastName: memberObj.lastName, + country: memberObj.country, + homeCountryCode: memberObj.homeCountryCode, + competitionCountryCode: memberObj.competitionCountryCode, + photoURL: memberObj.photoURL, + tracks: memberObj.tracks, + availableForGigs: memberObj.availableForGigs, + namesAndHandleAppearance: memberObj.namesAndHandleAppearance, + createdBy: memberObj.createdBy || CREATED_BY, + updatedBy: memberObj.updatedBy || memberUpdateData.updatedBy || CREATED_BY + }) - if (memberObj.memberTraits) { - let memberTraitsDB = await tx.memberTraits.findFirst({ - where: { - userId: memberObj.userId - } - }) + const requiredForCreate = ['userId', 'handle', 'handleLower', 'email'] + const missingRequiredForCreate = requiredForCreate.filter(field => !createDefaults[field]) - if (!memberTraitsDB) { - memberTraitsDB = await tx.memberTraits.create({ - data: { - userId: memberObj.userId, - createdBy: CREATED_BY + if (missingRequiredForCreate.length > 0) { + logWarn('Skipping member upsert due to missing required fields', { ...context, missingFields: missingRequiredForCreate }) + if (!isEmpty(memberUpdateData)) { + await executeWrite('member.update', () => tx.member.update({ + where: { userId: memberObj.userId }, + data: memberUpdateData + }), context) } - }) - } - - if ((memberObj.memberTraits.subscriptions && memberObj.memberTraits.subscriptions.length > 0) || (memberObj.memberTraits.hobby && memberObj.memberTraits.hobby.length > 0)) { - const toUpdateObj = {} - if (memberObj.memberTraits.subscriptions && memberObj.memberTraits.subscriptions.length > 0) { - toUpdateObj.subscriptions = memberObj.memberTraits.subscriptions - } - if (memberObj.memberTraits.hobby && memberObj.memberTraits.hobby.length > 0) { - toUpdateObj.hobby = memberObj.memberTraits.hobby + } else { + const createData = { ...createDefaults, ...memberUpdateData } + await executeWrite('member.upsert', () => tx.member.upsert({ + where: { userId: memberObj.userId }, + update: memberUpdateData, + create: createData + }), context) } - await tx.memberTraits.update({ - where: { - id: memberTraitsDB.id - }, - data: toUpdateObj - }) } - await updateTraitElement(memberObj.memberTraits.device, tx.memberTraitDevice, memberTraitsDB.id, CREATED_BY) - await updateTraitElement(memberObj.memberTraits.software, tx.memberTraitSoftware, memberTraitsDB.id, CREATED_BY) - await updateTraitElement(memberObj.memberTraits.serviceProvider, tx.memberTraitServiceProvider, memberTraitsDB.id, CREATED_BY) - await updateTraitElement(memberObj.memberTraits.work, tx.memberTraitWork, memberTraitsDB.id, CREATED_BY) - await updateTraitElement(memberObj.memberTraits.education, tx.memberTraitEducation, memberTraitsDB.id, CREATED_BY) - await updateTraitElement(memberObj.memberTraits.basicInfo, tx.memberTraitBasicInfo, memberTraitsDB.id, CREATED_BY) - await updateTraitElement(memberObj.memberTraits.language, tx.memberTraitLanguage, memberTraitsDB.id, CREATED_BY) - await updateTraitElement(memberObj.memberTraits.checklist, tx.memberTraitOnboardChecklist, memberTraitsDB.id, CREATED_BY) - await updateTraitElement(memberObj.memberTraits.personalization, tx.memberTraitPersonalization, memberTraitsDB.id, CREATED_BY) - await updateTraitElement(memberObj.memberTraits.community, tx.memberTraitCommunity, memberTraitsDB.id, CREATED_BY) - } + if (memberObj.maxRating && memberObj.maxRating.rating && memberObj.maxRating.track && memberObj.maxRating.subTrack) { + const existingMaxRating = await tx.memberMaxRating.findFirst({ + where: { userId: memberObj.userId } + }) - if (memberObj.memberSkills) { - await tx.memberSkill.deleteMany({ - where: { + const maxRatingData = compactObject({ + ...memberObj.maxRating, + ratingColor: memberObj.maxRating.ratingColor || DEFAULT_RATING_COLOR, userId: memberObj.userId - } - }) + }) - let allCategories = await tx.skillCategory.findMany() - let allDisplayModes = await tx.displayMode.findMany() - let allSkillLevels = await tx.skillLevel.findMany() - let allSkills = await tx.skill.findMany() - let newCategories = [] - let newDisplayModes = [] - let newSkillLevels = [] - let newSkills = [] - memberObj.memberSkills.forEach(elemItem => { - if (elemItem.category && !find(allCategories, category => category.id === elemItem.category.id)) { - newCategories.push({ - ...elemItem.category, - createdBy: CREATED_BY - }) - } - if (elemItem.displayMode && !find(allDisplayModes, displayMode => displayMode.id === elemItem.displayMode.id)) { - newDisplayModes.push({ - ...elemItem.displayMode, - createdBy: CREATED_BY - }) - } - if (elemItem.levels && elemItem.levels.length > 0) { - elemItem.levels.forEach(levelItem => { - if (!find(allSkillLevels, level => levelItem.id === level.id)) { - newSkillLevels.push({ - ...levelItem, - createdBy: CREATED_BY - }) + if (existingMaxRating) { + const diff = {} + MAX_RATING_FIELDS.forEach((field) => { + if (maxRatingData[field] !== undefined && !isEqual(existingMaxRating[field], maxRatingData[field])) { + diff[field] = maxRatingData[field] } }) + if (!isEmpty(diff)) { + diff.updatedBy = CREATED_BY + await executeWrite('memberMaxRating.update', () => tx.memberMaxRating.update({ + where: { id: existingMaxRating.id }, + data: diff + }), context) + } + } else { + await executeWrite('memberMaxRating.create', () => tx.memberMaxRating.create({ + data: { + ...maxRatingData, + createdBy: memberObj.maxRating.createdBy || CREATED_BY + } + }), context) } - if (!find(allSkills, skill => skill.id === elemItem.id)) { - newSkills.push({ - id: elemItem.id, - name: elemItem.name, - categoryId: elemItem.category.id, - createdBy: CREATED_BY - }) - } - }) - - newCategories = uniqBy(newCategories, elemItem => elemItem.id) - newDisplayModes = uniqBy(newDisplayModes, elemItem => elemItem.id) - newSkillLevels = uniqBy(newSkillLevels, elemItem => elemItem.id) - newSkills = uniqBy(newSkills, elemItem => elemItem.id) - if (newCategories.length > 0) { - await tx.skillCategory.createMany({ - data: newCategories - }) - } - if (newDisplayModes.length > 0) { - await tx.displayMode.createMany({ - data: newDisplayModes - }) } - if (newSkillLevels.length > 0) { - await tx.skillLevel.createMany({ - data: newSkillLevels - }) + + if (memberObj.addresses && memberObj.addresses.length > 0) { + await syncMemberAddresses(tx, memberObj.userId, memberObj.addresses) } - if (newSkills.length > 0) { - await tx.skill.createMany({ - data: newSkills + + if (memberObj.memberTraits) { + let memberTraitsDB = await tx.memberTraits.findFirst({ + where: { + userId: memberObj.userId + } }) - } - const toCreateArr = memberObj.memberSkills.map(elemItem => { - const skillObj = { - id: uuidv4(), - skill: { - connect: { - id: elemItem.id - } - }, - member: { - connect: { - userId: memberObj.userId - } - }, - createdBy: CREATED_BY - } - if (elemItem.levels) { - skillObj.levels = { - create: elemItem.levels.map(level => ({ - skillLevel: { - connect: { - id: level.id - } - }, + if (!memberTraitsDB) { + const createdMemberTraits = await executeWrite('memberTraits.create', () => tx.memberTraits.create({ + data: { + userId: memberObj.userId, createdBy: CREATED_BY - })) - } - } - if (elemItem.displayMode) { - skillObj.displayMode = { - connect: { - id: elemItem.displayMode.id } + }), context) + memberTraitsDB = createdMemberTraits || memberTraitsDB + if (!memberTraitsDB) { + memberTraitsDB = { id: null } } } - return skillObj - }) - for (const skillObj of toCreateArr) { - const exist = await tx.memberSkill.findFirst({ - where: { - userId: memberObj.userId, - skillId: skillObj.skill.connect.id + if (memberTraitsDB) { + if ((memberObj.memberTraits.subscriptions && memberObj.memberTraits.subscriptions.length > 0) || (memberObj.memberTraits.hobby && memberObj.memberTraits.hobby.length > 0)) { + const toUpdateObj = {} + if (memberObj.memberTraits.subscriptions && memberObj.memberTraits.subscriptions.length > 0) { + toUpdateObj.subscriptions = memberObj.memberTraits.subscriptions + } + if (memberObj.memberTraits.hobby && memberObj.memberTraits.hobby.length > 0) { + const sanitizedHobbies = [] + const hobbySource = isArray(memberObj.memberTraits.hobby) ? flattenDeep(memberObj.memberTraits.hobby) : [memberObj.memberTraits.hobby] + forEach(hobbySource, hobbyValue => { + if (isString(hobbyValue) && hobbyValue.trim()) { + sanitizedHobbies.push(hobbyValue) + } + }) + if (isEmpty(sanitizedHobbies)) { + logWarn('Skipping hobby update due to invalid values', context) + } else { + toUpdateObj.hobby = sanitizedHobbies + } + } + if (!isEmpty(toUpdateObj)) { + await executeWrite('memberTraits.update', () => tx.memberTraits.update({ + where: { + id: memberTraitsDB.id + }, + data: toUpdateObj + }), context) + } } - }) - if (!exist) { - await tx.memberSkill.create({ - data: skillObj - }) + + await updateTraitElement(memberObj.memberTraits.device, tx.memberTraitDevice, memberTraitsDB.id, CREATED_BY, 'memberTraitDevice', context) + await updateTraitElement(memberObj.memberTraits.software, tx.memberTraitSoftware, memberTraitsDB.id, CREATED_BY, 'memberTraitSoftware', context) + await updateTraitElement(memberObj.memberTraits.serviceProvider, tx.memberTraitServiceProvider, memberTraitsDB.id, CREATED_BY, 'memberTraitServiceProvider', context) + await updateTraitElement(memberObj.memberTraits.work, tx.memberTraitWork, memberTraitsDB.id, CREATED_BY, 'memberTraitWork', context) + await updateTraitElement(memberObj.memberTraits.education, tx.memberTraitEducation, memberTraitsDB.id, CREATED_BY, 'memberTraitEducation', context) + await updateTraitElement(memberObj.memberTraits.basicInfo, tx.memberTraitBasicInfo, memberTraitsDB.id, CREATED_BY, 'memberTraitBasicInfo', context) + await updateTraitElement(memberObj.memberTraits.language, tx.memberTraitLanguage, memberTraitsDB.id, CREATED_BY, 'memberTraitLanguage', context) + await updateTraitElement(memberObj.memberTraits.checklist, tx.memberTraitOnboardChecklist, memberTraitsDB.id, CREATED_BY, 'memberTraitOnboardChecklist', context) + await updateTraitElement(memberObj.memberTraits.personalization, tx.memberTraitPersonalization, memberTraitsDB.id, CREATED_BY, 'memberTraitPersonalization', context) + await updateTraitElement(memberObj.memberTraits.community, tx.memberTraitCommunity, memberTraitsDB.id, CREATED_BY, 'memberTraitCommunity', context) } } - } + }, { + timeout: TRANSACTION_TIMEOUT_MS + })) + } catch (err) { + logError('Failed to update member with traits and skills', { ...context, error: err?.message }) + throw err + } + } + + if (memberObj.memberSkills && memberObj.memberSkills.length > 0) { + try { + await syncMemberSkills(memberObj.userId, memberObj.memberSkills, memberObj.handle) + } catch (err) { + logError('Failed to sync member skills', { ...context, error: err?.message }) + throw err + } + } +} + +async function syncMemberAddresses (tx, userId, addresses = []) { + if (!Array.isArray(addresses) || addresses.length === 0) { + logInfo('No addresses provided for synchronization', { userId }) + return + } + + const existingAddresses = await tx.memberAddress.findMany({ + where: { userId } + }) + + const processedIds = new Set() + const normalizedIncoming = addresses.map((address) => { + const clonedAddress = cloneDeep(address || {}) + sanitizeNullBytesDeep(clonedAddress) + const normalized = normalizeAddressFieldStrings(clonedAddress) + return { + ...normalized, + userId, + createdBy: normalized.createdBy || CREATED_BY + } + }) + + for (const incoming of normalizedIncoming) { + const addressKey = buildAddressKey(incoming) + const candidateById = incoming.id ? existingAddresses.find(existing => `${existing.id}` === `${incoming.id}`) : null + let matchingAddress = candidateById || existingAddresses.find(existing => buildAddressKey(existing) === addressKey) + + const updatePayload = compactObject({ + streetAddr1: incoming.streetAddr1, + streetAddr2: incoming.streetAddr2, + city: incoming.city, + zip: incoming.zip ? `${incoming.zip}` : incoming.zip, + stateCode: incoming.stateCode, + type: incoming.type, + updatedBy: CREATED_BY }) + + if (matchingAddress) { + processedIds.add(matchingAddress.id) + const diff = {} + ADDRESS_FIELDS.forEach((field) => { + const incomingValue = updatePayload[field] ?? null + const existingValue = matchingAddress[field] ?? null + if (!isEqual(existingValue, incomingValue)) { + diff[field] = incomingValue + } + }) + + if (!isEmpty(diff)) { + diff.updatedBy = CREATED_BY + await executeWrite('memberAddress.update', () => tx.memberAddress.update({ + where: { id: matchingAddress.id }, + data: diff + }), { userId, addressKey }) + } + } else { + const createPayload = compactObject({ + ...updatePayload, + userId, + createdBy: incoming.createdBy || CREATED_BY + }) + + await executeWrite('memberAddress.create', () => tx.memberAddress.create({ + data: createPayload + }), { userId, addressKey }) + } + } + + const addressesToRemove = existingAddresses.filter(address => !processedIds.has(address.id)) + if (addressesToRemove.length > 0) { + await handleStaleRecords('memberAddress', tx.memberAddress, addressesToRemove, { userId }) } } @@ -1942,34 +2812,91 @@ async function updateMembersWithTraitsAndSkills (memberObj) { * @param {Object} txObject prisma tx instance * @param {String} memberTraitId the memberTraitId * @param {String} createdBy the createdBy + * @param {String} entityName name for logging context + * @param {Object} context parent context */ -async function updateTraitElement (objArr, txObject, memberTraitId, createdBy) { - if (objArr && objArr.length > 0) { - const toUpdateArr = objArr.map(elemItem => { - const traitItem = { ...(elemItem || {}) } +async function updateTraitElement (objArr, txObject, memberTraitId, createdBy, entityName = 'traitElement', context = {}) { + if (!objArr || objArr.length === 0) { + return + } - if (Object.prototype.hasOwnProperty.call(traitItem, 'industry')) { - const { industry } = traitItem - if (!WORK_INDUSTRY_TYPES.includes(industry)) { - traitItem.industry = null - } - } + const effectiveContext = { ...context, memberTraitId, entityName } - return { - ...traitItem, - memberTraitId, - createdBy - } - }) + if (DRY_RUN) { + logInfo('DRY_RUN active; trait element synchronization skipped', effectiveContext) + return + } - await txObject.deleteMany({ - where: { - memberTraitId + const existingElements = await txObject.findMany({ + where: { memberTraitId } + }) + + const processedIds = new Set() + + for (const elemItem of objArr) { + const traitItem = cloneDeep(elemItem || {}) + sanitizeNullBytesDeep(traitItem) + + if (Object.prototype.hasOwnProperty.call(traitItem, 'industry')) { + const { industry } = traitItem + if (!WORK_INDUSTRY_TYPES.includes(industry)) { + traitItem.industry = null } + } + + const preparedItem = { + ...traitItem, + memberTraitId, + createdBy: traitItem.createdBy || createdBy + } + + const traitHash = createTraitHash(preparedItem) + let matchingElement = null + + if (preparedItem.id) { + matchingElement = existingElements.find(record => `${record.id}` === `${preparedItem.id}`) + } + if (!matchingElement) { + matchingElement = existingElements.find(record => createTraitHash(record) === traitHash) + } + + const upsertPayload = compactObject({ + ...preparedItem }) - await txObject.createMany({ - data: toUpdateArr - }) + delete upsertPayload.id + delete upsertPayload.createdAt + delete upsertPayload.updatedAt + + if (matchingElement) { + processedIds.add(matchingElement.id) + const diff = {} + Object.keys(upsertPayload).forEach((key) => { + if (key === 'memberTraitId' || key === 'createdBy') { + return + } + const incomingValue = upsertPayload[key] + const existingValue = matchingElement[key] + if (!isEqual(existingValue, incomingValue)) { + diff[key] = incomingValue + } + }) + if (!isEmpty(diff)) { + diff.updatedBy = createdBy + await executeWrite(`${entityName}.update`, () => txObject.update({ + where: { id: matchingElement.id }, + data: diff + }), effectiveContext) + } + } else { + await executeWrite(`${entityName}.create`, () => txObject.create({ + data: upsertPayload + }), effectiveContext) + } + } + + const staleElements = existingElements.filter(record => !processedIds.has(record.id)) + if (staleElements.length > 0) { + await handleStaleRecords(entityName, txObject, staleElements, effectiveContext) } } @@ -2282,7 +3209,7 @@ async function updateOrCreateModel (itemData, existingData, txModel, parentId, o * @param {Object} parentId the parent Id object * @param {String} operatorId the operator Id */ -async function updateArrayItems (updateItems, existingItems, txModel, parentId, operatorId) { +async function updateArrayItems (updateItems, existingItems, txModel, parentId, operatorId, entityName) { const toUpdate = [] const toCreate = [] if (updateItems.length === 0) { @@ -2327,13 +3254,10 @@ async function updateArrayItems (updateItems, existingItems, txModel, parentId, })) }) - await txModel.deleteMany({ - where: { - id: { - in: toDeleteIds - } - } - }) + const staleRecords = existingItems.filter(item => toDeleteIds.includes(item.id)) + if (staleRecords.length > 0) { + await handleStaleRecords(entityName || 'memberStatsItems', txModel, staleRecords, { ...parentId, operatorId }) + } } /** @@ -2345,7 +3269,7 @@ async function updateArrayItems (updateItems, existingItems, txModel, parentId, * @param {Object} parentId the parent Id object * @param {String} operatorId the operator Id */ -async function updateArrayDivisionItems (updateD1Items, updateD2Items, existingItems, txModel, parentId, operatorId) { +async function updateArrayDivisionItems (updateD1Items, updateD2Items, existingItems, txModel, parentId, operatorId, entityName) { const toUpdate = [] const toCreate = [] if ((!updateD1Items || updateD1Items.length === 0) && (!updateD2Items || updateD2Items.length === 0)) { @@ -2407,13 +3331,10 @@ async function updateArrayDivisionItems (updateD1Items, updateD2Items, existingI })) }) - await txModel.deleteMany({ - where: { - id: { - in: toDeleteIds - } - } - }) + const staleRecords = existingItems.filter(item => toDeleteIds.includes(item.id)) + if (staleRecords.length > 0) { + await handleStaleRecords(entityName || 'memberSrmDivisionDetail', txModel, staleRecords, { ...parentId, operatorId }) + } } /** @@ -2424,7 +3345,7 @@ async function updateArrayDivisionItems (updateD1Items, updateD2Items, existingI * @param {Object} parentId the parent Id object * @param {String} operatorId the operator Id */ -async function updateArrayLevelItems (updateItems, existingItems, txModel, parentId, operatorId) { +async function updateArrayLevelItems (updateItems, existingItems, txModel, parentId, operatorId, entityName) { const toUpdate = [] const toCreate = [] if (updateItems.length === 0) { @@ -2469,13 +3390,10 @@ async function updateArrayLevelItems (updateItems, existingItems, txModel, paren })) }) - await txModel.deleteMany({ - where: { - id: { - in: toDeleteIds - } - } - }) + const staleRecords = existingItems.filter(item => toDeleteIds.includes(item.id)) + if (staleRecords.length > 0) { + await handleStaleRecords(entityName || 'memberSrmChallengeDetail', txModel, staleRecords, { ...parentId, operatorId }) + } } /** @@ -2544,7 +3462,7 @@ async function updateMemberStat (data, member, operatorId) { const developStatsId = memberStatDB.develop.id const existingItems = memberStatDB.develop.items || [] - await updateArrayItems(data.develop.items, existingItems, tx.memberDevelopStatsItem, { developStatsId }, operatorId) + await updateArrayItems(data.develop.items, existingItems, tx.memberDevelopStatsItem, { developStatsId }, operatorId, 'memberDevelopStatsItem') } } @@ -2561,7 +3479,7 @@ async function updateMemberStat (data, member, operatorId) { const designStatsId = memberStatDB.design.id const existingItems = memberStatDB.design.items || [] - await updateArrayItems(data.design.items, existingItems, tx.memberDesignStatsItem, { designStatsId }, operatorId) + await updateArrayItems(data.design.items, existingItems, tx.memberDesignStatsItem, { designStatsId }, operatorId, 'memberDesignStatsItem') } } @@ -2585,13 +3503,13 @@ async function updateMemberStat (data, member, operatorId) { if (data.dataScience.srm.challengeDetails) { const existingItems = memberStatDB.dataScience.srm.challengeDetails || [] - await updateArrayLevelItems(data.dataScience.srm.challengeDetails, existingItems, tx.memberSrmChallengeDetail, { srmStatsId }, operatorId) + await updateArrayLevelItems(data.dataScience.srm.challengeDetails, existingItems, tx.memberSrmChallengeDetail, { srmStatsId }, operatorId, 'memberSrmChallengeDetail') } if (data.dataScience.srm.division1 || data.dataScience.srm.division2) { const existingItems = memberStatDB.dataScience.srm.divisions || [] - await updateArrayDivisionItems(data.dataScience.srm.division1, data.dataScience.srm.division2, existingItems, tx.memberSrmDivisionDetail, { srmStatsId }, operatorId) + await updateArrayDivisionItems(data.dataScience.srm.division1, data.dataScience.srm.division2, existingItems, tx.memberSrmDivisionDetail, { srmStatsId }, operatorId, 'memberSrmDivisionDetail') } } @@ -2660,6 +3578,7 @@ async function importDistributionStats () { ratingRange3800To3899: 0, ratingRange3900To3999: 0 } + const maxDistributionBucket = 39 while (current <= total) { const records = await tx.memberMaxRating.findMany({ @@ -2679,9 +3598,10 @@ async function importDistributionStats () { distributionValue = cloneDeep(distributionStat) } - const idxVal = Math.floor(record.rating / 100) + const idxValRaw = Math.floor(record.rating / 100) + const idxVal = Math.max(0, Math.min(idxValRaw, maxDistributionBucket)) const ratingKey = idxVal === 0 ? 'ratingRange0To099' : `ratingRange${idxVal}00To${idxVal}99` - distributionValue[ratingKey] += 1 + distributionValue[ratingKey] = (distributionValue[ratingKey] ?? 0) + 1 uniqueMap.set(mapKey, distributionValue) }) @@ -2689,27 +3609,197 @@ async function importDistributionStats () { } if (uniqueMap.size > 0) { - const dataArray = [] - uniqueMap.forEach((value, key) => { - const tracks = key.split('-') - const data = { + for (const [key, value] of uniqueMap.entries()) { + const [track, subTrack] = key.split('-') + const createData = { ...value, - track: tracks[0], - subTrack: tracks[1], + track, + subTrack, createdBy: CREATED_BY } - dataArray.push(data) - }) - await tx.distributionStats.createMany({ - data: dataArray - }) + const updateData = { + ...value, + updatedBy: CREATED_BY + } + + await executeWrite('distributionStats.upsert', () => tx.distributionStats.upsert({ + where: { + track_subTrack: { + track, + subTrack + } + }, + create: createData, + update: updateData + }), { + track, + subTrack, + step: migrationRuntimeState.step + }) + } } console.log(`Finished counted ${uniqueMap.size} distributionStats records\n`) }) } +async function runMigrationStep (step, dateFilter, askQuestion) { + const logContext = { step, dateFilter: dateFilter ? dateFilter.toISOString() : null } + migrationRuntimeState.step = step + migrationRuntimeState.dateFilter = dateFilter + destructiveApprovals.delete(step) + + const shouldRunIntegrityCheck = ['1', '2', '3', '4', '5', '7'].includes(step) + + try { + if (shouldRunIntegrityCheck) { + const validationResult = await validateIncrementalMigrationData(null, dateFilter) + if (!validationResult.isValid) { + throw new Error('Incremental migration validation failed. Resolve the reported issues before re-running.') + } + } + + let beforeSnapshot = null + if (shouldRunIntegrityCheck) { + beforeSnapshot = await getMemberIntegritySnapshot() + } + + logInfo('Starting migration step', logContext) + + switch (step) { + case '0': { + const executed = await withDestructiveGuard({ step, askQuestion, description: 'Full database reset (step 0)' }, async () => { + console.log('Clearing all DB data...') + await clearDB() + }) + if (!executed) { + logInfo('Full reset skipped; existing data retained', logContext) + } + break + } + case '1': { + const executed = await withDestructiveGuard({ step, askQuestion, description: 'Member profile cleanup (step 1)' }, async () => { + console.log('Clearing member data...') + const cleanupContext = { ...logContext, operation: 'pre-import-clear' } + await executeWrite('memberAddress.deleteMany', () => prisma.memberAddress.deleteMany(), cleanupContext) + await executeWrite('memberMaxRating.deleteMany', () => prisma.memberMaxRating.deleteMany(), cleanupContext) + await executeWrite('member.deleteMany', () => prisma.member.deleteMany(), cleanupContext) + }) + if (!executed) { + logInfo('Retaining existing member records for step 1 import', logContext) + } + const memberDynamoFilename = 'MemberProfile.json' + await importDynamoMember(memberDynamoFilename, dateFilter) + break + } + case '2': { + const executed = await withDestructiveGuard({ step, askQuestion, description: 'Member trait and skill cleanup (step 2)' }, async () => { + console.log('Clearing member trait and skill data...') + const cleanupContext = { ...logContext, operation: 'pre-import-clear' } + await executeWrite('memberTraitBasicInfo.deleteMany', () => prisma.memberTraitBasicInfo.deleteMany(), cleanupContext) + await executeWrite('memberTraitCommunity.deleteMany', () => prisma.memberTraitCommunity.deleteMany(), cleanupContext) + await executeWrite('memberTraitDevice.deleteMany', () => prisma.memberTraitDevice.deleteMany(), cleanupContext) + await executeWrite('memberTraitEducation.deleteMany', () => prisma.memberTraitEducation.deleteMany(), cleanupContext) + await executeWrite('memberTraitLanguage.deleteMany', () => prisma.memberTraitLanguage.deleteMany(), cleanupContext) + await executeWrite('memberTraitOnboardChecklist.deleteMany', () => prisma.memberTraitOnboardChecklist.deleteMany(), cleanupContext) + await executeWrite('memberTraitPersonalization.deleteMany', () => prisma.memberTraitPersonalization.deleteMany(), cleanupContext) + await executeWrite('memberTraitServiceProvider.deleteMany', () => prisma.memberTraitServiceProvider.deleteMany(), cleanupContext) + await executeWrite('memberTraitSoftware.deleteMany', () => prisma.memberTraitSoftware.deleteMany(), cleanupContext) + await executeWrite('memberTraitWork.deleteMany', () => prisma.memberTraitWork.deleteMany(), cleanupContext) + await executeWrite('memberTraits.deleteMany', () => prisma.memberTraits.deleteMany(), cleanupContext) + }) + if (!executed) { + logInfo('Retaining existing trait data for step 2 import', logContext) + } + const memberElasticsearchFilename = 'members-2020-01.json' + await importElasticSearchMember(memberElasticsearchFilename, dateFilter) + break + } + case '3': { + const executed = await withDestructiveGuard({ step, askQuestion, description: 'Member stats cleanup (step 3)' }, async () => { + console.log('Clearing member stats data...') + const cleanupContext = { ...logContext, operation: 'pre-import-clear' } + await executeWrite('memberCopilotStats.deleteMany', () => prisma.memberCopilotStats.deleteMany(), cleanupContext) + await executeWrite('memberMarathonStats.deleteMany', () => prisma.memberMarathonStats.deleteMany(), cleanupContext) + await executeWrite('memberDesignStatsItem.deleteMany', () => prisma.memberDesignStatsItem.deleteMany(), cleanupContext) + await executeWrite('memberDesignStats.deleteMany', () => prisma.memberDesignStats.deleteMany(), cleanupContext) + await executeWrite('memberDevelopStatsItem.deleteMany', () => prisma.memberDevelopStatsItem.deleteMany(), cleanupContext) + await executeWrite('memberDevelopStats.deleteMany', () => prisma.memberDevelopStats.deleteMany(), cleanupContext) + await executeWrite('memberSrmChallengeDetail.deleteMany', () => prisma.memberSrmChallengeDetail.deleteMany(), cleanupContext) + await executeWrite('memberSrmDivisionDetail.deleteMany', () => prisma.memberSrmDivisionDetail.deleteMany(), cleanupContext) + await executeWrite('memberSrmStats.deleteMany', () => prisma.memberSrmStats.deleteMany(), cleanupContext) + await executeWrite('memberDataScienceStats.deleteMany', () => prisma.memberDataScienceStats.deleteMany(), cleanupContext) + await executeWrite('memberStats.deleteMany', () => prisma.memberStats.deleteMany(), cleanupContext) + }) + if (!executed) { + logInfo('Retaining existing stats data for step 3 import', logContext) + } + const memberStateDynamoFilename = 'MemberStats.json' + await importDynamoMemberStat(memberStateDynamoFilename, dateFilter) + break + } + case '4': { + const memberStatElasticsearchFilename = 'memberstats-2020-01.json' + await importElasticSearchMemberStat(memberStatElasticsearchFilename, dateFilter) + break + } + case '5': { + const executed = await withDestructiveGuard({ step, askQuestion, description: 'Member stats history cleanup (step 5)' }, async () => { + console.log('Clearing member stats history data...') + const cleanupContext = { ...logContext, operation: 'pre-import-clear' } + await executeWrite('memberDataScienceHistoryStats.deleteMany', () => prisma.memberDataScienceHistoryStats.deleteMany(), cleanupContext) + await executeWrite('memberDevelopHistoryStats.deleteMany', () => prisma.memberDevelopHistoryStats.deleteMany(), cleanupContext) + await executeWrite('memberHistoryStats.deleteMany', () => prisma.memberHistoryStats.deleteMany(), cleanupContext) + }) + if (!executed) { + logInfo('Retaining existing stats history for step 5 import', logContext) + } + const memberStateHistoryDynamoFilename = 'MemberStatsHistory.json' + await importDynamoMemberStatHistory(memberStateHistoryDynamoFilename, dateFilter) + const memberStatePrivateDynamoFilename = 'MemberStatsHistory_Private.json' + await importDynamoMemberStatHistoryPrivate(memberStatePrivateDynamoFilename, dateFilter) + break + } + case '6': { + const executed = await withDestructiveGuard({ step, askQuestion, description: 'Distribution stats cleanup (step 6)' }, async () => { + console.log('Clearing distribution stats data...') + const cleanupContext = { ...logContext, operation: 'pre-import-clear' } + await executeWrite('distributionStats.deleteMany', () => prisma.distributionStats.deleteMany(), cleanupContext) + }) + if (!executed) { + logInfo('Retaining existing distribution stats; incoming data will be merged', logContext) + } + await importDistributionStats() + break + } + case '7': { + const memberElasticsearchFilename = 'members-2020-01.json' + await updateMemberStatusFromElasticSearch(memberElasticsearchFilename, dateFilter) + break + } + default: + throw new Error(`Unsupported step "${step}"`) + } + + if (shouldRunIntegrityCheck) { + const afterSnapshot = await getMemberIntegritySnapshot() + await verifyDataIntegrity(beforeSnapshot, afterSnapshot, logContext) + } + + logInfo('Finished migration step', { step }) + } finally { + migrationRuntimeState.step = null + migrationRuntimeState.dateFilter = null + } +} + async function main () { + const configValidation = validateMigrationConfiguration() + if (!configValidation.isValid) { + console.error('Migration configuration is invalid. Review logs for details before re-running.') + process.exit(1) + } + console.log('This script is migrating data into DB') console.log('The data number is huge, about 5,000,000 ~ 10,000,000 lines for each file') console.log('Each steps will run very long time (about 1.5h ~ 6h+)') @@ -2722,119 +3812,58 @@ async function main () { console.log('4. Update ElasticSearch MemberStat') console.log('5. Import Dynamo MemberStatHistory') console.log('6. Import Distribution Stats') + console.log('7. Update ElasticSearch Member Status') console.log('') + console.log('Destructive clears require --full-reset or ALLOW_DESTRUCTIVE=true and an explicit confirmation. Incremental runs retain existing data by default.') const rl = readline.createInterface({ input: process.stdin, output: process.stdout }) - rl.question('Please select your step to run (0-6): ', async (step) => { - if (step !== '0' && step !== '1' && step !== '2' && step !== '3' && step !== '4' && step !== '5' && step !== '6') { - rl.close() - } else { - console.log(`Running step ${step} ...`) - rl.question('Please confirm (yes/no): ', async (answer) => { - const answerLower = answer.toLowerCase() - if (answerLower === 'yes' || answerLower === 'y') { - const executeStep = async (dateFilter) => { - console.log('') - if (step === '0') { - console.log('Clearing all DB data...') - await clearDB() - } else if (step === '1') { - console.log('Clearing member data...') - await prisma.memberAddress.deleteMany() - await prisma.memberMaxRating.deleteMany() - await prisma.member.deleteMany() - - const memberDynamoFilename = 'MemberProfile.json' - await importDynamoMember(memberDynamoFilename, dateFilter) - } else if (step === '2') { - console.log('Clearing member trait and skill data...') - await prisma.memberTraitBasicInfo.deleteMany() - await prisma.memberTraitCommunity.deleteMany() - await prisma.memberTraitDevice.deleteMany() - await prisma.memberTraitEducation.deleteMany() - await prisma.memberTraitLanguage.deleteMany() - await prisma.memberTraitOnboardChecklist.deleteMany() - await prisma.memberTraitPersonalization.deleteMany() - await prisma.memberTraitServiceProvider.deleteMany() - await prisma.memberTraitSoftware.deleteMany() - await prisma.memberTraitWork.deleteMany() - await prisma.memberTraits.deleteMany() - - await prisma.memberSkillLevel.deleteMany() - await prisma.memberSkill.deleteMany() - await skillsPrisma.skillLevel.deleteMany() - await skillsPrisma.skill.deleteMany() - await skillsPrisma.skillCategory.deleteMany() - await prisma.displayMode.deleteMany() - - const memberElasticsearchFilename = 'members-2020-01.json' - await importElasticSearchMember(memberElasticsearchFilename, dateFilter) - } else if (step === '3') { - console.log('Clearing member stats data...') - await prisma.memberCopilotStats.deleteMany() - await prisma.memberMarathonStats.deleteMany() - await prisma.memberDesignStatsItem.deleteMany() - await prisma.memberDesignStats.deleteMany() - await prisma.memberDevelopStatsItem.deleteMany() - await prisma.memberDevelopStats.deleteMany() - await prisma.memberSrmChallengeDetail.deleteMany() - await prisma.memberSrmDivisionDetail.deleteMany() - await prisma.memberSrmStats.deleteMany() - await prisma.memberDataScienceStats.deleteMany() - await prisma.memberStats.deleteMany() - - const memberStateDynamoFilename = 'MemberStats.json' - await importDynamoMemberStat(memberStateDynamoFilename, dateFilter) - } else if (step === '4') { - const memberStatElasticsearchFilename = 'memberstats-2020-01.json' - await importElasticSearchMemberStat(memberStatElasticsearchFilename, dateFilter) - } else if (step === '5') { - console.log('Clearing member stats history data...') - await prisma.memberDataScienceHistoryStats.deleteMany() - await prisma.memberDevelopHistoryStats.deleteMany() - await prisma.memberHistoryStats.deleteMany() - - const memberStateHistoryDynamoFilename = 'MemberStatsHistory.json' - await importDynamoMemberStatHistory(memberStateHistoryDynamoFilename, dateFilter) - - const memberStatePrivateDynamoFilename = 'MemberStatsHistory_Private.json' - await importDynamoMemberStatHistoryPrivate(memberStatePrivateDynamoFilename, dateFilter) - } else if (step === '6') { - console.log('Clearing distribution stats data...') - await prisma.distributionStats.deleteMany() - - await importDistributionStats() - } + const askQuestion = (prompt) => new Promise(resolve => rl.question(prompt, resolve)) - console.log('Script is finished.') - rl.close() - } + let selectedStep = null + try { + selectedStep = (await askQuestion('Please select your step to run (0-7): ')).trim() + const validSteps = new Set(['0', '1', '2', '3', '4', '5', '6', '7']) + if (!validSteps.has(selectedStep)) { + console.log('Unsupported step selected. Script is finished.') + return + } - const needsDateFilter = ['1', '2', '3', '4', '5'].includes(step) - if (needsDateFilter) { - rl.question('Enter date filter (YYYY-MM-DD UTC; timestamp-less records will be skipped, or press Enter to skip): ', async (dateFilterInput) => { - const dateFilter = parseDateFilter(dateFilterInput) - if (dateFilter) { - console.log(`Filtering records with recognized timestamps on or after ${dateFilter.toISOString()} (UTC). Records missing timestamps will be skipped.`) - } else if (dateFilterInput && dateFilterInput.trim()) { - console.log('Invalid date filter input. Expected YYYY-MM-DD; continuing without filtering.') - } - await executeStep(dateFilter) - }) - } else { - await executeStep(null) - } + console.log(`Running step ${selectedStep} ...`) + const confirmation = (await askQuestion('Please confirm (yes/no): ')).trim().toLowerCase() + if (confirmation !== 'yes' && confirmation !== 'y') { + console.log('Script is finished.') + return + } + + let dateFilter = null + if (['1', '2', '3', '4', '5', '7'].includes(selectedStep)) { + const dateFilterInput = (await askQuestion('Enter date filter (YYYY-MM-DD UTC; timestamp-less records will be skipped, or press Enter to skip): ')).trim() + if (dateFilterInput) { + const parsed = parseDateFilter(dateFilterInput) + if (parsed) { + console.log(`Filtering records with recognized timestamps on or after ${parsed.toISOString()} (UTC). Records missing timestamps will be skipped.`) + dateFilter = parsed } else { - console.log('Script is finished.') - rl.close() + console.log('Invalid date filter input. Expected YYYY-MM-DD; continuing without filtering.') } - }) + } } - }) + + console.log('') + await runMigrationStep(selectedStep, dateFilter, askQuestion) + console.log('Script is finished.') + } catch (err) { + logError('Migration step failed', { step: selectedStep, error: err?.message }) + const label = selectedStep ?? 'unknown' + console.error(`Migration step ${label} failed: ${err.message}`) + process.exitCode = 1 + } finally { + rl.close() + } } if (require.main === module) { diff --git a/src/services/SearchService.js b/src/services/SearchService.js index 82389a9..652f893 100644 --- a/src/services/SearchService.js +++ b/src/services/SearchService.js @@ -156,15 +156,24 @@ async function searchMembers (currentUser, query) { if (currentUser == null) { throw new errors.UnauthorizedError('Authentication token is required to query users by email') } - if (!helper.hasSearchByEmailRole(currentUser)) { + if (!currentUser.isMachine && !helper.hasSearchByEmailRole(currentUser)) { throw new errors.BadRequestError('Admin role is required to query users by email') } } // search for the members based on query + // Allow sanitized responses for explicit lookups even without elevated privileges. + const isExplicitMemberLookup = + query.userId != null || + (_.isArray(query.userIds) && query.userIds.length > 0) || + (!_.isEmpty(query.handle)) || + (_.isArray(query.handles) && query.handles.length > 0) || + (!_.isEmpty(query.handleLower)) || + (_.isArray(query.handlesLower) && query.handlesLower.length > 0) + const canBypassStatusRestriction = currentUser && (currentUser.isMachine || helper.hasAdminRole(currentUser)) const prismaFilter = prismaHelper.buildSearchMemberFilter(query, { - restrictStatus: !canBypassStatusRestriction + restrictStatus: !(canBypassStatusRestriction || isExplicitMemberLookup) }) logger.debug(`searchMembers: prisma filter ${stringifyForLog(prismaFilter)}`) const searchData = await fillMembers(prismaFilter, query, fields)