Skip to content

nael-fridhi/node-function-invoker

 
 

Repository files navigation

Node Function Invoker

CI

Purpose

The node function invoker provides a host for functions consisting of a single NodeJS module. It adheres to riff streaming protocol and invokes functions accordingly.

Supported functions

Non-streaming functions

Non-streaming functions, more specifically "request-reply" functions, such as:

module.exports = (x) => x ** 2;

will be automatically promoted to streaming functions via the equivalent of the map operator.

Request-reply functions can also be asynchronous:

module.exports = async (x) => x ** 2;

or return a Promise:

module.exports = (x) => Promise.resolve(x ** 2);

Finally, note that the interaction model can be explicitly advertised, albeit this is not necessary:

module.exports = (x) => x ** 2;
module.exports.$interactionModel = 'request-reply';

Streaming functions

Streaming functions must comply to the following signature:

module.exports = (inputStreams, outputStreams) => {
    const firstInputStream = inputStreams.$order[0];
    const firstOutputStream = outputStreams.$order[0];
    const secondOutputStream = outputStreams.$order[1];
    // do something
};
module.exports.$interactionModel = 'node-streams';

Please note that streaming functions must always declare the corresponding interaction mode.

Parameters can also be looked up by name:

module.exports = (inputStreams, outputStreams) => {
    const { numbers, letters } = inputStreams;
    const { repetitions } = outputStreams;
    // do something
};
module.exports.$interactionModel = 'node-streams';

Input streams are Readable streams.

Output streams are Writable streams.

The function must end the output streams when it is done emitting data or when an error occurs (if the output streams are pipe'd from input streams, then this is automatically managed by this invoker).

Lifecycle

Functions that communicate with external services, like a database, can use the $init and $destroy lifecycle hooks on the function. These methods are called once per process.

The $init method is guaranteed to finish before the main function is invoked for the first time.

The $destroy method is guaranteed to be invoked after all of the main functions are finished, before the process shuts down.

let client;

// function
module.exports = async ({key, amount}) => {
    return await client.incrby(key, amount);
};

// setup
module.exports.$init = async () => {
    const Redis = require('redis-promise');
    client = new Redis();
    await client.connect();
};

// cleanup
module.exports.$destroy = async () => {
    await client.quit();
};

The lifecycle methods are optional, and should only be implemented when needed. The hooks may be either traditional or async functions. Lifecycle functions have up to 10 seconds to complete their work, or the function invoker will abort.

Argument transformers

Sometimes, the content-type information is not enough to extract the payload the user function is supposed to interact with.

Argument transformers are custom functions that take a Message (as defined by @projectriff/message) and return whatever the function needs.

The Message payload is the result of the first content-type-based conversion pass. For instance, if the input content-type is application/json and its payload is '{"key": "value"}' the payload of the Message exposed to the transformer will be the corresponding object representation (i.e. {"key": "value"}).

Argument transformers are declared this way:

module.exports.$argumentTransformers = [
    // transformer for first input
    (message) => {
        return message.payload;
    },
    // transformer for second input
    (message) => {
        return message.headers.getValue('x-some-header');
    },
    // ...
];

If $argumentTransformers is not declared, the default transformer assigned to each input extracts the Message payload.

Supported protocols

This invoker supports only streaming, and complies to riff streaming protocol. However, it is possible to send HTTP requests and receive HTTP responses if you combine this invoker with the streaming HTTP adapter available here.

Development

Prereqs

  • Node version required: 10+.
  • Make sure to install the EditorConfig plugin in your editing environment.

Build

  • Install dependencies by running npm ci.
  • Generate the Protobuf client and server with npm run generate-proto
  • Run the tests with npm test

Full streaming setup

  1. Set up Kafka onto your K8s cluster (kubectl apply the file kafka-broker.yaml included in the streaming processor project).
  2. Set up Liiklus (kubectl apply the file liiklus.yaml included in the streaming processor project).
  3. Set up the Kafka Gateway by following these instructions.

End-to-end local run

  • Run Liiklus producers and consumers with this project.
  • Run this invoker:
 $ FUNCTION_URI="$(pwd)/samples/streaming-repeater" npm start
  • Run the processor with the appropriate parameters.
  • Start sending data via the Liiklus producers.

Invoker local debug run

Execute the following and enjoy some logs:

 $ FUNCTION_URI="$(pwd)/samples/streaming-repeater" NODE_DEBUG='riff' npm start

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • JavaScript 99.6%
  • Shell 0.4%