Skip to content

Feature765 Portable Plugin

Jiyong Huang edited this page Sep 17, 2021 · 3 revisions

Feature 926: Portable Plugin

Requirement

Motivation

Currently we support source, sink and function plugin with native go plugin system. There are some limitations in Go plugin system which makes it hard to deploy. It has a very strict compile environment limitation: the plugin must build with the same Go version, dependency version/path, os, source code version etc. This makes it very hard to migrate to a different running environment. Which means the user will need to rebuild the plugin when upgrading eKuiper version or even running in another production environment. The built plugin is nearly unmigratable.

Thus, we would like to offer another "portable" plugin solution for users to:

  1. Build once and run for all environment. The plugin will only need to rebuild when there are breaking changes in eKuiper plugin interface which is very infrequent.
  2. Cross language support. Allow to compose the plugin with various programming languages.
  3. Unify API between native and portable plugin. This means we need to communicate both control and data info between eKuiper and plugin.
  4. Dynamic loading/unloading plugins without need to restart eKuiper main process.

Survey

The plugin can run in-process or in a separate process. Native plugin is the official and nearly only in-process solution. The portable plugin must be a separate process. Separate process solution also have at least two types:

  1. rpc based plugin, typically hashicorp goplugin.
  2. mq based plugin

Hashicorp go-plugin seems promising is vastly used. Rpc based plugin is better at handling control message than data processing. Considering eKuiper plugin's scenario, they all use in a streaming environment which is data centric and require a long connection. It is more suitable to use mq based solution.

Design

Take source plugin as an example. Main program will invoke the plugin when needed by os.exec and sending the context as the argument and they will establish the data and control pipeline through MQ. The connection will retain until a stop control signal is received from the main program. Source plugin can then keep feeding data to MQ data pipeline and finally go into the rules.

arch

Each portable plugin registration will be run as a standalone process. In order to avoid process overhead, each portable plugin registration can include multiple plugins aka. symbols of all kinds. Basically, users would bundle a set of plugin into a zip and register it in batch with REST API. The set of symbols will be run in the same process in runtime. If the plugins need process separation, then the users can create two plugin registration separately.

Thus, there are two levels of a portable plugin: plugin and symbols. Each symbol could be a source, sink or function. Thus, when defining a portable plugin, users can get a set of new source, sink and function registered.

For example, users can define a plugin named car and export many symbols for source, sink and function. The definition will be presented as a json file as below:

{
  "name": "car",
  "version": "v1.0.0",
  "language": "go",
  "executable": "server",
  "sources": [
    "json","udp","sync"
  ],
  "sinks": [
    "command"
  ],
  "functions": [
    "link", "rank"
  ]
}

DISCUSSION: How to deal with executable in different os? Recommend to create different plugin zip for different os system like native plugin?

Executable environment: let the user install or provide install script as well

Usage

The steps to create plugin is similar to current native plugin.

  1. Develop the plugin with SDK.
    1. Develop each plugin symbol by implementing corresponding interfaces
    2. Develop the main program to serve all the symbols as one plugin
  2. Build or package the plugin depending on the programing language.
  3. Register the plugin by eKuiper file/REST/CLI.

We will provide SDK for all mainstream language. Below, let's take golang and python as examples.

Go

Take Golang as an example, the development in Golang will very much like the native API. Below is a sample source plugin which will feed the hardcoded json data to the eKuiper. The plugin development is almost the same as native plugin which implements ths source interface. The only difference will be the export section. For the portable plugin, each plugin will be run as a seperate process, so it will need a main function and call the start function provided by the SDK.

In the portable version, the context may not be able to provide exactly the same funtionality such as state storage. Need to investigate more. Below is an example source plugin implementation.

package main

import (
 "fmt"
 "os"
 "playground/mqPlugin/sdk"
 "playground/mqPlugin/sdk/api"
 "time"
)

var data = []map[string]interface{}{
 {
  "color": "red",
  "size":  3,
  "ts":    1541152486013,
 },
 {
  "color": "yellow",
  "size":  2,
  "ts":    1541152487013,
 },
 {
  "color": "blue",
  "size":  1,
  "ts":    1541152488013,
 },
}

type json struct {
}

func(s *json) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, _ chan<- error) {
 ctx.GetLogger().Infof("Start json source for rule %s", ctx.GetRuleId())
 ticker := time.NewTicker(1 * time.Second)
 c := 0
 for{
  select {
  case <- ticker.C:
   select{
   case consumer <- api.NewDefaultSourceTuple(data[c], nil):
    c = (c+1)%len(data)
   case <- ctx.Done():
   }
  case <- ctx.Done():
   ticker.Stop()
  }
 }
}

func(s *json)  Configure(dataSource string, config map[string]interface{}) error {
 fmt.Printf("received datasource %s, config %+v", dataSource, config)
 return nil
}

func(s *json) Close(ctx api.StreamContext) error {
 ctx.GetLogger().Infof("Closing json source")
 return nil
}

Next, the user can continue to develop more plugins of source, function and sink with the similar pattern. After all plugins developed, we need to create a main program to register these plugins and provide the entry to run the plugin. Below is an example:

package main

import (
  sdk "github.com/lf-edge/ekuiper-plugin-sdk"
  "github.com/lf-edge/ekuiper-plugin-sdk/api"
  "os"
)

func main(){
  sdk.Start(os.Args, &sdk.PluginConfig{
    Sources: map[string]sdk.NewSourceFunc{
      "json": func() api.Source {
        return &jsonSource{}
      },
      "anotherSource": func() api.Source {
        return &anotehrSource{}
      },
  }})
}

Build or package

