Skip to content

Commit

Permalink
feat(backups): implement mirror backup
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp authored and julien-f committed May 30, 2023
1 parent aa36629 commit 8b7b162
Show file tree
Hide file tree
Showing 19 changed files with 607 additions and 85 deletions.
3 changes: 3 additions & 0 deletions @xen-orchestra/backups/Backup.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
'use strict'

const { Metadata } = require('./_runners/Metadata.js')
const { VmsRemote } = require('./_runners/VmsRemote.js')
const { VmsXapi } = require('./_runners/VmsXapi.js')

exports.createRunner = function createRunner(opts) {
const { type } = opts.job
switch (type) {
case 'backup':
return new VmsXapi(opts)
case 'mirrorBackup':
return new VmsRemote(opts)
case 'metadataBackup':
return new Metadata(opts)
default:
Expand Down
98 changes: 98 additions & 0 deletions @xen-orchestra/backups/_runners/VmsRemote.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
'use strict'

const { asyncMapSettled } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable')
const { limitConcurrency } = require('limit-concurrency-decorator')

const { extractIdsFromSimplePattern } = require('../extractIdsFromSimplePattern.js')
const { Task } = require('../Task.js')
const createStreamThrottle = require('./_createStreamThrottle.js')
const { DEFAULT_SETTINGS, Abstract } = require('./_Abstract.js')
const { runTask } = require('./_runTask.js')
const { getAdaptersByRemote } = require('./_getAdaptersByRemote.js')
const { FullRemote } = require('./_vmRunners/FullRemote.js')
const { IncrementalRemote } = require('./_vmRunners/IncrementalRemote.js')

const DEFAULT_REMOTE_VM_SETTINGS = {
concurrency: 2,
copyRetention: 0,
deleteFirst: false,
exportRetention: 0,
healthCheckSr: undefined,
healthCheckVmsWithTags: [],
maxExportRate: 0,
maxMergedDeltasPerRun: Infinity,
timeout: 0,
validateVhdStreams: false,
vmTimeout: 0,
}

exports.VmsRemote = class RemoteVmsBackupRunner extends Abstract {
_computeBaseSettings(config, job) {
const baseSettings = { ...DEFAULT_SETTINGS }
Object.assign(baseSettings, DEFAULT_REMOTE_VM_SETTINGS, config.defaultSettings, config.vm?.defaultSettings)
Object.assign(baseSettings, job.settings[''])
return baseSettings
}

async run() {
const job = this._job
const schedule = this._schedule
const settings = this._settings

const throttleStream = createStreamThrottle(settings.maxExportRate)

const config = this._config
await Disposable.use(
() => this._getAdapter(job.sourceRemote),
() => (settings.healthCheckSr !== undefined ? this._getRecord('SR', settings.healthCheckSr) : undefined),
Disposable.all(
extractIdsFromSimplePattern(job.remotes).map(id => id !== job.sourceRemote && this._getAdapter(id))
),
async ({ adapter: sourceRemoteAdapter }, healthCheckSr, remoteAdapters) => {
// remove adapters that failed (already handled)
remoteAdapters = remoteAdapters.filter(_ => !!_)
if (remoteAdapters.length === 0) {
return
}

const vmsUuids = await sourceRemoteAdapter.listAllVms()

Task.info('vms', { vms: vmsUuids })

remoteAdapters = getAdaptersByRemote(remoteAdapters)
const allSettings = this._job.settings
const baseSettings = this._baseSettings

const handleVm = vmUuid => {
const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } }

const opts = {
baseSettings,
config,
job,
healthCheckSr,
remoteAdapters,
schedule,
settings: { ...settings, ...allSettings[vmUuid] },
sourceRemoteAdapter,
throttleStream,
vmUuid,
}
let vmBackup
if (job.mode === 'delta') {
vmBackup = new IncrementalRemote(opts)
} else if (job.mode === 'full') {
vmBackup = new FullRemote(opts)
} else {
throw new Error(`Job mode ${job.mode} not implemented for mirror backup`)
}

return runTask(taskStart, () => vmBackup.run())
}
const { concurrency } = settings
await asyncMapSettled(vmsUuids, !concurrency ? handleVm : limitConcurrency(concurrency)(handleVm))
}
)
}
}
53 changes: 53 additions & 0 deletions @xen-orchestra/backups/_runners/_vmRunners/FullRemote.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict'

const { decorateMethodsWith } = require('@vates/decorate-with')
const { defer } = require('golike-defer')
const { AbstractRemote } = require('./_AbstractRemote')
const { FullRemoteWriter } = require('../_writers/FullRemoteWriter')
const { forkStreamUnpipe } = require('../_forkStreamUnpipe')
const { watchStreamSize } = require('../../_watchStreamSize')
const { Task } = require('../../Task')

