Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 87 additions & 7 deletions src/internal/database/migrations/progressive.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { areMigrationsUpToDate } from '@internal/database/migrations/migrate'
import { ErrorCode, isStorageError } from '@internal/errors'
import { RunMigrationsOnTenants } from '@storage/events'
import { getConfig } from '../../../config'
import { logger, logSchema } from '../../monitoring'
Expand All @@ -9,6 +10,8 @@ const { dbMigrationFreezeAt } = getConfig()
export class ProgressiveMigrations {
protected tenants: string[] = []
protected emittingJobs = false
protected inFlightCreateJobs?: Promise<void>
protected pendingCreateJobsMax = 0
protected watchInterval: NodeJS.Timeout | undefined

constructor(protected readonly options: { maxSize: number; interval: number; watch?: boolean }) {
Expand Down Expand Up @@ -96,9 +99,33 @@ export class ProgressiveMigrations {
}, this.options.interval)
}

protected async createJobs(maxJobs: number) {
protected createJobs(maxJobs: number) {
this.pendingCreateJobsMax = Math.max(this.pendingCreateJobsMax, maxJobs)

if (this.inFlightCreateJobs) {
return this.inFlightCreateJobs
}

this.emittingJobs = true
const tenantsBatch = this.tenants.splice(0, maxJobs)
this.inFlightCreateJobs = this.runCreateJobs().finally(() => {
this.emittingJobs = false
this.inFlightCreateJobs = undefined
this.pendingCreateJobsMax = 0
})

return this.inFlightCreateJobs
}

protected async runCreateJobs() {
while (this.pendingCreateJobsMax > 0) {
const maxJobs = this.pendingCreateJobsMax
this.pendingCreateJobsMax = 0
await this.createJobsBatch(maxJobs)
}
}

protected async createJobsBatch(maxJobs: number) {
const tenantsBatch = this.tenants.slice(0, maxJobs)
const jobs = await Promise.allSettled(
tenantsBatch.map(async (tenant) => {
const tenantConfig = await getTenantConfig(tenant)
Expand Down Expand Up @@ -127,15 +154,68 @@ export class ProgressiveMigrations {
})
)

const completedTenants = new Set<string>()
const droppedTenants = new Set<string>()
const retryableFailedTenants = new Set<string>()
const validJobs = jobs
.map((job) => {
if (job.status === 'fulfilled' && job.value) {
return job.value
.map((job, index) => {
const tenant = tenantsBatch[index]

if (job.status === 'rejected') {
// If there are more terminal errors later, we need to extend this check.
if (isStorageError(ErrorCode.TenantNotFound, job.reason)) {
droppedTenants.add(tenant)
logSchema.warning(
logger,
`[Migrations] Failed to prepare migration job for tenant ${tenant}; dropping tenant from queue because it no longer exists`,
{
type: 'migrations',
error: job.reason,
project: tenant,
metadata: JSON.stringify({
strategy: 'progressive',
}),
}
)
return
}

retryableFailedTenants.add(tenant)
logSchema.warning(
logger,
`[Migrations] Failed to prepare migration job for tenant ${tenant}; keeping tenant queued for retry`,
{
type: 'migrations',
error: job.reason,
project: tenant,
metadata: JSON.stringify({
strategy: 'progressive',
}),
}
)
return
}

completedTenants.add(tenant)
return job.value
})
.filter((job) => job)

await RunMigrationsOnTenants.batchSend(validJobs as RunMigrationsOnTenants[])
this.emittingJobs = false
if (validJobs.length > 0) {
await RunMigrationsOnTenants.batchSend(validJobs as RunMigrationsOnTenants[])
}

if (completedTenants.size > 0 || droppedTenants.size > 0 || retryableFailedTenants.size > 0) {
const remainingTenants = this.tenants.filter(
(tenant) =>
!completedTenants.has(tenant) &&
!droppedTenants.has(tenant) &&
!retryableFailedTenants.has(tenant)
)
const failedTenantsInQueue = this.tenants.filter((tenant) =>
retryableFailedTenants.has(tenant)
)
this.tenants = remainingTenants.concat(failedTenantsInQueue)
}
}
}
Loading
Loading