Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3): memory improvement #7086

Merged
merged 3 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion @xen-orchestra/backups/RemoteAdapter.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -681,11 +681,13 @@ export class RemoteAdapter {
}
}

async outputStream(path, input, { checksum = true, validator = noop } = {}) {
async outputStream(path, input, { checksum = true, maxStreamLength, streamLength, validator = noop } = {}) {
const container = watchStreamSize(input)
await this._handler.outputStream(path, input, {
checksum,
dirMode: this._dirMode,
maxStreamLength,
streamLength,
async validator() {
await input.task
return validator.apply(this, arguments)
Expand Down
2 changes: 2 additions & 0 deletions @xen-orchestra/backups/_runners/_vmRunners/FullRemote.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote
writer =>
writer.run({
stream: forkStreamUnpipe(stream),
// stream will be forked and transformed, it's not safe to attach additionnal properties to it
streamLength: stream.length,
timestamp: metadata.timestamp,
vm: metadata.vm,
vmSnapshot: metadata.vmSnapshot,
Expand Down
12 changes: 11 additions & 1 deletion @xen-orchestra/backups/_runners/_vmRunners/FullXapi.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,23 @@ export const FullXapi = class FullXapiVmBackupRunner extends AbstractXapi {
useSnapshot: false,
})
)

const vdis = await exportedVm.$getDisks()
let maxStreamLength = 1024 * 1024 // Ovf file and tar headers are a few KB, let's stay safe
for (const vdiRef of vdis) {
const vdi = await this._xapi.getRecord(vdiRef)
// at most the xva will take the physical usage of the disk
// the resulting stream can be smaller due to the smaller block size for xva than vhd, and compression of xcp-ng
maxStreamLength += vdi.physical_utilisation
}

const sizeContainer = watchStreamSize(stream)

const timestamp = Date.now()

await this._callWriters(
writer =>
writer.run({
maxStreamLength,
sizeContainer,
stream: forkStreamUnpipe(stream),
timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
)
}

async _run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
async _run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
const settings = this._settings
const job = this._job
const scheduleId = this._scheduleId
Expand Down Expand Up @@ -65,6 +65,8 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {

await Task.run({ name: 'transfer' }, async () => {
await adapter.outputStream(dataFilename, stream, {
maxStreamLength,
streamLength,
validator: tmpPath => adapter.isValidXva(tmpPath),
})
return { size: sizeContainer.size }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { AbstractWriter } from './_AbstractWriter.mjs'

export class AbstractFullWriter extends AbstractWriter {
async run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
async run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
try {
return await this._run({ timestamp, sizeContainer, stream, vm, vmSnapshot })
return await this._run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot })
} finally {
// ensure stream is properly closed
stream.destroy()
Expand Down
4 changes: 3 additions & 1 deletion @xen-orchestra/fs/src/abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ export default class RemoteHandlerAbstract {
* @param {number} [options.dirMode]
* @param {(this: RemoteHandlerAbstract, path: string) => Promise<undefined>} [options.validator] Function that will be called before the data is commited to the remote, if it fails, file should not exist
*/
async outputStream(path, input, { checksum = true, dirMode, validator } = {}) {
async outputStream(path, input, { checksum = true, dirMode, maxStreamLength, streamLength, validator } = {}) {
path = normalizePath(path)
let checksumStream

Expand All @@ -201,6 +201,8 @@ export default class RemoteHandlerAbstract {
}
await this._outputStream(path, input, {
dirMode,
maxStreamLength,
streamLength,
validator,
})
if (checksum) {
Expand Down
56 changes: 50 additions & 6 deletions @xen-orchestra/fs/src/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
CreateMultipartUploadCommand,
DeleteObjectCommand,
GetObjectCommand,
GetObjectLockConfigurationCommand,
HeadObjectCommand,
ListObjectsV2Command,
PutObjectCommand,
Expand All @@ -17,7 +18,7 @@ import { getApplyMd5BodyChecksumPlugin } from '@aws-sdk/middleware-apply-body-ch
import { Agent as HttpAgent } from 'http'
import { Agent as HttpsAgent } from 'https'
import { createLogger } from '@xen-orchestra/log'
import { PassThrough, pipeline } from 'stream'
import { PassThrough, Transform, pipeline } from 'stream'
import { parse } from 'xo-remote-parser'
import copyStreamToBuffer from './_copyStreamToBuffer.js'
import guessAwsRegion from './_guessAwsRegion.js'
Expand All @@ -30,6 +31,8 @@ import { pRetry } from 'promise-toolbox'

// limits: https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
const MAX_PART_SIZE = 1024 * 1024 * 1024 * 5 // 5GB
const MAX_PART_NUMBER = 10000
const MIN_PART_SIZE = 5 * 1024 * 1024
const { warn } = createLogger('xo:fs:s3')

export default class S3Handler extends RemoteHandlerAbstract {
Expand Down Expand Up @@ -71,9 +74,6 @@ export default class S3Handler extends RemoteHandlerAbstract {
}),
})

// Workaround for https://github.com/aws/aws-sdk-js-v3/issues/2673
this.#s3.middlewareStack.use(getApplyMd5BodyChecksumPlugin(this.#s3.config))

const parts = split(path)
this.#bucket = parts.shift()
this.#dir = join(...parts)
Expand Down Expand Up @@ -223,18 +223,44 @@ export default class S3Handler extends RemoteHandlerAbstract {
}
}

async _outputStream(path, input, { validator }) {
async _outputStream(path, input, { streamLength, maxStreamLength = streamLength, validator }) {
// S3 storage is limited to 10K part, each part is limited to 5GB. And the total upload must be smaller than 5TB
// a bigger partSize increase the memory consumption of aws/lib-storage exponentially
let partSize
if (maxStreamLength === undefined) {
warn(`Writing ${path} to a S3 remote without a max size set will cut it to 50GB`, { path })
partSize = MIN_PART_SIZE // min size for S3
} else {
partSize = Math.min(Math.max(Math.ceil(maxStreamLength / MAX_PART_NUMBER), MIN_PART_SIZE), MAX_PART_SIZE)
}

// ensure we don't try to upload a stream to big for this partSize
let readCounter = 0
const MAX_SIZE = MAX_PART_NUMBER * partSize
const streamCutter = new Transform({
julien-f marked this conversation as resolved.
Show resolved Hide resolved
transform(chunk, encoding, callback) {
readCounter += chunk.length
if (readCounter > MAX_SIZE) {
callback(new Error(`read ${readCounter} bytes, maximum size allowed is ${MAX_SIZE} `))
} else {
callback(null, chunk)
}
},
})

// Workaround for "ReferenceError: ReadableStream is not defined"
// https://github.com/aws/aws-sdk-js-v3/issues/2522
const Body = new PassThrough()
pipeline(input, Body, () => {})
pipeline(input, streamCutter, Body, () => {})
julien-f marked this conversation as resolved.
Show resolved Hide resolved

const upload = new Upload({
client: this.#s3,
params: {
...this.#createParams(path),
Body,
},
partSize,
leavePartsOnError: false,
})

await upload.done()
Expand Down Expand Up @@ -418,6 +444,24 @@ export default class S3Handler extends RemoteHandlerAbstract {

async _closeFile(fd) {}

async _sync() {
await super._sync()
try {
// if Object Lock is enabled, each upload must come with a contentMD5 header
// the computation of this md5 is memory-intensive, especially when uploading a stream
const res = await this.#s3.send(new GetObjectLockConfigurationCommand({ Bucket: this.#bucket }))
if (res.ObjectLockConfiguration?.ObjectLockEnabled === 'Enabled') {
// Workaround for https://github.com/aws/aws-sdk-js-v3/issues/2673
// will automatically add the contentMD5 header to any upload to S3
this.#s3.middlewareStack.use(getApplyMd5BodyChecksumPlugin(this.#s3.config))
}
} catch (error) {
if (error.Code !== 'ObjectLockConfigurationNotFoundError') {
throw error
}
}
}

useVhdDirectory() {
return true
}
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

<!--packages-start-->

- @xen-orchestra/backups patch
- @xen-orchestra/fs patch
- @xen-orchestra/mixins minor
- @xen-orchestra/xapi minor
- xo-server minor
Expand Down