Skip to content

Commit

Permalink
refactor(xo-server): extract vmware import method to its own mixin
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp committed Apr 17, 2024
1 parent 2de0a2b commit acc7b3a
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 244 deletions.
244 changes: 0 additions & 244 deletions packages/xo-server/src/xo-mixins/migrate-vm.mjs
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@
import { decorateWith } from '@vates/decorate-with'
import { defer as deferrable } from 'golike-defer'
import { fromEvent } from 'promise-toolbox'
import { createRunner } from '@xen-orchestra/backups/Backup.mjs'
import { Task } from '@xen-orchestra/mixins/Tasks.mjs'
import { v4 as generateUuid } from 'uuid'
import { VDI_FORMAT_VHD } from '@xen-orchestra/xapi'
import asyncMapSettled from '@xen-orchestra/async-map/legacy.js'
import Esxi from '@xen-orchestra/vmware-explorer/esxi.mjs'
import openDeltaVmdkasVhd from '@xen-orchestra/vmware-explorer/openDeltaVmdkAsVhd.mjs'
import OTHER_CONFIG_TEMPLATE from '../xapi/other-config-template.mjs'
import VhdEsxiRaw from '@xen-orchestra/vmware-explorer/VhdEsxiRaw.mjs'
import { importVdi as importVdiThroughXva } from '@xen-orchestra/xva/importVdi.mjs'

export default class MigrateVm {
constructor(app) {
Expand Down Expand Up @@ -120,237 +109,4 @@ export default class MigrateVm {
}
}
}

#buildDiskChainByNode(disks, snapshots) {
let chain = []
if (snapshots && snapshots.current) {
const currentSnapshotId = snapshots.current

let currentSnapshot = snapshots.snapshots.find(({ uid }) => uid === currentSnapshotId)

chain = [currentSnapshot.disks]
while ((currentSnapshot = snapshots.snapshots.find(({ uid }) => uid === currentSnapshot.parent))) {
chain.push(currentSnapshot.disks)
}
chain.reverse()
}

chain.push(disks)

for (const disk of chain) {
if (disk.capacity > 2 * 1024 * 1024 * 1024 * 1024) {
/* 2TO */
throw new Error("Can't migrate disks larger than 2TiB")
}
}

const chainsByNodes = {}
chain.forEach(disks => {
disks.forEach(disk => {
chainsByNodes[disk.node] = chainsByNodes[disk.node] || []
chainsByNodes[disk.node].push(disk)
})
})

return chainsByNodes
}

#connectToEsxi(host, user, password, sslVerify) {
return Task.run({ properties: { name: `connecting to ${host}` } }, async () => {
const esxi = new Esxi(host, user, password, sslVerify)
await fromEvent(esxi, 'ready')
return esxi
})
}

async connectToEsxiAndList({ host, user, password, sslVerify }) {
const esxi = await this.#connectToEsxi(host, user, password, sslVerify)
return esxi.getAllVmMetadata()
}

