Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(VDI_exportContent): create xapi task during NBD export for VHD #7228

Merged
merged 10 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion @xen-orchestra/xapi/vdi.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import pCatch from 'promise-toolbox/catch'
import pRetry from 'promise-toolbox/retry'
import { createLogger } from '@xen-orchestra/log'
import { decorateClass } from '@vates/decorate-with'
import { finished } from 'node:stream'
import { strict as assert } from 'node:assert'

import extractOpaqueRef from './_extractOpaqueRef.mjs'
Expand Down Expand Up @@ -106,12 +107,16 @@ class Vdi {
stream = createNbdRawStream(nbdClient)
} else {
// raw export without nbd or vhd exports needs a resource stream
const vdiName = await this.getField('VDI', ref, 'name_label')
stream = await this.getResource(cancelToken, '/export_raw_vdi/', {
query,
task: await this.task_create(`Exporting content of VDI ${await this.getField('VDI', ref, 'name_label')}`),
task: await this.task_create(`Exporting content of VDI ${vdiName}`),
})
if (nbdClient !== undefined && format === VDI_FORMAT_VHD) {
const taskRef = await this.task_create(`Exporting content of VDI ${vdiName} using NBD`)
stream = await createNbdVhdStream(nbdClient, stream)
stream.on('progress', progress => this.call('task.set_progress', taskRef, progress))
finished(stream, () => this.task_destroy(taskRef))
}
}
return stream
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [HTTP] `http.useForwardedHeaders` setting can be enabled when XO is behind a reverse proxy to fetch clients IP addresses from `X-Forwarded-*` headers [Forum#67625](https://xcp-ng.org/forum/post/67625) (PR [#7233](https://github.com/vatesfr/xen-orchestra/pull/7233))
- [Backup]Use multiple link to speedup NBD backup (PR [#7216](https://github.com/vatesfr/xen-orchestra/pull/7216))
- [Backup] Show if disk is differential or full in incremental backups (PR [#7222](https://github.com/vatesfr/xen-orchestra/pull/7222))
- [VDI] Create XAPI task during NBD export (PR [#7228](https://github.com/vatesfr/xen-orchestra/pull/7228))

### Bug fixes

Expand Down
63 changes: 51 additions & 12 deletions packages/vhd-lib/createStreamNbd.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'
const { finished, Readable } = require('node:stream')
const { readChunkStrict, skipStrict } = require('@vates/read-chunk')
const { Readable } = require('node:stream')
const { unpackHeader } = require('./Vhd/_utils')
const {
FOOTER_SIZE,
Expand All @@ -14,6 +14,9 @@ const {
const { fuHeader, checksumStruct } = require('./_structs')
const assert = require('node:assert')

const MAX_DURATION_BETWEEN_PROGRESS_EMIT = 5e3
const MIN_TRESHOLD_PERCENT_BETWEEN_PROGRESS_EMIT = 1

exports.createNbdRawStream = function createRawStream(nbdClient) {
const exportSize = Number(nbdClient.exportSize)
const chunkSize = 2 * 1024 * 1024
Expand All @@ -31,7 +34,14 @@ exports.createNbdRawStream = function createRawStream(nbdClient) {
return stream
}

exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStream) {
exports.createNbdVhdStream = async function createVhdStream(
nbdClient,
sourceStream,
{
maxDurationBetweenProgressEmit = MAX_DURATION_BETWEEN_PROGRESS_EMIT,
minTresholdPercentBetweenProgressEmit = MIN_TRESHOLD_PERCENT_BETWEEN_PROGRESS_EMIT,
} = {}
) {
const bufFooter = await readChunkStrict(sourceStream, FOOTER_SIZE)

const header = unpackHeader(await readChunkStrict(sourceStream, HEADER_SIZE))
Expand Down Expand Up @@ -78,10 +88,35 @@ exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStr
}
}

const totalLength = (offsetSector + blockSizeInSectors + 1) /* end footer */ * SECTOR_SIZE

let lengthRead = 0
let lastUpdate = 0
let lastLengthRead = 0

function throttleEmitProgress() {
const now = Date.now()

if (
lengthRead - lastLengthRead > (minTresholdPercentBetweenProgressEmit / 100) * totalLength ||
(now - lastUpdate > maxDurationBetweenProgressEmit && lengthRead !== lastLengthRead)
) {
stream.emit('progress', lengthRead / totalLength)
lastUpdate = now
lastLengthRead = lengthRead
}
}

function trackAndGet(buffer) {
lengthRead += buffer.length
throttleEmitProgress()
return buffer
}

async function* iterator() {
yield bufFooter
yield rawHeader
yield bat
yield trackAndGet(bufFooter)
yield trackAndGet(rawHeader)
yield trackAndGet(bat)

let precBlocOffset = FOOTER_SIZE + HEADER_SIZE + batSize
for (let i = 0; i < PARENT_LOCATOR_ENTRIES; i++) {
Expand All @@ -91,7 +126,7 @@ exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStr
await skipStrict(sourceStream, parentLocatorOffset - precBlocOffset)
const data = await readChunkStrict(sourceStream, space)
precBlocOffset = parentLocatorOffset + space
yield data
yield trackAndGet(data)
}
}

Expand All @@ -106,16 +141,20 @@ exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStr
})
const bitmap = Buffer.alloc(SECTOR_SIZE, 255)
for await (const block of nbdIterator) {
yield bitmap // don't forget the bitmap before the block
yield block
yield trackAndGet(bitmap) // don't forget the bitmap before the block
yield trackAndGet(block)
}
yield bufFooter
yield trackAndGet(bufFooter)
}

const stream = Readable.from(iterator(), { objectMode: false })
stream.length = (offsetSector + blockSizeInSectors + 1) /* end footer */ * SECTOR_SIZE
stream.length = totalLength
stream._nbd = true
stream.on('error', () => nbdClient.disconnect())
stream.on('end', () => nbdClient.disconnect())
finished(stream, () => {
clearInterval(interval)
nbdClient.disconnect()
})
const interval = setInterval(throttleEmitProgress, maxDurationBetweenProgressEmit)

return stream
}
Loading