Skip to content

Commit

Permalink
shared store resource pool #10010 (#10029)
Browse files Browse the repository at this point in the history
* shared store resource pool #10010

* shared store resource pool #10010

* shared store resource pool #10010

* shared store resource pool #10010

* shared store resource pool #10010

* shared store resource pool #10010

* shared store resource pool #10010

* shared store resource pool #10010
  • Loading branch information
pedrorfernandes committed Apr 5, 2023
1 parent 1cb216c commit d87519f
Show file tree
Hide file tree
Showing 13 changed files with 411 additions and 69 deletions.
5 changes: 4 additions & 1 deletion e2e/interop/shared-store.e2e.js
@@ -1,7 +1,10 @@
const assert = require('node:assert')
const { getValue, setValue } = require('../../packages/wdio-shared-store-service')
const { getValue, setValue, setResourcePool, getValueFromPool, addValueToPool } = require('../../packages/wdio-shared-store-service')

console.log('Test Shared Store exports')
assert.equal(typeof getValue, 'function')
assert.equal(typeof setValue, 'function')
assert.equal(typeof setResourcePool, 'function')
assert.equal(typeof getValueFromPool, 'function')
assert.equal(typeof addValueToPool, 'function')
console.log('Test Shared Store exports Test Passed!')
27 changes: 27 additions & 0 deletions packages/wdio-shared-store-service/README.md
Expand Up @@ -66,6 +66,33 @@ IMPORTANT! Every spec file should be atomic and isolated from others' specs.
The idea of the service is to deal with very specific environment setup issues.
Please avoid sharing test execution data!
### Resource Pools
If the worker threads are competing for resources that must be assigned for each worker, you can use Resource Pool API:
```js
// wdio.conf.js
import { setResourcePool, getValueFromPool, addValueToPool } from '@wdio/shared-store-service'

export const config = {
maxInstances: 2,
// ...
onPrepare: async function (config, capabilities) {
await setResourcePool('availableUrls', ['url01.com', 'url02.com'])
},
// ...
beforeSession: async (conf) => {
conf.baseUrl = await getValueFromPool('availableUrls');
},
// ...
afterSession: async (conf) => {
// worker returns the used resource for next workers to use
await addValueToPool('availableUrls', conf.baseUrl);
}
```
This example ensures that both workers never use the same `baseUrl`. A unique url is only assigned to one worker until it's released by it.
## Configuration
Add `shared-store` to the services list and the `sharedStore` object will be accessible to you on the [`browser` scope](https://webdriver.io/docs/api/browser) in your test.
Expand Down
Expand Up @@ -13,6 +13,8 @@ type PolkaRequest = {
path: string;
method: string;
body: Record<string, unknown>;
params: Record<string, unknown>;
query: Record<string, unknown>;
};

type PolkaResponse = {
Expand All @@ -23,7 +25,11 @@ interface PolkaInstance {
use: (use: any, cb: NextFn) => PolkaInstance;
post: (
path: string,
cb: (req: PolkaRequest, res: PolkaResponse) => void
cb: (req: PolkaRequest, res: PolkaResponse, next: Function) => void
) => PolkaInstance;
get: (
path: string,
cb: (req: PolkaRequest, res: PolkaResponse, next: Function) => void
) => PolkaInstance;
listen: Function;
server: Partial<import("http").Server> & { address(): { post: string } };
Expand Down
15 changes: 15 additions & 0 deletions packages/wdio-shared-store-service/src/cjs/index.ts
Expand Up @@ -9,3 +9,18 @@ exports.getValue = async function (key: string) {
const m = await esmModule
return m.getValue(key)
}

exports.setResourcePool = async function (key: string, value: never) {
const m = await esmModule
return m.setResourcePool(key, value)
}

exports.getValueFromPool = async function (key: string, options: never) {
const m = await esmModule
return m.getValueFromPool(key, options)
}

exports.addValueToPool = async function (key: string, value: never) {
const m = await esmModule
return m.addValueToPool(key, value)
}
92 changes: 54 additions & 38 deletions packages/wdio-shared-store-service/src/client.ts
@@ -1,24 +1,32 @@
import type { Response } from 'got'
import type { RequestError } from 'got'
import got from 'got'
import logger from '@wdio/logger'

import type { JsonCompatible, JsonPrimitive, JsonObject, JsonArray } from '@wdio/types'
import type { GetValueOptions } from 'src'

const log = logger('@wdio/shared-store-service')
let baseUrlResolve: Parameters<ConstructorParameters<typeof Promise>[0]>[0]
const baseUrlPromise = new Promise<string>((resolve) => {
baseUrlResolve = resolve
})

const WAIT_INTERVAL = 100
const pendingValues = new Map<string, any>()
let waitTimeout: NodeJS.Timer

let baseUrl: string | undefined
export const setPort = (port: number) => { baseUrl = `http://localhost:${port}` }
let isBaseUrlReady = false
export const setPort = (port: number) => {
/**
* if someone calls `setValue` in `onPrepare` we don't have a base url
* set as the launcher is called after user hooks. In this case we need
* to wait until it is set and flush all messages.
*/
baseUrlResolve(`http://localhost:${port}`)
isBaseUrlReady = true
}

/**
* make a request to the server to get a value from the store
* @param {string} key
* @returns {*}
*/
export const getValue = async (key: string): Promise<string | number | boolean | JsonObject | JsonArray | null | undefined> => {
const baseUrl = await baseUrlPromise
const res = await got.post(`${baseUrl}/get`, { json: { key }, responseType: 'json' }).catch(errHandler)
return res?.body ? (res.body as JsonObject).value : undefined
}
Expand All @@ -29,40 +37,48 @@ export const getValue = async (key: string): Promise<string | number | boolean |
* @param {*} value `store[key]` value (plain object)
*/
export const setValue = async (key: string, value: JsonCompatible | JsonPrimitive) => {
/**
* if someone calls `setValue` in `onPrepare` we don't have a base url
* set as the launcher is called after user hooks. In this case we need
* to wait until it is set and flush all messages.
*/
if (baseUrl) {
const setPromise = baseUrlPromise.then((baseUrl) => {
return got.post(`${baseUrl}/set`, { json: { key, value } }).catch(errHandler)
}
})

log.info('Shared store server not yet started, collecting value')
pendingValues.set(key, value)
return isBaseUrlReady ? setPromise : Promise.resolve()
}

if (waitTimeout) {
return
}
/**
*
* @param {string} key
* @param {*} value
*/
export const setResourcePool = async (key: string, value: JsonArray) => {
const setPromise = baseUrlPromise.then((baseUrl) => {
return got.post(`${baseUrl}/pool/set`, { json: { key, value } }).catch(errHandler)
})

log.info('Check shared store server to start')
waitTimeout = setInterval(async () => {
if (!baseUrl) {
return
}
return isBaseUrlReady ? setPromise : Promise.resolve()
}

log.info(`Shared store server started, flushing ${pendingValues.size} values`)
clearInterval(waitTimeout)
await Promise.all([...pendingValues.entries()].map(async ([key, value]) => {
await got.post(`${baseUrl}/set`, { json: { key, value } }).catch(errHandler)
pendingValues.delete(key)
})).then(
() => log.info('All pending values were successfully stored'),
(err) => log.error(`Failed to store all values: ${err.stack}`)
)
}, WAIT_INTERVAL)
/**
*
* @param {string} key
* @param {*} value
*/
export const getValueFromPool = async (key: string, options: GetValueOptions) => {
const baseUrl = await baseUrlPromise
const res = await got.get(`${baseUrl}/pool/get/${key}${options?.timeout ? `?timeout=${options.timeout}` : '' }`, { responseType: 'json' }).catch(errHandler)
return res?.body ? (res.body as JsonObject).value : undefined
}

/**
*
* @param {string} key
* @param {*} value
*/
export const addValueToPool = async (key: string, value: JsonPrimitive | JsonCompatible) => {
const baseUrl = await baseUrlPromise
const res = await got.post(`${baseUrl}/pool/add`, { json: { key, value }, responseType: 'json' }).catch(errHandler)
return res?.body ? (res.body as JsonObject).value : undefined
}

const errHandler = (err: Response<Error>) => {
log.warn(err.statusCode, err.statusMessage, err.url, err.body)
const errHandler = (err: RequestError) => {
throw new Error(`${err.response?.body || 'Shared store server threw an error'}`)
}
8 changes: 6 additions & 2 deletions packages/wdio-shared-store-service/src/index.ts
@@ -1,16 +1,20 @@
import type { JsonPrimitive, JsonCompatible } from '@wdio/types'
import type { JsonPrimitive, JsonCompatible, JsonArray } from '@wdio/types'

import SharedStoreLauncher from './launcher.js'
import SharedStoreService from './service.js'

export { getValue, setValue } from './client.js'
export { getValue, setValue, setResourcePool, getValueFromPool, addValueToPool } from './client.js'
export default SharedStoreService
export const launcher = SharedStoreLauncher
export type GetValueOptions = { timeout: Number } | undefined;

export interface BrowserExtension {
sharedStore: {
get: (key: string) => JsonPrimitive | JsonCompatible;
set: (key: string, value: JsonPrimitive | JsonCompatible) => void;
setResourcePool: (key: string, value: JsonArray) => void;
getValueFromPool: (key: string, options: GetValueOptions) => JsonPrimitive | JsonCompatible;
addValueToPool: (key: string, value: JsonPrimitive | JsonCompatible) => void;
}
}

Expand Down
59 changes: 59 additions & 0 deletions packages/wdio-shared-store-service/src/server.ts
Expand Up @@ -5,10 +5,12 @@ import { json } from '@polka/parse'
import type { JsonCompatible, JsonPrimitive, JsonObject } from '@wdio/types'

const store: JsonObject = {}
const resourcePoolStore: Map<string, any[]> = new Map()
/**
* @private
*/
export const __store = store
export const __resourcePoolStore = resourcePoolStore

const validateBody: NextFn = (req, res, next) => {
if (!req.path.endsWith('/get') && !req.path.endsWith('/set')) {
Expand All @@ -20,6 +22,9 @@ const validateBody: NextFn = (req, res, next) => {
next()
}

const MAX_TIMEOUT = 15000
const DEFAULT_TIMEOUT = 1000

export const startServer = () => new Promise<{ port: number, app: PolkaInstance }>((resolve, reject) => {
const app = polka()
/**
Expand All @@ -46,6 +51,60 @@ export const startServer = () => new Promise<{ port: number, app: PolkaInstance
store[key] = req.body.value as JsonCompatible | JsonPrimitive
return res.end()
})
.post('/pool/set', (req, res, next) => {
const key = req.body.key as string
const value = req.body.value as JsonCompatible | JsonPrimitive

if (!Array.isArray(value)) {
return next('Resource pool must be an array of values')
}

resourcePoolStore.set(key, value)
return res.end()
})
.get('/pool/get/:key', async (req, res, next) => {
const key = req.params.key as string

if (!resourcePoolStore.has(key)) {
return next(`'${key}' resource pool does not exist. Set it first using 'setResourcePool'`)
}

let pool = resourcePoolStore.get(key) || []

if (pool.length > 0) {
return res.end(JSON.stringify({ value: pool.shift() }))
}

const timeout = Math.min(parseInt(req.query.timeout as string) || DEFAULT_TIMEOUT, MAX_TIMEOUT)

try {
const result = await new Promise((resolve, reject) => {
setTimeout(function secondAttempt() {
pool = resourcePoolStore.get(key) || []
if (pool.length > 0) {
resolve({ value: pool.shift() })
}

reject(`'${key}' resource pool is empty. Set values to it first using 'setResourcePool' or 'addValueToPool'`)
}, timeout)
})
res.end(JSON.stringify(result))
} catch (err) {
return next(err)
}
})
.post('/pool/add', (req, res, next) => {
const key = req.body.key as string
const value = req.body.value as JsonCompatible | JsonPrimitive
const pool = resourcePoolStore.get(key)

if (!pool) {
return next(`'${key}' resource pool does not exist. Set it first using 'setResourcePool'`)
}

pool.push(value)
return res.end()
})

/**
* run server on a random port, `0` stands for random port
Expand Down
23 changes: 21 additions & 2 deletions packages/wdio-shared-store-service/src/service.ts
@@ -1,7 +1,8 @@
import type { JsonCompatible, JsonPrimitive, Services } from '@wdio/types'
import type { JsonCompatible, JsonPrimitive, Services, JsonArray } from '@wdio/types'

import { getValue, setValue, setPort } from './client.js'
import { getValue, setValue, setPort, setResourcePool, getValueFromPool, addValueToPool } from './client.js'
import type { SharedStoreServiceCapabilities } from './types.js'
import type { GetValueOptions } from './index.js'

export default class SharedStoreService implements Services.ServiceInstance {
private _browser?: WebdriverIO.Browser
Expand All @@ -25,6 +26,24 @@ export default class SharedStoreService implements Services.ServiceInstance {
key: string,
value: JsonCompatible | JsonPrimitive
) => this._browser?.call(() => setValue(key, value))
},
setResourcePool: {
value: (
key: string,
value: JsonArray
) => this._browser?.call(() => setResourcePool(key, value))
},
getValueFromPool: {
value: (
key: string,
options: GetValueOptions
) => this._browser?.call(() => getValueFromPool(key, options))
},
addValueToPool: {
value: (
key: string,
value: JsonCompatible | JsonPrimitive
) => this._browser?.call(() => addValueToPool(key, value))
}
})

Expand Down

0 comments on commit d87519f

Please sign in to comment.