Skip to content

Commit

Permalink
fix(xen-api): various undici fixes (#7472)
Browse files Browse the repository at this point in the history
* fix(xen-api): various undici fixes

Fixes #7465

Introduced by bfb8d3b

- don't use single Client as it allows a single parallel request by default
- fix HTTP proxy support (for all methods but `putResource()`
- follow redirects
- `getResource` respect configured protocol
- properly throws on non 2xx responses
  • Loading branch information
julien-f committed Mar 15, 2024
1 parent c6451cf commit 6c16055
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 60 deletions.
91 changes: 42 additions & 49 deletions packages/xen-api/index.mjs
Expand Up @@ -5,15 +5,14 @@ import ms from 'ms'
import httpRequest from 'http-request-plus'
import map from 'lodash/map.js'
import noop from 'lodash/noop.js'
import { Client } from 'undici'
import { Agent, ProxyAgent, request } from 'undici'
import { coalesceCalls } from '@vates/coalesce-calls'
import { Collection } from 'xo-collection'
import { EventEmitter } from 'events'
import { Index } from 'xo-collection/index.js'
import { cancelable, defer, fromCallback, ignoreErrors, pDelay, pRetry, pTimeout } from 'promise-toolbox'
import { limitConcurrency } from 'limit-concurrency-decorator'
import { decorateClass } from '@vates/decorate-with'
import { ProxyAgent } from 'proxy-agent'

import debug from './_debug.mjs'
import getTaskResult from './_getTaskResult.mjs'
Expand Down Expand Up @@ -135,12 +134,30 @@ export class Xapi extends EventEmitter {
delete url.password
}

this._allowUnauthorized = opts.allowUnauthorized
const { httpProxy } = opts
const dispatcherOpts = {
maxRedirections: 3,
}
const tlsOpts = {
minVersion: 'TLSv1',
rejectUnauthorized: !opts.allowUnauthorized,
}
if (httpProxy !== undefined) {
this._httpAgent = new ProxyAgent({
getProxyForUrl: () => httpProxy,
rejectUnauthorized: !opts.allowUnauthorized,
const uri = new URL(httpProxy)
const token = 'Basic ' + Buffer.from(`${uri.username}:${uri.password}`).toString('base64')
this._undiciDispatcher = new ProxyAgent({
...dispatcherOpts,

proxyTls: tlsOpts,
requestTls: tlsOpts,
token,
uri,
})
} else {
this._undiciDispatcher = new Agent({
...dispatcherOpts,

connect: tlsOpts,
})
}
this._setUrl(url)
Expand Down Expand Up @@ -177,10 +194,6 @@ export class Xapi extends EventEmitter {
}
}

get httpAgent() {
return this._httpAgent
}

get readOnly() {
return this._readOnly
}
Expand Down Expand Up @@ -403,40 +416,30 @@ export class Xapi extends EventEmitter {
}

let url = new URL('http://localhost')
url.protocol = this._url.protocol
await this._setHostAddressInUrl(url, host)

const response = await this._addSyncStackTrace(
pRetry(
async () => {
const client = new Client(url, {
connect: {
rejectUnauthorized: !this._allowUnauthorized,
// Support XS <= 6.5 with Node => 12
minVersion: 'TLSv1',
},
return request(url, {
bodyTimeout: this._httpInactivityTimeout,
dispatcher: this._undiciDispatcher,
headersTimeout: this._httpInactivityTimeout,
maxRedirections: 0,
method: 'GET',
path: pathname,
query,
signal: $cancelToken,
}).then(response => {
const { statusCode } = response
if (((statusCode / 100) | 0) === 2) {
return response
}
const error = new Error(`unexpected ${response.statusCode}`)
Object.defineProperty(error, 'response', { value: response })
throw error
})

return client
.request({
method: 'GET',
path: pathname,
query,
maxRedirections: 0,
headersTimeout: this._httpInactivityTimeout,
bodyTimeout: this._httpInactivityTimeout,
agent: this.httpAgent,

signal: $cancelToken,
})
.then(response => {
const { statusCode } = response
if (((statusCode / 100) | 0) === 2) {
return response
}
const error = new Error(`${response.statusCode} ${response.statusMessage}`)
Object.defineProperty(error, 'response', { value: response })
throw error
})
},
{
when: error => error.response !== undefined && error.response.statusCode === 302,
Expand Down Expand Up @@ -506,8 +509,6 @@ export class Xapi extends EventEmitter {

const doRequest = (url, opts) =>
httpRequest(url, {
agent: this.httpAgent,

body,
headers,
method: 'PUT',
Expand Down Expand Up @@ -971,17 +972,9 @@ export class Xapi extends EventEmitter {
const { hostname } = url
url.hostnameRaw = hostname[0] === '[' ? hostname.slice(1, -1) : hostname

const client = new Client(url, {
connect: {
minVersion: 'TLSv1',
rejectUnauthorized: !this._allowUnauthorized,
},
})

this._humanId = `${this._auth.user ?? 'unknown'}@${url.hostname}`
this._transport = this._createTransport({
agent: this.httpAgent,
client,
dispatcher: this._undiciDispatcher,
url,
})
this._url = url
Expand Down
1 change: 0 additions & 1 deletion packages/xen-api/package.json
Expand Up @@ -46,7 +46,6 @@
"minimist": "^1.2.0",
"ms": "^2.1.1",
"promise-toolbox": "^0.21.0",
"proxy-agent": "^6.3.1",
"pw": "0.0.4",
"undici": "^6.2.1",
"xmlrpc-parser": "^1.0.3",
Expand Down
13 changes: 8 additions & 5 deletions packages/xen-api/transports/json-rpc.mjs
@@ -1,26 +1,29 @@
import { format, parse } from 'json-rpc-protocol'
import { request } from 'undici'

import XapiError from '../_XapiError.mjs'

import UnsupportedTransport from './_UnsupportedTransport.mjs'

// https://github.com/xenserver/xenadmin/blob/0df39a9d83cd82713f32d24704852a0fd57b8a64/XenModel/XenAPI/Session.cs#L403-L433
export default ({ agent, client, url }) => {
export default ({ dispatcher, url }) => {
url = new URL('./jsonrpc', Object.assign(new URL('http://localhost'), url))
const path = url.pathname + url.search

return async function (method, args) {
const res = await client.request({
const res = await request(url, {
dispatcher,
body: format.request(0, method, args),
headers: {
Accept: 'application/json',
'Content-Type': 'application/json',
},
method: 'POST',
path,
agent,
})

if (((res.statusCode / 100) | 0) !== 2) {
throw new Error('unexpect statusCode ' + res.statusCode)
}

// content-type is `text/xml` on old hosts where JSON-RPC is unsupported
if (res.headers['content-type'] !== 'application/json') {
throw new UnsupportedTransport()
Expand Down
13 changes: 8 additions & 5 deletions packages/xen-api/transports/xml-rpc.mjs
@@ -1,3 +1,4 @@
import { request } from 'undici'
import { XmlRpcMessage, XmlRpcResponse } from 'xmlrpc-parser'

import prepareXmlRpcParams from './_prepareXmlRpcParams.mjs'
Expand All @@ -20,24 +21,26 @@ const parseResult = result => {
return result.Value
}

export default ({ agent, client, url }) => {
export default ({ dispatcher, url }) => {
url = new URL('./xmlrpc', Object.assign(new URL('http://localhost'), url))
const path = url.pathname + url.search

return async function (method, args) {
const message = new XmlRpcMessage(method, prepareXmlRpcParams(args))

const res = await client.request({
const res = await request(url, {
dispatcher,
body: message.xml(),
headers: {
Accept: 'text/xml',
'Content-Type': 'text/xml',
},
method: 'POST',
path,
agent,
})

if (((res.statusCode / 100) | 0) !== 2) {
throw new Error('unexpect statusCode ' + res.statusCode)
}

if (res.headers['content-type'] !== 'text/xml' && res.headers['content-type'] !== 'application/xml') {
throw new UnsupportedTransport()
}
Expand Down

0 comments on commit 6c16055

Please sign in to comment.