From 94c33b927f3cefe5d92aa167af1ebc00a8954d4d Mon Sep 17 00:00:00 2001 From: Lenny Date: Fri, 30 May 2025 14:03:26 -0400 Subject: [PATCH 1/5] fix: make empty bucket use queue to remove underlying objects asynchronously --- src/storage/events/index.ts | 1 + .../lifecycle/object-admin-delete-batch.ts | 87 +++++++++++++++++++ src/storage/events/workers.ts | 2 + src/storage/storage.ts | 13 +-- src/test/bucket.test.ts | 31 +++++++ 5 files changed, 129 insertions(+), 5 deletions(-) create mode 100644 src/storage/events/lifecycle/object-admin-delete-batch.ts diff --git a/src/storage/events/index.ts b/src/storage/events/index.ts index 2f73e808..23ce3246 100644 --- a/src/storage/events/index.ts +++ b/src/storage/events/index.ts @@ -4,6 +4,7 @@ export * from './lifecycle/object-created' export * from './lifecycle/object-updated' export * from './lifecycle/object-removed' export * from './lifecycle/object-admin-delete' +export * from './lifecycle/object-admin-delete-batch' export * from './objects/backup-object' export * from './migrations/run-migrations' export * from './migrations/reset-migrations' diff --git a/src/storage/events/lifecycle/object-admin-delete-batch.ts b/src/storage/events/lifecycle/object-admin-delete-batch.ts new file mode 100644 index 00000000..998784d8 --- /dev/null +++ b/src/storage/events/lifecycle/object-admin-delete-batch.ts @@ -0,0 +1,87 @@ +import { BaseEvent } from '../base-event' +import { getConfig } from '../../../config' +import { Job, SendOptions, WorkOptions } from 'pg-boss' +import { logger, logSchema } from '@internal/monitoring' +import { Storage } from '../../index' +import { BasePayload } from '@internal/queue' + +export interface ObjectDeleteBatchEvent extends BasePayload { + prefixes: string[] + bucketId: string +} + +const { storageS3Bucket, adminDeleteQueueTeamSize, adminDeleteConcurrency } = getConfig() + +export class ObjectAdminDeleteBatch extends BaseEvent { + static queueName = 'object:admin:delete-batch' + + static getWorkerOptions(): WorkOptions { + return {} + } + + static getSendOptions(): SendOptions { + return { + priority: 10, + expireInSeconds: 30, + } + } + + static async handle(job: Job) { + let storage: Storage | undefined = undefined + + const { prefixes, bucketId } = job.data + if (prefixes.length < 1) { + return + } + + try { + storage = await this.createStorage(job.data) + + logSchema.event(logger, `[Admin]: ObjectAdminDeleteBatch ${bucketId} ${prefixes.length}`, { + jodId: job.id, + type: 'event', + event: 'ObjectAdminDeleteBatch', + payload: JSON.stringify(job.data), + objectPath: bucketId, + resources: prefixes, + tenantId: job.data.tenant.ref, + project: job.data.tenant.ref, + reqId: job.data.reqId, + }) + + await storage.backend.deleteObjects(storageS3Bucket, prefixes) + } catch (e) { + logger.error( + { + error: e, + jodId: job.id, + type: 'event', + event: 'ObjectAdminDeleteBatch', + payload: JSON.stringify(job.data), + objectPath: bucketId, + resources: prefixes, + tenantId: job.data.tenant.ref, + project: job.data.tenant.ref, + reqId: job.data.reqId, + }, + `[Admin]: ObjectAdminDeleteBatch ${bucketId} ${prefixes.length} - FAILED` + ) + throw e + } finally { + if (storage) { + const tenant = storage.db.tenant() + storage.db + .destroyConnection() + .then(() => { + // no-op + }) + .catch((e) => { + logger.error( + { error: e }, + `[Admin]: ObjectAdminDeleteBatch ${tenant.ref} - FAILED DISPOSING CONNECTION` + ) + }) + } + } + } +} diff --git a/src/storage/events/workers.ts b/src/storage/events/workers.ts index 6489bf4b..bee7a7ba 100644 --- a/src/storage/events/workers.ts +++ b/src/storage/events/workers.ts @@ -6,10 +6,12 @@ import { BackupObjectEvent } from './objects/backup-object' import { ResetMigrationsOnTenant } from './migrations/reset-migrations' import { JwksCreateSigningSecret } from './jwks/jwks-create-signing-secret' import { UpgradePgBossV10 } from './pgboss/upgrade-v10' +import { ObjectAdminDeleteBatch } from './lifecycle/object-admin-delete-batch' export function registerWorkers() { Queue.register(Webhook) Queue.register(ObjectAdminDelete) + Queue.register(ObjectAdminDeleteBatch) Queue.register(RunMigrationsOnTenants) Queue.register(BackupObjectEvent) Queue.register(ResetMigrationsOnTenant) diff --git a/src/storage/storage.ts b/src/storage/storage.ts index 07de3018..8e5855b3 100644 --- a/src/storage/storage.ts +++ b/src/storage/storage.ts @@ -6,9 +6,9 @@ import { getFileSizeLimit, mustBeValidBucketName, parseFileSizeToBytes } from '. import { getConfig } from '../config' import { ObjectStorage } from './object' import { InfoRenderer } from '@storage/renderer/info' -import { logger, logSchema } from '@internal/monitoring' +import { ObjectAdminDeleteBatch } from './events' -const { requestUrlLengthLimit, storageS3Bucket } = getConfig() +const { requestUrlLengthLimit } = getConfig() /** * Storage @@ -205,15 +205,18 @@ export class Storage { ) if (deleted && deleted.length > 0) { - const params = deleted.reduce((all, { name, version }) => { + const prefixes = deleted.reduce((all, { name, version }) => { const fileName = withOptionalVersion(`${this.db.tenantId}/${bucketId}/${name}`, version) all.push(fileName) all.push(fileName + '.info') return all }, [] as string[]) // delete files from s3 asynchronously - this.backend.deleteObjects(storageS3Bucket, params).catch((e) => { - logSchema.error(logger, 'Failed to delete objects from s3', { type: 's3', error: e }) + await ObjectAdminDeleteBatch.send({ + prefixes, + bucketId, + tenant: this.db.tenant(), + reqId: this.db.reqId, }) } diff --git a/src/test/bucket.test.ts b/src/test/bucket.test.ts index 69c559d2..edeb2479 100644 --- a/src/test/bucket.test.ts +++ b/src/test/bucket.test.ts @@ -440,6 +440,23 @@ describe('testing EMPTY bucket', () => { test('user is able to empty a bucket with a service key', async () => { const bucketId = 'bucket3' + + // confirm there are items in the bucket before empty + const responseList = await appInstance.inject({ + method: 'POST', + url: '/object/list/' + bucketId, + headers: { + authorization: `Bearer ${process.env.SERVICE_KEY}`, + }, + payload: { + prefix: '', + limit: 10, + offset: 0, + }, + }) + expect(responseList.statusCode).toBe(200) + expect(responseList.json()).toHaveLength(2) + const response = await appInstance.inject({ method: 'POST', url: `/bucket/${bucketId}/empty`, @@ -450,6 +467,20 @@ describe('testing EMPTY bucket', () => { expect(response.statusCode).toBe(200) const responseJSON = JSON.parse(response.body) expect(responseJSON.message).toBe('Successfully emptied') + + // confirm the bucket is actually empty after + const responseList2 = await appInstance.inject({ + method: 'POST', + url: '/object/list/' + bucketId, + headers: { + authorization: `Bearer ${process.env.SERVICE_KEY}`, + }, + payload: { + prefix: '', + }, + }) + expect(responseList2.statusCode).toBe(200) + expect(responseList2.json()).toHaveLength(0) }) test('user is able to delete a bucket', async () => { From c52a33a608998dfe676865df64826ee6fe856db6 Mon Sep 17 00:00:00 2001 From: Lenny Date: Mon, 2 Jun 2025 17:41:10 -0400 Subject: [PATCH 2/5] add default limit of 200,000 objects max for empty bucket --- src/config.ts | 5 +++++ src/internal/errors/codes.ts | 8 ++++++++ src/storage/storage.ts | 7 ++++++- 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/config.ts b/src/config.ts index 8142ff40..e0a62610 100644 --- a/src/config.ts +++ b/src/config.ts @@ -97,6 +97,7 @@ type StorageConfigType = { responseSMaxAge: number anonKeyAsync: Promise serviceKeyAsync: Promise + emptyBucketMax: number storageBackendType: StorageBackendType tenantId: string requestUrlLengthLimit: number @@ -313,6 +314,10 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { ), // Storage storageBackendType: getOptionalConfigFromEnv('STORAGE_BACKEND') as StorageBackendType, + emptyBucketMax: parseInt( + getOptionalConfigFromEnv('STORAGE_EMPTY_BUCKET_MAX') || '200000', + 10 + ), // Storage - File storageFilePath: getOptionalConfigFromEnv( diff --git a/src/internal/errors/codes.ts b/src/internal/errors/codes.ts index cd887aff..0c6142f2 100644 --- a/src/internal/errors/codes.ts +++ b/src/internal/errors/codes.ts @@ -49,6 +49,14 @@ export const ERRORS = { message: `The bucket you tried to delete is not empty`, originalError: e, }), + UnableToEmptyBucket: (bucket: string, e?: Error) => + new StorageBackendError({ + code: ErrorCode.InvalidRequest, + resource: bucket, + httpStatusCode: 409, + message: `Unable to empty the bucket because it contains too many objects`, + originalError: e, + }), NoSuchBucket: (bucket: string, e?: Error) => new StorageBackendError({ code: ErrorCode.NoSuchBucket, diff --git a/src/storage/storage.ts b/src/storage/storage.ts index 8e5855b3..1c434f4c 100644 --- a/src/storage/storage.ts +++ b/src/storage/storage.ts @@ -8,7 +8,7 @@ import { ObjectStorage } from './object' import { InfoRenderer } from '@storage/renderer/info' import { ObjectAdminDeleteBatch } from './events' -const { requestUrlLengthLimit } = getConfig() +const { requestUrlLengthLimit, emptyBucketMax } = getConfig() /** * Storage @@ -187,6 +187,11 @@ export class Storage { async emptyBucket(bucketId: string) { await this.findBucket(bucketId, 'name') + const count = await this.countObjects(bucketId) + if (count > emptyBucketMax) { + throw ERRORS.UnableToEmptyBucket(bucketId) + } + while (true) { const objects = await this.db.listObjects( bucketId, From 749d25bc117a0d833e830f009656ab7642209533 Mon Sep 17 00:00:00 2001 From: Lenny Date: Thu, 5 Jun 2025 13:55:58 -0400 Subject: [PATCH 3/5] Limit count on empty and delete bucket for better efficiency --- src/config.ts | 5 +-- src/storage/database/adapter.ts | 2 +- src/storage/database/knex.ts | 54 +++++++++++++++++++-------------- src/storage/storage.ts | 12 ++------ src/test/bucket.test.ts | 36 ++++++++++++++++++++++ 5 files changed, 72 insertions(+), 37 deletions(-) diff --git a/src/config.ts b/src/config.ts index e0a62610..24641c30 100644 --- a/src/config.ts +++ b/src/config.ts @@ -314,10 +314,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { ), // Storage storageBackendType: getOptionalConfigFromEnv('STORAGE_BACKEND') as StorageBackendType, - emptyBucketMax: parseInt( - getOptionalConfigFromEnv('STORAGE_EMPTY_BUCKET_MAX') || '200000', - 10 - ), + emptyBucketMax: parseInt(getOptionalConfigFromEnv('STORAGE_EMPTY_BUCKET_MAX') || '200000', 10), // Storage - File storageFilePath: getOptionalConfigFromEnv( diff --git a/src/storage/database/adapter.ts b/src/storage/database/adapter.ts index a81d8a12..24bbf3b5 100644 --- a/src/storage/database/adapter.ts +++ b/src/storage/database/adapter.ts @@ -75,7 +75,7 @@ export interface Database { filters?: Filters ): Promise - countObjectsInBucket(bucketId: string): Promise + countObjectsInBucket(bucketId: string, limit?: number): Promise deleteBucket(bucketId: string | string[]): Promise diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index eb511b4c..cf74bfcf 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -160,17 +160,21 @@ export class StorageKnexDB implements Database { return result } - async countObjectsInBucket(bucketId: string) { + async countObjectsInBucket(bucketId: string, limit?: number): Promise { + // if we have a limit use select to only scan up to that limit + if (limit !== undefined) { + const result = await this.runQuery('CountObjectsInBucketWithLimit', (knex) => { + return knex.from('objects').where('bucket_id', bucketId).limit(limit).select(knex.raw('1')) + }) + return result.length + } + + // do full count if there is no limit const result = await this.runQuery('CountObjectsInBucket', (knex) => { - return knex - .from<{ count: number }>('objects') - .where('bucket_id', bucketId) - .limit(10) - .count() - .first() + return knex.from('objects').where('bucket_id', bucketId).count().first<{ count: number }>() }) - return (result?.count as number) || 0 + return result?.count || 0 } async deleteBucket(bucketId: string | string[]) { @@ -612,7 +616,10 @@ export class StorageKnexDB implements Database { async mustLockObject(bucketId: string, objectName: string, version?: string) { return this.runQuery('MustLockObject', async (knex) => { const hash = hashStringToInt(`${bucketId}/${objectName}${version ? `/${version}` : ''}`) - const result = await knex.raw(`SELECT pg_try_advisory_xact_lock(?);`, [hash]) + const result = await knex.raw<{ rows: { pg_try_advisory_xact_lock: boolean }[] }>( + `SELECT pg_try_advisory_xact_lock(?);`, + [hash] + ) const lockAcquired = result.rows.shift()?.pg_try_advisory_xact_lock || false if (!lockAcquired) { @@ -631,7 +638,7 @@ export class StorageKnexDB implements Database { ) { return this.runQuery('WaitObjectLock', async (knex) => { const hash = hashStringToInt(`${bucketId}/${objectName}${version ? `/${version}` : ''}`) - const query = knex.raw(`SELECT pg_advisory_xact_lock(?)`, [hash]) + const query = knex.raw(`SELECT pg_advisory_xact_lock(?)`, [hash]) if (opts?.timeout) { let timeoutInterval: undefined | NodeJS.Timeout @@ -661,18 +668,21 @@ export class StorageKnexDB implements Database { async searchObjects(bucketId: string, prefix: string, options: SearchObjectOption) { return this.runQuery('SearchObjects', async (knex) => { - const result = await knex.raw('select * from storage.search(?,?,?,?,?,?,?,?)', [ - prefix, - bucketId, - options.limit || 100, - prefix.split('/').length, - options.offset || 0, - options.search || '', - options.sortBy?.column ?? 'name', - options.sortBy?.order ?? 'asc', - ]) + const result = await knex.raw<{ rows: Obj[] }>( + 'select * from storage.search(?,?,?,?,?,?,?,?)', + [ + prefix, + bucketId, + options.limit || 100, + prefix.split('/').length, + options.offset || 0, + options.search || '', + options.sortBy?.column ?? 'name', + options.sortBy?.order ?? 'asc', + ] + ) - return (result as any).rows + return result.rows }) } @@ -807,7 +817,7 @@ export class StorageKnexDB implements Database { if (typeof columns === 'object') { value.forEach((column: string) => { - delete (columns as Record)[column] + delete (columns as Record)[column] }) } } diff --git a/src/storage/storage.ts b/src/storage/storage.ts index 1c434f4c..7ddf41f1 100644 --- a/src/storage/storage.ts +++ b/src/storage/storage.ts @@ -146,14 +146,6 @@ export class Storage { return this.db.updateBucket(id, bucketData) } - /** - * Counts objects in a bucket - * @param id - */ - countObjects(id: string) { - return this.db.countObjectsInBucket(id) - } - /** * Delete a specific bucket if empty * @param id @@ -164,7 +156,7 @@ export class Storage { forUpdate: true, }) - const countObjects = await db.asSuperUser().countObjectsInBucket(id) + const countObjects = await db.asSuperUser().countObjectsInBucket(id, 1) if (countObjects && countObjects > 0) { throw ERRORS.BucketNotEmpty(id) @@ -187,7 +179,7 @@ export class Storage { async emptyBucket(bucketId: string) { await this.findBucket(bucketId, 'name') - const count = await this.countObjects(bucketId) + const count = await this.db.countObjectsInBucket(bucketId, emptyBucketMax + 1) if (count > emptyBucketMax) { throw ERRORS.UnableToEmptyBucket(bucketId) } diff --git a/src/test/bucket.test.ts b/src/test/bucket.test.ts index edeb2479..e9fabeaa 100644 --- a/src/test/bucket.test.ts +++ b/src/test/bucket.test.ts @@ -3,6 +3,9 @@ import dotenv from 'dotenv' import app from '../app' import { S3Backend } from '../storage/backend' import { FastifyInstance } from 'fastify' +import { getPostgresConnection, getServiceKeyUser } from '@internal/database' +import { StorageKnexDB } from '@storage/database' +import { getConfig } from '../config' dotenv.config({ path: '.env.test' }) const anonKey = process.env.ANON_KEY || '' @@ -362,6 +365,39 @@ describe('testing public bucket functionality', () => { }) }) +describe('testing count objects in bucket', () => { + const { tenantId } = getConfig() + let db: StorageKnexDB + + beforeAll(async () => { + const superUser = await getServiceKeyUser(tenantId) + const pg = await getPostgresConnection({ + superUser, + user: superUser, + tenantId: tenantId, + host: 'localhost', + }) + + db = new StorageKnexDB(pg, { + host: 'localhost', + tenantId, + }) + }) + + it('should return correct object count', async () => { + await expect(db.countObjectsInBucket('bucket2')).resolves.toBe(27) + }) + it('should return limited object count', async () => { + await expect(db.countObjectsInBucket('bucket2', 22)).resolves.toBe(22) + }) + it('should return full object count if limit is greater than total', async () => { + await expect(db.countObjectsInBucket('bucket2', 999)).resolves.toBe(27) + }) + it('should return 0 object count if there are no objects with provided bucket id', async () => { + await expect(db.countObjectsInBucket('this-is-not-a-bucket-at-all', 999)).resolves.toBe(0) + }) +}) + describe('testing DELETE bucket', () => { test('user is able to delete a bucket', async () => { const bucketId = 'bucket4' From b01d090bcc23066929310cf75c9f7f03114554e5 Mon Sep 17 00:00:00 2001 From: Lenny Date: Tue, 24 Jun 2025 08:19:58 -0700 Subject: [PATCH 4/5] list objects in worker when emptying bucket --- src/http/routes/bucket/emptyBucket.ts | 9 +- src/storage/events/index.ts | 4 +- .../lifecycle/object-admin-delete-batch.ts | 87 ----------- .../objects/object-admin-delete-all-before.ts | 141 ++++++++++++++++++ .../object-admin-delete.ts | 0 src/storage/events/workers.ts | 6 +- src/storage/storage.ts | 69 +++------ src/test/bucket.test.ts | 10 +- src/test/db/02-dummy-data.sql | 3 + 9 files changed, 184 insertions(+), 145 deletions(-) delete mode 100644 src/storage/events/lifecycle/object-admin-delete-batch.ts create mode 100644 src/storage/events/objects/object-admin-delete-all-before.ts rename src/storage/events/{lifecycle => objects}/object-admin-delete.ts (100%) diff --git a/src/http/routes/bucket/emptyBucket.ts b/src/http/routes/bucket/emptyBucket.ts index 4ae3482b..a997dfb7 100644 --- a/src/http/routes/bucket/emptyBucket.ts +++ b/src/http/routes/bucket/emptyBucket.ts @@ -14,7 +14,10 @@ const emptyBucketParamsSchema = { const successResponseSchema = { type: 'object', properties: { - message: { type: 'string', examples: ['Successfully emptied'] }, + message: { + type: 'string', + examples: ['Empty bucket has been queued. Completion may take up to an hour.'], + }, }, } interface emptyBucketRequestInterface extends AuthenticatedRequest { @@ -41,7 +44,9 @@ export default async function routes(fastify: FastifyInstance) { await request.storage.emptyBucket(bucketId) - return response.status(200).send(createResponse('Successfully emptied')) + return response + .status(200) + .send(createResponse('Empty bucket has been queued. Completion may take up to an hour.')) } ) } diff --git a/src/storage/events/index.ts b/src/storage/events/index.ts index 23ce3246..0a0ad545 100644 --- a/src/storage/events/index.ts +++ b/src/storage/events/index.ts @@ -3,8 +3,8 @@ export * from './base-event' export * from './lifecycle/object-created' export * from './lifecycle/object-updated' export * from './lifecycle/object-removed' -export * from './lifecycle/object-admin-delete' -export * from './lifecycle/object-admin-delete-batch' +export * from './objects/object-admin-delete' +export * from './objects/object-admin-delete-all-before' export * from './objects/backup-object' export * from './migrations/run-migrations' export * from './migrations/reset-migrations' diff --git a/src/storage/events/lifecycle/object-admin-delete-batch.ts b/src/storage/events/lifecycle/object-admin-delete-batch.ts deleted file mode 100644 index 998784d8..00000000 --- a/src/storage/events/lifecycle/object-admin-delete-batch.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { BaseEvent } from '../base-event' -import { getConfig } from '../../../config' -import { Job, SendOptions, WorkOptions } from 'pg-boss' -import { logger, logSchema } from '@internal/monitoring' -import { Storage } from '../../index' -import { BasePayload } from '@internal/queue' - -export interface ObjectDeleteBatchEvent extends BasePayload { - prefixes: string[] - bucketId: string -} - -const { storageS3Bucket, adminDeleteQueueTeamSize, adminDeleteConcurrency } = getConfig() - -export class ObjectAdminDeleteBatch extends BaseEvent { - static queueName = 'object:admin:delete-batch' - - static getWorkerOptions(): WorkOptions { - return {} - } - - static getSendOptions(): SendOptions { - return { - priority: 10, - expireInSeconds: 30, - } - } - - static async handle(job: Job) { - let storage: Storage | undefined = undefined - - const { prefixes, bucketId } = job.data - if (prefixes.length < 1) { - return - } - - try { - storage = await this.createStorage(job.data) - - logSchema.event(logger, `[Admin]: ObjectAdminDeleteBatch ${bucketId} ${prefixes.length}`, { - jodId: job.id, - type: 'event', - event: 'ObjectAdminDeleteBatch', - payload: JSON.stringify(job.data), - objectPath: bucketId, - resources: prefixes, - tenantId: job.data.tenant.ref, - project: job.data.tenant.ref, - reqId: job.data.reqId, - }) - - await storage.backend.deleteObjects(storageS3Bucket, prefixes) - } catch (e) { - logger.error( - { - error: e, - jodId: job.id, - type: 'event', - event: 'ObjectAdminDeleteBatch', - payload: JSON.stringify(job.data), - objectPath: bucketId, - resources: prefixes, - tenantId: job.data.tenant.ref, - project: job.data.tenant.ref, - reqId: job.data.reqId, - }, - `[Admin]: ObjectAdminDeleteBatch ${bucketId} ${prefixes.length} - FAILED` - ) - throw e - } finally { - if (storage) { - const tenant = storage.db.tenant() - storage.db - .destroyConnection() - .then(() => { - // no-op - }) - .catch((e) => { - logger.error( - { error: e }, - `[Admin]: ObjectAdminDeleteBatch ${tenant.ref} - FAILED DISPOSING CONNECTION` - ) - }) - } - } - } -} diff --git a/src/storage/events/objects/object-admin-delete-all-before.ts b/src/storage/events/objects/object-admin-delete-all-before.ts new file mode 100644 index 00000000..ef70e8ed --- /dev/null +++ b/src/storage/events/objects/object-admin-delete-all-before.ts @@ -0,0 +1,141 @@ +import { BaseEvent } from '../base-event' +import { getConfig } from '../../../config' +import { Job, SendOptions, WorkOptions } from 'pg-boss' +import { logger, logSchema } from '@internal/monitoring' +import { Storage } from '../../index' +import { BasePayload } from '@internal/queue' +import { withOptionalVersion } from '@storage/backend' + +const DELETE_JOB_TIME_LIMIT_MS = 10_000 + +export interface ObjectDeleteAllBeforeEvent extends BasePayload { + before: string + bucketId: string +} + +const { storageS3Bucket, requestUrlLengthLimit } = getConfig() + +export class ObjectAdminDeleteAllBefore extends BaseEvent { + static queueName = 'object:admin:delete-all-before' + + static getWorkerOptions(): WorkOptions { + return {} + } + + static getSendOptions(payload: ObjectDeleteAllBeforeEvent): SendOptions { + return { + singletonKey: `${payload.tenant.ref}/${payload.bucketId}`, + priority: 10, + expireInSeconds: 30, + } + } + + static async handle(job: Job) { + let storage: Storage | undefined = undefined + + const tenantId = job.data.tenant.ref + const bucketId = job.data.bucketId + const before = new Date(job.data.before) + + try { + storage = await this.createStorage(job.data) + + logSchema.event( + logger, + `[Admin]: ObjectAdminDeleteAllBefore ${bucketId} ${before.toUTCString()}`, + { + jodId: job.id, + type: 'event', + event: 'ObjectAdminDeleteAllBefore', + payload: JSON.stringify(job.data), + objectPath: bucketId, + tenantId, + project: tenantId, + reqId: job.data.reqId, + } + ) + + const batchLimit = Math.floor(requestUrlLengthLimit / (36 + 3)) + + let moreObjectsToDelete = false + const start = Date.now() + while (Date.now() - start < DELETE_JOB_TIME_LIMIT_MS) { + moreObjectsToDelete = false + const objects = await storage.db.listObjects(bucketId, 'id, name', batchLimit + 1, before) + + const backend = storage.backend + if (objects && objects.length > 0) { + if (objects.length > batchLimit) { + objects.pop() + moreObjectsToDelete = true + } + + await storage.db.withTransaction(async (trx) => { + const deleted = await trx.deleteObjects( + bucketId, + objects.map(({ id }) => id!), + 'id' + ) + + if (deleted && deleted.length > 0) { + const prefixes: string[] = [] + + for (const { name, version } of deleted) { + const fileName = withOptionalVersion(`${tenantId}/${bucketId}/${name}`, version) + prefixes.push(fileName) + prefixes.push(fileName + '.info') + } + + await backend.deleteObjects(storageS3Bucket, prefixes) + } + }) + } + + if (!moreObjectsToDelete) { + break + } + } + + if (moreObjectsToDelete) { + // delete next batch + await ObjectAdminDeleteAllBefore.send({ + before, + bucketId, + tenant: job.data.tenant, + reqId: job.data.reqId, + }) + } + } catch (e) { + logger.error( + { + error: e, + jodId: job.id, + type: 'event', + event: 'ObjectAdminDeleteAllBefore', + payload: JSON.stringify(job.data), + objectPath: bucketId, + tenantId, + project: tenantId, + reqId: job.data.reqId, + }, + `[Admin]: ObjectAdminDeleteAllBefore ${bucketId} ${before.toUTCString()} - FAILED` + ) + throw e + } finally { + if (storage) { + const tenant = storage.db.tenant() + storage.db + .destroyConnection() + .then(() => { + // no-op + }) + .catch((e) => { + logger.error( + { error: e }, + `[Admin]: ObjectAdminDeleteAllBefore ${tenant.ref} - FAILED DISPOSING CONNECTION` + ) + }) + } + } + } +} diff --git a/src/storage/events/lifecycle/object-admin-delete.ts b/src/storage/events/objects/object-admin-delete.ts similarity index 100% rename from src/storage/events/lifecycle/object-admin-delete.ts rename to src/storage/events/objects/object-admin-delete.ts diff --git a/src/storage/events/workers.ts b/src/storage/events/workers.ts index bee7a7ba..742c205f 100644 --- a/src/storage/events/workers.ts +++ b/src/storage/events/workers.ts @@ -1,17 +1,17 @@ import { Queue } from '@internal/queue' import { Webhook } from './lifecycle/webhook' -import { ObjectAdminDelete } from './lifecycle/object-admin-delete' +import { ObjectAdminDelete } from './objects/object-admin-delete' import { RunMigrationsOnTenants } from './migrations/run-migrations' import { BackupObjectEvent } from './objects/backup-object' import { ResetMigrationsOnTenant } from './migrations/reset-migrations' import { JwksCreateSigningSecret } from './jwks/jwks-create-signing-secret' import { UpgradePgBossV10 } from './pgboss/upgrade-v10' -import { ObjectAdminDeleteBatch } from './lifecycle/object-admin-delete-batch' +import { ObjectAdminDeleteAllBefore } from './objects/object-admin-delete-all-before' export function registerWorkers() { Queue.register(Webhook) Queue.register(ObjectAdminDelete) - Queue.register(ObjectAdminDeleteBatch) + Queue.register(ObjectAdminDeleteAllBefore) Queue.register(RunMigrationsOnTenants) Queue.register(BackupObjectEvent) Queue.register(ResetMigrationsOnTenant) diff --git a/src/storage/storage.ts b/src/storage/storage.ts index 7ddf41f1..c8846df4 100644 --- a/src/storage/storage.ts +++ b/src/storage/storage.ts @@ -1,4 +1,4 @@ -import { StorageBackendAdapter, withOptionalVersion } from './backend' +import { StorageBackendAdapter } from './backend' import { Database, FindBucketFilters } from './database' import { ERRORS } from '@internal/errors' import { AssetRenderer, HeadRenderer, ImageRenderer } from './renderer' @@ -6,9 +6,9 @@ import { getFileSizeLimit, mustBeValidBucketName, parseFileSizeToBytes } from '. import { getConfig } from '../config' import { ObjectStorage } from './object' import { InfoRenderer } from '@storage/renderer/info' -import { ObjectAdminDeleteBatch } from './events' +import { ObjectAdminDeleteAllBefore } from './events' -const { requestUrlLengthLimit, emptyBucketMax } = getConfig() +const { emptyBucketMax } = getConfig() /** * Storage @@ -175,8 +175,9 @@ export class Storage { /** * Deletes all files in a bucket * @param bucketId + * @param before limit to files before the specified time (defaults to now) */ - async emptyBucket(bucketId: string) { + async emptyBucket(bucketId: string, before: Date = new Date()) { await this.findBucket(bucketId, 'name') const count = await this.db.countObjectsInBucket(bucketId, emptyBucketMax + 1) @@ -184,52 +185,24 @@ export class Storage { throw ERRORS.UnableToEmptyBucket(bucketId) } - while (true) { - const objects = await this.db.listObjects( - bucketId, - 'id, name', - Math.floor(requestUrlLengthLimit / (36 + 3)) - ) - - if (!(objects && objects.length > 0)) { - break - } + const objects = await this.db.listObjects(bucketId, 'id, name', 1, before) + if (!objects || objects.length < 1) { + // the bucket is already empty + return + } - const deleted = await this.db.deleteObjects( - bucketId, - objects.map(({ id }) => id!), - 'id' - ) - - if (deleted && deleted.length > 0) { - const prefixes = deleted.reduce((all, { name, version }) => { - const fileName = withOptionalVersion(`${this.db.tenantId}/${bucketId}/${name}`, version) - all.push(fileName) - all.push(fileName + '.info') - return all - }, [] as string[]) - // delete files from s3 asynchronously - await ObjectAdminDeleteBatch.send({ - prefixes, - bucketId, - tenant: this.db.tenant(), - reqId: this.db.reqId, - }) - } + // ensure delete permissions + await this.db.testPermission((db) => { + return db.deleteObject(bucketId, objects[0].id!) + }) - if (deleted?.length !== objects.length) { - const deletedNames = new Set(deleted?.map(({ name }) => name)) - const remainingNames = objects - .filter(({ name }) => !deletedNames.has(name)) - .map(({ name }) => name) - - throw ERRORS.AccessDenied( - `Cannot delete: ${remainingNames.join( - ' ,' - )}, you may have SELECT but not DELETE permissions` - ) - } - } + // use queue to recursively delete all objects created before the specified time + await ObjectAdminDeleteAllBefore.send({ + before, + bucketId, + tenant: this.db.tenant(), + reqId: this.db.reqId, + }) } validateMimeType(mimeType: string[]) { diff --git a/src/test/bucket.test.ts b/src/test/bucket.test.ts index e9fabeaa..d39d6e69 100644 --- a/src/test/bucket.test.ts +++ b/src/test/bucket.test.ts @@ -471,11 +471,13 @@ describe('testing EMPTY bucket', () => { }) expect(response.statusCode).toBe(200) const responseJSON = JSON.parse(response.body) - expect(responseJSON.message).toBe('Successfully emptied') + expect(responseJSON.message).toBe( + 'Empty bucket has been queued. Completion may take up to an hour.' + ) }) test('user is able to empty a bucket with a service key', async () => { - const bucketId = 'bucket3' + const bucketId = 'bucket3a' // confirm there are items in the bucket before empty const responseList = await appInstance.inject({ @@ -502,7 +504,9 @@ describe('testing EMPTY bucket', () => { }) expect(response.statusCode).toBe(200) const responseJSON = JSON.parse(response.body) - expect(responseJSON.message).toBe('Successfully emptied') + expect(responseJSON.message).toBe( + 'Empty bucket has been queued. Completion may take up to an hour.' + ) // confirm the bucket is actually empty after const responseList2 = await appInstance.inject({ diff --git a/src/test/db/02-dummy-data.sql b/src/test/db/02-dummy-data.sql index 45241e49..475ebb53 100644 --- a/src/test/db/02-dummy-data.sql +++ b/src/test/db/02-dummy-data.sql @@ -12,6 +12,7 @@ INSERT INTO "auth"."users" ("instance_id", "id", "aud", "role", "email", "encryp INSERT INTO "storage"."buckets" ("id", "name", "owner", "created_at", "updated_at", "public", "file_size_limit", "allowed_mime_types") VALUES ('bucket2', 'bucket2', '4d56e902-f0a0-4662-8448-a4d9e643c142', '2021-02-17 04:43:32.770206+00', '2021-02-17 04:43:32.770206+00', false, NULL, NULL), ('bucket3', 'bucket3', '4d56e902-f0a0-4662-8448-a4d9e643c142', '2021-02-17 04:43:32.770206+00', '2021-02-17 04:43:32.770206+00', false, NULL, NULL), +('bucket3a', 'bucket3a', '4d56e902-f0a0-4662-8448-a4d9e643c142', '2021-02-17 04:43:32.770206+00', '2021-02-17 04:43:32.770206+00', false, NULL, NULL), ('bucket4', 'bucket4', '317eadce-631a-4429-a0bb-f19a7a517b4a', '2021-02-25 09:23:01.58385+00', '2021-02-25 09:23:01.58385+00', false, NULL, NULL), ('bucket5', 'bucket5', '317eadce-631a-4429-a0bb-f19a7a517b4a', '2021-02-27 03:04:25.6386+00', '2021-02-27 03:04:25.6386+00', false, NULL, NULL), ('bucket6', 'bucket6', '317eadce-631a-4429-a0bb-f19a7a517b4a', '2021-02-27 03:04:25.6386+00', '2021-02-27 03:04:25.6386+00', false, NULL, NULL), @@ -46,6 +47,8 @@ INSERT INTO "storage"."objects" ("id", "bucket_id", "name", "owner", "created_at ('b39ae4ab-802b-4c42-9271-3f908c34363c', 'bucket2', 'private/sadcat-upload3.png', '317eadce-631a-4429-a0bb-f19a7a517b4a', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '{"mimetype": "image/svg+xml", "size": 1234}', NULL), ('8098E1AC-C744-4368-86DF-71B60CCDE221', 'bucket3', 'sadcat-upload3.png', '317eadce-631a-4429-a0bb-f19a7a517b4a', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '{"mimetype": "image/svg+xml", "size": 1234}', NULL), ('D3EB488E-94F4-46CD-86D3-242C13B95BAC', 'bucket3', 'sadcat-upload2.png', '317eadce-631a-4429-a0bb-f19a7a517b4a', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '{"mimetype": "image/svg+xml", "size": 1234}', NULL), +('8098E1AC-C744-4368-86DF-71B60CCDE222', 'bucket3a', 'sadcat-upload33.png', '317eadce-631a-4429-a0bb-f19a7a517b4a', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '{"mimetype": "image/svg+xml", "size": 1234}', NULL), +('D3EB488E-94F4-46CD-86D3-242C13B95BAD', 'bucket3a', 'sadcat-upload22.png', '317eadce-631a-4429-a0bb-f19a7a517b4a', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '{"mimetype": "image/svg+xml", "size": 1234}', NULL), ('746180e8-8029-4134-8a21-48ab35485d81', 'public-bucket', 'favicon.ico', '317eadce-631a-4429-a0bb-f19a7a517b4a', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '{"mimetype": "image/svg+xml", "size": 1234}', NULL), ('ea2e2806-9ded-4882-8c26-e172a29ed063', 'public-bucket-2', 'favicon.ico', '317eadce-631a-4429-a0bb-f19a7a517b4a', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '2021-03-01 08:53:29.567975+00', '{"size": 1234, "mimetype": "image/svg+xml", "eTag": "abc", "lastModified": "Wed, 12 Oct 2022 11:17:02 GMT", "contentLength": 3746, "cacheControl": "no-cache"}', NULL); ; From db76afa08be4d9afd36de5c00156b511211347d0 Mon Sep 17 00:00:00 2001 From: Lenny Date: Wed, 2 Jul 2025 09:42:51 -0400 Subject: [PATCH 5/5] fix: allow 10,000 parts for s3 uploads --- src/http/routes/s3/commands/upload-part.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/http/routes/s3/commands/upload-part.ts b/src/http/routes/s3/commands/upload-part.ts index 57c5c3cf..7f113670 100644 --- a/src/http/routes/s3/commands/upload-part.ts +++ b/src/http/routes/s3/commands/upload-part.ts @@ -18,7 +18,7 @@ const UploadPartInput = { type: 'object', properties: { uploadId: { type: 'string' }, - partNumber: { type: 'number', minimum: 1, maximum: 5000 }, + partNumber: { type: 'number', minimum: 1, maximum: 10000 }, }, required: ['uploadId', 'partNumber'], },