Skip to content

Commit

Permalink
chore(xen-api): http-request-plus → undici (except putResource) (#7177)
Browse files Browse the repository at this point in the history
  • Loading branch information
b-Nollet committed Mar 12, 2024
1 parent 51f95b3 commit bfb8d3b
Show file tree
Hide file tree
Showing 19 changed files with 153 additions and 112 deletions.
2 changes: 1 addition & 1 deletion @xen-orchestra/backups/_runners/_PoolMetadataBackup.mjs
Expand Up @@ -31,7 +31,7 @@ export class PoolMetadataBackup {
const poolDir = `${DIR_XO_POOL_METADATA_BACKUPS}/${schedule.id}/${pool.$id}`
const dir = `${poolDir}/${formatFilenameDate(timestamp)}`

const stream = await this._exportPoolMetadata()
const stream = (await this._exportPoolMetadata()).body
const fileName = `${dir}/data`

const metadata = JSON.stringify(
Expand Down
10 changes: 6 additions & 4 deletions @xen-orchestra/backups/_runners/_vmRunners/FullXapi.mjs
Expand Up @@ -30,10 +30,12 @@ export const FullXapi = class FullXapiVmBackupRunner extends AbstractXapi {
const vm = this._vm
const exportedVm = this._exportedVm
const stream = this._throttleStream(
await this._xapi.VM_export(exportedVm.$ref, {
compress: Boolean(compression) && (compression === 'native' ? 'gzip' : 'zstd'),
useSnapshot: false,
})
(
await this._xapi.VM_export(exportedVm.$ref, {
compress: Boolean(compression) && (compression === 'native' ? 'gzip' : 'zstd'),
useSnapshot: false,
})
).body
)

const vdis = await exportedVm.$getDisks()
Expand Down
10 changes: 6 additions & 4 deletions @xen-orchestra/xapi/vdi.mjs
Expand Up @@ -108,10 +108,12 @@ class Vdi {
} else {
// raw export without nbd or vhd exports needs a resource stream
const vdiName = await this.getField('VDI', ref, 'name_label')
stream = await this.getResource(cancelToken, '/export_raw_vdi/', {
query,
task: await this.task_create(`Exporting content of VDI ${vdiName}`),
})
stream = (
await this.getResource(cancelToken, '/export_raw_vdi/', {
query,
task: await this.task_create(`Exporting content of VDI ${vdiName}`),
})
).body
if (nbdClient !== undefined && format === VDI_FORMAT_VHD) {
const taskRef = await this.task_create(`Exporting content of VDI ${vdiName} using NBD`)
stream = await createNbdVhdStream(nbdClient, stream)
Expand Down
7 changes: 4 additions & 3 deletions @xen-orchestra/xapi/vm.mjs
Expand Up @@ -9,6 +9,7 @@ import { asyncMap } from '@xen-orchestra/async-map'
import { createLogger } from '@xen-orchestra/log'
import { decorateClass } from '@vates/decorate-with'
import { defer } from 'golike-defer'
import { finished } from 'node:stream'
import { incorrectState, forbiddenOperation } from 'xo-common/api-errors.js'
import { JsonRpcError } from 'json-rpc-protocol'
import { Ref } from 'xen-api'
Expand Down Expand Up @@ -502,7 +503,7 @@ class Vm {
exportedVmRef = vmRef
}
try {
const stream = await this.getResource(cancelToken, '/export/', {
const response = await this.getResource(cancelToken, '/export/', {
query: {
ref: exportedVmRef,
use_compression: compress === 'zstd' ? 'zstd' : compress === true || compress === 'gzip' ? 'true' : 'false',
Expand All @@ -511,10 +512,10 @@ class Vm {
})

if (useSnapshot) {
stream.once('end', destroySnapshot).once('error', destroySnapshot)
finished(response.body, destroySnapshot)
}

return stream
return response
} catch (error) {
// augment the error with as much relevant info as possible
const [poolMaster, exportedVm] = await Promise.all([
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.unreleased.md
Expand Up @@ -36,6 +36,9 @@
<!--packages-start-->

- @vates/otp minor
- @xen-orchestra/backups patch
- @xen-orchestra/xapi patch
- xen-api major
- xo-server minor
- xo-web patch

Expand Down
6 changes: 3 additions & 3 deletions packages/xen-api/examples/export-vdi.mjs
Expand Up @@ -44,14 +44,14 @@ defer(async ($defer, rawArgs) => {
const vdi = await resolveRecord(xapi, 'VDI', args[1])

// https://xapi-project.github.io/xen-api/snapshots.html#downloading-a-disk-or-snapshot
const exportStream = await xapi.getResource(token, '/export_raw_vdi/', {
const response = await xapi.getResource(token, '/export_raw_vdi/', {
query: {
format: raw ? 'raw' : 'vhd',
vdi: vdi.$ref,
},
})

console.warn('Export task:', exportStream.headers['task-id'])
console.warn('Export task:', response.headers['task-id'])

const top = createTop()
const progressStream = createProgress()
Expand All @@ -63,5 +63,5 @@ defer(async ($defer, rawArgs) => {
}, 1e3)
)

await pipeline(exportStream, progressStream, throttle(bps), createOutputStream(args[2]))
await pipeline(response.body, progressStream, throttle(bps), createOutputStream(args[2]))
})(process.argv.slice(2)).catch(console.error.bind(console, 'error'))
6 changes: 3 additions & 3 deletions packages/xen-api/examples/export-vm.mjs
Expand Up @@ -37,17 +37,17 @@ defer(async ($defer, rawArgs) => {
process.on('SIGINT', cancel)

// https://xapi-project.github.io/xen-api/importexport.html
const exportStream = await xapi.getResource(token, '/export/', {
const response = await xapi.getResource(token, '/export/', {
query: {
ref: (await resolveRecord(xapi, 'VM', args[1])).$ref,
use_compression: zstd ? 'zstd' : gzip ? 'true' : 'false',
},
})

console.warn('Export task:', exportStream.headers['task-id'])
console.warn('Export task:', response.headers['task-id'])

await pipeline(
exportStream,
response.body,
createProgress({ time: 1e3 }, p => console.warn(formatProgress(p))),
createOutputStream(args[2])
)
Expand Down
68 changes: 45 additions & 23 deletions packages/xen-api/index.mjs
Expand Up @@ -5,6 +5,7 @@ import ms from 'ms'
import httpRequest from 'http-request-plus'
import map from 'lodash/map.js'
import noop from 'lodash/noop.js'
import { Client } from 'undici'
import { coalesceCalls } from '@vates/coalesce-calls'
import { Collection } from 'xo-collection'
import { EventEmitter } from 'events'
Expand Down Expand Up @@ -402,37 +403,54 @@ export class Xapi extends EventEmitter {
}

let url = new URL('http://localhost')
url.protocol = this._url.protocol
url.pathname = pathname
url.search = new URLSearchParams(query)
await this._setHostAddressInUrl(url, host)

const response = await this._addSyncStackTrace(
pRetry(
async () =>
httpRequest(url, {
rejectUnauthorized: !this._allowUnauthorized,

// this is an inactivity timeout (unclear in Node doc)
timeout: this._httpInactivityTimeout,

maxRedirects: 0,

// Support XS <= 6.5 with Node => 12
minVersion: 'TLSv1',
agent: this.httpAgent,
async () => {
const client = new Client(url, {
connect: {
rejectUnauthorized: !this._allowUnauthorized,
// Support XS <= 6.5 with Node => 12
minVersion: 'TLSv1',
},
})

signal: $cancelToken,
}),
return client
.request({
method: 'GET',
path: pathname,
query,
maxRedirections: 0,
headersTimeout: this._httpInactivityTimeout,
bodyTimeout: this._httpInactivityTimeout,
agent: this.httpAgent,

signal: $cancelToken,
})
.then(response => {
const { statusCode } = response
if (((statusCode / 100) | 0) === 2) {
return response
}
const error = new Error(`${response.statusCode} ${response.statusMessage}`)
Object.defineProperty(error, 'response', { value: response })
throw error
})
},
{
when: error => error.response !== undefined && error.response.statusCode === 302,
onRetry: async error => {
const response = error.response
if (response === undefined) {
if (response === undefined || response.body === undefined) {
throw error
}
response.destroy()
response.body.on('error', noop)
response.body.destroy()
url = await this._replaceHostAddressInUrl(new URL(response.headers.location, url))
query = Object.fromEntries(url.searchParams.entries())
pathname = url.pathname
url.pathname = url.search = ''
},
}
)
Expand Down Expand Up @@ -953,14 +971,18 @@ export class Xapi extends EventEmitter {
const { hostname } = url
url.hostnameRaw = hostname[0] === '[' ? hostname.slice(1, -1) : hostname

this._humanId = `${this._auth.user ?? 'unknown'}@${url.hostname}`
this._transport = this._createTransport({
secureOptions: {
const client = new Client(url, {
connect: {
minVersion: 'TLSv1',
rejectUnauthorized: !this._allowUnauthorized,
},
url,
})

this._humanId = `${this._auth.user ?? 'unknown'}@${url.hostname}`
this._transport = this._createTransport({
agent: this.httpAgent,
client,
url,
})
this._url = url
}
Expand Down
5 changes: 3 additions & 2 deletions packages/xen-api/package.json
Expand Up @@ -28,7 +28,7 @@
"xen-api": "./cli.mjs"
},
"engines": {
"node": ">=14"
"node": ">=18"
},
"dependencies": {
"@vates/coalesce-calls": "^0.1.0",
Expand All @@ -48,7 +48,8 @@
"promise-toolbox": "^0.21.0",
"proxy-agent": "^6.3.1",
"pw": "0.0.4",
"xmlrpc": "^1.3.2",
"undici": "^6.2.1",
"xmlrpc-parser": "^1.0.3",
"xo-collection": "^0.5.0"
},
"devDependencies": {
Expand Down
10 changes: 5 additions & 5 deletions packages/xen-api/transports/json-rpc.mjs
@@ -1,23 +1,23 @@
import httpRequestPlus from 'http-request-plus'
import { format, parse } from 'json-rpc-protocol'

import XapiError from '../_XapiError.mjs'

import UnsupportedTransport from './_UnsupportedTransport.mjs'

// https://github.com/xenserver/xenadmin/blob/0df39a9d83cd82713f32d24704852a0fd57b8a64/XenModel/XenAPI/Session.cs#L403-L433
export default ({ secureOptions, url, agent }) => {
export default ({ agent, client, url }) => {
url = new URL('./jsonrpc', Object.assign(new URL('http://localhost'), url))
const path = url.pathname + url.search

return async function (method, args) {
const res = await httpRequestPlus(url, {
...secureOptions,
const res = await client.request({
body: format.request(0, method, args),
headers: {
Accept: 'application/json',
'Content-Type': 'application/json',
},
method: 'POST',
path,
agent,
})

Expand All @@ -26,7 +26,7 @@ export default ({ secureOptions, url, agent }) => {
throw new UnsupportedTransport()
}

const response = parse(await res.text())
const response = parse(await res.body.text())

if (response.type === 'response') {
return response.result
Expand Down
55 changes: 30 additions & 25 deletions packages/xen-api/transports/xml-rpc.mjs
@@ -1,18 +1,8 @@
import xmlrpc from 'xmlrpc'
import { promisify } from 'promise-toolbox'

import XapiError from '../_XapiError.mjs'
import { XmlRpcMessage, XmlRpcResponse } from 'xmlrpc-parser'

import prepareXmlRpcParams from './_prepareXmlRpcParams.mjs'

const logError = error => {
if (error.res) {
console.error('XML-RPC Error: %s (response status %s)', error.message, error.res.statusCode)
console.error('%s', error.body)
}

throw error
}
import XapiError from '../_XapiError.mjs'
import UnsupportedTransport from './_UnsupportedTransport.mjs'

const parseResult = result => {
const status = result.Status
Expand All @@ -30,16 +20,31 @@ const parseResult = result => {
return result.Value
}

export default ({ secureOptions, url: { hostnameRaw, pathname, port, protocol }, agent }) => {
const secure = protocol === 'https:'
const client = (secure ? xmlrpc.createSecureClient : xmlrpc.createClient)({
...(secure ? secureOptions : undefined),
agent,
host: hostnameRaw,
pathname,
port,
})
const call = promisify(client.methodCall, client)

return (method, args) => call(method, prepareXmlRpcParams(args)).then(parseResult, logError)
export default ({ agent, client, url }) => {
url = new URL('./xmlrpc', Object.assign(new URL('http://localhost'), url))
const path = url.pathname + url.search

return async function (method, args) {
const message = new XmlRpcMessage(method, prepareXmlRpcParams(args))

const res = await client.request({
body: message.xml(),
headers: {
Accept: 'text/xml',
'Content-Type': 'text/xml',
},
method: 'POST',
path,
agent,
})

if (res.headers['content-type'] !== 'text/xml' && res.headers['content-type'] !== 'application/xml') {
throw new UnsupportedTransport()
}

const xml = await res.body.text()
const response = await new XmlRpcResponse().parse(xml)

return parseResult(response.params[0])
}
}
2 changes: 1 addition & 1 deletion packages/xo-server-perf-alert/src/index.js
Expand Up @@ -693,7 +693,7 @@ ${entriesWithMissingStats.map(({ listItem }) => listItem).join('\n')}`
payload.vm_uuid = xapiObject.uuid
}
// JSON is not well formed, can't use the default node parser
return JSON5.parse(await (await xapi.getResource('/rrd_updates', payload)).text())
return JSON5.parse(await (await xapi.getResource('/rrd_updates', payload)).body.text())
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/xo-server/src/api/test.mjs
Expand Up @@ -60,7 +60,7 @@ export async function copyVm({ vm, sr }) {
{
// eslint-disable-next-line no-console
console.log('export full VM...')
const input = await srcXapi.VM_export(vm._xapiRef)
const input = (await srcXapi.VM_export(vm._xapiRef)).body
// eslint-disable-next-line no-console
console.log('import full VM...')
await tgtXapi.VM_destroy(await tgtXapi.VM_import(input, sr._xapiRef))
Expand Down
2 changes: 1 addition & 1 deletion packages/xo-server/src/api/vm.mjs
Expand Up @@ -1212,7 +1212,7 @@ revert.resolve = {
async function handleExport(req, res, { xapi, vmRef, compress, format = 'xva' }) {
// @todo : should we put back the handleExportFAIL_ON_QUEUE ?
const stream =
format === 'ova' ? await xapi.exportVmOva(vmRef) : await xapi.VM_export(FAIL_ON_QUEUE, vmRef, { compress })
format === 'ova' ? await xapi.exportVmOva(vmRef) : (await xapi.VM_export(FAIL_ON_QUEUE, vmRef, { compress })).body

res.on('close', () => stream.destroy())
// Remove the filename as it is already part of the URL.
Expand Down
2 changes: 1 addition & 1 deletion packages/xo-server/src/xapi-stats.mjs
Expand Up @@ -255,7 +255,7 @@ export default class XapiStats {
start: timestamp,
},
})
.then(response => response.text())
.then(response => response.body.text())
.then(data => {
try {
// starting from XAPI 23.31, the response is valid JSON
Expand Down

0 comments on commit bfb8d3b

Please sign in to comment.