Skip to content

Commit

Permalink
feat: s3 store expiration extension
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Nov 22, 2023
1 parent 612ac24 commit dc5da78
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ The tus protocol supports optional [extensions][]. Below is a table of the suppo
| ------------------------ | --------------------------------- | ----------------------------- | ------------------------------- |
| [Creation][] ||||
| [Creation With Upload][] ||||
| [Expiration][] || ||
| [Expiration][] || ||
| [Checksum][] ||||
| [Termination][] ||||
| [Concatenation][] ||||
Expand Down
113 changes: 112 additions & 1 deletion packages/s3-store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Options = {
// The server calculates the optimal part size, which takes this size into account,
// but may increase it to not exceed the S3 10K parts limit.
partSize?: number
expirationPeriodInMilliseconds?: number
// Options to pass to the AWS S3 SDK.
s3ClientConfig: S3ClientConfig & {bucket: string}
}
Expand Down Expand Up @@ -69,6 +70,7 @@ export class S3Store extends DataStore {
private cache: Map<string, MetadataValue> = new Map()
private client: S3
private preferredPartSize: number
private expirationPeriodInMilliseconds = 0
public maxMultipartParts = 10_000 as const
public minPartSize = 5_242_880 as const // 5MiB
public maxUploadSize = 5_497_558_138_880 as const // 5TiB
Expand All @@ -82,9 +84,11 @@ export class S3Store extends DataStore {
'creation-with-upload',
'creation-defer-length',
'termination',
'expiration',
]
this.bucket = bucket
this.preferredPartSize = partSize || 8 * 1024 * 1024
this.expirationPeriodInMilliseconds = options.expirationPeriodInMilliseconds ?? 0
this.client = new S3(restS3ClientConfig)
}

