Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

intel/rsp-sw-toolkit-im-suite-goplumber

Repository files navigation

DISCONTINUATION OF PROJECT.

This project will no longer be maintained by Intel.

This project has been identified as having known security escapes.

Intel has ceased development and contributions including, but not limited to, maintenance, bug fixes, new releases, or updates, to this project.

Intel no longer accepts patches to this project.

Goplumber

Go plumb your data.

Goplumber is an ETL framework based around Go's templates. It's main purpose it facilitating declarative descriptions for scheduled tasks that involve downloading some data, verifying and/or reshaping it, and sending it to receiver.

Operation

While it's possible to create Pipelines entirely via code, the point of goplumber is declarative configs. Currently, that's accomplished via JSON files. You create a Plumber, give it Clients, and ask it to build Pipelines out of config data. You can then execute the Pipeline directly, or easily schedule it to run on a regular interval.

Pipeline

A Pipeline is as blueprint for a series of Tasks, i.e. descriptions of work.

As a quick example, if you want to download a file and publish it on an MQTT topic, you can create a pipeline like this:

{
  "name": "example",
  "trigger": {"interval": {"minutes": 10}},
  "tasks": {
    "download": {
      "type": "http",
      "raw": { "method": "GET", "url": "http://example.com" }
    },
    "send": {
      "type": "mqtt",
      "raw": {"name": "some/topic"},
      "links": { "value": { "from": "download" } }
    }
  }
}

Client

A Client handles Task execution. For example, and MQTT client describes which broker to connect to:

{
  "endpoint": "mosquitto-server:1883",
  "clientID": "goplumber"
}

Plumber

The Plumber creates Pipelines. When you create a Plumber, you give it the Clients it should use, then you give it a Pipeline config and ask it to create a Pipeline. The returned Pipeline can then be executed, scheduled to run on an interval, or be used as a Client for another Pipeline, effectively creating new Task types.

The goplumber library doesn't currently create Plumbers from config code - it's instead left to the calling code. This package includes a couple of commands, and one of them will read a JSON file and use it to build a Plumber and start running Pipelines:

goplumber -config /config/plumber.json

{
  "configDir": "/config",
  "mqttConfigFile": "mqttConf.json",
  "pipelineNames": [ "mypipeline.json" ],
  "customTasks": [ ]
}

If plumber.json contains this, the above command creates a Plumber that has a single MQTT client and a single Pipeline, both loaded from /config. The included Makefile runs this example.

Task Execution

Task input is specified in the configuration as either raw values or links. A task's name can be used to link it's output to other tasks' inputs. The params valid for a task depend on its type, as the type determines which client the plumber will use to satisfy the task.

goplumber builds a dependency graph of the tasks from the config. When the pipeline is run, it executes tasks in dependency order. It merges a task's raw and links parameters into a map which is then passed to the client.

Most tasks (see info about template tasks below) allow all parameters to be either raw or links. A key should not appear in both the raw and links section. All links must reference the name of a valid task in the pipeline.

While a pipeline is executing, some metadata is stored about its tasks -- things like when it started or stopped, or whether it contained an error. This data is stored as its status, which may be accessed in links via "using". The value of the link will be the corresponding piece of metadat.a

Template Tasks

Since template tasks are the main transform in goplumber, they have some special behavior. In Go, templates are defined within a namespace of templates, within which templates may call each other by name. When Go parses template data, it creates a map of template names in the namespace to their definitions; if it encounters a new template definition with the same name as one already in the namespace, the new definition overwrites the old one. This allows a user to create reusable, composable templates definitions.

A template task's namespace is formed by loading all of the names given in its namespaces list. Each namespace defines one or more templates, loaded in order. The template that's executed is the one given by template.

The input map for the template consists of the raw.initialData merged with the data stored in links. As for all tasks, the input is map[string][]byte, and the output is []byte. This allows, for instance, the output of a template to be the raw data for an http task, but it leads to a notable gotcha: if you wish to use the output of a template in a json context, the template must produce valid json data. Additionally, if the template wishes to use linked json object, it must first convert the json data to extract its properties.

Example:

{
  "myTemplateTask": {
    "type": "template",
    "raw": {
      "template": "myURLBuilderTemplate",
      "namespaces": ["urlTemplates"],
      "initialData": {
        "baseURL": "http://example.com/endpoint"
      }
    },
    "links": {
      "someParam": {"from": "anotherTask"}
    }    
  }
}

Template Functions

The following template functions are available (see code docs for more details):

  • timestamp: current Unix time in ms as an int64.
  • formatTime: convert int64 Unix ms timestamp to string.
  • formatCurrentTime: like above, but uses the current time.
  • outboundIP: returns a net.IP that the service uses for outbound requests.
  • int: converts a number, string, byte, or byte array to an int.
  • add: returns the sum of multiple float64s.
  • str: converts a []byte to a Go string.
  • bytes: converts a Go string to a []byte.
  • json: unmarshals []byte (or json.RawMessage) into a Go map[string]interface{}.
  • dec64: decodes a base-64 encoded string into a []byte
  • enc64: encodes a []byte to a base-64 encoded string.
  • join: same as strings.Join.
  • split: same as strings.Split.
  • splitN: same as strings.SplitN.
  • strIdx: same as strings.Index.
  • trimSpace: same as strings.TrimSpace.
  • err: immediately forces a template to stop execution and return an error.

About

No description, website, or topics provided.

Resources

Security policy

Stars

Watchers

Forks

Packages

No packages published

Languages