class FullRemoteVmBackupRunner extends AbstractRemote {
_getRemoteWriter() {
return FullRemoteWriter
}
async _run($defer) {
const transferList = await this._computeTransferList(({ mode }) => mode === 'full')

await this._callWriters(async writer => {
await writer.beforeBackup()
$defer(async () => {
await writer.afterBackup()
})
}, 'writer.beforeBackup()')
if (transferList.length > 0) {
for (const metadata of transferList) {
const stream = await this._sourceRemoteAdapter.readFullVmBackup(metadata)
const sizeContainer = watchStreamSize(stream)

// @todo shouldn't transfer backup if it will be deleted by retention policy (higher retention on source than destination)
await this._callWriters(
writer =>
writer.run({
stream: forkStreamUnpipe(stream),
timestamp: metadata.timestamp,
vm: metadata.vm,
vmSnapshot: metadata.vmSnapshot,
sizeContainer,
}),
'writer.run()'
)
// for healthcheck
this._tags = metadata.vm.tags
}
} else {
Task.info('No new data to upload for this VM')
}
}
}

exports.FullRemote = FullRemoteVmBackupRunner
decorateMethodsWith(FullRemoteVmBackupRunner, {
_run: defer,
})
67 changes: 67 additions & 0 deletions @xen-orchestra/backups/_runners/_vmRunners/IncrementalRemote.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict'
const assert = require('node:assert')

const { decorateMethodsWith } = require('@vates/decorate-with')
const { defer } = require('golike-defer')
const { mapValues } = require('lodash')
const { Task } = require('../../Task')
const { AbstractRemote } = require('./_AbstractRemote')
const { IncrementalRemoteWriter } = require('../_writers/IncrementalRemoteWriter')
const { forkDeltaExport } = require('./_forkDeltaExport')
const isVhdDifferencingDisk = require('vhd-lib/isVhdDifferencingDisk')
const { asyncEach } = require('@vates/async-each')

class IncrementalRemoteVmBackupRunner extends AbstractRemote {
_getRemoteWriter() {
return IncrementalRemoteWriter
}
async _run($defer) {
const transferList = await this._computeTransferList(({ mode }) => mode === 'delta')
await this._callWriters(async writer => {
await writer.beforeBackup()
$defer(async () => {
await writer.afterBackup()
})
}, 'writer.beforeBackup()')

if (transferList.length > 0) {
for (const metadata of transferList) {
assert.strictEqual(metadata.mode, 'delta')

await this._callWriters(writer => writer.prepare({ isBase: metadata.isBase }), 'writer.prepare()')
const incrementalExport = await this._sourceRemoteAdapter.readIncrementalVmBackup(metadata, undefined, {
useChain: false,
})

const differentialVhds = {}

await asyncEach(Object.entries(incrementalExport.streams), async ([key, stream]) => {
differentialVhds[key] = await isVhdDifferencingDisk(stream)
})

incrementalExport.streams = mapValues(incrementalExport.streams, this._throttleStream)
await this._callWriters(
writer =>
writer.transfer({
deltaExport: forkDeltaExport(incrementalExport),
differentialVhds,
timestamp: metadata.timestamp,
vm: metadata.vm,
vmSnapshot: metadata.vmSnapshot,
}),
'writer.transfer()'
)
await this._callWriters(writer => writer.cleanup(), 'writer.cleanup()')
// for healthcheck
this._tags = metadata.vm.tags
}
} else {
Task.info('No new data to upload for this VM')
}
}
}

exports.IncrementalRemote = IncrementalRemoteVmBackupRunner
decorateMethodsWith(IncrementalRemoteVmBackupRunner, {
_run: defer,
})
11 changes: 10 additions & 1 deletion @xen-orchestra/backups/_runners/_vmRunners/IncrementalXapi.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const { Task } = require('../../Task.js')
const { watchStreamSize } = require('../../_watchStreamSize.js')
const { AbstractXapi } = require('./_AbstractXapi.js')
const { forkDeltaExport } = require('./_forkDeltaExport.js')
const isVhdDifferencingDisk = require('vhd-lib/isVhdDifferencingDisk')
const { asyncEach } = require('@vates/async-each')

const { debug } = createLogger('xo:backups:IncrementalXapiVmBackup')

Expand Down Expand Up @@ -47,12 +49,18 @@ exports.IncrementalXapi = class IncrementalXapiVmBackupRunner extends AbstractXa
if (Object.values(deltaExport.streams).some(({ _nbd }) => _nbd)) {
Task.info('Transfer data using NBD')
}

