From 88421f7c8b53dcb74ff0bf68982227c1b81e94f2 Mon Sep 17 00:00:00 2001 From: fenos Date: Fri, 10 Nov 2023 11:29:04 +0000 Subject: [PATCH] feat: s3 store expiration extension --- README.md | 2 +- packages/s3-store/index.ts | 114 +++++++++++++++++++++- test/docker-compose.yaml | 22 +++++ test/package.json | 3 +- test/s3.e2e.ts | 188 +++++++++++++++++++++++++++++++++++++ 5 files changed, 326 insertions(+), 3 deletions(-) create mode 100644 test/docker-compose.yaml create mode 100644 test/s3.e2e.ts diff --git a/README.md b/README.md index ba3b467e..c928436e 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,7 @@ The tus protocol supports optional [extensions][]. Below is a table of the suppo | ------------------------ | --------------------------------- | ----------------------------- | ------------------------------- | | [Creation][] | ✅ | ✅ | ✅ | | [Creation With Upload][] | ✅ | ✅ | ✅ | -| [Expiration][] | ✅ | ❌ | ❌ | +| [Expiration][] | ✅ | ✅ | ❌ | | [Checksum][] | ❌ | ❌ | ❌ | | [Termination][] | ✅ | ✅ | ❌ | | [Concatenation][] | ❌ | ❌ | ❌ | diff --git a/packages/s3-store/index.ts b/packages/s3-store/index.ts index e81c5dd7..c82e7ee9 100644 --- a/packages/s3-store/index.ts +++ b/packages/s3-store/index.ts @@ -22,6 +22,7 @@ type Options = { // The server calculates the optimal part size, which takes this size into account, // but may increase it to not exceed the S3 10K parts limit. partSize?: number + expirationPeriodInMilliseconds?: number // Options to pass to the AWS S3 SDK. s3ClientConfig: S3ClientConfig & {bucket: string} } @@ -69,6 +70,8 @@ export class S3Store extends DataStore { private cache: Map = new Map() private client: S3 private preferredPartSize: number + private expirationPeriodInMilliseconds = 0 + private setExpirationOnPartsUploads = false public maxMultipartParts = 10_000 as const public minPartSize = 5_242_880 as const // 5MiB public maxUploadSize = 5_497_558_138_880 as const // 5TiB @@ -82,9 +85,11 @@ export class S3Store extends DataStore { 'creation-with-upload', 'creation-defer-length', 'termination', + 'expiration', ] this.bucket = bucket this.preferredPartSize = partSize || 8 * 1024 * 1024 + this.expirationPeriodInMilliseconds = options.expirationPeriodInMilliseconds ?? 0 this.client = new S3(restS3ClientConfig) } @@ -100,6 +105,7 @@ export class S3Store extends DataStore { Bucket: this.bucket, Key: `${upload.id}.info`, Body: JSON.stringify(upload), + Tagging: `Tus-Completed=false`, Metadata: { 'upload-id': uploadId, 'tus-version': TUS_RESUMABLE, @@ -108,6 +114,20 @@ export class S3Store extends DataStore { log(`[${upload.id}] metadata file saved`) } + private async completeMetadata(upload: Upload) { + const {'upload-id': uploadId} = await this.getMetadata(upload.id) + await this.client.putObject({ + Bucket: this.bucket, + Key: `${upload.id}.info`, + Body: JSON.stringify(upload), + Tagging: `Tus-Completed=true`, + Metadata: { + 'upload-id': uploadId, + 'tus-version': TUS_RESUMABLE, + }, + }) + } + /** * Retrieves upload metadata previously saved in `${file_id}.info`. * There's a small and simple caching mechanism to avoid multiple @@ -132,6 +152,7 @@ export class S3Store extends DataStore { size: file.size ? Number.parseInt(file.size, 10) : undefined, offset: Number.parseInt(file.offset, 10), metadata: file.metadata, + creation_date: file.creation_date, }), }) return this.cache.get(id) as MetadataValue @@ -173,6 +194,7 @@ export class S3Store extends DataStore { Bucket: this.bucket, Key: this.partKey(id, true), Body: readStream, + Tagging: 'Tus-Completed=false', }) log(`[${id}] finished uploading incomplete part`) return data.ETag as string @@ -452,6 +474,8 @@ export class S3Store extends DataStore { request.ContentType = upload.metadata.contentType } + upload.creation_date = new Date().toISOString() + const res = await this.client.createMultipartUpload(request) await this.saveMetadata(upload, res.UploadId as string) log(`[${upload.id}] multipart upload created (${res.UploadId})`) @@ -495,6 +519,7 @@ export class S3Store extends DataStore { try { const parts = await this.retrieveParts(id) await this.finishMultipartUpload(metadata, parts) + await this.completeMetadata(metadata.file) this.clearCache(id) } catch (error) { log(`[${id}] failed to finish upload`, error) @@ -558,7 +583,7 @@ export class S3Store extends DataStore { file.size = upload_length - this.saveMetadata(file, uploadId) + await this.saveMetadata(file, uploadId) } public async remove(id: string): Promise { @@ -588,4 +613,91 @@ export class S3Store extends DataStore { this.clearCache(id) } + + protected getExpirationDate(created_at: string) { + const date = new Date(created_at) + + return new Date(date.getTime() + this.getExpiration()) + } + + getExpiration(): number { + return this.expirationPeriodInMilliseconds + } + + async deleteExpired(): Promise { + let keyMarker: string | undefined = undefined + let uploadIdMarker: string | undefined = undefined + let isTruncated = true + let deleted = 0 + + while (isTruncated) { + const listResponse: AWS.ListMultipartUploadsCommandOutput = + await this.client.listMultipartUploads({ + Bucket: this.bucket, + KeyMarker: keyMarker, + UploadIdMarker: uploadIdMarker, + }) + + const expiredUploads = + listResponse.Uploads?.filter((multiPartUpload) => { + const initiatedDate = multiPartUpload.Initiated + return ( + initiatedDate && + this.getExpirationDate(initiatedDate.toISOString()).getTime() < + new Date().getTime() + ) + }) || [] + + const objectsToDelete = expiredUploads.reduce((all, expiredUpload) => { + all.push( + { + key: expiredUpload.Key + '.info', + }, + { + key: expiredUpload.Key + '.part', + } + ) + return all + }, [] as {key: string}[]) + + const deletions: Promise[] = [] + + // Batch delete 1000 items at a time + while (objectsToDelete.length > 0) { + const objects = objectsToDelete.splice(0, 1000) + deletions.push( + this.client.deleteObjects({ + Bucket: this.bucket, + Delete: { + Objects: objects.map((object) => ({ + Key: object.key, + })), + }, + }) + ) + } + + const [objectsDeleted] = await Promise.all([ + Promise.all(deletions), + ...expiredUploads.map((expiredUpload) => { + return this.client.abortMultipartUpload({ + Bucket: this.bucket, + Key: expiredUpload.Key, + UploadId: expiredUpload.UploadId, + }) + }), + ]) + + deleted += objectsDeleted.reduce((all, acc) => all + (acc.Deleted?.length ?? 0), 0) + + isTruncated = Boolean(listResponse.IsTruncated) + + if (isTruncated) { + keyMarker = listResponse.NextKeyMarker + uploadIdMarker = listResponse.NextUploadIdMarker + } + } + + return deleted + } } diff --git a/test/docker-compose.yaml b/test/docker-compose.yaml new file mode 100644 index 00000000..a78e1114 --- /dev/null +++ b/test/docker-compose.yaml @@ -0,0 +1,22 @@ +version: '3' +services: + minio: + image: minio/minio + ports: + - '9000:9000' + - '9001:9001' + environment: + MINIO_ROOT_USER: s3-storage + MINIO_ROOT_PASSWORD: secret1234 + command: server --console-address ":9001" /data + + createbuckets: + image: minio/mc + depends_on: + - minio + entrypoint: > + /bin/sh -c " + /usr/bin/mc alias set s3-minio http://minio:9000 s3-storage secret1234; + /usr/bin/mc mb s3-minio/tus-node-server-bucket; + exit 0; + " diff --git a/test/package.json b/test/package.json index 4f15a025..51a0c8a1 100644 --- a/test/package.json +++ b/test/package.json @@ -4,7 +4,8 @@ "private": true, "scripts": { "build": "tsc", - "test": "mocha e2e.test.ts --timeout 40000 --exit --extension ts --require ts-node/register" + "start:local-s3": "docker-compose up -d", + "test": "npm run start:local-s3 && mocha e2e.test.ts s3.e2e.ts --timeout 40000 --exit --extension ts --require ts-node/register" }, "dependencies": { "@tus/file-store": "workspace:^", diff --git a/test/s3.e2e.ts b/test/s3.e2e.ts new file mode 100644 index 00000000..4465087f --- /dev/null +++ b/test/s3.e2e.ts @@ -0,0 +1,188 @@ +import {S3Store} from '@tus/s3-store' +import {Server, TUS_RESUMABLE} from '@tus/server' +import {SuperAgentTest} from 'supertest' +import request from 'supertest' +import http from 'node:http' +import {describe} from 'node:test' +import {strict as assert} from 'node:assert' +import {S3} from '@aws-sdk/client-s3' +import sinon from 'sinon' + +const STORE_PATH = '/upload' + +interface S3Options { + partSize?: number + expirationPeriodInMilliseconds?: number +} + +const s3Credentials = { + bucket: 'tus-node-server-bucket', + region: 'us-east-1', + endpoint: 'http://localhost:9000', + forcePathStyle: true, + credentials: { + accessKeyId: 's3-storage', + secretAccessKey: 'secret1234', + }, +} + +const s3Client = new S3(s3Credentials) + +const createStore = (options: S3Options = {}) => + new S3Store({ + ...options, + s3ClientConfig: s3Credentials, + }) + +const createUpload = async (agent: SuperAgentTest, uploadLength: number) => { + const response = await agent + .post(STORE_PATH) + .set('Tus-Resumable', TUS_RESUMABLE) + .set('Upload-Length', uploadLength.toString()) + .expect(201) + + assert(Boolean(response.headers.location), 'location not returned') + const uploadId = response.headers.location.split('/').pop() + return {uploadId: uploadId as string, expires: response.headers['upload-expires']} +} + +const allocMB = (mb: number) => Buffer.alloc(1024 * 1024 * mb) +const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) + +const stubListMultiPart = (store: S3Store, uploadId: string, expires: string) => { + // @ts-ignore + // Note: accessing private property for stabbing purpose + sinon.stub(store.client, 'listMultipartUploads').callsFake(async () => { + const upload = await s3Client.getObject({ + Bucket: s3Credentials.bucket, + Key: uploadId + '.info', + }) + + return { + Uploads: [ + { + UploadId: upload.Metadata?.['upload-id'], + Key: uploadId, + Initiated: new Date(new Date(expires).getTime() - 1000), + }, + ], + } + }) +} + +const patchUpload = async ( + agent: SuperAgentTest, + uploadId: string, + data: Buffer, + offset = 0 +) => { + const res = await agent + .patch(`${STORE_PATH}/${uploadId}`) + .set('Tus-Resumable', TUS_RESUMABLE) + .set('Upload-Offset', offset.toString()) + .set('Content-Type', 'application/offset+octet-stream') + .send(data) + .expect(204) + + return {offset: parseInt(res.headers['upload-offset'], 10)} +} + +describe('S3 Store E2E', () => { + describe('Expiration extension', () => { + let server: Server + let listener: http.Server + let agent: SuperAgentTest + let store: S3Store + + before((done) => { + store = createStore({ + expirationPeriodInMilliseconds: 1000, + partSize: 5_242_880, + }) + server = new Server({ + path: STORE_PATH, + datastore: store, + }) + listener = server.listen() + agent = request.agent(listener) + done() + }) + + after((done) => { + listener.close(done) + }) + + it('should set Tus-Completed=false when the upload is not completed', async () => { + const data = allocMB(11) + const {uploadId} = await createUpload(agent, data.length) + await patchUpload(agent, uploadId, data.subarray(0, 1024 * 1024 * 6)) + + const {TagSet} = await s3Client.getObjectTagging({ + Bucket: s3Credentials.bucket, + Key: uploadId + '.info', + }) + + assert( + TagSet?.find((tag) => tag.Key === 'Tus-Completed')?.Value === 'false', + 'object tag Tus-Completed not set to "false"' + ) + }) + + it('should set Tus-Completed=true when the upload is completed', async () => { + const data = allocMB(11) + const {uploadId} = await createUpload(agent, data.length) + const {offset} = await patchUpload( + agent, + uploadId, + data.subarray(0, 1024 * 1024 * 6) + ) + + await patchUpload(agent, uploadId, data.subarray(offset), offset) + + const {TagSet} = await s3Client.getObjectTagging({ + Bucket: s3Credentials.bucket, + Key: uploadId + '.info', + }) + + assert( + TagSet?.find((tag) => tag.Key === 'Tus-Completed')?.Value === 'true', + 'object tag Tus-Completed not set to "true"' + ) + }) + + it('calling deleteExpired will delete all expired objects', async () => { + const data = allocMB(11) + const {uploadId, expires} = await createUpload(agent, data.length) + await patchUpload(agent, uploadId, data.subarray(0, 1024 * 1024 * 6)) + + await wait(1100) + + // Note: Minio is not compliant with the `listMultipartUploads` method on the s3 spec, see https://github.com/minio/minio/issues/13246 + stubListMultiPart(store, uploadId, expires) + + // .info file and .part should be deleted + const deletedFiles = await store.deleteExpired() + assert(deletedFiles === 2, `not all parts were deleted, deleted ${deletedFiles}`) + }) + + it('will not allow to upload to an expired url', async () => { + const data = allocMB(11) + const {uploadId} = await createUpload(agent, data.length) + const {offset} = await patchUpload( + agent, + uploadId, + data.subarray(0, 1024 * 1024 * 6) + ) + + await wait(1100) + + await agent + .patch(`${STORE_PATH}/${uploadId}`) + .set('Tus-Resumable', TUS_RESUMABLE) + .set('Upload-Offset', offset.toString()) + .set('Content-Type', 'application/offset+octet-stream') + .send(data.subarray(offset)) + .expect(410) + }) + }) +})