This step will be much more simplified. It depends on programing language for how to package. Just make sure the packaged file is executable.

For golang written plugin, just use go build exactly the same as build a common program and package the built executable.

Python

To use python plugin, the user must install python environment and the required pip in the dev and production system.

  1. Python environment to make sure python command is available.

  2. Install dependencies by pip includes ekuiper for ekuiper python plugin sdk and pynng for the nng client.

    pip install pynng
    pip install ekuiper

The python plugin will need to inherit the corresponding class source/func/sink by implementing a similar set of abstract functions and in the main function call the inherited run function to start running. Below is an example.

import time

from sources.source import ekuiper

class PyJson(Source):
    def __init__(self):
        self.data = {"name":"pyjson", "value": 2021}

    def configure(self, datasource, conf):
        print("configuring with datasource {} and conf {}".format(datasource, conf))

    def open(self, ctx):
        print("opening")
        for i in range(100):
            ctx.emit(self.data, None)
            print("emit")
            time.sleep(30)

    def close(self, ctx):
        print("closing")

// TODO add the python main file

Build or package

As python is an interpretive language, we don't need to build it. Just package the source code and specify the entry file in the management api.

POST http://{{host}}/plugins/sources
Content-Type: application/json

{"name":"pyjson","file":"http://yourhost/plugin/testzips/sources/pyjson.zip", "kind": 1, "language": "python","executable": "pyjson.py"}

Management

  1. By REST/CLI. Unlike native plugin, the portable plugin can only create a set of symbols together. So we will add a new endpoint to distinguish it from native. Each plugin can only have one language and executable is the main program. And we need to define all the plugins names by symbols parameter.

    • Create a plugin with many symbols
    POST http://{{host}}/plugins/portable
    Content-Type: application/json
    
    {"name":"labelImage", "file": "https://www.emqx.io/downloads/kuiper-plugins/v1.1.2/debian/functions/labelImage_amd64.zip"}
    
  • Get a list of portable plugins

    GET http://{{host}}/plugins/portable
    Content-Type: application/json
    
    ["car", "json"]
    
  • Get the info of a portable plugin

    GET http://{{host}}/plugins/portable/car
    Content-Type: application/json
    
    {
      "name": "car",
      "version": "v1.0.0",
      "language": "go",
      "executable": "server",
      "sources": [
        "json","udp","sync"
      ],
      "sinks": [
        "command"
      ],
      "functions": [
        "link", "rank"
      ]
    }
    

DISCUSSION

All the information about a portable plugin are actually provided by the json file. Could we make the same for native plugin? To make sure we can have all the information from the file system without the state db support.

  • Get all available sources, sinks and functions from built-in, plugin, service and portable plugins.
 GET http://{{host}}/sources
 Content-Type: application/json
 [
   "mqtt": {
     "version": "3.0",
     "type": "builtin"
   },
   "zmq": {
     "version": "3.0",
     "type": "plugin-native"
   },
   "json": {
     "version": "v1.0.0",
     "type": "plugin-portable"
   }
   ...
 ]
  1. By file: Like native plugin, put the file in the plugin folder will load it directly. Currently, we load plugin by *.so postfix. For portable plugin, this is not working, we need to design:

    • What file format to be placed?
    • version handling
    • Deleted plugin handling

    Each portable plugin requires the following structure:

    • A top-level directory of the name of the plugin.
      • A json file inside the directory of the name of the plugin.
      • An executable file inside the directory.
      • All other dependencies.
      • Config files (yaml and json) inside 'etc/$pluginType' for each symbol in that plugin.

    Take the car plugin as an example. To load it automatically, uses need to put it in this structure:

    etc
      sources
        json.yaml
        json.json
        udp.yaml
        udp.json
        sync.yaml
        sync.json
      sinks
        command.json
      functions
        link.json
        rank.json
    plugins
      portable
        car
          server
          car.json
    

    Notice that, the symbol name must be unique for a specific plugin type. By adding the plugin directory to plugins/portable, the plugin will be loaded once eKuiper starts.

Implementation

The implementation will include two parts.

  1. SDK for various languages
    1. Define the interfaces
    2. Implement the runtime
      1. Context parsed from command line argument
      2. Establish data pipeline as mq client
      3. Establish control pipeline and handle control signals as mq client

The MQ must be small and embeddable, so brokerless MQ is a good option. We have two options zeromq and mangos. I have done some initial prototype with mangos. By embedding it, the footprint is increased less than 1MB and it is written in pure Golang. Additionally, it supports all OS including Windows and all mainstream programing language client.

Runtime

There are two parts of runtime

  1. eKuiper plugin management runtime
  2. plugin runtime of various language

For eKuiper plugin management runtime, it will need to handle:

  1. Manage the register of portable plugin by REST/CLI or file in plugins folder.
  2. Fetch the correct portable plugin and symbol when creating a rule refer to the plugin.
  3. Establish the nanomsg connection to the plugin as the control channel.
  4. Control the plugin: start, stop, restart
  5. Establish the nanomsg connection to the symbols as the data channel
  6. Control the symbol: run, stop, restart
  7. Handle shared source instance
  8. Portable plugin health check and error handling
  9. Symbol state management
  10. Plugin configure like debug, sendTimeout: validate the ability, need to define all supported arg

For the plugin runtime, it will need to handle:

  1. Establish the nanomsg connection as the control channel to manage symbols' lifecycle
  2. Construct the plugin configuration by parsing command arguments
  3. Symbol context construction
  4. Symbol state handling
  5. Handle data communication for symbols with eKuiper
  6. Clean up symbols after rule stop
  7. Clean up plugin if killed

All the support must be applied to source, sink and function.