From 2574eae6fdbe603b74a3c27a57e3b545aec54314 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?z=C5=8Dng=20y=C7=94?= Date: Tue, 6 Mar 2018 10:40:44 +0800 Subject: [PATCH] [BREAKING] feat: migrating from generators to async/await (#36) feat: migrating from generators to async/await --- .travis.yml | 2 +- appveyor.yml | 2 +- lib/client.js | 121 ++++++++++++++-------------- lib/index.js | 3 +- lib/leader.js | 45 ++++++----- lib/server.js | 33 ++++---- lib/utils.js | 9 ++- package.json | 38 ++++----- test/async.test.js | 27 +++++++ test/client.test.js | 122 +++++++++++++++++------------ test/close.test.js | 26 +++--- test/cluster.test.js | 2 +- test/connection.test.js | 40 +++++----- test/event.test.js | 16 ++-- test/index.test.js | 110 +++++++++++++++++--------- test/register_error.test.js | 20 ++--- test/server.test.js | 18 ++--- test/supports/async_api_client.js | 22 ++++++ test/supports/async_data_client.js | 21 +++++ test/supports/cluster_server.js | 29 +++++-- test/utils.test.js | 10 +-- 21 files changed, 432 insertions(+), 284 deletions(-) create mode 100644 test/async.test.js create mode 100644 test/supports/async_api_client.js create mode 100644 test/supports/async_data_client.js diff --git a/.travis.yml b/.travis.yml index 47cc542..960bc57 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,8 @@ sudo: false language: node_js node_js: - - '6' - '8' + - '9' install: - npm i npminstall && npminstall script: diff --git a/appveyor.yml b/appveyor.yml index 3d15e52..d0aa47e 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,7 +1,7 @@ environment: matrix: - - nodejs_version: '6' - nodejs_version: '8' + - nodejs_version: '9' install: - ps: Install-Product node $env:nodejs_version diff --git a/lib/client.js b/lib/client.js index ff5f963..1387ed2 100644 --- a/lib/client.js +++ b/lib/client.js @@ -1,7 +1,6 @@ 'use strict'; const debug = require('debug')('cluster-client'); -const co = require('co'); const is = require('is-type-of'); const Base = require('sdk-base'); const assert = require('assert'); @@ -46,9 +45,9 @@ class ClusterClient extends Base { this[logger].warn('[ClusterClient:%s] %s closed, and try to init it again', this.options.name, this[innerClient].isLeader ? 'leader' : 'follower'); this[isReady] = false; this.ready(false); - this[init](); + this[init]().catch(err => { this.ready(err); }); }; - this[init](); + this[init]().catch(err => { this.ready(err); }); } get isClusterClientLeader() { @@ -68,59 +67,55 @@ class ClusterClient extends Base { * * @return {void} */ - [init]() { - co(function* () { - const name = this.options.name; - const port = this.options.port; - let server; - if (this.options.isLeader === true) { - server = yield ClusterServer.create(name, port); - if (!server) { - throw new Error(`create "${name}" leader failed, the port:${port} is occupied by other`); - } - } else if (this.options.isLeader === false) { - // wait for leader active - yield ClusterServer.waitFor(port, this.options.maxWaitTime); - } else { - debug('[ClusterClient:%s] init cluster client, try to seize the leader on port:%d', name, port); - server = yield ClusterServer.create(name, port); + async [init]() { + const name = this.options.name; + const port = this.options.port; + let server; + if (this.options.isLeader === true) { + server = await ClusterServer.create(name, port); + if (!server) { + throw new Error(`create "${name}" leader failed, the port:${port} is occupied by other`); } + } else if (this.options.isLeader === false) { + // wait for leader active + await ClusterServer.waitFor(port, this.options.maxWaitTime); + } else { + debug('[ClusterClient:%s] init cluster client, try to seize the leader on port:%d', name, port); + server = await ClusterServer.create(name, port); + } - if (server) { - this[innerClient] = new Leader(Object.assign({ server }, this.options)); - debug('[ClusterClient:%s] has seized port %d, and serves as leader client.', name, port); - } else { - this[innerClient] = new Follower(this.options); - debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port); - } + if (server) { + this[innerClient] = new Leader(Object.assign({ server }, this.options)); + debug('[ClusterClient:%s] has seized port %d, and serves as leader client.', name, port); + } else { + this[innerClient] = new Follower(this.options); + debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port); + } - // events delegate - utils.delegateEvents(this[innerClient], this); + // events delegate + utils.delegateEvents(this[innerClient], this); - // re init when connection is close - this[innerClient].on('close', this[closeHandler]); + // re init when connection is close + this[innerClient].on('close', this[closeHandler]); - // wait leader/follower ready - yield this[innerClient].ready(); + // wait leader/follower ready + await this[innerClient].ready(); - // subscribe all - for (const registrations of this[subInfo].values()) { - for (const args of registrations) { - this[innerClient].subscribe(args[0], args[1]); - } - } - // publish all - for (const reg of this[pubInfo].values()) { - this[innerClient].publish(reg); + // subscribe all + for (const registrations of this[subInfo].values()) { + for (const args of registrations) { + this[innerClient].subscribe(args[0], args[1]); } + } + // publish all + for (const reg of this[pubInfo].values()) { + this[innerClient].publish(reg); + } - if (!this[isReady]) { - this[isReady] = true; - this.ready(true); - } - }.bind(this)).catch(err => { - this.ready(err); - }); + if (!this[isReady]) { + this[isReady] = true; + this.ready(true); + } } /** @@ -210,24 +205,22 @@ class ClusterClient extends Base { this[innerClient].invoke(method, args, callback); } - [close]() { - return co(function* () { - try { - // close after ready, in case of innerClient is initializing - yield this.ready(); - } catch (err) { - // ignore - } + async [close]() { + try { + // close after ready, in case of innerClient is initializing + await this.ready(); + } catch (err) { + // ignore + } - const client = this[innerClient]; - if (client) { - // prevent re-initializing - client.removeListener('close', this[closeHandler]); - if (client.close) { - yield utils.callFn(client.close.bind(client)); - } + const client = this[innerClient]; + if (client) { + // prevent re-initializing + client.removeListener('close', this[closeHandler]); + if (client.close) { + await utils.callFn(client.close.bind(client)); } - }.bind(this)); + } } } diff --git a/lib/index.js b/lib/index.js index 16c9408..e56b3e0 100644 --- a/lib/index.js +++ b/lib/index.js @@ -194,7 +194,8 @@ class ClientWrapper { for (const key of keys) { const descriptor = Reflect.getOwnPropertyDescriptor(proto, key); - if (descriptor.value && is.generatorFunction(descriptor.value)) { + if (descriptor.value && + (is.generatorFunction(descriptor.value) || is.asyncFunction(descriptor.value))) { this.delegate(key); } } diff --git a/lib/leader.js b/lib/leader.js index 81ffc59..e81bdf2 100644 --- a/lib/leader.js +++ b/lib/leader.js @@ -53,13 +53,14 @@ class Leader extends Base { }); } - this._closeHandler = this._handleClose.bind(this); this._handleConnection = this._handleConnection.bind(this); // subscribe its own channel this._server.on(`${this.options.name}_connection`, this._handleConnection); this._server.once('close', () => { this.emit('server_closed'); }); - this.on('server_closed', this._closeHandler); + this.on('server_closed', () => { + this._handleClose().catch(err => { this.emit('error', err); }); + }); // maxIdleTime is 3 times of heartbeatInterval const heartbeatInterval = this.options.heartbeatInterval; @@ -247,7 +248,7 @@ class Leader extends Base { const conn = new Connection({ socket, name: this.options.name, - logger: this.options.logger, + logger: this.logger, transcode: this.options.transcode, requestTimeout: this.options.requestTimeout, }); @@ -393,14 +394,14 @@ class Leader extends Base { }); } - * _handleClose() { + async _handleClose() { debug('[Leader:%s] leader server is closed', this.options.name); // close the real client if (this._realClient) { const originClose = this._findMethodName('close'); if (originClose) { // support common function, generatorFunction, and function returning a promise - yield utils.callFn(this._realClient[originClose].bind(this._realClient)); + await utils.callFn(this._realClient[originClose].bind(this._realClient)); } } clearInterval(this._heartbeatTimer); @@ -408,28 +409,26 @@ class Leader extends Base { this.emit('close'); } - close() { + async close() { this._closeByUser = true; - return co(function* () { - debug('[Leader:%s] try to close leader', this.options.name); - // 1. stop listening to server channel - this._server.removeListener(`${this.options.name}_connection`, this._handleConnection); - - // 2. close all mock connections - for (const conn of this._connections.values()) { - if (conn.isMock) { - conn.emit('close'); - } + debug('[Leader:%s] try to close leader', this.options.name); + // 1. stop listening to server channel + this._server.removeListener(`${this.options.name}_connection`, this._handleConnection); + + // 2. close all mock connections + for (const conn of this._connections.values()) { + if (conn.isMock) { + conn.emit('close'); } + } - // 3. close server - // CANNOT close server directly by server.close(), other cluster clients may be using it - this.removeAllListeners('server_closed'); - yield ClusterServer.close(this.options.name, this._server); + // 3. close server + // CANNOT close server directly by server.close(), other cluster clients may be using it + this.removeAllListeners('server_closed'); + await ClusterServer.close(this.options.name, this._server); - // 5. close real client - yield this._handleClose(); - }.bind(this)); + // 5. close real client + await this._handleClose(); } } diff --git a/lib/server.js b/lib/server.js index ac6a8e0..0b56865 100644 --- a/lib/server.js +++ b/lib/server.js @@ -3,6 +3,7 @@ const debug = require('debug')('cluster-client:server'); const net = require('net'); const Base = require('sdk-base'); +const sleep = require('mz-modules/sleep'); const Packet = require('./protocol/packet'); // share memory in current process @@ -19,10 +20,8 @@ if (global.typeSet) { global.typeSet = typeSet = new Set(); } -const sleep = timeout => cb => setTimeout(cb, timeout); - function claimServer(port) { - return cb => { + return new Promise((resolve, reject) => { const server = net.createServer(); server.listen({ port, @@ -33,26 +32,26 @@ function claimServer(port) { function onError(err) { debug('listen %s error: %s', port, err); - cb(err); + reject(err); } server.on('error', onError); server.on('listening', () => { server.removeListener('error', onError); debug('listen %s success', port); - cb(null, server); + resolve(server); }); - }; + }); } function tryToConnect(port) { - return cb => { + return new Promise(resolve => { const socket = net.connect(port, '127.0.0.1'); debug('try to connecting %s', port); let success = false; socket.on('connect', () => { success = true; - cb(null, true); + resolve(true); // disconnect socket.end(); debug('test connected %s success, end now', port); @@ -61,9 +60,9 @@ function tryToConnect(port) { debug('test connect %s error: %s, success: %s', port, err, success); // if success before, ignore it if (success) return; - cb(null, false); + resolve(false); }); - }; + }); } class ClusterServer extends Base { @@ -169,7 +168,7 @@ class ClusterServer extends Base { * @param {Number} port - the port * @return {ClusterServer} server */ - static* create(name, port) { + static async create(name, port) { const key = `${name}@${port}`; let instance = serverMap.get(port); if (instance && !instance.isClosed) { @@ -181,7 +180,7 @@ class ClusterServer extends Base { } // compete for the local port, if got => leader, otherwise follower try { - const server = yield claimServer(port); + const server = await claimServer(port); instance = new ClusterServer({ server, port }); typeSet.add(key); serverMap.set(port, instance); @@ -200,7 +199,7 @@ class ClusterServer extends Base { } } - static* close(name, server) { + static async close(name, server) { const port = server._port; // remove from typeSet, so other client can occupy @@ -217,7 +216,7 @@ class ClusterServer extends Base { // close server if no one is listening on this port any more if (!listening) { const server = serverMap.get(port); - if (server) yield server.close(); + if (server) await server.close(); } } @@ -228,18 +227,18 @@ class ClusterServer extends Base { * @param {Number} timeout - the max wait time * @return {void} */ - static* waitFor(port, timeout) { + static async waitFor(port, timeout) { const start = Date.now(); let connect = false; while (!connect) { - connect = yield tryToConnect(port); + connect = await tryToConnect(port); // if timeout, throw error if (Date.now() - start > timeout) { throw new Error(`[ClusterClient] leader does not be active in ${timeout}ms on port:${port}`); } if (!connect) { - yield sleep(3000); + await sleep(3000); } } } diff --git a/lib/utils.js b/lib/utils.js index 05d734a..5690d9c 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,5 +1,6 @@ 'use strict'; +const co = require('co'); const is = require('is-type-of'); const stringify = require('json-stringify-safe'); @@ -70,15 +71,17 @@ exports.formatKey = formatKey; * @param {Array} args - args as fn() paramaters * @return {*} data returned by fn */ -exports.callFn = function* (fn, args) { +exports.callFn = async function(fn, args) { args = args || []; if (!is.function(fn)) return; if (is.generatorFunction(fn)) { - return yield fn(...args); + return await co(function* () { + return yield fn(...args); + }); } const r = fn(...args); if (is.promise(r)) { - return yield r; + return await r; } return r; }; diff --git a/package.json b/package.json index 3766d03..3775f4a 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,7 @@ "test": "npm run lint && npm run test-local", "test-local": "egg-bin test", "pkgfiles": "egg-bin pkgfiles --check", - "ci": "egg-bin autod --check && npm run pkgfiles && npm run lint && npm run cov", + "ci": "npm run autod --check && npm run pkgfiles && npm run lint && npm run cov", "contributors": "contributors -f plain -o AUTHORS" }, "repository": { @@ -32,38 +32,40 @@ }, "homepage": "https://github.com/node-modules/cluster-client#readme", "dependencies": { - "byte": "^1.2.0", + "byte": "^1.4.0", "co": "^4.6.0", - "debug": "^3.0.1", - "egg-logger": "^1.6.0", + "debug": "^3.1.0", + "egg-logger": "^1.6.1", "is-type-of": "^1.2.0", "json-stringify-safe": "^5.0.1", - "long": "^3.2.0", - "sdk-base": "^3.3.0", - "serialize-json": "^1.0.1", - "tcp-base": "^3.0.0", - "utility": "^1.12.0" + "long": "^4.0.0", + "mz-modules": "^2.1.0", + "sdk-base": "^3.4.0", + "serialize-json": "^1.0.2", + "tcp-base": "^3.1.0", + "utility": "^1.13.1" }, "devDependencies": { "address": "^1.0.3", - "autod": "^2.9.0", + "autod": "^3.0.1", "await-event": "^2.1.0", "coffee": "^4.1.0", "contributors": "^0.5.1", - "egg-bin": "^4.3.3", + "egg-bin": "^4.3.7", "egg-ci": "^1.8.0", - "egg-mock": "^3.12.1", - "eslint": "^4.7.1", - "eslint-config-egg": "^5.1.1", + "egg-mock": "^3.14.1", + "eslint": "^4.18.2", + "eslint-config-egg": "^7.0.0", "mm": "^2.2.0", - "mz-modules": "^2.0.0", + "mz-modules": "^2.1.0", "pedding": "^1.1.0", - "spy": "^1.0.0" + "spy": "^1.0.0", + "webstorm-disable-index": "^1.2.0" }, "engines": { - "node": ">= 6.0.0" + "node": ">= 8.0.0" }, "ci": { - "version": "6, 8" + "version": "8, 9" } } diff --git a/test/async.test.js b/test/async.test.js new file mode 100644 index 0000000..2cf0bda --- /dev/null +++ b/test/async.test.js @@ -0,0 +1,27 @@ +'use strict'; + +const assert = require('assert'); +const APIClient = require('./supports/async_api_client'); + +describe('test/async.test.js', () => { + it('should support auto delegate async function', async function() { + const leader = new APIClient(); + const follower = new APIClient(); + + await Promise.all([ + leader.ready(), + follower.ready(), + ]); + + let ret = await follower.echo('hello'); + assert(ret === 'hello'); + + ret = await leader.echo('hello'); + assert(ret === 'hello'); + + await Promise.all([ + follower.close(), + leader.close(), + ]); + }); +}); diff --git a/test/client.test.js b/test/client.test.js index cf84017..15edfcd 100644 --- a/test/client.test.js +++ b/test/client.test.js @@ -59,7 +59,7 @@ describe('test/client.test.js', () => { } } - it('should work ok', function* () { + it('should work ok', async function() { const client_1 = new ClusterClient(); const client_2 = new ClusterClient(); @@ -76,29 +76,29 @@ describe('test/client.test.js', () => { }); client_1.subscribe({ key: 'foo' }, listener_1); - let ret = yield client_1.await('foo_received'); + let ret = await client_1.await('foo_received'); assert(is.array(ret) && ret.length === 0); client_2.subscribe({ key: 'foo' }, val => { client_2.emit('foo_received', val); }); client_2.subscribe({ key: 'foo' }, listener_2); - ret = yield client_2.await('foo_received'); + ret = await client_2.await('foo_received'); assert(is.array(ret) && ret.length === 0); client_2.subscribe({ key: 'foo' }, val => { client_2.emit('foo_received_again', val); }); - ret = yield client_2.await('foo_received_again'); + ret = await client_2.await('foo_received_again'); assert(is.array(ret) && ret.length === 0); // publish client_2.publish({ key: 'foo', value: 'bar' }); - let rs = yield [ + let rs = await Promise.all([ client_1.await('foo_received'), client_2.await('foo_received'), - ]; + ]); assert(is.array(rs[0]) && rs[0].length === 1); assert(rs[0][0] === 'bar'); assert(is.array(rs[1]) && rs[1].length === 1); @@ -107,10 +107,10 @@ describe('test/client.test.js', () => { // unPublish client_2.unPublish({ key: 'foo', value: 'bar' }); - rs = yield [ + rs = await Promise.all([ client_1.await('foo_received_1'), client_2.await('foo_received_2'), - ]; + ]); assert(is.array(rs[0]) && rs[0].length === 0); assert(is.array(rs[1]) && rs[1].length === 0); @@ -121,46 +121,38 @@ describe('test/client.test.js', () => { // publish again client_2.publish({ key: 'foo', value: 'bar_1' }); - yield [ - function* () { - yield new Promise((resolve, reject) => { - setTimeout(resolve, 3000); - client_1.once('foo_received_1', () => { reject(new Error('should not run here')); }); - }); - }, - function* () { - yield new Promise((resolve, reject) => { - setTimeout(resolve, 3000); - client_2.once('foo_received_2', () => { reject(new Error('should not run here')); }); - }); - }, - ]; + await Promise.all([ + new Promise((resolve, reject) => { + setTimeout(resolve, 3000); + client_1.once('foo_received_1', () => { reject(new Error('should not run here')); }); + }), + new Promise((resolve, reject) => { + setTimeout(resolve, 3000); + client_2.once('foo_received_2', () => { reject(new Error('should not run here')); }); + }), + ]); client_1.unSubscribe({ key: 'foo' }); client_2.unSubscribe({ key: 'foo' }); client_2.publish({ key: 'foo', value: 'bar_2' }); - yield [ - function* () { - yield new Promise((resolve, reject) => { - setTimeout(resolve, 3000); - client_1.once('foo_received', () => { reject(new Error('should not run here')); }); - }); - }, - function* () { - yield new Promise((resolve, reject) => { - setTimeout(resolve, 3000); - client_2.once('foo_received', () => { reject(new Error('should not run here')); }); - }); - }, - ]; + await Promise.all([ + new Promise((resolve, reject) => { + setTimeout(resolve, 3000); + client_1.once('foo_received', () => { reject(new Error('should not run here')); }); + }), + new Promise((resolve, reject) => { + setTimeout(resolve, 3000); + client_2.once('foo_received', () => { reject(new Error('should not run here')); }); + }), + ]); client_1.close(); client_2.close(); }); - it('should subscribe for second time', function* () { + it('should subscribe for second time', async function() { const client = new ClusterClient(); client.publish({ key: 'foo', value: 'bar' }); @@ -168,30 +160,37 @@ describe('test/client.test.js', () => { client.emit('foo_received_1', val); }); - let ret = yield client.await('foo_received_1'); + let ret = await client.await('foo_received_1'); assert.deepEqual(ret, [ 'bar' ]); client.subscribe({ key: 'foo' }, val => { client.emit('foo_received_2', val); }); - ret = yield client.await('foo_received_2'); + ret = await client.await('foo_received_2'); assert.deepEqual(ret, [ 'bar' ]); - yield client.close(); + await client.close(); }); class ErrorClient extends Base { constructor() { super({ initMethod: '_init' }); + + this.data = ''; } - * _init() { + async _init() { + await sleep(1000); throw new Error('mock error'); } send(data) { console.log('send', data); } + + async getData() { + return this.data; + } } class APIClient extends APIClientBase { @@ -212,20 +211,47 @@ describe('test/client.test.js', () => { }; } - * _init() { + async _init() { + await sleep(1000); throw new Error('mock error'); } send(data) { this._client.send(data); } + + async getData() { + return await this._client.getData(); + } } - it('invokeOneway + ready error', function* () { + it('should invoke with ready err', async function() { + const leader = new APIClient(); + try { + await leader.getData(); + assert(false); + } catch (err) { + assert(err && err.message === 'mock error'); + } + + const follower = new APIClient(); + + try { + await follower.getData(); + assert(false); + } catch (err) { + assert(err && err.message === 'mock error'); + } + + await follower.close(); + await follower.close(); + }); + + it('invokeOneway + ready error', async function() { const client = new APIClient(); client.send(123); try { - yield client.ready(); + await client.ready(); } catch (err) { assert(err.message === 'mock error'); } @@ -233,7 +259,7 @@ describe('test/client.test.js', () => { const client2 = new APIClient(); client2.send(321); try { - yield client2.ready(); + await client2.ready(); } catch (err) { assert(err.message === 'mock error'); } @@ -241,9 +267,9 @@ describe('test/client.test.js', () => { client.send(123); client2.send(321); - yield sleep(2000); + await sleep(2000); - yield client.close(); - yield client2.close(); + await client.close(); + await client2.close(); }); }); diff --git a/test/close.test.js b/test/close.test.js index 57b2e8a..2a38821 100644 --- a/test/close.test.js +++ b/test/close.test.js @@ -20,18 +20,18 @@ describe('test/close.test.js', () => { }); }); - it('should delegate close ok', function* () { + it('should delegate close ok', async function() { const leader = cluster(CloseClient, { port }) .delegate('destroy', 'close') .create(); - yield leader.ready(); + await leader.ready(); assert(fs.existsSync(path.join(__dirname, `supports/${process.version}.bin`))); - yield leader.destroy(); + await leader.destroy(); assert(!fs.existsSync(path.join(__dirname, `supports/${process.version}.bin`))); }); - it('should APIClient has default close', function* () { + it('should APIClient has default close', async function() { class APIClient extends cluster.APIClientBase { get DataClient() { return CloseClient; @@ -43,8 +43,8 @@ describe('test/close.test.js', () => { } let client = new APIClient(); - yield client.ready(); - yield client.close(); + await client.ready(); + await client.close(); class APIClient2 extends cluster.APIClientBase { get DataClient() { @@ -57,11 +57,11 @@ describe('test/close.test.js', () => { } client = new APIClient2(); - yield client.ready(); - yield client.close(); + await client.ready(); + await client.close(); }); - it('should handle error event after closed', function* () { + it('should handle error event after closed', async function() { class DataClient extends Base { constructor(options) { super(options); @@ -78,16 +78,16 @@ describe('test/close.test.js', () => { const leader = cluster(DataClient, { port }) .create(); - yield leader.ready(); - yield leader.close(); + await leader.ready(); + await leader.close(); try { - yield leader.await('error'); + await leader.await('error'); } catch (err) { assert(err.message === 'mock error'); } // close again should work - yield leader.close(); + await leader.close(); }); }); diff --git a/test/cluster.test.js b/test/cluster.test.js index ba87bd8..fff63ef 100644 --- a/test/cluster.test.js +++ b/test/cluster.test.js @@ -72,7 +72,7 @@ describe('test/cluster.test.js', () => { it('should work on cluster module', () => { return coffee.fork(path.join(__dirname, 'supports/cluster_server.js')) - // .debug() + // .debug(0) // make sure leader and follower exists .expect('stdout', /, leader: true/) .expect('stdout', /, leader: false/) diff --git a/test/connection.test.js b/test/connection.test.js index dc72038..a0156a6 100644 --- a/test/connection.test.js +++ b/test/connection.test.js @@ -41,35 +41,37 @@ describe('test/connection.test.js', () => { server.once('close', done); }); - it('should throw error if send timeout', function* () { + it('should throw error if send timeout', async function() { const socket = net.connect(port, '127.0.0.1'); - yield awaitEvent(socket, 'connect'); - yield sleep(100); + await awaitEvent(socket, 'connect'); + await sleep(100); assert(conns.has(socket.localPort)); const conn = conns.get(socket.localPort); try { - yield cb => { + await new Promise((resolve, reject) => { conn.send(new Request({ connObj: { foo: 'bar' }, timeout: 1000, - }), cb); - }; + }), err => { + if (err) { reject(err); } else { resolve(); } + }); + }); assert(false, 'no here'); } catch (err) { assert(err && err.name === 'ClusterConnectionResponseTimeoutError'); assert(err.message === `[ClusterClient] no response in 1000ms, remotePort#${socket.localPort}`); } socket.destroy(); - yield awaitEvent(socket, 'close'); - yield sleep(100); + await awaitEvent(socket, 'close'); + await sleep(100); assert(!conns.has(socket.localPort)); }); - it('should handle request ok', function* () { + it('should handle request ok', async function() { const socket = net.connect(port, '127.0.0.1'); - yield awaitEvent(socket, 'connect'); - yield sleep(100); + await awaitEvent(socket, 'connect'); + await sleep(100); assert(conns.has(socket.localPort)); const conn = conns.get(socket.localPort); @@ -79,31 +81,31 @@ describe('test/connection.test.js', () => { timeout: 1000, }).encode()); - const req = yield conn.await('request'); + const req = await conn.await('request'); assert(req && !req.isResponse); assert(req.timeout === 1000); assert.deepEqual(req.connObj, { foo: 'bar' }); assert(!req.data); - yield Promise.all([ + await Promise.all([ conn.close(), conn.close(), // close second time awaitEvent(socket, 'close'), ]); - yield sleep(100); + await sleep(100); assert(!conns.has(socket.localPort)); }); - it('should close connection if decode error', function* () { + it('should close connection if decode error', async function() { const socket = net.connect(port, '127.0.0.1'); - yield awaitEvent(socket, 'connect'); - yield sleep(100); + await awaitEvent(socket, 'connect'); + await sleep(100); assert(conns.has(socket.localPort)); socket.write(new Buffer('010000000000000000000001000003e80000000d000000007b22666f6f223a22626172227c', 'hex')); - yield awaitEvent(socket, 'close'); - yield sleep(100); + await awaitEvent(socket, 'close'); + await sleep(100); assert(!conns.has(socket.localPort)); }); }); diff --git a/test/event.test.js b/test/event.test.js index 6e58737..b02dd2c 100644 --- a/test/event.test.js +++ b/test/event.test.js @@ -32,23 +32,23 @@ describe('test/event.test.js', () => { } } - before(function* () { + before(async function() { const server = net.createServer(); - port = yield cb => { + port = await new Promise(resolve => { server.listen(0, () => { const address = server.address(); console.log('using port =>', address.port); server.close(); - cb(null, address.port); + resolve(address.port); }); - }; + }); client = new ClusterClient(); }); - after(function* () { - yield client.close(); + after(async function() { + await client.close(); }); - it('should ok', function* () { + it('should ok', async function() { mm(process, 'emitWarning', err => { client.emit('error', err); }); @@ -62,7 +62,7 @@ describe('test/event.test.js', () => { }); }; - yield Promise.race([ + await Promise.race([ client.await('error'), client.await('foo'), subscribe(), diff --git a/test/index.test.js b/test/index.test.js index 6f5c6d7..353ece0 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -11,12 +11,44 @@ const assert = require('assert'); const pedding = require('pedding'); const serverMap = global.serverMap; const symbols = require('../lib/symbol'); +const ClusterServer = require('../lib/server'); const NotifyClient = require('./supports/notify_client'); const RegistryClient = require('./supports/registry_client'); const portDelta = Number(process.versions.node.slice(0, 1)); describe('test/index.test.js', () => { + afterEach(mm.restore); + + it('should throw if port is occupied by other, while new Leader', async function() { + mm(ClusterServer, 'create', async () => null); + + const server = net.createServer(); + const port = await new Promise(resolve => { + server.listen(0, () => { + const address = server.address(); + // console.log('using port =>', port); + // server.close(); + resolve(address.port); + }); + }); + const leader = cluster(RegistryClient, { port, isLeader: true }) + .delegate('subscribe', 'subscribe') + .delegate('publish', 'publish') + .override('foo', 'bar') + .create(); + + try { + await leader.ready(); + assert(false); + } catch (err) { + assert(err.message === `create "RegistryClient" leader failed, the port:${port} is occupied by other`); + } + + await leader.close(); + server.close(); + }); + describe('RegistryClient', () => { const port = 8880 + portDelta; let leader; @@ -30,13 +62,13 @@ describe('test/index.test.js', () => { follower = cluster(RegistryClient, { port, isLeader: false }).create(); }); - afterEach(function* () { + afterEach(async function() { assert(serverMap.has(port) === true); - yield Promise.race([ + await Promise.race([ cluster.close(follower), follower.await('error'), ]); - yield Promise.race([ + await Promise.race([ cluster.close(leader), leader.await('error'), ]); @@ -107,7 +139,7 @@ describe('test/index.test.js', () => { }); }); - it('should should not close net.Server if other client is using same port', function* () { + it('should should not close net.Server if other client is using same port', async function() { class AnotherClient extends Base { constructor() { super(); @@ -115,18 +147,18 @@ describe('test/index.test.js', () => { } } const anotherleader = cluster(AnotherClient, { port, isLeader: true }).create(); - yield anotherleader.ready(); + await anotherleader.ready(); // assert has problem with global scope virable // assert(serverMap.has(port) === true); if (!serverMap.has(port)) throw new Error(); - yield cluster.close(anotherleader); + await cluster.close(anotherleader); // leader is using the same port, so anotherleader.close should not close the net.Server if (!serverMap.has(port)) throw new Error(); }); - it('should realClient.close be a generator function ok', function* () { + it('should realClient.close be a generator function ok', async function() { class RealClientWithGeneratorClose extends Base { constructor() { super(); @@ -138,8 +170,8 @@ describe('test/index.test.js', () => { } } const anotherleader = cluster(RealClientWithGeneratorClose, { port, isLeader: true }).create(); - yield anotherleader.ready(); - yield cluster.close(anotherleader); + await anotherleader.ready(); + await cluster.close(anotherleader); // make sure real client is closed; // assert has problem with global scope virable if (anotherleader[symbols.innerClient]._realClient.closed !== true) { @@ -147,7 +179,7 @@ describe('test/index.test.js', () => { } }); - it('should realClient.close be a normal function ok', function* () { + it('should realClient.close be a normal function ok', async function() { class RealClientWithNormalClose extends Base { constructor() { super(); @@ -158,8 +190,8 @@ describe('test/index.test.js', () => { } } const anotherleader = cluster(RealClientWithNormalClose, { port, isLeader: true }).create(); - yield anotherleader.ready(); - yield cluster.close(anotherleader); + await anotherleader.ready(); + await cluster.close(anotherleader); // make sure real client is closed; // assert has problem with global scope virable if (anotherleader[symbols.innerClient]._realClient.closed !== true) { @@ -167,7 +199,7 @@ describe('test/index.test.js', () => { } }); - it('should realClient.close be a function returning promise ok', function* () { + it('should realClient.close be a function returning promise ok', async function() { class RealClientWithCloseReturningPromise extends Base { constructor() { super(); @@ -178,8 +210,8 @@ describe('test/index.test.js', () => { } } const anotherleader = cluster(RealClientWithCloseReturningPromise, { port, isLeader: true }).create(); - yield anotherleader.ready(); - yield cluster.close(anotherleader); + await anotherleader.ready(); + await cluster.close(anotherleader); // make sure real client is closed; // assert has problem with global scope virable if (anotherleader[symbols.innerClient]._realClient.closed !== true) { @@ -303,7 +335,7 @@ describe('test/index.test.js', () => { } }); - it('should symbol function not delegated', function* () { + it('should symbol function not delegated', () => { assert(!leader[SYMBOL_FN]); assert(!follower[SYMBOL_FN]); }); @@ -483,15 +515,15 @@ describe('test/index.test.js', () => { let leader; let follower; let follower2; - before(function* () { + before(() => { leader = cluster(RegistryClient, { isLeader: true, port, isBroadcast: false }).create(4322, '224.5.6.9'); follower = cluster(RegistryClient, { isLeader: false, port, isBroadcast: false }).create(4322, '224.5.6.9'); follower2 = cluster(RegistryClient, { isLeader: false, port, isBroadcast: false }).create(4322, '224.5.6.9'); }); - after(function* () { - yield follower.close(); - yield follower2.close(); - yield leader.close(); + after(async function() { + await follower.close(); + await follower2.close(); + await leader.close(); }); @@ -541,47 +573,48 @@ describe('test/index.test.js', () => { let client_1; let client_2; let client_3; - before(function* () { + before(async function() { client_1 = cluster(RegistryClient, { port }).create(4323, '224.5.6.10'); client_2 = cluster(RegistryClient, { port }).create(4323, '224.5.6.10'); client_3 = cluster(RegistryClient, { port }).create(4323, '224.5.6.10'); - yield client_1.ready(); - yield client_2.ready(); - yield client_3.ready(); + await client_1.ready(); + await client_2.ready(); + await client_3.ready(); }); - after(() => { - cluster.close(client_1); - cluster.close(client_2); - cluster.close(client_3); + after(async () => { + await cluster.close(client_3); + await cluster.close(client_2); + await cluster.close(client_1); }); it('should re subscribe / publish ok', done => { done = pedding(done, 3); let trigger_1 = false; let trigger_2 = false; + let trigger_3 = false; client_1.subscribe({ dataId: 'com.alibaba.dubbo.demo.DemoService', }, val => { + if (trigger_1) return; + + trigger_1 = true; assert(val && val.length > 0); console.log('1', val.map(url => url.host)); assert(val.some(url => url.host === '30.20.78.299:20880')); - if (!trigger_1) { - trigger_1 = true; - done(); - } + done(); }); client_2.subscribe({ dataId: 'com.alibaba.dubbo.demo.DemoService', }, val => { + if (trigger_2) return; + + trigger_2 = true; assert(val && val.length > 0); console.log('2', val.map(url => url.host)); assert(val.some(url => url.host === '30.20.78.299:20880')); - if (!trigger_2) { - trigger_2 = true; - done(); - } + done(); }); client_2.publish({ @@ -610,6 +643,9 @@ describe('test/index.test.js', () => { client_3.subscribe({ dataId: 'com.alibaba.dubbo.demo.DemoService', }, val => { + if (trigger_3) return; + + trigger_3 = true; assert(val && val.length > 0); console.log('3', val.map(url => url.host)); if (val.length === 2) { diff --git a/test/register_error.test.js b/test/register_error.test.js index 18ef76f..4c55c6b 100644 --- a/test/register_error.test.js +++ b/test/register_error.test.js @@ -21,7 +21,7 @@ describe('test/register_error.test.js', () => { }); afterEach(mm.restore); - it('should register channel util success', function* () { + it('should register channel util success', async function() { const originDecode = Packet.decode; mm(Packet, 'decode', function(buf) { const ret = originDecode(buf); @@ -35,14 +35,14 @@ describe('test/register_error.test.js', () => { const leader = cluster(Client, { port }).create(); const follower = cluster(Client, { port }).create(); - yield leader.ready(); - yield follower.ready(); + await leader.ready(); + await follower.ready(); - yield follower.close(); - yield leader.close(); + await follower.close(); + await leader.close(); }); - it('should handle register_channel request in leader', function* () { + it('should handle register_channel request in leader', async function() { mm(Response.prototype, 'encode', function() { mm.restore(); return new Buffer('01010000000000000000000000000bb80000001f000000007b2274797065223a2272656769737465725f6368616e6e656c5f726573227d', 'hex'); @@ -51,11 +51,11 @@ describe('test/register_error.test.js', () => { const leader = cluster(Client, { port }).create(); const follower = cluster(Client, { port }).create(); - yield leader.ready(); - yield follower.ready(); + await leader.ready(); + await follower.ready(); - yield follower.close(); - yield leader.close(); + await follower.close(); + await leader.close(); // subscribe after close follower.subscribe({ foo: 'bar' }, val => { diff --git a/test/server.test.js b/test/server.test.js index c3bccf9..963497c 100644 --- a/test/server.test.js +++ b/test/server.test.js @@ -15,20 +15,20 @@ describe('test/server.test.js', () => { .end(done); }); - it('should return null create with same name', function* () { - const server1 = yield ClusterServer.create('same-name', 10001); + it('should return null create with same name', async function() { + const server1 = await ClusterServer.create('same-name', 10001); assert(server1); - const server2 = yield ClusterServer.create('same-name', 10001); + const server2 = await ClusterServer.create('same-name', 10001); assert(server2 === null); - yield server1.close(); + await server1.close(); }); - it('should create success if previous closed by ClusterServer.close', function* () { - const server1 = yield ClusterServer.create('previous-closed', 10002); + it('should create success if previous closed by ClusterServer.close', async function() { + const server1 = await ClusterServer.create('previous-closed', 10002); assert(server1); - yield ClusterServer.close('previous-closed', server1); - const server2 = yield ClusterServer.create('previous-closed', 10002); + await ClusterServer.close('previous-closed', server1); + const server2 = await ClusterServer.create('previous-closed', 10002); assert(server2); - yield ClusterServer.close('previous-closed', server1); + await ClusterServer.close('previous-closed', server1); }); }); diff --git a/test/supports/async_api_client.js b/test/supports/async_api_client.js new file mode 100644 index 0000000..6260989 --- /dev/null +++ b/test/supports/async_api_client.js @@ -0,0 +1,22 @@ +'use strict'; + +const APIClientBase = require('../../').APIClientBase; + +class APIClient extends APIClientBase { + get DataClient() { + return require('./async_data_client'); + } + + get clusterOptions() { + return { + name: 'api_client_test', + responseTimeout: 100, + }; + } + + async echo(str) { + return await this._client.echo(str); + } +} + +module.exports = APIClient; diff --git a/test/supports/async_data_client.js b/test/supports/async_data_client.js new file mode 100644 index 0000000..25814bc --- /dev/null +++ b/test/supports/async_data_client.js @@ -0,0 +1,21 @@ +'use strict'; + +const Base = require('sdk-base'); +const sleep = require('mz-modules/sleep'); + +class DataClient extends Base { + constructor() { + super({ initMethod: '_init' }); + } + + async _init() { + await sleep(5000); + } + + async echo(str) { + await sleep(10); + return str; + } +} + +module.exports = DataClient; diff --git a/test/supports/cluster_server.js b/test/supports/cluster_server.js index fd9f97f..c3e157d 100644 --- a/test/supports/cluster_server.js +++ b/test/supports/cluster_server.js @@ -3,9 +3,11 @@ const cluster = require('cluster'); const http = require('http'); const net = require('net'); -const numCPUs = require('os').cpus().length; +let numCPUs = require('os').cpus().length; const APIClientBase = require('../..').APIClientBase; +if (numCPUs <= 1) numCPUs = 2; + function startServer(port) { class TestClient extends APIClientBase { get DataClient() { @@ -49,6 +51,7 @@ function startServer(port) { if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); + const workerSet = new Set(); // Fork workers. for (let i = 0; i < numCPUs; i++) { @@ -58,13 +61,23 @@ function startServer(port) { cluster.on('exit', (worker, code, signal) => { console.log(`worker ${worker.process.pid} died, code: ${code}, signal: ${signal}`); }); - setTimeout(() => { - process.exit(0); - }, 2000); + cluster.on('message', worker => { + workerSet.add(worker.id); + if (workerSet.size === numCPUs) { + process.exit(0); + } + }); + // setTimeout(() => { + // process.exit(0); + // }, 10000); } else { const client = new TestClient(); - client.ready(() => { - console.log(`Worker ${process.pid} client ready, leader: ${client.isClusterClientLeader}`); + client.ready(err => { + if (err) { + console.log(`Worker ${process.pid} client ready failed, leader: ${client.isClusterClientLeader}, errMsg: ${err.message}`); + } else { + console.log(`Worker ${process.pid} client ready, leader: ${client.isClusterClientLeader}`); + } }); let latestVal; client.subscribe({ key: 'foo' }, val => { @@ -76,6 +89,10 @@ function startServer(port) { client.publish({ key: 'foo', value: 'bar ' + Date() }); }, 200); + setTimeout(() => { + process.send(cluster.worker.id); + }, 5000); + // Workers can share any TCP connection // In this case it is an HTTP server http.createServer((req, res) => { diff --git a/test/utils.test.js b/test/utils.test.js index 6ce5eeb..bf39158 100644 --- a/test/utils.test.js +++ b/test/utils.test.js @@ -15,17 +15,17 @@ describe('test/utils.test.js', () => { assert(utils.nextId() === 1); }); - it('should callFn ok', function* () { - yield utils.callFn(null); - const ret = yield utils.callFn(function* (a, b) { + it('should callFn ok', async function() { + await utils.callFn(null); + const ret = await utils.callFn(function* (a, b) { return a + b; }, [ 1, 2 ]); assert(ret === 3); - yield utils.callFn(function(a, b) { + await utils.callFn(function(a, b) { return Promise.resolve(a + b); }, [ 1, 2 ]); assert(ret === 3); - yield utils.callFn(function(a, b) { + await utils.callFn(function(a, b) { return a + b; }, [ 1, 2 ]); assert(ret === 3);