@decorateWith(deferrable)
async migrationfromEsxi(
$defer,
{ host, user, password, sslVerify, sr: srId, network: networkId, vm: vmId, stopSource, dataStoreToHandlers }
) {
const app = this._app
const esxi = await this.#connectToEsxi(host, user, password, sslVerify)

const esxiVmMetadata = await Task.run({ properties: { name: `get metadata of ${vmId}` } }, async () => {
return esxi.getTransferableVmMetadata(vmId)
})

const { disks, firmware, memory, name_label, networks, nCpus, powerState, snapshots } = esxiVmMetadata
const isRunning = powerState !== 'poweredOff'

const chainsByNodes = await Task.run(
{ properties: { name: `build disks and snapshots chains for ${vmId}` } },
async () => {
return this.#buildDiskChainByNode(disks, snapshots)
}
)

const sr = app.getXapiObject(srId)
const xapi = sr.$xapi

const vm = await Task.run({ properties: { name: 'creating MV on XCP side' } }, async () => {
// got data, ready to start creating
const vm = await xapi._getOrWaitObject(
await xapi.VM_create({
...OTHER_CONFIG_TEMPLATE,
memory_dynamic_max: memory,
memory_dynamic_min: memory,
memory_static_max: memory,
memory_static_min: memory,
name_description: 'from esxi',
name_label,
VCPUs_at_startup: nCpus,
VCPUs_max: nCpus,
})
)
await Promise.all([
vm.update_HVM_boot_params('firmware', firmware),
vm.update_platform('device-model', 'qemu-upstream-' + (firmware === 'uefi' ? 'uefi' : 'compat')),
asyncMapSettled(['start', 'start_on'], op => vm.update_blocked_operations(op, 'Esxi migration in progress...')),
vm.set_name_label(`[Importing...] ${name_label}`),
])

const vifDevices = await xapi.call('VM.get_allowed_VIF_devices', vm.$ref)

await Promise.all(
networks.map((network, i) =>
xapi.VIF_create(
{
device: vifDevices[i],
network: app.getXapiObject(networkId).$ref,
VM: vm.$ref,
},
{
MAC: network.macAddress,
}
)
)
)
return vm
})
$defer.onFailure.call(xapi, 'VM_destroy', vm.$ref)

const vhds = await Promise.all(
Object.keys(chainsByNodes).map(async (node, userdevice) =>
Task.run({ properties: { name: `Cold import of disks ${node}` } }, async () => {
const chainByNode = chainsByNodes[node]
let vdi
let parentVhd, vhd
// if the VM is running we'll transfer everything before the last , which is an active disk
// the esxi api does not allow us to read an active disk
// later we'll stop the VM and transfer this snapshot
const nbColdDisks = isRunning ? chainByNode.length - 1 : chainByNode.length
for (let diskIndex = 0; diskIndex < nbColdDisks; diskIndex++) {
// the first one is a RAW disk ( full )
const disk = chainByNode[diskIndex]
const { fileName, path, datastore: datastoreName, isFull } = disk
if (isFull) {
vhd = await VhdEsxiRaw.open(datastoreName, path + '/' + fileName, {
thin: false,
esxi,
dataStoreToHandlers,
})
} else {
vhd = await openDeltaVmdkasVhd(datastoreName, path + '/' + fileName, parentVhd, {
lookMissingBlockInParent: true,
esxi,
dataStoreToHandlers,
})
}
vhd.label = fileName
parentVhd = vhd
}
if (nbColdDisks > 0 /* got a cold chain */) {
const { capacity, descriptionLabel, nameLabel } = chainByNode[nbColdDisks - 1]
// we don't need to read the BAT with the importVdiThroughXva process
const vdiMetadata = {
name_description: 'fromESXI' + descriptionLabel,
name_label: '[ESXI]' + nameLabel,
SR: sr.$ref,
virtual_size: capacity,
}
vdi = await importVdiThroughXva(vdiMetadata, vhd, xapi, sr)

// it can fail before the vdi is connected to the vm
$defer.onFailure.call(xapi, 'VDI_destroy', vdi.$ref)
await xapi.VBD_create({
VDI: vdi.$ref,
VM: vm.$ref,
device: `xvd${String.fromCharCode('a'.charCodeAt(0) + userdevice)}`,
userdevice: String(userdevice < 3 ? userdevice : userdevice + 1),
})
}
return { vdi, vhd }
})
)
)

if (isRunning && stopSource) {
// it the vm was running, we stop it and transfer the data in the active disk
await Task.run({ properties: { name: 'powering down source VM' } }, () => esxi.powerOff(vmId))

await Promise.all(
Object.keys(chainsByNodes).map(async (node, userdevice) => {
await Task.run({ properties: { name: `Transfering deltas of ${userdevice}` } }, async () => {
const chainByNode = chainsByNodes[node]
const disk = chainByNode[chainByNode.length - 1]
const { capacity, descriptionLabel, fileName, nameLabel, path, datastore: datastoreName, isFull } = disk
let { vdi, vhd: parentVhd } = vhds[userdevice]
let vhd
if (isFull) {
vhd = await VhdEsxiRaw.open(datastoreName, path + '/' + fileName, {
thin: false,
esxi,
dataStoreToHandlers,
})
// we don't need to read the BAT with the importVdiThroughXva process
const vdiMetadata = {
name_description: 'fromESXI' + descriptionLabel,
name_label: '[ESXI]' + nameLabel,
SR: sr.$ref,
virtual_size: capacity,
}
vdi = await importVdiThroughXva(vdiMetadata, vhd, xapi, sr)
// it can fail before the vdi is connected to the vm
$defer.onFailure.call(xapi, 'VDI_destroy', vdi.$ref)
await xapi.VBD_create({
VDI: vdi.$ref,
VM: vm.$ref,
device: `xvd${String.fromCharCode('a'.charCodeAt(0) + userdevice)}`,
userdevice: String(userdevice < 3 ? userdevice : userdevice + 1),
})
} else {
if (parentVhd === undefined) {
throw new Error(`Can't import delta of a running VM without its parent VHD`)
}
// we only want to transfer blocks present in the delta vhd, not the full vhd chain
vhd = await openDeltaVmdkasVhd(datastoreName, path + '/' + fileName, parentVhd, {
lookMissingBlockInParent: false,
esxi,
dataStoreToHandlers,
})
}
const stream = vhd.stream()

await vdi.$importContent(stream, { format: VDI_FORMAT_VHD })
})
})
)
}

await Task.run({ properties: { name: 'Finishing transfer' } }, async () => {
// remove the importing in label
await vm.set_name_label(esxiVmMetadata.name_label)

// remove lock on start
await asyncMapSettled(['start', 'start_on'], op => vm.update_blocked_operations(op, null))
})

return vm.uuid
}
}
33 changes: 33 additions & 0 deletions packages/xo-server/src/xo-mixins/vmware/buildChainByNode.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
export function buildDiskChainByNode(disks, snapshots) {
let chain = []
if (snapshots && snapshots.current) {
const currentSnapshotId = snapshots.current

let currentSnapshot = snapshots.snapshots.find(({ uid }) => uid === currentSnapshotId)

chain = [currentSnapshot.disks]
while ((currentSnapshot = snapshots.snapshots.find(({ uid }) => uid === currentSnapshot.parent))) {
chain.push(currentSnapshot.disks)
}
chain.reverse()
}

chain.push(disks)

for (const disk of chain) {
if (disk.capacity > 2 * 1024 * 1024 * 1024 * 1024) {
/* 2TO */
throw new Error("Can't migrate disks larger than 2TiB")
}
}

const chainsByNodes = {}
chain.forEach(disks => {
disks.forEach(disk => {
chainsByNodes[disk.node] = chainsByNodes[disk.node] || []
chainsByNodes[disk.node].push(disk)
})
})

return chainsByNodes
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { Task } from '@xen-orchestra/mixins/Tasks.mjs'
import { VDI_FORMAT_VHD } from '@xen-orchestra/xapi'
import openDeltaVmdkasVhd from '@xen-orchestra/vmware-explorer/openDeltaVmdkAsVhd.mjs'
import VhdEsxiRaw from '@xen-orchestra/vmware-explorer/VhdEsxiRaw.mjs'
import { importVdi as importVdiThroughXva } from '@xen-orchestra/xva/importVdi.mjs'
import { defer } from 'golike-defer'

async function _importDiskChain(
$defer,
{ esxi, dataStoreToHandlers, sr, vm, chainByNode, vdi, parentVhd, userdevice }
) {
let vhd
if (chainByNode.length === 0) {
return { vhd, vdi }
}
const isFullImport = chainByNode[0].isFull
for (let diskIndex = 0; diskIndex < chainByNode.length; diskIndex++) {
// the first one is a RAW disk ( full )
const disk = chainByNode[diskIndex]
const { fileName, path, datastore: datastoreName, isFull } = disk
if (isFull) {
vhd = await VhdEsxiRaw.open(datastoreName, path + '/' + fileName, {
thin: false,
esxi,
dataStoreToHandlers,
})
} else {
if (parentVhd === undefined) {
throw new Error(`Can't import delta of a running VM without its parent VHD`)
}
vhd = await openDeltaVmdkasVhd(datastoreName, path + '/' + fileName, parentVhd, {
lookMissingBlockInParent: isFullImport, // only look to missing block on full import
esxi,
dataStoreToHandlers,
})
}
vhd.label = fileName
parentVhd = vhd
}
if (isFullImport) {
const { capacity, descriptionLabel, nameLabel } = chainByNode[chainByNode.length - 1]
// we don't need to read the BAT with the importVdiThroughXva process
const vdiMetadata = {
name_description: 'fromESXI' + descriptionLabel,
name_label: '[ESXI]' + nameLabel,
SR: sr.$ref,
virtual_size: capacity,
}
vdi = await importVdiThroughXva(vdiMetadata, vhd, sr.$xapi, sr)

// it can fail before the vdi is connected to the vm
$defer.onFailure.call(sr.$xapi, 'VDI_destroy', vdi.$ref)

await sr.$xapi.VBD_create({
VDI: vdi.$ref,
VM: vm.$ref,
device: `xvd${String.fromCharCode('a'.charCodeAt(0) + userdevice)}`,
userdevice: String(userdevice < 3 ? userdevice : userdevice + 1),
})

return { vdi, vhd }
} else {
// delta mode works only in vhd mode for now
const stream = vhd.stream()
await vhd.readBlockAllocationTable()
await vdi.$importContent(stream, { format: VDI_FORMAT_VHD })
}
return { vdi, vhd }
}

const importDiskChain = defer(_importDiskChain)

export const importDisksFromDatastore = async function importDisksFromDatastore({
esxi,
dataStoreToHandlers,
vm,
chainsByNodes,
sr,
vhds = [],
}) {
return await Promise.all(
Object.keys(chainsByNodes).map(async (node, userdevice) =>
Task.run({ properties: { name: `Cold import of disks ${node}` } }, async () => {
const chainByNode = chainsByNodes[node]
const { vdi, vhd: parentVhd } = vhds[userdevice] ?? {}
return importDiskChain({ esxi, dataStoreToHandlers, vm, chainByNode, userdevice, sr, parentVhd, vdi })
})
)
)
}

0 comments on commit acc7b3a

Please sign in to comment.