Skip to content

Commit

Permalink
feat(fs/s3): compute sensible chunk size for uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp authored and julien-f committed Oct 23, 2023
1 parent 5048485 commit 37b2113
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 11 deletions.
4 changes: 3 additions & 1 deletion @xen-orchestra/backups/RemoteAdapter.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -681,11 +681,13 @@ export class RemoteAdapter {
}
}

async outputStream(path, input, { checksum = true, validator = noop } = {}) {
async outputStream(path, input, { checksum = true, maxStreamLength, streamLength, validator = noop } = {}) {
const container = watchStreamSize(input)
await this._handler.outputStream(path, input, {
checksum,
dirMode: this._dirMode,
maxStreamLength,
streamLength,
async validator() {
await input.task
return validator.apply(this, arguments)
Expand Down
2 changes: 2 additions & 0 deletions @xen-orchestra/backups/_runners/_vmRunners/FullRemote.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote
writer =>
writer.run({
stream: forkStreamUnpipe(stream),
// stream will be forked and transformed, it's not safe to attach additionnal properties to it
streamLength: stream.length,
timestamp: metadata.timestamp,
vm: metadata.vm,
vmSnapshot: metadata.vmSnapshot,
Expand Down
12 changes: 11 additions & 1 deletion @xen-orchestra/backups/_runners/_vmRunners/FullXapi.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,23 @@ export const FullXapi = class FullXapiVmBackupRunner extends AbstractXapi {
useSnapshot: false,
})
)

const vdis = await exportedVm.$getDisks()
let maxStreamLength = 1024 * 1024 // Ovf file and tar headers are a few KB, let's stay safe
for (const vdiRef of vdis) {
const vdi = await this._xapi.getRecord(vdiRef)
// at most the xva will take the physical usage of the disk
// the resulting stream can be smaller due to the smaller block size for xva than vhd, and compression of xcp-ng
maxStreamLength += vdi.physical_utilisation
}

const sizeContainer = watchStreamSize(stream)

const timestamp = Date.now()

await this._callWriters(
writer =>
writer.run({
maxStreamLength,
sizeContainer,
stream: forkStreamUnpipe(stream),
timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
)
}

async _run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
async _run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
const settings = this._settings
const job = this._job
const scheduleId = this._scheduleId
Expand Down Expand Up @@ -65,6 +65,8 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {

await Task.run({ name: 'transfer' }, async () => {
await adapter.outputStream(dataFilename, stream, {
maxStreamLength,
streamLength,
validator: tmpPath => adapter.isValidXva(tmpPath),
})
return { size: sizeContainer.size }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { AbstractWriter } from './_AbstractWriter.mjs'

export class AbstractFullWriter extends AbstractWriter {
async run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
async run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
try {
return await this._run({ timestamp, sizeContainer, stream, vm, vmSnapshot })
return await this._run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot })
} finally {
// ensure stream is properly closed
stream.destroy()
Expand Down
4 changes: 3 additions & 1 deletion @xen-orchestra/fs/src/abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ export default class RemoteHandlerAbstract {
* @param {number} [options.dirMode]
* @param {(this: RemoteHandlerAbstract, path: string) => Promise<undefined>} [options.validator] Function that will be called before the data is commited to the remote, if it fails, file should not exist
*/
async outputStream(path, input, { checksum = true, dirMode, validator } = {}) {
async outputStream(path, input, { checksum = true, dirMode, maxStreamLength, streamLength, validator } = {}) {
path = normalizePath(path)
let checksumStream

Expand All @@ -201,6 +201,8 @@ export default class RemoteHandlerAbstract {
}
await this._outputStream(path, input, {
dirMode,
maxStreamLength,
streamLength,
validator,
})
if (checksum) {
Expand Down
17 changes: 12 additions & 5 deletions @xen-orchestra/fs/src/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import { pRetry } from 'promise-toolbox'

// limits: https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
const MAX_PART_SIZE = 1024 * 1024 * 1024 * 5 // 5GB
const MAX_PART_NUMBER = 10000
const MIN_PART_SIZE = 5 * 1024 * 1024
const { warn } = createLogger('xo:fs:s3')

export default class S3Handler extends RemoteHandlerAbstract {
Expand Down Expand Up @@ -221,15 +223,20 @@ export default class S3Handler extends RemoteHandlerAbstract {
}
}

async _outputStream(path, input, { validator }) {
async _outputStream(path, input, { streamLength, maxStreamLength = streamLength, validator }) {
// S3 storage is limited to 10K part, each part is limited to 5GB. And the total upload must be smaller than 5TB
// a bigger partSize increase the memory consumption of aws/lib-storage exponentially
const MAX_PART = 10000
const PART_SIZE = 5 * 1024 * 1024
const MAX_SIZE = MAX_PART * PART_SIZE
let partSize
if (maxStreamLength === undefined) {
warn(`Writing ${path} to a S3 remote without a max size set will cut it to 50GB`, { path })
partSize = MIN_PART_SIZE // min size for S3
} else {
partSize = Math.min(Math.max(Math.ceil(maxStreamLength / MAX_PART_NUMBER), MIN_PART_SIZE), MAX_PART_SIZE)
}

// ensure we don't try to upload a stream to big for this partSize
let readCounter = 0
const MAX_SIZE = MAX_PART_NUMBER * partSize
const streamCutter = new Transform({
transform(chunk, encoding, callback) {
readCounter += chunk.length
Expand All @@ -252,7 +259,7 @@ export default class S3Handler extends RemoteHandlerAbstract {
...this.#createParams(path),
Body,
},
partSize: PART_SIZE,
partSize,
leavePartsOnError: false,
})

Expand Down

0 comments on commit 37b2113

Please sign in to comment.