Browse files

add plugin support

  • Loading branch information...
rwynn committed Mar 15, 2017
1 parent a9cf704 commit 0ac6e83073a6c9e618ad7e6cf3dd01b7849bb437
Showing with 351 additions and 105 deletions.
  1. +123 −6
  2. +199 −99 mongofluxd.go
  3. +29 −0 mongofluxdplug/plug.go
@@ -1,6 +1,7 @@
# mongofluxd
Real time sync from MongoDB into InfluxDB

### Installation

Download the latest [release]( or install with go get
@@ -83,16 +84,16 @@ measure = "sales"

Load 100K documents of time series data into MongoDB.

// sleep for 2ms to ensure t is 2ms apart
for (var i=0; i<100000; ++i) { sleep(2); var t = new Date(); db.test.insert({c: 1, d: 5.5, t: t}); }
// sleep for 1ms to ensure t is 1ms apart
for (var i=0; i<100000; ++i) { sleep(1); var t = new Date(); db.test.insert({c: 1, d: 5.5, t: t}); }

Run monfluxd with direct reads on test.test (config contents above)

time ./mongofluxd -f flux.toml

real 0m2.167s
user 0m2.028s
sys 0m0.444s
real 0m1.982s
user 0m1.936s
sys 0m0.396s

Verify it all got into InfluxDB

@@ -108,5 +109,121 @@ Verify it all got into InfluxDB
On a VirtualBox VM with 4 virtual cores and 4096 mb of memory, syncing 100K documents from MongoDB to InfluxDB
took only 2.167 seconds for a throughput of 46,146 points per second.
took only 1.936 seconds for a throughput of 50,454 points per second.
### Advanced
mongofluxd supports golang 1.8 plugins for advanced use cases. For example, you have a one-to-many relationship
between MongoDB documents and InfluxDB points. mongofluxd supports consuming 1 go plugin .so. This .so may expose
many public functions. mongofluxd supports mapping one plugin symbol (a function) with each measurement.
The mapping function must be of the form:
func (*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error)
The following example plugin maps a single MongoDB document to multiple Points in InfluxDB:
package main
import (
// Plugin to map a single MongoDB document to multiple InfluxDB points
// e.g.
// db.testplug.insert({ts: new Date(), pts: [{o: 0, d: 1.5}, {o: 2, d: 3.2}]})
// where ts is the base time, o is the second offset for each point, and d is field data for each point
func MyPointMapper(input *mongofluxdplug.MongoDocument) (output []*mongofluxdplug.InfluxPoint, err error) {
doc := input.Data
// reference base time
var t time.Time
switch doc["ts"].(type) {
case time.Time:
t = doc["ts"].(time.Time)
return nil, fmt.Errorf("expected ts field with type %T but got %T", t, doc["ts"])
// reference list of points with time offset and data
var pts []interface{}
switch doc["pts"].(type) {
case []interface{}:
pts = doc["pts"].([]interface{})
return nil, fmt.Errorf("expected pts field with type %T but got %T", pts, doc["pts"])
// for each pt in this single document, add an InfluxPoint to the output
for _, p := range pts {
// assert type of each point p
var pt map[string]interface{}
switch ptt := p.(type) {
case map[string]interface{}:
pt = ptt
case gtm.OpLogEntry:
pt = map[string]interface{}(ptt)
return nil, fmt.Errorf("expected point of type %T but got %T", pt, p)
// read offset and point data
var offset, pointData float64
switch pt["o"].(type) {
case float64:
offset = pt["o"].(float64)
return nil, fmt.Errorf("expected offset of type %T but got %T", offset, pt["o"])
switch pt["d"].(type) {
case float64:
pointData = pt["d"].(float64)
return nil, fmt.Errorf("expected point data of type %T but got %T", pointData, pt["d"])
// create a new InfluxPoint
point := &mongofluxdplug.InfluxPoint{
Tags: make(map[string]string),
Fields: make(map[string]interface{}),
// set time, fields, and tags on the Point
point.Timestamp = t.Add(time.Duration(int64(offset)) * time.Second)
point.Fields["d"] = pointData
// append the Point to the output
output = append(output, point)
return output, nil
The plugin can be built for go 1.8 and above using the go build command
go build -buildmode=plugin -o myplugin.go
The public plugin function, or symbol, can then be assigned to a measurement in the config file
plugin-path = "/path/to/"
namespace = "test.testplug"
# for this measurement use a go plugin to map a single MongoDB document to multiple InfluxDB points
# in this case the function name to use is MyPointMapper
# the time, fields, and tags will be generated by the plugin
symbol = "MyPointMapper"
precision = "ms"
When a MongoDB document is inserted into the `test.testplug` namespace, the `MyPointMapper` function will
be invoked to determine a slice of Points to write to InfluxDB.
Oops, something went wrong.

0 comments on commit 0ac6e83

Please sign in to comment.