Skip to content

Commit

Permalink
Add support for gRPC
Browse files Browse the repository at this point in the history
Implements a gRPC server for the riff MessageFunction service.

- Dynamically loads function.proto to create the service rather than
  using statically generated code. In tests, load time is twice as fast
  using dynamic loading and offers plain JS type support.
- Looks for `Content-Type` and `Accept` headers (case sensitive) to
  support content negotiation supporting:
  - `text/plain`
  - `application/octet-stream`
  - `application/json`
- Maintains compatibility with functions invocable via HTTP

The `error` message header indicates when an invocation failed.
Errors are either client or server errors. Client errors can often
be resovled by using a different Content-Type or Accept header value.

The following error codes are used:
- `error-server-function-invocation`:
  The function being invoked threw an error
- `error-client-content-type-unsupported`:
  The message's Content-Type is not supported
- `error-client-accept-type-unsupported`:
  The server is unable to satisfy the client's acceptable types
- `error-client-unmarshall`:
  An error occured while unmarshall the input message's payload. For
  example, invalid JSON.
- `error-client-marshall`:
  An error occured while marshalling the output message's payload. For
  example, JSON stringifying cyclical object.

While streaming can be added in the future, all invocations are assumed
to be request-reply for now.

Issue: projectriff#12
  • Loading branch information
scothis committed Feb 1, 2018
1 parent cf28e05 commit 8a7f841
Show file tree
Hide file tree
Showing 9 changed files with 1,557 additions and 99 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ FROM node:8.9.4-alpine
WORKDIR /usr/app
COPY . .
RUN npm install --production
EXPOSE 8080
EXPOSE 8080 10382
CMD ["node", "server.js"]
31 changes: 31 additions & 0 deletions lib/function.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";

package function;

message Message {
message HeaderValue {
repeated string values = 1;
}

bytes payload = 1;
map<string, HeaderValue> headers = 2;
}

service MessageFunction {
rpc Call(stream Message) returns (stream Message) {}
}
131 changes: 131 additions & 0 deletions lib/grpc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
const logger = require('util').debuglog('riff');

const grpc = require('grpc');
const path = require('path');

const mediaTypeNegotiator = require('negotiator/lib/mediaType');
const SUPPORTED_MEDIA_TYPES = ['text/plain', 'application/octet-stream', 'application/json']; // In preference order
const mediaTypeMarshallers = {
'application/octet-stream': {
unmarshall: buffer => buffer,
marshall: any => Buffer.from(any)
},
'text/plain': {
unmarshall: buffer => '' + buffer,
marshall: string => Buffer.from('' + string)
},
'application/json': {
unmarshall: buffer => JSON.parse('' + buffer),
marshall: object => Buffer.from(JSON.stringify(object))
}
};

makeServer.proto = grpc.load(path.resolve(__dirname, 'function.proto'));

function makeServer(fn) {
const server = new grpc.Server();

server.addService(makeServer.proto.function.MessageFunction.service, {
call(call) {
call.on('data', async message => {
const { headers, payload } = message;

// TODO case insensitive headers
const contentType = headerValue(headers, 'Content-Type') || 'text/plain';
const accept = headerValue(headers, 'Accept') || 'text/plain';
const correlationId = headerValue(headers, 'correlationId');

try {
const accepted = mediaTypeNegotiator(accept, SUPPORTED_MEDIA_TYPES)[0];

// check MIME type validity before invoking function
const { unmarshall } = mediaTypeMarshallers[contentType] || {};
const { marshall } = mediaTypeMarshallers[accepted] || {};

if (!unmarshall) throw new RiffError('error-client-content-type-unsupported');
if (!marshall) throw new RiffError('error-client-accept-type-unsupported');

// convert payloads and invoke function
const unmarshalledInput = attempt(unmarshall, payload, 'error-client-unmarshall');
const output = await fn(unmarshalledInput);
const marshalledOutput = attempt(marshall, output, 'error-client-marshall');

logger('Result:', marshalledOutput);

// send response
call.write({
headers: addCorrelationIdHeader({
'Content-Type': createHeader(accepted)
}, correlationId),
payload: marshalledOutput
});
} catch (err) {
logger('Error:', err);

if (err instanceof RiffError) {
call.write({
headers: addCorrelationIdHeader({
error: createHeader(err.type)
}, correlationId),
payload: Buffer.from(err.cause ? err.cause.stack || ('' + err.cause) : '')
});
} else {
call.write({
headers: addCorrelationIdHeader({
error: createHeader('error-server-function-invocation')
}, correlationId),
payload: Buffer.from('' + (err && err.stack || err))
});
}
}
});
call.on('end', () => {
call.end();
});
}
});

return server;
}

// helpers

function headerValue(headers, header) {
return headers[header] && headers[header].values[0];
}

function createHeader(value) {
return { values: [value] };
}

function addCorrelationIdHeader(headers, correlationId) {
if (correlationId != null) {
headers.correlationId = createHeader(correlationId);
}
return headers;
}

function attempt(fn, arg, type) {
try {
return fn(arg);
} catch (e) {
throw new RiffError(type, e);
}
}

// Error type for non-function errors
class RiffError extends Error {
constructor(type, cause) {
super();

// Maintains proper stack trace for where our error was thrown (only available on V8)
if (Error.captureStackTrace) {
Error.captureStackTrace(this, RiffError);
}

this.type = type;
this.cause = cause;
}
}

module.exports = makeServer;
File renamed without changes.
Loading

0 comments on commit 8a7f841

Please sign in to comment.