Skip to content

Commit

Permalink
Merge 47039a4 into 0a41fdc
Browse files Browse the repository at this point in the history
  • Loading branch information
tbence94 committed Oct 28, 2019
2 parents 0a41fdc + 47039a4 commit 7da6ba9
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 50 deletions.
12 changes: 12 additions & 0 deletions protocol/test-package.proto
@@ -0,0 +1,12 @@
syntax = "proto2";

package testNamespaceWithSameMethods;

service TestService {
rpc Upper (TextMessage) returns (TextMessage) {}
rpc lower (TextMessage) returns (TextMessage) {}
}

message TextMessage {
required string text = 1;
}
2 changes: 1 addition & 1 deletion protocol/test.proto
@@ -1,4 +1,4 @@
syntax = "proto3";
syntax = "proto2";

message Empty {}

Expand Down
31 changes: 22 additions & 9 deletions src/client.ts
Expand Up @@ -38,7 +38,7 @@ import * as protobuf from 'protobufjs/minimal'
import {VError} from 'verror'
import * as WebSocket from 'ws'
import * as RPC from '../protocol/rpc'
import {waitForEvent} from './utils'
import {getFullName, lookupServices, waitForEvent} from './utils'

export let WS = WebSocket

Expand Down Expand Up @@ -108,17 +108,19 @@ export interface IClientEvents {
* ----------
* Can be used in both node.js and the browser. Also see {@link IClientOptions}.
*/
export class Client<T extends protobuf.rpc.Service> extends EventEmitter implements IClientEvents {
export class Client extends EventEmitter implements IClientEvents {

/**
* Client options, *readonly*.
*/
public readonly options: IClientOptions

/**
* The protobuf service instance which holds all the rpc methods defined in your protocol.
* Protobuf rpc service instances.
*/
public readonly service: T
public readonly services: {[name: string]: any} = {}

public readonly root: protobuf.Root

private active: boolean = false
private address: string
Expand All @@ -133,15 +135,22 @@ export class Client<T extends protobuf.rpc.Service> extends EventEmitter impleme

/**
* @param address The address to the {@link Server}, eg `ws://example.com:8042`.
* @param service The protocol buffer service class to use, an instance of this
* @param root The protocol buffer service class to use, an instance of this
* will be available as {@link Client.service}.
*/
constructor(address: string, service: {create(rpcImpl: protobuf.RPCImpl): T}, options: IClientOptions = {}) {
constructor(address: string, root: protobuf.Root, options: IClientOptions = {}) {
super()

this.address = address
this.options = options
this.service = service.create(this.rpcImpl)
this.root = root

const services = lookupServices(this.root)
services.forEach((serviceName) => {
const service = this.root.lookupService(serviceName)
const rpcService: protobuf.rpc.Service = service.create(this.rpcImpl)
this.services[serviceName] = rpcService
})

this.eventTypes = options.eventTypes || {}
this.backoff = options.backoff || defaultBackoff
Expand All @@ -153,6 +162,10 @@ export class Client<T extends protobuf.rpc.Service> extends EventEmitter impleme
}
}

public service<T extends protobuf.rpc.Service>(serviceName: string): T {
return this.services[serviceName]
}

/**
* Return `true` if the client is connected, otherwise `false`.
*/
Expand Down Expand Up @@ -234,13 +247,13 @@ export class Client<T extends protobuf.rpc.Service> extends EventEmitter impleme
this.flushMessageBuffer().catch(this.errorHandler)
}

private rpcImpl: protobuf.RPCImpl = (method, requestData, callback) => {
private rpcImpl: protobuf.RPCImpl = (method: any, requestData, callback) => {
const seq = this.nextSeq
this.nextSeq = (this.nextSeq + 1) & 0xffff

const message: RPC.IMessage = {
request: {
method: method.name,
method: getFullName(method),
payload: requestData,
seq,
},
Expand Down
39 changes: 21 additions & 18 deletions src/server.ts
Expand Up @@ -38,7 +38,7 @@ import * as protobuf from 'protobufjs/minimal'
import {VError} from 'verror'
import * as WebSocket from 'ws'
import * as RPC from './../protocol/rpc'
import {waitForEvent} from './utils'
import {getFullName} from './utils'

/**
* RPC Server options
Expand Down Expand Up @@ -84,9 +84,9 @@ export class Server extends EventEmitter implements IServerEvents {
public readonly options: IServerOptions

/**
* The protobuf Service instance, internal.
* The protobuf Root instance, internal.
*/
public readonly service: protobuf.Service
public readonly root: protobuf.Root

/**
* The underlying uWebSocket server, internal.
Expand All @@ -97,13 +97,13 @@ export class Server extends EventEmitter implements IServerEvents {
private pingInterval: number

/**
* @param service The protocol buffer service class to serve.
* @param root The protocol buffer root which contains the services to serve.
* @param options Options, see {@link IServerOptions}.
*/
constructor(service: protobuf.Service, options: IServerOptions = {}) {
constructor(root: protobuf.Root, options: IServerOptions = {}) {
super()

this.service = service
this.root = root
this.options = options

options.clientTracking = false
Expand All @@ -126,17 +126,19 @@ export class Server extends EventEmitter implements IServerEvents {
* Implement a RPC method defined in the protobuf service.
*/
public implement(method: protobuf.Method|string, handler: Handler) {
if (typeof method === 'string') {
const methodName = method[0].toUpperCase() + method.substring(1)
method = this.service.methods[methodName]
if (!method) {
throw new Error('Invalid method')
}
} else if (this.service.methodsArray.indexOf(method) === -1) {
if (typeof method !== 'string') {
method = getFullName(method)
}

const result: any = this.root.lookup(method)
if (!result || !(result instanceof protobuf.Method)) {
throw new Error('Invalid method')
}
method = result
const fullMethodName = getFullName(method)

method.resolve()
this.handlers[method.name] = handler
this.handlers[fullMethodName] = handler
}

/**
Expand Down Expand Up @@ -238,14 +240,15 @@ export class Connection extends EventEmitter {
}

private async requestHandler(request: RPC.Request): Promise<RPC.Response> {
const methodName = request.method[0].toUpperCase() + request.method.substring(1)
const fullMethodName = request.method

const method = this.server.service.methods[methodName]
if (!method) {
const result = this.server.root.lookup(fullMethodName)
if (!result || !(result instanceof protobuf.Method)) {
throw new Error('Invalid method')
}
const method: protobuf.Method = result

const impl = this.server.handlers[methodName]
const impl = this.server.handlers[fullMethodName]
if (!impl) {
throw new Error('Not implemented')
}
Expand Down
44 changes: 44 additions & 0 deletions src/utils.ts
Expand Up @@ -34,6 +34,7 @@
*/

import {EventEmitter} from 'events'
import {Namespace, ReflectionObject, Service} from 'protobufjs'

/**
* Return a promise that will resove when a specific event is emitted.
Expand All @@ -43,3 +44,46 @@ export function waitForEvent<T>(emitter: EventEmitter, eventName: string|symbol)
emitter.once(eventName, resolve)
})
}

/**
* Resolve full name of protobuf objects.
* This helps to distinguish services or methods with the same name but in different packages/namespaces.
*
* Example returns:
* 'packageName.serviceName.methodName'
* 'differentPackageName.serviceName.methodName'
*/
export function getFullName(obj: ReflectionObject, names: string[] = []): string {
if (obj.name) {
names.unshift(obj.name)
}

if (obj.parent) {
return getFullName(obj.parent, names)
}

return names.join('.')
}

/**
* Get all protobuf.Service in a protobuf.ReflectionObject.
* returns with an array of fully namespaced services.
*
* Example return:
* ['packageName.serviceName.methodName', 'differentPackageName.serviceName.methodName']
*/
export function lookupServices(obj: ReflectionObject): string[] {
const services: string[] = []

if (obj instanceof Service) {
services.push(getFullName(obj))
}

if (obj instanceof Namespace) {
obj.nestedArray.forEach((nestedObject: ReflectionObject) => {
services.push(...lookupServices(nestedObject))
})
}

return services
}

0 comments on commit 7da6ba9

Please sign in to comment.