Skip to content

Commit

Permalink
rebase: main
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Dec 13, 2023
2 parents dda00f0 + fa49076 commit d0c7ef1
Show file tree
Hide file tree
Showing 41 changed files with 1,851 additions and 434 deletions.
2 changes: 2 additions & 0 deletions .yarnrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ plugins:
spec: "@yarnpkg/plugin-typescript"
- path: .yarn/plugins/@yarnpkg/plugin-interactive-tools.cjs
spec: "@yarnpkg/plugin-interactive-tools"

yarnPath: .yarn/releases/yarn-3.2.3.cjs
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ The tus protocol supports optional [extensions][]. Below is a table of the suppo
| ------------------------ | --------------------------------- | ----------------------------- | ------------------------------- |
| [Creation][] ||||
| [Creation With Upload][] ||||
| [Expiration][] || ||
| [Expiration][] || ||
| [Checksum][] ||||
| [Termination][] ||||
| [Concatenation][] ||||
Expand Down
25 changes: 10 additions & 15 deletions packages/file-store/configstores/FileConfigstore.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import fs from 'node:fs/promises'
import path from 'node:path'
import {Upload} from '@tus/server'
import PQueue from 'p-queue'

import {Configstore} from './Types'

Expand All @@ -11,40 +10,36 @@ import {Configstore} from './Types'
*/
export class FileConfigstore implements Configstore {
directory: string
queue: PQueue

constructor(path: string) {
this.directory = path
this.queue = new PQueue({concurrency: 1})
}

async get(key: string): Promise<Upload | undefined> {
try {
const buffer = await this.queue.add(() => fs.readFile(this.resolve(key), 'utf8'))
const buffer = await fs.readFile(this.resolve(key), 'utf8')
return JSON.parse(buffer as string)
} catch {
return undefined
}
}

async set(key: string, value: Upload): Promise<void> {
await this.queue.add(() => fs.writeFile(this.resolve(key), JSON.stringify(value)))
await fs.writeFile(this.resolve(key), JSON.stringify(value))
}

async delete(key: string): Promise<void> {
await this.queue.add(() => fs.rm(this.resolve(key)))
await fs.rm(this.resolve(key))
}

async list(): Promise<Array<string>> {
return this.queue.add(async () => {
const files = await fs.readdir(this.directory)
const sorted = files.sort((a, b) => a.localeCompare(b))
const name = (file: string) => path.basename(file, '.json')
// To only return tus file IDs we check if the file has a corresponding JSON info file
return sorted.filter(
(file, idx) => idx < sorted.length - 1 && name(file) === name(sorted[idx + 1])
)
})
const files = await fs.readdir(this.directory)
const sorted = files.sort((a, b) => a.localeCompare(b))
const name = (file: string) => path.basename(file, '.json')
// To only return tus file IDs we check if the file has a corresponding JSON info file
return sorted.filter(
(file, idx) => idx < sorted.length - 1 && name(file) === name(sorted[idx + 1])
)
}

private resolve(key: string): string {
Expand Down
5 changes: 2 additions & 3 deletions packages/file-store/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"$schema": "https://json.schemastore.org/package.json",
"name": "@tus/file-store",
"version": "1.0.1",
"version": "1.1.0",
"description": "Local file storage for @tus/server",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand All @@ -21,8 +21,7 @@
"test": "mocha test.ts --exit --extension ts --require ts-node/register"
},
"dependencies": {
"debug": "^4.3.4",
"p-queue": "^6.6.2"
"debug": "^4.3.4"
},
"devDependencies": {
"@tus/server": "workspace:^",
Expand Down
30 changes: 29 additions & 1 deletion packages/s3-store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ The tus protocol supports optional [extensions][]. Below is a table of the suppo
| ------------------------ | --------------- |
| [Creation][] ||
| [Creation With Upload][] ||
| [Expiration][] | |
| [Expiration][] | |
| [Checksum][] ||
| [Termination][] ||
| [Concatenation][] ||
Expand All @@ -88,6 +88,32 @@ The tus protocol supports optional [extensions][]. Below is a table of the suppo

After a multipart upload is aborted, no additional parts can be uploaded using that upload ID. The storage consumed by any previously uploaded parts will be freed. However, if any part uploads are currently in progress, those part uploads might or might not succeed. As a result, it might be necessary to set an [S3 Lifecycle configuration](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html) to abort incomplete multipart uploads.

### Expiration

Unlike other stores, the expiration extension on the S3 store does not need to call [`server.cleanUpExpiredUploads()`][cleanExpiredUploads].
The store creates a `Tus-Complete` tag for all objects, including `.part` and `.info` files, to indicate whether an upload is finished.
This means you could setup a [lifecyle][] policy to automatically clean them up without a CRON job.

```json
{
"Rules": [
{
"Filter": {
"Tag": {
"Key": "Tus-Complete",
"Value": "false"
}
},
"Expiration": {
"Days": 2
}
}
]
}
```

If you want more granularity, it is still possible to configure a CRON job to call [`server.cleanExpiredUploads()`][cleanExpiredUploads] yourself.

## Examples

### Example: using `credentials` to fetch credentials inside a AWS container
Expand Down Expand Up @@ -137,3 +163,5 @@ See [`contributing.md`](https://github.com/tus/tus-node-server/blob/main/.github
[checksum]: https://tus.io/protocols/resumable-upload.html#checksum
[termination]: https://tus.io/protocols/resumable-upload.html#termination
[concatenation]: https://tus.io/protocols/resumable-upload.html#concatenation
[cleanExpiredUploads]: https://github.com/tus/tus-node-server/tree/main/packages/server#servercleanupexpireduploads
[lifecyle]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html
127 changes: 123 additions & 4 deletions packages/s3-store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Options = {
// The server calculates the optimal part size, which takes this size into account,
// but may increase it to not exceed the S3 10K parts limit.
partSize?: number
expirationPeriodInMilliseconds?: number
// Options to pass to the AWS S3 SDK.
s3ClientConfig: S3ClientConfig & {bucket: string}
}
Expand Down Expand Up @@ -69,6 +70,7 @@ export class S3Store extends DataStore {
private cache: Map<string, MetadataValue> = new Map()
private client: S3
private preferredPartSize: number
private expirationPeriodInMilliseconds = 0
public maxMultipartParts = 10_000 as const
public minPartSize = 5_242_880 as const // 5MiB
public maxUploadSize = 5_497_558_138_880 as const // 5TiB
Expand All @@ -82,9 +84,11 @@ export class S3Store extends DataStore {
'creation-with-upload',
'creation-defer-length',
'termination',
'expiration',
]
this.bucket = bucket
this.preferredPartSize = partSize || 8 * 1024 * 1024
this.expirationPeriodInMilliseconds = options.expirationPeriodInMilliseconds ?? 0
this.client = new S3(restS3ClientConfig)
}

Expand All @@ -98,8 +102,9 @@ export class S3Store extends DataStore {
log(`[${upload.id}] saving metadata`)
await this.client.putObject({
Bucket: this.bucket,
Key: `${upload.id}.info`,
Key: this.infoKey(upload.id),
Body: JSON.stringify(upload),
Tagging: `Tus-Completed=false`,
Metadata: {
'upload-id': uploadId,
'tus-version': TUS_RESUMABLE,
Expand All @@ -108,6 +113,20 @@ export class S3Store extends DataStore {
log(`[${upload.id}] metadata file saved`)
}

private async completeMetadata(upload: Upload) {
const {'upload-id': uploadId} = await this.getMetadata(upload.id)
await this.client.putObject({
Bucket: this.bucket,
Key: this.infoKey(upload.id),
Body: JSON.stringify(upload),
Tagging: `Tus-Completed=true`,
Metadata: {
'upload-id': uploadId,
'tus-version': TUS_RESUMABLE,
},
})
}

/**
* Retrieves upload metadata previously saved in `${file_id}.info`.
* There's a small and simple caching mechanism to avoid multiple
Expand All @@ -121,7 +140,7 @@ export class S3Store extends DataStore {

const {Metadata, Body} = await this.client.getObject({
Bucket: this.bucket,
Key: `${id}.info`,
Key: this.infoKey(id),
})
const file = JSON.parse((await Body?.transformToString()) as string)
this.cache.set(id, {
Expand All @@ -132,11 +151,16 @@ export class S3Store extends DataStore {
size: file.size ? Number.parseInt(file.size, 10) : undefined,
offset: Number.parseInt(file.offset, 10),
metadata: file.metadata,
creation_date: file.creation_date,
}),
})
return this.cache.get(id) as MetadataValue
}

private infoKey(id: string) {
return `${id}.info`
}

private partKey(id: string, isIncomplete = false) {
if (isIncomplete) {
id += '.part'
Expand Down Expand Up @@ -173,6 +197,7 @@ export class S3Store extends DataStore {
Bucket: this.bucket,
Key: this.partKey(id, true),
Body: readStream,
Tagging: 'Tus-Completed=false',
})
log(`[${id}] finished uploading incomplete part`)
return data.ETag as string
Expand Down Expand Up @@ -452,6 +477,8 @@ export class S3Store extends DataStore {
request.ContentType = upload.metadata.contentType
}

upload.creation_date = new Date().toISOString()

const res = await this.client.createMultipartUpload(request)
await this.saveMetadata(upload, res.UploadId as string)
log(`[${upload.id}] multipart upload created (${res.UploadId})`)
Expand Down Expand Up @@ -495,6 +522,7 @@ export class S3Store extends DataStore {
try {
const parts = await this.retrieveParts(id)
await this.finishMultipartUpload(metadata, parts)
await this.completeMetadata(metadata.file)
this.clearCache(id)
} catch (error) {
log(`[${id}] failed to finish upload`, error)
Expand Down Expand Up @@ -558,7 +586,7 @@ export class S3Store extends DataStore {

file.size = upload_length

return this.saveMetadata(file, uploadId)
await this.saveMetadata(file, uploadId)
}

public async remove(id: string): Promise<void> {
Expand All @@ -582,10 +610,101 @@ export class S3Store extends DataStore {
await this.client.deleteObjects({
Bucket: this.bucket,
Delete: {
Objects: [{Key: id}, {Key: `${id}.info`}],
Objects: [{Key: id}, {Key: this.infoKey(id)}],
},
})

this.clearCache(id)
}

protected getExpirationDate(created_at: string) {
const date = new Date(created_at)

return new Date(date.getTime() + this.getExpiration())
}

getExpiration(): number {
return this.expirationPeriodInMilliseconds
}

async deleteExpired(): Promise<number> {
if (this.getExpiration() === 0) {
return 0
}

let keyMarker: string | undefined = undefined
let uploadIdMarker: string | undefined = undefined
let isTruncated = true
let deleted = 0

while (isTruncated) {
const listResponse: AWS.ListMultipartUploadsCommandOutput =
await this.client.listMultipartUploads({
Bucket: this.bucket,
KeyMarker: keyMarker,
UploadIdMarker: uploadIdMarker,
})

const expiredUploads =
listResponse.Uploads?.filter((multiPartUpload) => {
const initiatedDate = multiPartUpload.Initiated
return (
initiatedDate &&
new Date().getTime() >
this.getExpirationDate(initiatedDate.toISOString()).getTime()
)
}) || []

const objectsToDelete = expiredUploads.reduce((all, expiredUpload) => {
all.push(
{
key: this.infoKey(expiredUpload.Key as string),
},
{
key: this.partKey(expiredUpload.Key as string, true),
}
)
return all
}, [] as {key: string}[])

const deletions: Promise<AWS.DeleteObjectsCommandOutput>[] = []

// Batch delete 1000 items at a time
while (objectsToDelete.length > 0) {
const objects = objectsToDelete.splice(0, 1000)
deletions.push(
this.client.deleteObjects({
Bucket: this.bucket,
Delete: {
Objects: objects.map((object) => ({
Key: object.key,
})),
},
})
)
}

const [objectsDeleted] = await Promise.all([
Promise.all(deletions),
...expiredUploads.map((expiredUpload) => {
return this.client.abortMultipartUpload({
Bucket: this.bucket,
Key: expiredUpload.Key,
UploadId: expiredUpload.UploadId,
})
}),
])

deleted += objectsDeleted.reduce((all, acc) => all + (acc.Deleted?.length ?? 0), 0)

isTruncated = Boolean(listResponse.IsTruncated)

if (isTruncated) {
keyMarker = listResponse.NextKeyMarker
uploadIdMarker = listResponse.NextUploadIdMarker
}
}

return deleted
}
}
2 changes: 1 addition & 1 deletion packages/s3-store/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"$schema": "https://json.schemastore.org/package.json",
"name": "@tus/s3-store",
"version": "1.1.1",
"version": "1.2.0",
"description": "AWS S3 store for @tus/server",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down

0 comments on commit d0c7ef1

Please sign in to comment.