diff --git a/src/internal/database/migrations/progressive.ts b/src/internal/database/migrations/progressive.ts index 8e94c803..e4ca995f 100644 --- a/src/internal/database/migrations/progressive.ts +++ b/src/internal/database/migrations/progressive.ts @@ -1,4 +1,5 @@ import { areMigrationsUpToDate } from '@internal/database/migrations/migrate' +import { ErrorCode, isStorageError } from '@internal/errors' import { RunMigrationsOnTenants } from '@storage/events' import { getConfig } from '../../../config' import { logger, logSchema } from '../../monitoring' @@ -9,6 +10,8 @@ const { dbMigrationFreezeAt } = getConfig() export class ProgressiveMigrations { protected tenants: string[] = [] protected emittingJobs = false + protected inFlightCreateJobs?: Promise + protected pendingCreateJobsMax = 0 protected watchInterval: NodeJS.Timeout | undefined constructor(protected readonly options: { maxSize: number; interval: number; watch?: boolean }) { @@ -96,9 +99,33 @@ export class ProgressiveMigrations { }, this.options.interval) } - protected async createJobs(maxJobs: number) { + protected createJobs(maxJobs: number) { + this.pendingCreateJobsMax = Math.max(this.pendingCreateJobsMax, maxJobs) + + if (this.inFlightCreateJobs) { + return this.inFlightCreateJobs + } + this.emittingJobs = true - const tenantsBatch = this.tenants.splice(0, maxJobs) + this.inFlightCreateJobs = this.runCreateJobs().finally(() => { + this.emittingJobs = false + this.inFlightCreateJobs = undefined + this.pendingCreateJobsMax = 0 + }) + + return this.inFlightCreateJobs + } + + protected async runCreateJobs() { + while (this.pendingCreateJobsMax > 0) { + const maxJobs = this.pendingCreateJobsMax + this.pendingCreateJobsMax = 0 + await this.createJobsBatch(maxJobs) + } + } + + protected async createJobsBatch(maxJobs: number) { + const tenantsBatch = this.tenants.slice(0, maxJobs) const jobs = await Promise.allSettled( tenantsBatch.map(async (tenant) => { const tenantConfig = await getTenantConfig(tenant) @@ -127,15 +154,68 @@ export class ProgressiveMigrations { }) ) + const completedTenants = new Set() + const droppedTenants = new Set() + const retryableFailedTenants = new Set() const validJobs = jobs - .map((job) => { - if (job.status === 'fulfilled' && job.value) { - return job.value + .map((job, index) => { + const tenant = tenantsBatch[index] + + if (job.status === 'rejected') { + // If there are more terminal errors later, we need to extend this check. + if (isStorageError(ErrorCode.TenantNotFound, job.reason)) { + droppedTenants.add(tenant) + logSchema.warning( + logger, + `[Migrations] Failed to prepare migration job for tenant ${tenant}; dropping tenant from queue because it no longer exists`, + { + type: 'migrations', + error: job.reason, + project: tenant, + metadata: JSON.stringify({ + strategy: 'progressive', + }), + } + ) + return + } + + retryableFailedTenants.add(tenant) + logSchema.warning( + logger, + `[Migrations] Failed to prepare migration job for tenant ${tenant}; keeping tenant queued for retry`, + { + type: 'migrations', + error: job.reason, + project: tenant, + metadata: JSON.stringify({ + strategy: 'progressive', + }), + } + ) + return } + + completedTenants.add(tenant) + return job.value }) .filter((job) => job) - await RunMigrationsOnTenants.batchSend(validJobs as RunMigrationsOnTenants[]) - this.emittingJobs = false + if (validJobs.length > 0) { + await RunMigrationsOnTenants.batchSend(validJobs as RunMigrationsOnTenants[]) + } + + if (completedTenants.size > 0 || droppedTenants.size > 0 || retryableFailedTenants.size > 0) { + const remainingTenants = this.tenants.filter( + (tenant) => + !completedTenants.has(tenant) && + !droppedTenants.has(tenant) && + !retryableFailedTenants.has(tenant) + ) + const failedTenantsInQueue = this.tenants.filter((tenant) => + retryableFailedTenants.has(tenant) + ) + this.tenants = remainingTenants.concat(failedTenantsInQueue) + } } } diff --git a/src/test/progressive-migrations.test.ts b/src/test/progressive-migrations.test.ts new file mode 100644 index 00000000..7b78d714 --- /dev/null +++ b/src/test/progressive-migrations.test.ts @@ -0,0 +1,356 @@ +const mockBatchSend = jest.fn() +const mockWarning = jest.fn() +const mockError = jest.fn() + +jest.mock('../internal/database/tenant', () => ({ + getTenantConfig: jest.fn(), + TenantMigrationStatus: { + FAILED_STALE: 'FAILED_STALE', + }, +})) + +jest.mock('@internal/database/migrations/migrate', () => ({ + areMigrationsUpToDate: jest.fn(), +})) + +jest.mock('@storage/events', () => ({ + RunMigrationsOnTenants: class { + static batchSend = mockBatchSend + payload: Record + + constructor(payload: Record) { + this.payload = payload + } + }, +})) + +jest.mock('../internal/monitoring', () => ({ + logger: {}, + logSchema: { + info: jest.fn(), + warning: mockWarning, + error: mockError, + }, +})) + +import { areMigrationsUpToDate } from '@internal/database/migrations/migrate' +import { ERRORS } from '@internal/errors' +import { RunMigrationsOnTenants } from '@storage/events' +import { ProgressiveMigrations } from '../internal/database/migrations/progressive' +import { getTenantConfig } from '../internal/database/tenant' + +class TestProgressiveMigrations extends ProgressiveMigrations { + seed(...tenants: string[]) { + this.tenants.push(...tenants) + } + + pending() { + return [...this.tenants] + } + + isEmitting() { + return this.emittingJobs + } + + flush(maxJobs: number) { + return this.createJobs(maxJobs) + } +} + +function createDeferred() { + let resolve!: (value: T | PromiseLike) => void + let reject!: (reason?: unknown) => void + + const promise = new Promise((res, rej) => { + resolve = res + reject = rej + }) + + return { promise, resolve, reject } +} + +const mockGetTenantConfig = jest.mocked(getTenantConfig) +const mockAreMigrationsUpToDate = jest.mocked(areMigrationsUpToDate) +const mockRunMigrationsBatchSend = jest.mocked(RunMigrationsOnTenants.batchSend) + +describe('ProgressiveMigrations', () => { + beforeEach(() => { + jest.clearAllMocks() + + mockGetTenantConfig.mockResolvedValue({ + migrationStatus: undefined, + syncMigrationsDone: false, + } as Awaited>) + mockAreMigrationsUpToDate.mockResolvedValue(false) + }) + + it('keeps queued tenants and resets emittingJobs when batchSend fails', async () => { + mockRunMigrationsBatchSend + .mockRejectedValueOnce(new Error('queue unavailable')) + .mockResolvedValueOnce(undefined as never) + + const migrations = new TestProgressiveMigrations({ + maxSize: 10, + interval: 1000, + watch: false, + }) + + migrations.seed('tenant-a') + + await expect(migrations.flush(1)).rejects.toThrow('queue unavailable') + expect(migrations.pending()).toEqual(['tenant-a']) + expect(migrations.isEmitting()).toBe(false) + + await expect(migrations.flush(1)).resolves.toBeUndefined() + expect(migrations.pending()).toEqual([]) + expect(migrations.isEmitting()).toBe(false) + expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(2) + }) + + it('logs batch enqueue failures at the caller boundary', async () => { + mockRunMigrationsBatchSend.mockRejectedValueOnce(new Error('queue unavailable')) + + const migrations = new TestProgressiveMigrations({ + maxSize: 10, + interval: 1000, + watch: false, + }) + + migrations.seed('tenant-a') + + await expect(migrations.drain()).resolves.toBeUndefined() + + expect(migrations.pending()).toEqual(['tenant-a']) + expect(migrations.isEmitting()).toBe(false) + expect(mockError).toHaveBeenCalledTimes(1) + expect(mockError).toHaveBeenCalledWith( + expect.anything(), + '[Migrations] Error creating migration jobs', + expect.objectContaining({ + type: 'migrations', + }) + ) + }) + + it('keeps new tenants queued while a batch is in flight and ignores duplicate adds', async () => { + const deferredBatch = createDeferred() + mockRunMigrationsBatchSend.mockReturnValueOnce(deferredBatch.promise as never) + + const migrations = new TestProgressiveMigrations({ + maxSize: 10, + interval: 1000, + watch: false, + }) + + migrations.seed('tenant-a') + + const flushPromise = migrations.flush(1) + await new Promise((resolve) => setImmediate(resolve)) + + expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(1) + expect(migrations.isEmitting()).toBe(true) + + migrations.addTenant('tenant-a') + migrations.addTenant('tenant-b') + + expect(migrations.pending()).toEqual(['tenant-a', 'tenant-b']) + + deferredBatch.resolve(undefined) + + await expect(flushPromise).resolves.toBeUndefined() + expect(migrations.pending()).toEqual(['tenant-b']) + expect(migrations.isEmitting()).toBe(false) + }) + + it('serializes drain with an in-flight batch and drains the remaining tenants after it finishes', async () => { + const deferredBatch = createDeferred() + mockRunMigrationsBatchSend + .mockReturnValueOnce(deferredBatch.promise as never) + .mockResolvedValueOnce(undefined as never) + + const migrations = new TestProgressiveMigrations({ + maxSize: 1, + interval: 1000, + watch: false, + }) + + migrations.seed('tenant-a', 'tenant-b') + + const flushPromise = migrations.flush(1) + await new Promise((resolve) => setImmediate(resolve)) + + const drainPromise = migrations.drain() + + expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(1) + expect(migrations.isEmitting()).toBe(true) + + deferredBatch.resolve(undefined) + + await expect(Promise.all([flushPromise, drainPromise])).resolves.toEqual([undefined, undefined]) + + expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(2) + expect( + (mockRunMigrationsBatchSend.mock.calls[0][0][0] as { payload: { tenantId: string } }).payload + ).toMatchObject({ + tenantId: 'tenant-a', + }) + expect( + (mockRunMigrationsBatchSend.mock.calls[1][0][0] as { payload: { tenantId: string } }).payload + ).toMatchObject({ + tenantId: 'tenant-b', + }) + expect(migrations.pending()).toEqual([]) + expect(migrations.isEmitting()).toBe(false) + }) + + it('starts a follow-up run when drain is requested in a late microtask after a batch settles', async () => { + const migrations = new TestProgressiveMigrations({ + maxSize: 1, + interval: 1000, + watch: false, + }) + + mockRunMigrationsBatchSend + .mockImplementationOnce(async () => { + queueMicrotask(() => { + migrations.addTenant('tenant-b') + void migrations.drain() + }) + }) + .mockResolvedValueOnce(undefined as never) + + migrations.seed('tenant-a') + + await expect(migrations.flush(1)).resolves.toBeUndefined() + await new Promise((resolve) => setImmediate(resolve)) + + expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(2) + expect( + (mockRunMigrationsBatchSend.mock.calls[0][0][0] as { payload: { tenantId: string } }).payload + ).toMatchObject({ + tenantId: 'tenant-a', + }) + expect( + (mockRunMigrationsBatchSend.mock.calls[1][0][0] as { payload: { tenantId: string } }).payload + ).toMatchObject({ + tenantId: 'tenant-b', + }) + expect(migrations.pending()).toEqual([]) + expect(migrations.isEmitting()).toBe(false) + }) + + it('moves prep-failed tenants to the back so later tenants can still be scheduled', async () => { + mockGetTenantConfig.mockImplementation(async (tenantId) => { + if (tenantId === 'tenant-b') { + throw new Error('tenant lookup failed') + } + + return { + migrationStatus: undefined, + syncMigrationsDone: false, + } as Awaited> + }) + + const migrations = new TestProgressiveMigrations({ + maxSize: 1, + interval: 1000, + watch: false, + }) + + migrations.seed('tenant-b', 'tenant-a') + + await expect(migrations.flush(1)).resolves.toBeUndefined() + expect(migrations.pending()).toEqual(['tenant-a', 'tenant-b']) + + await expect(migrations.flush(1)).resolves.toBeUndefined() + expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(1) + expect( + (mockRunMigrationsBatchSend.mock.calls[0][0][0] as { payload: { tenantId: string } }).payload + ).toMatchObject({ + tenantId: 'tenant-a', + }) + expect(migrations.pending()).toEqual(['tenant-b']) + }) + + it('keeps tenants queued when preparing a migration job fails', async () => { + mockGetTenantConfig.mockImplementation(async (tenantId) => { + if (tenantId === 'tenant-b') { + throw new Error('tenant lookup failed') + } + + return { + migrationStatus: undefined, + syncMigrationsDone: false, + } as Awaited> + }) + + const migrations = new TestProgressiveMigrations({ + maxSize: 10, + interval: 1000, + watch: false, + }) + + migrations.seed('tenant-a', 'tenant-b') + + await expect(migrations.flush(2)).resolves.toBeUndefined() + + expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(1) + expect(mockRunMigrationsBatchSend.mock.calls[0][0]).toHaveLength(1) + expect( + (mockRunMigrationsBatchSend.mock.calls[0][0][0] as { payload: { tenantId: string } }).payload + ).toMatchObject({ + tenantId: 'tenant-a', + }) + expect(migrations.pending()).toEqual(['tenant-b']) + expect(migrations.isEmitting()).toBe(false) + expect(mockWarning).toHaveBeenCalledWith( + expect.anything(), + '[Migrations] Failed to prepare migration job for tenant tenant-b; keeping tenant queued for retry', + expect.objectContaining({ + type: 'migrations', + project: 'tenant-b', + }) + ) + }) + + it('drops tenants whose config no longer exists instead of retrying forever', async () => { + mockGetTenantConfig.mockImplementation(async (tenantId) => { + if (tenantId === 'tenant-b') { + throw ERRORS.MissingTenantConfig(tenantId) + } + + return { + migrationStatus: undefined, + syncMigrationsDone: false, + } as Awaited> + }) + + const migrations = new TestProgressiveMigrations({ + maxSize: 10, + interval: 1000, + watch: false, + }) + + migrations.seed('tenant-a', 'tenant-b') + + await expect(migrations.flush(2)).resolves.toBeUndefined() + + expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(1) + expect(mockRunMigrationsBatchSend.mock.calls[0][0]).toHaveLength(1) + expect( + (mockRunMigrationsBatchSend.mock.calls[0][0][0] as { payload: { tenantId: string } }).payload + ).toMatchObject({ + tenantId: 'tenant-a', + }) + expect(migrations.pending()).toEqual([]) + expect(migrations.isEmitting()).toBe(false) + expect(mockWarning).toHaveBeenCalledWith( + expect.anything(), + '[Migrations] Failed to prepare migration job for tenant tenant-b; dropping tenant from queue because it no longer exists', + expect.objectContaining({ + type: 'migrations', + project: 'tenant-b', + }) + ) + }) +}) diff --git a/src/test/run-migrations-event.test.ts b/src/test/run-migrations-event.test.ts new file mode 100644 index 00000000..83435e03 --- /dev/null +++ b/src/test/run-migrations-event.test.ts @@ -0,0 +1,160 @@ +const mockGetTenantConfig = jest.fn() +const mockAreMigrationsUpToDate = jest.fn() +const mockRunMigrationsOnTenant = jest.fn() +const mockUpdateTenantMigrationsState = jest.fn() +const mockDeleteIfActiveExists = jest.fn() +const mockInfo = jest.fn() +const mockError = jest.fn() + +jest.mock('@internal/database', () => ({ + getTenantConfig: mockGetTenantConfig, + TenantMigrationStatus: { + COMPLETED: 'COMPLETED', + FAILED: 'FAILED', + FAILED_STALE: 'FAILED_STALE', + }, +})) + +jest.mock('@internal/database/migrations', () => ({ + areMigrationsUpToDate: mockAreMigrationsUpToDate, + runMigrationsOnTenant: mockRunMigrationsOnTenant, + updateTenantMigrationsState: mockUpdateTenantMigrationsState, +})) + +jest.mock('../storage/events/base-event', () => ({ + BaseEvent: class { + static deleteIfActiveExists = mockDeleteIfActiveExists + + static getQueueName(this: { queueName: string }) { + return this.queueName + } + }, +})) + +jest.mock('@internal/monitoring', () => ({ + logger: {}, + logSchema: { + info: mockInfo, + error: mockError, + warning: jest.fn(), + }, +})) + +import { TenantMigrationStatus } from '@internal/database' +import { ERRORS } from '@internal/errors' +import { RunMigrationsOnTenants } from '../storage/events/migrations/run-migrations' + +function makeJob(overrides?: Partial>) { + return { + id: 'job-1', + name: RunMigrationsOnTenants.getQueueName(), + retryCount: 0, + retryLimit: 3, + singletonKey: 'migrations_tenant-a', + data: { + tenantId: 'tenant-a', + upToMigration: 'storage-schema', + tenant: { + ref: 'tenant-a', + host: '', + }, + }, + ...overrides, + } +} + +describe('RunMigrationsOnTenants.handle', () => { + beforeEach(() => { + jest.clearAllMocks() + + mockGetTenantConfig.mockResolvedValue({ + databaseUrl: 'postgres://tenant-db', + }) + mockAreMigrationsUpToDate.mockResolvedValue(false) + mockRunMigrationsOnTenant.mockResolvedValue(undefined) + mockUpdateTenantMigrationsState.mockResolvedValue(undefined) + mockDeleteIfActiveExists.mockResolvedValue(undefined) + }) + + it('runs migrations and marks the tenant completed on success', async () => { + await expect(RunMigrationsOnTenants.handle(makeJob() as never)).resolves.toBeUndefined() + + expect(mockRunMigrationsOnTenant).toHaveBeenCalledWith({ + databaseUrl: 'postgres://tenant-db', + tenantId: 'tenant-a', + waitForLock: false, + upToMigration: 'storage-schema', + }) + expect(mockUpdateTenantMigrationsState).toHaveBeenCalledWith('tenant-a', { + migration: 'storage-schema', + state: TenantMigrationStatus.COMPLETED, + }) + expect(mockDeleteIfActiveExists).not.toHaveBeenCalled() + }) + + it('short-circuits when migrations are already up to date', async () => { + mockAreMigrationsUpToDate.mockResolvedValue(true) + + await expect(RunMigrationsOnTenants.handle(makeJob() as never)).resolves.toBeUndefined() + + expect(mockRunMigrationsOnTenant).not.toHaveBeenCalled() + expect(mockUpdateTenantMigrationsState).not.toHaveBeenCalled() + expect(mockDeleteIfActiveExists).not.toHaveBeenCalled() + }) + + it('returns without marking the tenant failed on lock timeout', async () => { + mockRunMigrationsOnTenant.mockRejectedValue(ERRORS.LockTimeout()) + + await expect(RunMigrationsOnTenants.handle(makeJob() as never)).resolves.toBeUndefined() + + expect(mockUpdateTenantMigrationsState).not.toHaveBeenCalled() + expect(mockDeleteIfActiveExists).not.toHaveBeenCalled() + expect(mockInfo).toHaveBeenCalledWith( + expect.anything(), + '[Migrations] lock timeout for tenant tenant-a', + expect.objectContaining({ + type: 'migrations', + project: 'tenant-a', + }) + ) + }) + + it('marks the tenant FAILED and rethrows when a retryable failure happens', async () => { + mockRunMigrationsOnTenant.mockRejectedValue(new Error('migration failed')) + + await expect(RunMigrationsOnTenants.handle(makeJob() as never)).rejects.toThrow( + 'migration failed' + ) + + expect(mockUpdateTenantMigrationsState).toHaveBeenCalledWith('tenant-a', { + state: TenantMigrationStatus.FAILED, + }) + expect(mockDeleteIfActiveExists).toHaveBeenCalledWith( + RunMigrationsOnTenants.getQueueName(), + 'migrations_tenant-a', + 'job-1' + ) + }) + + it('marks the tenant FAILED_STALE on the final retry before rethrowing', async () => { + mockRunMigrationsOnTenant.mockRejectedValue(new Error('migration failed')) + + await expect( + RunMigrationsOnTenants.handle( + makeJob({ + retryCount: 3, + retryLimit: 3, + }) as never + ) + ).rejects.toThrow('migration failed') + + expect(mockUpdateTenantMigrationsState).toHaveBeenCalledWith('tenant-a', { + state: TenantMigrationStatus.FAILED_STALE, + }) + expect(mockDeleteIfActiveExists).toHaveBeenCalledWith( + RunMigrationsOnTenants.getQueueName(), + 'migrations_tenant-a', + 'job-1' + ) + }) +})