Skip to content

Commit

Permalink
@tus/s3-store: add expiration extension (#513)
Browse files Browse the repository at this point in the history
Co-authored-by: Merlijn Vos <merlijn@soverin.net>
  • Loading branch information
fenos and Murderlon committed Nov 27, 2023
1 parent 612ac24 commit 6fded1e
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 43 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ The tus protocol supports optional [extensions][]. Below is a table of the suppo
| ------------------------ | --------------------------------- | ----------------------------- | ------------------------------- |
| [Creation][] ||||
| [Creation With Upload][] ||||
| [Expiration][] || ||
| [Expiration][] || ||
| [Checksum][] ||||
| [Termination][] ||||
| [Concatenation][] ||||
Expand Down
30 changes: 29 additions & 1 deletion packages/s3-store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ The tus protocol supports optional [extensions][]. Below is a table of the suppo
| ------------------------ | --------------- |
| [Creation][] ||
| [Creation With Upload][] ||
| [Expiration][] | |
| [Expiration][] | |
| [Checksum][] ||
| [Termination][] ||
| [Concatenation][] ||
Expand All @@ -88,6 +88,32 @@ The tus protocol supports optional [extensions][]. Below is a table of the suppo

After a multipart upload is aborted, no additional parts can be uploaded using that upload ID. The storage consumed by any previously uploaded parts will be freed. However, if any part uploads are currently in progress, those part uploads might or might not succeed. As a result, it might be necessary to set an [S3 Lifecycle configuration](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html) to abort incomplete multipart uploads.

### Expiration

Unlike other stores, the expiration extension on the S3 store does not need to call [`server.cleanUpExpiredUploads()`][cleanExpiredUploads].
The store creates a `Tus-Complete` tag for all objects, including `.part` and `.info` files, to indicate whether an upload is finished.
This means you could setup a [lifecyle][] policy to automatically clean them up without a CRON job.

```json
{
"Rules": [
{
"Filter": {
"Tag": {
"Key": "Tus-Complete",
"Value": "false"
}
},
"Expiration": {
"Days": 2
}
}
]
}
```

If you want more granularity, it is still possible to configure a CRON job to call [`server.cleanExpiredUploads()`][cleanExpiredUploads] yourself.

## Examples

### Example: using `credentials` to fetch credentials inside a AWS container
Expand Down Expand Up @@ -137,3 +163,5 @@ See [`contributing.md`](https://github.com/tus/tus-node-server/blob/main/.github
[checksum]: https://tus.io/protocols/resumable-upload.html#checksum
[termination]: https://tus.io/protocols/resumable-upload.html#termination
[concatenation]: https://tus.io/protocols/resumable-upload.html#concatenation
[cleanExpiredUploads]: https://github.com/tus/tus-node-server/tree/main/packages/server#servercleanupexpireduploads
[lifecyle]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html
127 changes: 123 additions & 4 deletions packages/s3-store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Options = {
// The server calculates the optimal part size, which takes this size into account,
// but may increase it to not exceed the S3 10K parts limit.
partSize?: number
expirationPeriodInMilliseconds?: number
// Options to pass to the AWS S3 SDK.
s3ClientConfig: S3ClientConfig & {bucket: string}
}
Expand Down Expand Up @@ -69,6 +70,7 @@ export class S3Store extends DataStore {
private cache: Map<string, MetadataValue> = new Map()
private client: S3
private preferredPartSize: number
private expirationPeriodInMilliseconds = 0
public maxMultipartParts = 10_000 as const
public minPartSize = 5_242_880 as const // 5MiB
public maxUploadSize = 5_497_558_138_880 as const // 5TiB
Expand All @@ -82,9 +84,11 @@ export class S3Store extends DataStore {
'creation-with-upload',
'creation-defer-length',
'termination',
'expiration',
]
this.bucket = bucket
this.preferredPartSize = partSize || 8 * 1024 * 1024
this.expirationPeriodInMilliseconds = options.expirationPeriodInMilliseconds ?? 0
this.client = new S3(restS3ClientConfig)
}

Expand All @@ -98,8 +102,9 @@ export class S3Store extends DataStore {
log(`[${upload.id}] saving metadata`)
await this.client.putObject({
Bucket: this.bucket,
Key: `${upload.id}.info`,
Key: this.infoKey(upload.id),
Body: JSON.stringify(upload),
Tagging: `Tus-Completed=false`,
Metadata: {
'upload-id': uploadId,
'tus-version': TUS_RESUMABLE,
Expand All @@ -108,6 +113,20 @@ export class S3Store extends DataStore {
log(`[${upload.id}] metadata file saved`)
}

private async completeMetadata(upload: Upload) {
const {'upload-id': uploadId} = await this.getMetadata(upload.id)
await this.client.putObject({
Bucket: this.bucket,
Key: this.infoKey(upload.id),
Body: JSON.stringify(upload),
Tagging: `Tus-Completed=true`,
Metadata: {
'upload-id': uploadId,
'tus-version': TUS_RESUMABLE,
},
})
}

/**
* Retrieves upload metadata previously saved in `${file_id}.info`.
* There's a small and simple caching mechanism to avoid multiple
Expand All @@ -121,7 +140,7 @@ export class S3Store extends DataStore {

const {Metadata, Body} = await this.client.getObject({
Bucket: this.bucket,
Key: `${id}.info`,
Key: this.infoKey(id),
})
const file = JSON.parse((await Body?.transformToString()) as string)
this.cache.set(id, {
Expand All @@ -132,11 +151,16 @@ export class S3Store extends DataStore {
size: file.size ? Number.parseInt(file.size, 10) : undefined,
offset: Number.parseInt(file.offset, 10),
metadata: file.metadata,
creation_date: file.creation_date,
}),
})
return this.cache.get(id) as MetadataValue
}

private infoKey(id: string) {
return `${id}.info`
}

private partKey(id: string, isIncomplete = false) {
if (isIncomplete) {
id += '.part'
Expand Down Expand Up @@ -173,6 +197,7 @@ export class S3Store extends DataStore {
Bucket: this.bucket,
Key: this.partKey(id, true),
Body: readStream,
Tagging: 'Tus-Completed=false',
})
log(`[${id}] finished uploading incomplete part`)
return data.ETag as string
Expand Down Expand Up @@ -452,6 +477,8 @@ export class S3Store extends DataStore {
request.ContentType = upload.metadata.contentType
}

upload.creation_date = new Date().toISOString()

const res = await this.client.createMultipartUpload(request)
await this.saveMetadata(upload, res.UploadId as string)
log(`[${upload.id}] multipart upload created (${res.UploadId})`)
Expand Down Expand Up @@ -495,6 +522,7 @@ export class S3Store extends DataStore {
try {
const parts = await this.retrieveParts(id)
await this.finishMultipartUpload(metadata, parts)
await this.completeMetadata(metadata.file)
this.clearCache(id)
} catch (error) {
log(`[${id}] failed to finish upload`, error)
Expand Down Expand Up @@ -558,7 +586,7 @@ export class S3Store extends DataStore {

file.size = upload_length

this.saveMetadata(file, uploadId)
await this.saveMetadata(file, uploadId)
}

public async remove(id: string): Promise<void> {
Expand All @@ -582,10 +610,101 @@ export class S3Store extends DataStore {
await this.client.deleteObjects({
Bucket: this.bucket,
Delete: {
Objects: [{Key: id}, {Key: `${id}.info`}],
Objects: [{Key: id}, {Key: this.infoKey(id)}],
},
})

this.clearCache(id)
}

protected getExpirationDate(created_at: string) {
const date = new Date(created_at)

return new Date(date.getTime() + this.getExpiration())
}

getExpiration(): number {
return this.expirationPeriodInMilliseconds
}

async deleteExpired(): Promise<number> {
if (this.getExpiration() === 0) {
return 0
}

let keyMarker: string | undefined = undefined
let uploadIdMarker: string | undefined = undefined
let isTruncated = true
let deleted = 0

while (isTruncated) {
const listResponse: AWS.ListMultipartUploadsCommandOutput =
await this.client.listMultipartUploads({
Bucket: this.bucket,
KeyMarker: keyMarker,
UploadIdMarker: uploadIdMarker,
})

const expiredUploads =
listResponse.Uploads?.filter((multiPartUpload) => {
const initiatedDate = multiPartUpload.Initiated
return (
initiatedDate &&
new Date().getTime() >
this.getExpirationDate(initiatedDate.toISOString()).getTime()
)
}) || []

const objectsToDelete = expiredUploads.reduce((all, expiredUpload) => {
all.push(
{
key: this.infoKey(expiredUpload.Key as string),
},
{
key: this.partKey(expiredUpload.Key as string, true),
}
)
return all
}, [] as {key: string}[])

const deletions: Promise<AWS.DeleteObjectsCommandOutput>[] = []

// Batch delete 1000 items at a time
while (objectsToDelete.length > 0) {
const objects = objectsToDelete.splice(0, 1000)
deletions.push(
this.client.deleteObjects({
Bucket: this.bucket,
Delete: {
Objects: objects.map((object) => ({
Key: object.key,
})),
},
})
)
}

const [objectsDeleted] = await Promise.all([
Promise.all(deletions),
...expiredUploads.map((expiredUpload) => {
return this.client.abortMultipartUpload({
Bucket: this.bucket,
Key: expiredUpload.Key,
UploadId: expiredUpload.UploadId,
})
}),
])

deleted += objectsDeleted.reduce((all, acc) => all + (acc.Deleted?.length ?? 0), 0)

isTruncated = Boolean(listResponse.IsTruncated)

if (isTruncated) {
keyMarker = listResponse.NextKeyMarker
uploadIdMarker = listResponse.NextUploadIdMarker
}
}

return deleted
}
}
66 changes: 30 additions & 36 deletions test/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,63 +360,57 @@ describe('EndToEnd', () => {
})

describe('PATCH', () => {
it('unfinished upload response contains header Upload-Expires', (done) => {
agent
it('unfinished upload response contains header Upload-Expires', async () => {
const res = await agent
.post(STORE_PATH)
.set('Tus-Resumable', TUS_RESUMABLE)
.set('Upload-Length', `${TEST_FILE_SIZE}`)
.set('Upload-Metadata', TEST_METADATA)
.set('Tus-Resumable', TUS_RESUMABLE)
.expect(201)
.end((_, res) => {
assert.equal('upload-expires' in res.headers, true)
file_id = res.headers.location.split('/').pop()
})

assert.equal('upload-expires' in res.headers, true)
file_id = res.headers.location.split('/').pop()

const msg = 'tus test'
const write_stream = agent
const patch_res = await agent
.patch(`${STORE_PATH}/${file_id}`)
.set('Tus-Resumable', TUS_RESUMABLE)
.set('Upload-Offset', '0')
.set('Content-Type', 'application/offset+octet-stream')
write_stream.on('response', (res) => {
assert.equal(res.statusCode, 204)
assert.equal(res.header['tus-resumable'], TUS_RESUMABLE)
assert.equal(res.header['upload-offset'], `${msg.length}`)
assert.equal('upload-expires' in res.headers, true)
done()
})
write_stream.write(msg)
write_stream.end(() => {})
.send(msg)
assert.equal(patch_res.statusCode, 204)
assert.equal(patch_res.header['tus-resumable'], TUS_RESUMABLE)
assert.equal(patch_res.header['upload-offset'], `${msg.length}`)
assert.equal('upload-expires' in patch_res.headers, true)
})

it('expired upload responds with 410 Gone', (done) => {
agent
it('expired upload responds with 410 Gone', async () => {
const res = await agent
.post(STORE_PATH)
.set('Tus-Resumable', TUS_RESUMABLE)
.set('Upload-Length', `${TEST_FILE_SIZE}`)
.set('Upload-Metadata', TEST_METADATA)
.set('Tus-Resumable', TUS_RESUMABLE)
.expect(201)
.end((_, res) => {
assert.equal('upload-expires' in res.headers, true)
file_id = res.headers.location.split('/').pop()

setTimeout(() => {
const msg = 'tus test'
const write_stream = agent
.patch(`${STORE_PATH}/${file_id}`)
.set('Tus-Resumable', TUS_RESUMABLE)
.set('Upload-Offset', '0')
.set('Content-Type', 'application/offset+octet-stream')
write_stream.on('response', (res) => {
assert.equal(res.statusCode, 410)
done()
})
write_stream.write(msg)
write_stream.end(() => {})
}, 51)
})
assert.equal('upload-expires' in res.headers, true)
file_id = res.headers.location.split('/').pop()

await new Promise<void>((resolve, reject) => {
setTimeout(() => {
const msg = 'tus test'
agent
.patch(`${STORE_PATH}/${file_id}`)
.set('Tus-Resumable', TUS_RESUMABLE)
.set('Upload-Offset', '0')
.set('Content-Type', 'application/offset+octet-stream')
.send(msg)
.expect(410)
.then(() => resolve())
.catch(reject)
}, 51)
})
})
})

Expand Down
2 changes: 1 addition & 1 deletion test/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"private": true,
"scripts": {
"build": "tsc",
"test": "mocha e2e.test.ts --timeout 40000 --exit --extension ts --require ts-node/register"
"test": "mocha e2e.test.ts s3.e2e.ts --timeout 40000 --exit --extension ts --require ts-node/register"
},
"dependencies": {
"@tus/file-store": "workspace:^",
Expand Down

0 comments on commit 6fded1e

Please sign in to comment.