Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/http/routes/admin/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export default async function routes(fastify: FastifyInstance) {
.table('pgboss_v10.job')
.where('state', 'active')
.where('name', 'tenants-migrations')
.orderBy('createdon', 'desc')
.orderBy('created_on', 'desc')
.limit(2000)

return reply.send(data)
Expand All @@ -79,7 +79,7 @@ export default async function routes(fastify: FastifyInstance) {
.table('pgboss_v10.job')
.where('state', 'active')
.where('name', 'tenants-migrations')
.orderBy('createdon', 'desc')
.orderBy('created_on', 'desc')
.update({ state: 'completed' })
.limit(2000)

Expand Down
3 changes: 2 additions & 1 deletion src/http/routes/admin/tenants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,8 @@ export default async function routes(fastify: FastifyInstance) {
migrations_version: await lastLocalMigrationName(),
migrations_status: TenantMigrationStatus.COMPLETED,
})
} catch {
} catch (e) {
request.executionError = e as Error
progressiveMigrations.addTenant(tenantId)
}

Expand Down
2 changes: 2 additions & 0 deletions src/internal/database/migrations/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ export async function tenantHasMigrations(tenantId: string, migration: keyof typ
if (migrationVersion) {
return DBMigration[migrationVersion] >= DBMigration[migration]
}

return false
}

return true
Expand Down
1 change: 1 addition & 0 deletions src/internal/monitoring/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ interface ErrorLog {
interface InfoLog {
type: string
project?: string
metadata?: string
}

export const logSchema = {
Expand Down
26 changes: 26 additions & 0 deletions src/internal/queue/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,32 @@ export class Event<T extends Omit<BasePayload, '$version'>> {
return true
}

/**
* See issue https://github.com/timgit/pg-boss/issues/535
* @param queueName
* @param singletonKey
* @param jobId
*/
static async deleteIfActiveExists(queueName: string, singletonKey: string, jobId: string) {
if (!pgQueueEnable) {
return Promise.resolve()
}

await Queue.getDb().executeSql(
`DELETE FROM pgboss_v10.job
WHERE id = $1
AND EXISTS(
SELECT 1 FROM pgboss_v10.job
WHERE id != $2
AND state < 'active'
AND name = $3
AND singleton_key = $4
)
`,
[jobId, jobId, queueName, singletonKey]
)
}

async invoke(): Promise<string | void | null> {
const constructor = this.constructor as typeof Event

Expand Down
48 changes: 39 additions & 9 deletions src/internal/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type SubclassOfBaseClass = (new (payload: any) => Event<any>) & {
export abstract class Queue {
protected static events: SubclassOfBaseClass[] = []
private static pgBoss?: PgBoss
private static pgBossDb?: PgBoss.Db

static async start(opts: {
signal?: AbortSignal
Expand Down Expand Up @@ -55,15 +56,17 @@ export abstract class Queue {
url = multitenantDatabaseUrl
}

Queue.pgBossDb = new QueueDB({
min: 0,
max: pgQueueMaxConnections,
connectionString: url,
statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
})

Queue.pgBoss = new PgBoss({
connectionString: url,

db: new QueueDB({
min: 0,
max: pgQueueMaxConnections,
connectionString: url,
statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
}),
db: Queue.pgBossDb,
schema: 'pgboss_v10',
application_name: 'storage-pgboss',
...(pgQueueDeleteAfterHours
Expand Down Expand Up @@ -139,6 +142,14 @@ export abstract class Queue {
return this.pgBoss
}

static getDb() {
if (!this.pgBossDb) {
throw new Error('pg boss not initialised')
}

return this.pgBossDb
}

static register<T extends SubclassOfBaseClass>(event: T) {
Queue.events.push(event)
}
Expand Down Expand Up @@ -220,8 +231,8 @@ export abstract class Queue {

// // normal queue
await this.pgBoss?.createQueue(queueName, {
name: queueName,
...queueOptions,
name: queueName,
deadLetter: deadLetterName,
})
} catch {
Expand Down Expand Up @@ -249,6 +260,15 @@ export abstract class Queue {
event.getWorkerOptions().batchSize ||
queueOpts.concurrentTaskCount + Math.max(1, Math.floor(queueOpts.concurrentTaskCount * 1.2))

logSchema.info(logger, `[Queue] Polling queue ${event.getQueueName()}`, {
type: 'queue',
metadata: JSON.stringify({
queueName: event.getQueueName(),
batchSize: batchSize,
pollingInterval: pollingInterval,
}),
})

let started = false
const interval = setInterval(async () => {
if (started) {
Expand Down Expand Up @@ -289,12 +309,12 @@ export abstract class Queue {
name: event.getQueueName(),
})
} catch (e) {
await this.pgBoss?.fail(event.getQueueName(), job.id)

QueueJobRetryFailed.inc({
name: event.getQueueName(),
})

await this.pgBoss?.fail(event.getQueueName(), job.id)

try {
const dbJob: JobWithMetadata | null =
(job as JobWithMetadata).priority !== undefined
Expand Down Expand Up @@ -329,6 +349,16 @@ export abstract class Queue {
}
})
)
} catch (e) {
logSchema.error(logger, `[Queue] Error while polling queue ${event.name}`, {
type: 'queue',
error: e,
metadata: JSON.stringify({
queueName: event.getQueueName(),
batchSize,
pollingInterval,
}),
})
} finally {
started = false
}
Expand Down
21 changes: 18 additions & 3 deletions src/storage/events/migrations/run-migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ interface RunMigrationsPayload extends BasePayload {
}

export class RunMigrationsOnTenants extends BaseEvent<RunMigrationsPayload> {
static queueName = 'tenants-migrations-v2'
static queueName = 'tenants-migrations'
static allowSync = false

static getQueueOptions(): Queue {
return {
name: this.queueName,
policy: 'singleton',
policy: 'stately',
} as const
}

Expand All @@ -36,7 +36,7 @@ export class RunMigrationsOnTenants extends BaseEvent<RunMigrationsPayload> {
static getSendOptions(payload: RunMigrationsPayload): SendOptions {
return {
singletonKey: `migrations_${payload.tenantId}`,
singletonMinutes: 5,
singletonHours: 1,
retryLimit: 3,
retryDelay: 5,
priority: 10,
Expand Down Expand Up @@ -93,6 +93,21 @@ export class RunMigrationsOnTenants extends BaseEvent<RunMigrationsPayload> {
} else {
await updateTenantMigrationsState(tenantId, { state: TenantMigrationStatus.FAILED })
}

try {
// get around pg-boss not allowing to have a stately queue in a state
// where there is a job in created state and retry state
const singletonKey = job.singletonKey || ''
await this.deleteIfActiveExists(this.getQueueName(), singletonKey, job.id)
} catch (e) {
logSchema.error(logger, `[Migrations] Error deleting job ${job.id}`, {
type: 'migrations',
error: e,
project: tenantId,
})
return
}

throw e
}
}
Expand Down
Loading