Expand All @@ -100,6 +104,7 @@ export class S3Store extends DataStore {
Bucket: this.bucket,
Key: `${upload.id}.info`,
Body: JSON.stringify(upload),
Tagging: `Tus-Completed=false`,
Metadata: {
'upload-id': uploadId,
'tus-version': TUS_RESUMABLE,
Expand All @@ -108,6 +113,20 @@ export class S3Store extends DataStore {
log(`[${upload.id}] metadata file saved`)
}

private async completeMetadata(upload: Upload) {
const {'upload-id': uploadId} = await this.getMetadata(upload.id)
await this.client.putObject({
Bucket: this.bucket,
Key: `${upload.id}.info`,
Body: JSON.stringify(upload),
Tagging: `Tus-Completed=true`,
Metadata: {
'upload-id': uploadId,
'tus-version': TUS_RESUMABLE,
},
})
}

/**
* Retrieves upload metadata previously saved in `${file_id}.info`.
* There's a small and simple caching mechanism to avoid multiple
Expand All @@ -132,6 +151,7 @@ export class S3Store extends DataStore {
size: file.size ? Number.parseInt(file.size, 10) : undefined,
offset: Number.parseInt(file.offset, 10),
metadata: file.metadata,
creation_date: file.creation_date,
}),
})
return this.cache.get(id) as MetadataValue
Expand Down Expand Up @@ -173,6 +193,7 @@ export class S3Store extends DataStore {
Bucket: this.bucket,
Key: this.partKey(id, true),
Body: readStream,
Tagging: 'Tus-Completed=false',
})
log(`[${id}] finished uploading incomplete part`)
return data.ETag as string
Expand Down Expand Up @@ -452,6 +473,8 @@ export class S3Store extends DataStore {
request.ContentType = upload.metadata.contentType
}

upload.creation_date = new Date().toISOString()

const res = await this.client.createMultipartUpload(request)
await this.saveMetadata(upload, res.UploadId as string)
log(`[${upload.id}] multipart upload created (${res.UploadId})`)
Expand Down Expand Up @@ -495,6 +518,7 @@ export class S3Store extends DataStore {
try {
const parts = await this.retrieveParts(id)
await this.finishMultipartUpload(metadata, parts)
await this.completeMetadata(metadata.file)
this.clearCache(id)
} catch (error) {
log(`[${id}] failed to finish upload`, error)
Expand Down Expand Up @@ -558,7 +582,7 @@ export class S3Store extends DataStore {

file.size = upload_length

this.saveMetadata(file, uploadId)
await this.saveMetadata(file, uploadId)
}

public async remove(id: string): Promise<void> {
Expand Down Expand Up @@ -588,4 +612,91 @@ export class S3Store extends DataStore {

this.clearCache(id)
}

protected getExpirationDate(created_at: string) {
const date = new Date(created_at)

return new Date(date.getTime() + this.getExpiration())
}

getExpiration(): number {
return this.expirationPeriodInMilliseconds
}

async deleteExpired(): Promise<number> {
let keyMarker: string | undefined = undefined
let uploadIdMarker: string | undefined = undefined
let isTruncated = true
let deleted = 0

while (isTruncated) {
const listResponse: AWS.ListMultipartUploadsCommandOutput =
await this.client.listMultipartUploads({
Bucket: this.bucket,
KeyMarker: keyMarker,
UploadIdMarker: uploadIdMarker,
})

const expiredUploads =
listResponse.Uploads?.filter((multiPartUpload) => {
const initiatedDate = multiPartUpload.Initiated
return (
initiatedDate &&
this.getExpirationDate(initiatedDate.toISOString()).getTime() <
new Date().getTime()
)
}) || []

const objectsToDelete = expiredUploads.reduce((all, expiredUpload) => {
all.push(
{
key: expiredUpload.Key + '.info',
},
{
key: expiredUpload.Key + '.part',
}
)
return all
}, [] as {key: string}[])

const deletions: Promise<AWS.DeleteObjectsCommandOutput>[] = []

// Batch delete 1000 items at a time
while (objectsToDelete.length > 0) {
const objects = objectsToDelete.splice(0, 1000)
deletions.push(
this.client.deleteObjects({
Bucket: this.bucket,
Delete: {
Objects: objects.map((object) => ({
Key: object.key,
})),
},
})
)
}

const [objectsDeleted] = await Promise.all([
Promise.all(deletions),
...expiredUploads.map((expiredUpload) => {
return this.client.abortMultipartUpload({
Bucket: this.bucket,
Key: expiredUpload.Key,
UploadId: expiredUpload.UploadId,
})
}),
])

deleted += objectsDeleted.reduce((all, acc) => all + (acc.Deleted?.length ?? 0), 0)

isTruncated = Boolean(listResponse.IsTruncated)

if (isTruncated) {
keyMarker = listResponse.NextKeyMarker
uploadIdMarker = listResponse.NextUploadIdMarker
}
}

return deleted
}
}
22 changes: 22 additions & 0 deletions test/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: '3'
services:
minio:
image: minio/minio
ports:
- '9000:9000'
- '9001:9001'
environment:
MINIO_ROOT_USER: s3-storage
MINIO_ROOT_PASSWORD: secret1234
command: server --console-address ":9001" /data

createbuckets:
image: minio/mc
depends_on:
- minio
entrypoint: >
/bin/sh -c "
/usr/bin/mc alias set s3-minio http://minio:9000 s3-storage secret1234;
/usr/bin/mc mb s3-minio/tus-node-server-bucket;
exit 0;
"
3 changes: 2 additions & 1 deletion test/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"private": true,
"scripts": {
"build": "tsc",
"test": "mocha e2e.test.ts --timeout 40000 --exit --extension ts --require ts-node/register"
"start:local-s3": "docker-compose up -d",
"test": "npm run start:local-s3 && mocha e2e.test.ts s3.e2e.ts --timeout 40000 --exit --extension ts --require ts-node/register"
},
"dependencies": {
"@tus/file-store": "workspace:^",
Expand Down

0 comments on commit dc5da78

Please sign in to comment.