const differentialVhds = {}
// since isVhdDifferencingDisk is reading and unshifting data in stream
// it should be done BEFORE any other stream transform
await asyncEach(Object.entries(deltaExport.streams), async ([key, stream]) => {
differentialVhds[key] = await isVhdDifferencingDisk(stream)
})
const sizeContainers = mapValues(deltaExport.streams, stream => watchStreamSize(stream))

if (this._settings.validateVhdStreams) {
deltaExport.streams = mapValues(deltaExport.streams, stream => pipeline(stream, vhdStreamValidator, noop))
}

deltaExport.streams = mapValues(deltaExport.streams, this._throttleStream)

const timestamp = Date.now()
Expand All @@ -61,6 +69,7 @@ exports.IncrementalXapi = class IncrementalXapiVmBackupRunner extends AbstractXa
writer =>
writer.transfer({
deltaExport: forkDeltaExport(deltaExport),
differentialVhds,
sizeContainers,
timestamp,
vm,
Expand Down
86 changes: 86 additions & 0 deletions @xen-orchestra/backups/_runners/_vmRunners/_AbstractRemote.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
'use strict'
const { Abstract } = require('./_Abstract')

const { getVmBackupDir } = require('../../_getVmBackupDir')
const { asyncEach } = require('@vates/async-each')
const { Disposable } = require('promise-toolbox')

exports.AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstract {
constructor({
config,
job,
healthCheckSr,
remoteAdapters,
schedule,
settings,
sourceRemoteAdapter,
throttleStream,
vmUuid,
}) {
super()
this.config = config
this.job = job
this.remoteAdapters = remoteAdapters
this.scheduleId = schedule.id
this.timestamp = undefined

this._healthCheckSr = healthCheckSr
this._sourceRemoteAdapter = sourceRemoteAdapter
this._throttleStream = throttleStream
this._vmUuid = vmUuid

const allSettings = job.settings
const writers = new Set()
this._writers = writers

const RemoteWriter = this._getRemoteWriter()
Object.entries(remoteAdapters).forEach(([remoteId, adapter]) => {
const targetSettings = {
...settings,
...allSettings[remoteId],
}
writers.add(new RemoteWriter({ adapter, config, job, vmUuid, remoteId, settings: targetSettings }))
})
}

async _computeTransferList(predicate) {
const vmBackups = await this._sourceRemoteAdapter.listVmBackups(this._vmUuid, predicate)
const localMetada = new Map()
Object.values(vmBackups).forEach(metadata => {
const timestamp = metadata.timestamp
localMetada.set(timestamp, metadata)
})
const nbRemotes = Object.keys(this.remoteAdapters).length
const remoteMetadatas = {}
await asyncEach(Object.values(this.remoteAdapters), async remoteAdapter => {
const remoteMetadata = await remoteAdapter.listVmBackups(this._vmUuid, predicate)
remoteMetadata.forEach(metadata => {
const timestamp = metadata.timestamp
remoteMetadatas[timestamp] = (remoteMetadatas[timestamp] ?? 0) + 1
})
})

let chain = []
const timestamps = [...localMetada.keys()]
timestamps.sort()
for (const timestamp of timestamps) {
if (remoteMetadatas[timestamp] !== nbRemotes) {
// this backup is not present in all the remote
// should be retransfered if not found later
chain.push(localMetada.get(timestamp))
} else {
// backup is present in local and remote : the chain has already been transferred
chain = []
}
}
return chain
}

async run() {
const handler = this._sourceRemoteAdapter._handler
await Disposable.use(await handler.lock(getVmBackupDir(this._vmUuid)), async () => {
await this._run()
await this._healthCheck()
})
}
}
9 changes: 6 additions & 3 deletions @xen-orchestra/backups/_runners/_writers/FullRemoteWriter.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ exports.FullRemoteWriter = class FullRemoteWriter extends MixinRemoteWriter(Abst
const scheduleId = this._scheduleId

const adapter = this._adapter

// TODO: clean VM backup directory
let metadata = await this._isAlreadyTransferred(timestamp)
if (metadata !== undefined) {
// @todo : should skip backup while being vigilant to not stuck the forked stream
Task.info('This backup has already been transfered')
}

const oldBackups = getOldEntries(
settings.exportRetention - 1,
Expand All @@ -46,7 +49,7 @@ exports.FullRemoteWriter = class FullRemoteWriter extends MixinRemoteWriter(Abst
const dataBasename = basename + '.xva'
const dataFilename = this._vmBackupDir + '/' + dataBasename

const metadata = {
metadata = {
jobId: job.id,
mode: job.mode,
scheduleId,
Expand Down
Loading

0 comments on commit 8b7b162

Please sign in to comment.