diff --git a/protocol/test-package.proto b/protocol/test-package.proto new file mode 100644 index 0000000..9e8c903 --- /dev/null +++ b/protocol/test-package.proto @@ -0,0 +1,12 @@ +syntax = "proto2"; + +package testNamespaceWithSameMethods; + +service TestService { + rpc Upper (TextMessage) returns (TextMessage) {} + rpc lower (TextMessage) returns (TextMessage) {} +} + +message TextMessage { + required string text = 1; +} diff --git a/protocol/test.proto b/protocol/test.proto index 38e8fa1..5e56b66 100644 --- a/protocol/test.proto +++ b/protocol/test.proto @@ -1,4 +1,4 @@ -syntax = "proto3"; +syntax = "proto2"; message Empty {} diff --git a/src/client.ts b/src/client.ts index f9ab701..399c8c4 100644 --- a/src/client.ts +++ b/src/client.ts @@ -38,7 +38,7 @@ import * as protobuf from 'protobufjs/minimal' import {VError} from 'verror' import * as WebSocket from 'ws' import * as RPC from '../protocol/rpc' -import {waitForEvent} from './utils' +import {getFullName, lookupServices, waitForEvent} from './utils' export let WS = WebSocket @@ -108,7 +108,7 @@ export interface IClientEvents { * ---------- * Can be used in both node.js and the browser. Also see {@link IClientOptions}. */ -export class Client extends EventEmitter implements IClientEvents { +export class Client extends EventEmitter implements IClientEvents { /** * Client options, *readonly*. @@ -116,9 +116,11 @@ export class Client extends EventEmitter impleme public readonly options: IClientOptions /** - * The protobuf service instance which holds all the rpc methods defined in your protocol. + * Protobuf rpc service instances. */ - public readonly service: T + public readonly services: {[name: string]: any} = {} + + public readonly root: protobuf.Root private active: boolean = false private address: string @@ -133,15 +135,22 @@ export class Client extends EventEmitter impleme /** * @param address The address to the {@link Server}, eg `ws://example.com:8042`. - * @param service The protocol buffer service class to use, an instance of this + * @param root The protocol buffer service class to use, an instance of this * will be available as {@link Client.service}. */ - constructor(address: string, service: {create(rpcImpl: protobuf.RPCImpl): T}, options: IClientOptions = {}) { + constructor(address: string, root: protobuf.Root, options: IClientOptions = {}) { super() this.address = address this.options = options - this.service = service.create(this.rpcImpl) + this.root = root + + const services = lookupServices(this.root) + services.forEach((serviceName) => { + const service = this.root.lookupService(serviceName) + const rpcService: protobuf.rpc.Service = service.create(this.rpcImpl) + this.services[serviceName] = rpcService + }) this.eventTypes = options.eventTypes || {} this.backoff = options.backoff || defaultBackoff @@ -153,6 +162,10 @@ export class Client extends EventEmitter impleme } } + public service(serviceName: string): T { + return this.services[serviceName] + } + /** * Return `true` if the client is connected, otherwise `false`. */ @@ -234,13 +247,13 @@ export class Client extends EventEmitter impleme this.flushMessageBuffer().catch(this.errorHandler) } - private rpcImpl: protobuf.RPCImpl = (method, requestData, callback) => { + private rpcImpl: protobuf.RPCImpl = (method: any, requestData, callback) => { const seq = this.nextSeq this.nextSeq = (this.nextSeq + 1) & 0xffff const message: RPC.IMessage = { request: { - method: method.name, + method: getFullName(method), payload: requestData, seq, }, diff --git a/src/server.ts b/src/server.ts index f0a4dc2..06b9245 100644 --- a/src/server.ts +++ b/src/server.ts @@ -38,7 +38,7 @@ import * as protobuf from 'protobufjs/minimal' import {VError} from 'verror' import * as WebSocket from 'ws' import * as RPC from './../protocol/rpc' -import {waitForEvent} from './utils' +import {getFullName} from './utils' /** * RPC Server options @@ -84,9 +84,9 @@ export class Server extends EventEmitter implements IServerEvents { public readonly options: IServerOptions /** - * The protobuf Service instance, internal. + * The protobuf Root instance, internal. */ - public readonly service: protobuf.Service + public readonly root: protobuf.Root /** * The underlying uWebSocket server, internal. @@ -97,13 +97,13 @@ export class Server extends EventEmitter implements IServerEvents { private pingInterval: number /** - * @param service The protocol buffer service class to serve. + * @param root The protocol buffer root which contains the services to serve. * @param options Options, see {@link IServerOptions}. */ - constructor(service: protobuf.Service, options: IServerOptions = {}) { + constructor(root: protobuf.Root, options: IServerOptions = {}) { super() - this.service = service + this.root = root this.options = options options.clientTracking = false @@ -126,17 +126,19 @@ export class Server extends EventEmitter implements IServerEvents { * Implement a RPC method defined in the protobuf service. */ public implement(method: protobuf.Method|string, handler: Handler) { - if (typeof method === 'string') { - const methodName = method[0].toUpperCase() + method.substring(1) - method = this.service.methods[methodName] - if (!method) { - throw new Error('Invalid method') - } - } else if (this.service.methodsArray.indexOf(method) === -1) { + if (typeof method !== 'string') { + method = getFullName(method) + } + + const result: any = this.root.lookup(method) + if (!result || !(result instanceof protobuf.Method)) { throw new Error('Invalid method') } + method = result + const fullMethodName = getFullName(method) + method.resolve() - this.handlers[method.name] = handler + this.handlers[fullMethodName] = handler } /** @@ -238,14 +240,15 @@ export class Connection extends EventEmitter { } private async requestHandler(request: RPC.Request): Promise { - const methodName = request.method[0].toUpperCase() + request.method.substring(1) + const fullMethodName = request.method - const method = this.server.service.methods[methodName] - if (!method) { + const result = this.server.root.lookup(fullMethodName) + if (!result || !(result instanceof protobuf.Method)) { throw new Error('Invalid method') } + const method: protobuf.Method = result - const impl = this.server.handlers[methodName] + const impl = this.server.handlers[fullMethodName] if (!impl) { throw new Error('Not implemented') } diff --git a/src/utils.ts b/src/utils.ts index 4a89c90..7af30a0 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -34,6 +34,7 @@ */ import {EventEmitter} from 'events' +import {Namespace, ReflectionObject, Service} from 'protobufjs' /** * Return a promise that will resove when a specific event is emitted. @@ -43,3 +44,46 @@ export function waitForEvent(emitter: EventEmitter, eventName: string|symbol) emitter.once(eventName, resolve) }) } + +/** + * Resolve full name of protobuf objects. + * This helps to distinguish services or methods with the same name but in different packages/namespaces. + * + * Example returns: + * 'packageName.serviceName.methodName' + * 'differentPackageName.serviceName.methodName' + */ +export function getFullName(obj: ReflectionObject, names: string[] = []): string { + if (obj.name) { + names.unshift(obj.name) + } + + if (obj.parent) { + return getFullName(obj.parent, names) + } + + return names.join('.') +} + +/** + * Get all protobuf.Service in a protobuf.ReflectionObject. + * returns with an array of fully namespaced services. + * + * Example return: + * ['packageName.serviceName.methodName', 'differentPackageName.serviceName.methodName'] + */ +export function lookupServices(obj: ReflectionObject): string[] { + const services: string[] = [] + + if (obj instanceof Service) { + services.push(getFullName(obj)) + } + + if (obj instanceof Namespace) { + obj.nestedArray.forEach((nestedObject: ReflectionObject) => { + services.push(...lookupServices(nestedObject)) + }) + } + + return services +} diff --git a/test/index.ts b/test/index.ts index 39bafe2..e510211 100644 --- a/test/index.ts +++ b/test/index.ts @@ -8,30 +8,53 @@ import * as path from 'path' import * as crypto from 'crypto' import {Server, Client} from './../src' import * as wsrpc_client from './../src/client' -import {waitForEvent} from './../src/utils' +import {waitForEvent, getFullName, lookupServices} from './../src/utils' import {TestService, TextMessage} from './../protocol/test' +import { testNamespaceWithSameMethods } from './../protocol/test-package' import * as rpcproto from './../protocol/rpc' import * as WebSocket from 'ws' const testPort = 1234 const testAddr = `ws://localhost:${ testPort }` const testProtoPath = path.join(__dirname, './../protocol/test.proto') -const testProto = protobuf.loadSync(testProtoPath) +const testPackageProtoPath = path.join(__dirname, './../protocol/test-package.proto') +const testProto = protobuf.loadSync([testProtoPath, testPackageProtoPath]) const serverService = testProto.lookupService('TestService') +const packagedTestService = testProto.lookupService('testNamespaceWithSameMethods.TestService') const serverOpts = { port: testPort, pingInterval: 0.05, } +describe('utils', () => { + it('getFullName works for services', function() { + assert.equal(getFullName(serverService), 'TestService') + assert.equal(getFullName(packagedTestService), 'testNamespaceWithSameMethods.TestService') + }) + + it('getFullName works for methods', function() { + const upperMethod = serverService.methods['Upper'] + const otherUpperMethod = packagedTestService.methods['Upper'] + + assert.equal(getFullName(upperMethod), 'TestService.Upper') + assert.equal(getFullName(otherUpperMethod), 'testNamespaceWithSameMethods.TestService.Upper') + }) + + it('lookupServices works', function() { + const services = lookupServices(testProto) + assert.deepEqual(services, ['TestService', 'testNamespaceWithSameMethods.TestService']) + }) +}) + describe('rpc', () => { let planError = false let unplannedError = false - let server = new Server(serverService, serverOpts) + let server = new Server(testProto, serverOpts) - server.implement('echo', async (request: TextMessage) => { + server.implement('Echo', async (request: TextMessage) => { if (request.text === 'throw-string') { throw 'You should always trow an error object' } @@ -50,6 +73,14 @@ describe('rpc', () => { }) }) + server.implement(packagedTestService.methods['Upper'], async (request: TextMessage) => { + return { text: 'Upper: ' + request.text.toUpperCase() } + }) + + server.implement('testNamespaceWithSameMethods.TestService.lower', async (request: TextMessage) => { + return { text: 'lower: ' + request.text.toLowerCase() } + }) + server.on('error', (error: Error) => { if (planError) { return @@ -59,7 +90,7 @@ describe('rpc', () => { console.warn('unplanned server error', error.message) }) - const client = new Client(testAddr, TestService, { + const client = new Client(testAddr, testProto, { sendTimeout: 100, eventTypes: { 'text': TextMessage @@ -74,6 +105,7 @@ describe('rpc', () => { unplannedError = true console.warn('unplanned client error', error.message) }) + after(async () => await client.disconnect()) it('should throw when implementing invalid method', function() { @@ -87,20 +119,42 @@ describe('rpc', () => { }) it('should run echo rpc method', async function() { - const response = await client.service.echo({text: 'hello world'}) + // @ts-ignore + const response = await client.service('TestService').echo({text: 'hello world'}) assert.equal(response.text, 'hello world') }) it('should run upper rpc method', async function() { this.slow(150) - const response = await client.service.upper({text: 'hello world'}) + + // @ts-ignore + const response = await client.service('TestService').upper({text: 'hello world'}) assert.equal(response.text, 'HELLO WORLD') }) + it('should run upper rpc method in namespaced service', async function() { + // @ts-ignore + const response = await client.service('testNamespaceWithSameMethods.TestService').upper({text: 'hello world'}) + assert.equal(response.text, 'Upper: HELLO WORLD') + }) + + it('should run lower rpc method in namespaced service', async function() { + // @ts-ignore + const response = await client.service('testNamespaceWithSameMethods.TestService').lower({text: 'Hello World'}) + assert.equal(response.text, 'lower: hello world') + }) + + it('should run without @ts-ignore if the type is specified...', async function() { + const service: testNamespaceWithSameMethods.TestService = client.service('testNamespaceWithSameMethods.TestService') + const response = await service.lower({text: 'Hello World'}) + assert.equal(response.text, 'lower: hello world') + }) + it('should handle thrown errors in implementation handler', async function() { planError = true try { - await client.service.echo({text: 'throw'}) + // @ts-ignore + await client.service('TestService').echo({text: 'throw'}) assert(false, 'should not be reached') } catch (error) { assert.equal(error.name, 'RPCError') @@ -110,7 +164,8 @@ describe('rpc', () => { it('should handle thrown strings in implementation handler', async function() { try { - await client.service.echo({text: 'throw-string'}) + // @ts-ignore + await client.service('TestService').echo({text: 'throw-string'}) assert(false, 'should not be reached') } catch (error) { assert.equal(error.name, 'RPCError') @@ -120,7 +175,8 @@ describe('rpc', () => { it('should handle unimplemented methods', async function() { try { - await client.service.notImplemented({}) + // @ts-ignore + await client.service('TestService').notImplemented({}) assert(false, 'should throw') } catch (error) { assert.equal(error.name, 'RPCError') @@ -161,7 +217,6 @@ describe('rpc', () => { }) }) - it('should handle garbled data from client', function(done) { planError = true const c = client as any @@ -218,7 +273,8 @@ describe('rpc', () => { it('should timeout messages', async function() { this.slow(300) - const response = client.service.echo({text: 'foo'}) + // @ts-ignore + const response = client.service('TestService').echo({text: 'foo'}) await client.disconnect() try { await response @@ -231,7 +287,8 @@ describe('rpc', () => { it('should reconnect', async function() { planError = false await client.connect() - const response = await client.service.echo({text: 'baz'}) + // @ts-ignore + const response = await client.service('TestService').echo({text: 'baz'}) assert(response.text, 'baz') }) @@ -244,8 +301,10 @@ describe('rpc', () => { server.connections[0].close() await waitForEvent(client, 'close') - const buzz = client.service.echo({text: 'fizz'}) - const fizz = client.service.echo({text: 'buzz'}) + // @ts-ignore + const buzz = client.service('TestService').echo({text: 'fizz'}) + // @ts-ignore + const fizz = client.service('TestService').echo({text: 'buzz'}) const response = await Promise.all([buzz, fizz]) assert.deepEqual(response.map((msg) => msg.text), ['fizz', 'buzz']) }) @@ -258,14 +317,15 @@ describe('rpc', () => { // force a connection failure to simulate server being down for a bit await client.connect() planError = false - server = new Server(serverService, serverOpts) + server = new Server(testProto, serverOpts) await waitForEvent(client, 'open') }) it('should handle failed writes', async function() { ( client).socket.send = () => { throw new Error('boom') } try { - await client.service.echo({text: 'boom'}) + // @ts-ignore + await client.service('TestService').echo({text: 'boom'}) assert(false, 'should not be reached') } catch (error) { assert.equal(error.message, 'boom') @@ -286,16 +346,16 @@ describe('rpc browser client', function() { // simulated browser test using the ws module let server: Server - let client: Client + let client: Client before(async function() { (wsrpc_client).WS = WebSocket process.title = 'browser' - server = new Server(serverService, serverOpts) - server.implement('echo', async (request: TextMessage) => { + server = new Server(testProto, serverOpts) + server.implement('TestService.Echo', async (request: TextMessage) => { return {text: request.text} }) - client = new Client(testAddr, TestService) + client = new Client(testAddr, testProto) }) after(async function() { @@ -304,7 +364,8 @@ describe('rpc browser client', function() { }) it('should work', async function() { - const response = await client.service.echo({text: 'foo'}) + // @ts-ignore + const response = await client.service('TestService').echo({text: 'foo'}) assert.equal(response.text, 'foo') }) })