Skip to content
This repository has been archived by the owner on Sep 5, 2022. It is now read-only.

Commit

Permalink
Interceptors signature was changed
Browse files Browse the repository at this point in the history
  • Loading branch information
litichevskiydv committed Dec 1, 2019
1 parent 9b7a42f commit c95b250
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 141 deletions.
41 changes: 18 additions & 23 deletions src/grpcHostBuilder.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
const { Server, ServerCredentials } = require("grpc");

const { createLogger } = require("./logging/defaultLoggersFactory");
const ExceptionsHandler = require("./interceptors/exceptionsHandler");
const ContextsInitializer = require("./interceptors/contextsInitializer");
const exceptionsHandler = require("./implementationsWrappers/exceptionsHandler");
const rxToServerWritableStream = require("./implementationsWrappers/rxToServerWritableStream");

module.exports = class GrpcHostBuilder {
/**
Expand All @@ -13,15 +14,15 @@ module.exports = class GrpcHostBuilder {
this._interceptorsDefinitions = [];
this._servicesDefinitions = [];
this._methodsImplementationsWrappers = new Map()
.set("unary", require("./implementationsWrappers/unaryCall"))
.set("client_stream", require("./implementationsWrappers/ingoingStreamingCall"))
.set("server_stream", require("./implementationsWrappers/outgoingStreamingCall"))
.set("bidi", require("./implementationsWrappers/bidirectionalStreamingCall"));
.set("unary", require("./implementationsWrappers/callsFinalizers/unaryCall"))
.set("client_stream", require("./implementationsWrappers/callsFinalizers/ingoingStreamingCall"))
.set("server_stream", require("./implementationsWrappers/callsFinalizers/outgoingStreamingCall"))
.set("bidi", require("./implementationsWrappers/callsFinalizers/bidirectionalStreamingCall"));

this._server = new Server(options);
this._serverContext = { createLogger };

this.addInterceptor(ExceptionsHandler).addInterceptor(ContextsInitializer);
this.addInterceptor(ContextsInitializer);
}

/**
Expand All @@ -40,20 +41,13 @@ module.exports = class GrpcHostBuilder {
*/
addInterceptor(interceptor, ...interceptorArguments) {
if (interceptor.prototype && typeof interceptor.prototype.invoke === "function")
return this.addInterceptor(
async (call, methodDefinition, callback, next) =>
await new interceptor(this._serverContext, ...interceptorArguments).invoke(
call,
methodDefinition,
callback,
next
)
return this.addInterceptor(async (call, methodDefinition, next) =>
new interceptor(this._serverContext, ...interceptorArguments).invoke(call, methodDefinition, next)
);

this._interceptorsDefinitions.push({
index: this._index++,
interceptor: (call, methodDefinition, callback, next) =>
interceptor(call, methodDefinition, callback, next, ...interceptorArguments)
interceptor: (call, methodDefinition, next) => interceptor(call, methodDefinition, next, ...interceptorArguments)
});
return this;
}
Expand Down Expand Up @@ -89,19 +83,21 @@ module.exports = class GrpcHostBuilder {
if (methodImplementation === undefined) throw new Error(`Method ${methodDefinition.path} is not implemented`);
methodImplementation = methodImplementation.bind(serviceImplementation);

const methodType = GrpcHostBuilder._getMethodType(methodDefinition);
let serviceCallHandler = this._methodsImplementationsWrappers.get(methodType)(methodImplementation);
let serviceCallHandler = methodImplementation;
if (methodDefinition.responseStream) serviceCallHandler = rxToServerWritableStream(serviceCallHandler);

for (let i = this._interceptorsDefinitions.length - 1; i > -1; i--) {
const interceptorDefinition = this._interceptorsDefinitions[i];
if (interceptorDefinition.index > serviceIndex) continue;

const next = serviceCallHandler;
serviceCallHandler = async (call, callback) =>
await interceptorDefinition.interceptor(call, methodDefinition, callback, next);
serviceCallHandler = async call => interceptorDefinition.interceptor(call, methodDefinition, next);
}

return serviceCallHandler;
const methodType = GrpcHostBuilder._getMethodType(methodDefinition);
serviceCallHandler = this._methodsImplementationsWrappers.get(methodType)(serviceCallHandler);

return exceptionsHandler(methodDefinition, serviceCallHandler, this._serverContext.createLogger());
}

_addServices() {
Expand Down Expand Up @@ -136,10 +132,9 @@ module.exports = class GrpcHostBuilder {
* @callback interceptorFunction
* @param {*} call Server call.
* @param {*} methodDefinition Metadata for method implementation.
* @param {*} callback gRPC server callback.
* @param {*} next Next layers executor.
* @param {...any} arguments Additional interceptor arguments that were passed during registration.
* @returns {void}
* @returns {Promise<any>}
*/

/**
Expand Down
21 changes: 0 additions & 21 deletions src/implementationsWrappers/bidirectionalStreamingCall.js

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const { streamToRx } = require("rxjs-stream");

module.exports = function(handler) {
return async call => {
call.source = streamToRx(call);
await handler(call);
call.end();
};
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
const { streamToRx } = require("rxjs-stream");

module.exports = function(handler) {
return async (call, callback) => {
call.source = streamToRx(call);
callback(null, await handler(call));
};
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module.exports = function(handler) {
return async call => {
await handler(call);
call.end();
};
};
3 changes: 3 additions & 0 deletions src/implementationsWrappers/callsFinalizers/unaryCall.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module.exports = function(handler) {
return async (call, callback) => callback(null, await handler(call));
};
33 changes: 33 additions & 0 deletions src/implementationsWrappers/exceptionsHandler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const grpc = require("grpc");
const GRPCError = require("grpc-error");

/**
* @param {Error} error
* @returns {GRPCError}
*/
const createGrpcError = error => {
const stackTrace = error.stack.replace(/\r?\n|\r/g, " ");
return /^[ -~]*$/.test(stackTrace)
? new GRPCError(error, grpc.status.INTERNAL, { stackTrace })
: new GRPCError(error, grpc.status.INTERNAL);
};

module.exports = function(methodDefinition, handler, logger) {
return async (call, callback) => {
try {
await handler(call, callback);
} catch (error) {
let grpcError = error;
if (error instanceof GRPCError === false && error.constructor.toString() !== GRPCError.toString()) {
logger.error("Unhandled exception has occurred in method {methodName}", {
error,
methodName: methodDefinition.path
});
grpcError = createGrpcError(error);
}

if (callback) callback(grpcError);
else call.emit("error", grpcError);
}
};
};
19 changes: 0 additions & 19 deletions src/implementationsWrappers/ingoingStreamingCall.js

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
const { fromEvent } = require("rxjs");
const { takeUntil, catchError } = require("rxjs/operators");

module.exports = function(methodImplementation) {
module.exports = function(handler) {
return async call => {
const result = await methodImplementation(call);
const result = await handler(call);
await result
.pipe(
takeUntil(fromEvent(call, "cancelled")),
Expand All @@ -12,7 +12,5 @@ module.exports = function(methodImplementation) {
})
)
.forEach(message => call.write(message));

call.end();
};
};
3 changes: 0 additions & 3 deletions src/implementationsWrappers/unaryCall.js

This file was deleted.

24 changes: 6 additions & 18 deletions src/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,15 @@ export class GrpcHostBuilder {
/**
* @param call Server call.
* @param methodDefinition Metadata for method implementation.
* @param callback gRPC server callback.
* @param next Next layers executor.
* @param arguments Interceptor additional arguments.
*/
interceptor: (
call: ServiceCall,
methodDefinition: MethodDefinition<any, any>,
callback: sendUnaryData<any> | null,
next: handleServiceCall<any, any>,
...arguments: any[]
) => Promise<void>,
) => Promise<any>,
...interceptorArguments: any[]
): GrpcHostBuilder;
/**
Expand Down Expand Up @@ -88,24 +86,16 @@ type ServiceCall =
| ServerReadableStream<any>
| ServerWriteableStream<any>
| ServerDuplexStream<any, any>;
type sendUnaryData<ResponseType> = (
error: ServiceError | null,
value: ResponseType | null,
trailer?: Metadata,
flags?: number
) => void;

type handleServiceCall<RequestType, ResponseType> =
| handleUnaryCall<RequestType, ResponseType>
| handleClientStreamingCall<RequestType, ResponseType>
| handleServerStreamingCall<RequestType, ResponseType>
| handleBidiStreamingCall<RequestType, ResponseType>;
type handleUnaryCall<RequestType, ResponseType> = (call: ServerUnaryCall<RequestType>, callback: sendUnaryData<ResponseType>) => Promise<void>; // prettier-ignore
type handleClientStreamingCall<RequestType, ResponseType> = (call: ServerReadableStream<RequestType>, callback: sendUnaryData<ResponseType>) => Promise<void>; // prettier-ignore
type handleUnaryCall<RequestType, ResponseType> = (call: ServerUnaryCall<RequestType>) => Promise<ResponseType>;
type handleClientStreamingCall<RequestType, ResponseType> = (call: ServerReadableStream<RequestType>) => Promise<ResponseType>; // prettier-ignore
type handleServerStreamingCall<RequestType, ResponseType> = (call: ServerWriteableStream<RequestType>) => Promise<void>;
type handleBidiStreamingCall<RequestType, ResponseType> = (
call: ServerDuplexStream<RequestType, ResponseType>
) => Promise<void>;
type handleBidiStreamingCall<RequestType, ResponseType> = (call: ServerDuplexStream<RequestType, ResponseType>) => Promise<void>; // prettier-ignore

/**
* Used for calls that are streaming from the client side.
Expand Down Expand Up @@ -211,7 +201,7 @@ type serviceMethodImplementation<RequestType, ResponseType> =
type serviceUnaryMethodImplementation<RequestType, ResponseType> = (
call: ServerUnaryCall<RequestType>
) => Promise<ResponseType>;
type serviceClientStreamingMethodImplementation<RequestType, ResponseType> = (call: ServerIngoingStreamingCall<RequestType>) => Promise<Observable<ResponseType> | ResponseType>; // prettier-ignore
type serviceClientStreamingMethodImplementation<RequestType, ResponseType> = (call: ServerIngoingStreamingCall<RequestType>) => Promise<ResponseType>; // prettier-ignore
type serviceServerStreamingMethodImplementation<RequestType, ResponseType> = (call: ServerOutgoingStreamingCall<RequestType>) => Promise<Observable<ResponseType>>; // prettier-ignore
type serviceBidiStreamingMethodImplementation<RequestType, ResponseType> = (call: ServerBidiStreamingCall<RequestType>) => Promise<Observable<ResponseType>>; // prettier-ignore
type UntypedServiceImplementation = { [name: string]: serviceMethodImplementation<any, any> };
Expand All @@ -221,15 +211,13 @@ interface IInterceptor {
* Interceptor implementation.
* @param call Server call.
* @param methodDefinition Metadata for method implementation.
* @param callback gRPC server callback.
* @param next Next layers executor.
*/
invoke(
call: ServiceCall,
methodDefinition: MethodDefinition<any, any>,
callback: sendUnaryData<any> | null,
next: handleServiceCall<any, any>
): Promise<void>;
): Promise<any>;
}

declare namespace Logging {
Expand Down
4 changes: 2 additions & 2 deletions src/interceptors/contextsInitializer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const processingContext = require("processing-context");

module.exports = async function(call, methodDefinition, callback, next) {
module.exports = async function(call, methodDefinition, next) {
processingContext.create();
await next(call, callback);
return next(call);
};
33 changes: 0 additions & 33 deletions src/interceptors/exceptionsHandler.js

This file was deleted.

Loading

0 comments on commit c95b250

Please sign in to comment.