Skip to content

Commit

Permalink
feat: add tips for tcp-base (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengmk2 committed Aug 30, 2023
1 parent e8b3bc6 commit f3068e0
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 41 deletions.
10 changes: 8 additions & 2 deletions lib/follower.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class Follower extends Base {
id: req.id,
oneway: true,
data: req.encode(),
tips: `subscribe key: ${key}`,
});
this._subInfo.add(key);
} else if (this._subData.has(key)) {
Expand Down Expand Up @@ -189,6 +190,7 @@ class Follower extends Base {
id: req.id,
oneway: true,
data: req.encode(),
tips: `unSubscribe key ${key}`,
});
}
}
Expand Down Expand Up @@ -235,14 +237,16 @@ class Follower extends Base {
id: req.id,
oneway,
data: req.encode(),
tips: `invoke method:${method}, oneway:${oneway}, argLength:${argLength}`,
}, callback);
}

_registerChannel() {
const channelName = this.options.name;
const req = new Request({
connObj: {
type: 'register_channel',
channelName: this.options.name,
channelName,
},
timeout: this.options.responseTimeout,
});
Expand All @@ -251,11 +255,12 @@ class Follower extends Base {
id: req.id,
oneway: false,
data: req.encode(),
tips: `register_channel channelName:${channelName}`,
}, (err, data) => {
if (err) {
// if socket alive, do retry
if (this._socket) {
err.message = `register to channel: ${this.options.name} failed, will retry after 3s, ${err.message}`;
err.message = `register to channel: ${channelName} failed, will retry after 3s, ${err.message}`;
this.logger.warn(err);
// if exception, retry after 3s
setTimeout(() => this._registerChannel(), 3000);
Expand Down Expand Up @@ -303,6 +308,7 @@ class Follower extends Base {
id: req.id,
oneway: true,
data: res.encode(),
tips: `subscribe_result_res key:${connObj.key}`,
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"long": "^4.0.0",
"sdk-base": "^3.5.1",
"serialize-json": "^1.0.3",
"tcp-base": "^3.1.0",
"tcp-base": "^3.2.0",
"utility": "^1.15.0"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion test/cluster.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe('test/cluster.test.js', () => {
if (err) {
commit(err);
}
console.log(meta);
console.log(meta.stdout);
console.log('publish finish');
});

Expand Down
17 changes: 4 additions & 13 deletions test/index.test.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
'use strict';

const spy = require('spy');
const net = require('net');
const URL = require('url');
const mm = require('egg-mock');
const cluster = require('../');
const is = require('is-type-of');
Expand Down Expand Up @@ -470,16 +467,10 @@ describe('test/index.test.js', () => {
const port = 5550 + portDelta;
const transcode = {
encode(urls) {
if (Array.isArray(urls)) {
return Buffer.from(JSON.stringify(urls.map(url => url.href)));
}
return Buffer.from(JSON.stringify(urls));
},
decode(buf) {
const arr = JSON.parse(buf);
if (Array.isArray(arr)) {
return arr.map(url => URL.parse(url, true));
}
return arr;
},
};
Expand All @@ -492,10 +483,10 @@ describe('test/index.test.js', () => {
leader.subscribe({
dataId: 'com.alibaba.dubbo.demo.DemoService',
}, val => {
console.log('leader', val.map(item => item.host));
console.log('leader', val);
assert(val && val.length > 0);
if (val.length === 2) {
assert(val.every(url => url instanceof URL.Url));
// assert(val.every(url => url instanceof URL.Url));
assert(val.some(url => url.host === '30.20.78.299:20880'));
assert(val.some(url => url.host === '30.20.78.300:20880'));
done();
Expand All @@ -505,10 +496,10 @@ describe('test/index.test.js', () => {
follower.subscribe({
dataId: 'com.alibaba.dubbo.demo.DemoService',
}, val => {
console.log('follower', val.map(item => item.host));
console.log('follower', val, val.map(item => item.host));
assert(val && val.length > 0);
if (val.length === 2) {
assert(val.every(url => url instanceof URL.Url));
// assert(val.every(url => url instanceof URL.Url));
assert(val.some(url => url.host === '30.20.78.299:20880'));
assert(val.some(url => url.host === '30.20.78.300:20880'));
done();
Expand Down
44 changes: 20 additions & 24 deletions test/supports/registry_client.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
'use strict';

const dgram = require('dgram');
const { format: URLFormat } = require('url');
const Base = require('sdk-base');
const URL = require('url');

const pid = process.pid;
const localIp = require('address').ip();
Expand All @@ -25,21 +23,20 @@ class RegistryClient extends Base {
this._socket.on('error', err => this.emit('error', err));
this._socket.on('message', buf => {
const msg = buf.toString();
// console.log(`socket got: ${msg} from ${rinfo.address}:${rinfo.port}`);

if (msg.startsWith('register ')) {
const url = msg.substring(9);
const parsed = URL.parse(url, true);
const key = parsed.query.interface;
const parsed = new URL(url);
const key = parsed.searchParams.get('interface');
if (this._subscribed.has(key)) {
const subData = this._subscribed.get(key);
const category = parsed.query.category || 'providers';
const category = parsed.searchParams.get('category') || 'providers';
// const enabled = parsed.query.enabled || true;

if (subData.urlObj.query.category.split(',').indexOf(category) >= 0) {
subData.value = subData.value || new Map();
if (!subData.value.has(parsed.host)) {
subData.value.set(parsed.host, parsed);
subData.value.set(parsed.host, { host: parsed.host });
this.emit(key, Array.from(subData.value.values()));
}
}
Expand All @@ -48,17 +45,17 @@ class RegistryClient extends Base {
// TODO:
} else if (msg.startsWith('subscribe ')) {
const consumerUrl = msg.substring(10);
const parsed = URL.parse(consumerUrl, true);
const key = parsed.query.interface;
const parsed = new URL(consumerUrl);
const key = parsed.searchParams.get('interface');

if (this._registered.has(key)) {
const urls = this._registered.get(key);

for (const url of urls) {
const obj = URL.parse(url, true);
const category = obj.query.category || 'providers';
const obj = new URL(url);
const category = obj.searchParams.get('category') || 'providers';
// const enabled = obj.query.enabled || true;
if (parsed.query.category.split(',').indexOf(category) >= 0) {
if (parsed.searchParams.get('category').split(',').indexOf(category) >= 0) {
this._broadcast(`register ${url}`);
}
}
Expand Down Expand Up @@ -137,7 +134,7 @@ class RegistryClient extends Base {
},
pathname: `/${key}`,
};
this._broadcast(`register ${URL.format(urlObj)}`);
this._broadcast(`register ${URLFormat(urlObj)}`);

urlObj.query = {
application: 'demo-consumer',
Expand All @@ -151,7 +148,7 @@ class RegistryClient extends Base {
side: 'consumer',
timestamp: Date.now(),
};
this._broadcast(`subscribe ${URL.format(urlObj)}`);
this._broadcast(`subscribe ${URLFormat(urlObj)}`);

this._subscribed.set(key, {
urlObj,
Expand All @@ -172,22 +169,21 @@ class RegistryClient extends Base {
publish(reg) {
// register dubbo://30.20.78.300:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=81281&side=provider&timestamp=1481613276143
this._broadcast(`register ${reg.publishData}`);
const parsed = URL.parse(reg.publishData, true);
const key = parsed.query.interface;
const urlObject = new URL(reg.publishData);
const key = urlObject.searchParams.get('interface');

if (this._registered.has(key)) {
this._registered.get(key).push(reg.publishData);
} else {
this._registered.set(key, [ reg.publishData ]);
}

parsed.protocol = 'provider:';
parsed.search = null;
parsed.query = Object.assign({}, parsed.query, {
category: 'configurators',
check: false,
});
const providerUrl = URL.format(parsed);
urlObject.protocol = 'provider:';
urlObject.search = null;
urlObject.searchParams.set('category', 'configurators');
urlObject.searchParams.set('check', 'fase');
const providerUrl = urlObject.toString();
console.log(providerUrl);
this._broadcast(`subscribe ${providerUrl}`);
}

Expand Down

0 comments on commit f3068e0

Please sign in to comment.