Skip to content

Commit

Permalink
add ability for a measurement to target a specific influxdb database
Browse files Browse the repository at this point in the history
  • Loading branch information
rwynn committed Feb 10, 2019
1 parent c1ca001 commit 68a529c
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 15 deletions.
2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -83,6 +83,8 @@ fields = ["sales", "price"]
retention = "RP1" retention = "RP1"
# override the measurement name which defaults to the name of the MongoDB collection # override the measurement name which defaults to the name of the MongoDB collection
measure = "sales" measure = "sales"
# override the influx database name which default to the name of the MongoDB database
database = "salesdb"


[[measurement]] [[measurement]]
namespace = "db.col" namespace = "db.col"
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -203,6 +203,8 @@ github.com/rwynn/gtm v0.0.0-20190131014030-4d96eedfb073 h1:OgMl+/XpaQzW5c38u59HA
github.com/rwynn/gtm v0.0.0-20190131014030-4d96eedfb073/go.mod h1:LYXeTMjbA7l9k9oEM+NUBuu0BgvNrD5nQuo8seLsar0= github.com/rwynn/gtm v0.0.0-20190131014030-4d96eedfb073/go.mod h1:LYXeTMjbA7l9k9oEM+NUBuu0BgvNrD5nQuo8seLsar0=
github.com/rwynn/mgo v0.0.0-20190130173337-9b1257fb3190 h1:trPv91MYUdV6w+TaBJsudBfJOBRozGvneuDZVmEc/QI= github.com/rwynn/mgo v0.0.0-20190130173337-9b1257fb3190 h1:trPv91MYUdV6w+TaBJsudBfJOBRozGvneuDZVmEc/QI=
github.com/rwynn/mgo v0.0.0-20190130173337-9b1257fb3190/go.mod h1:OQBK0ebL25cW31topLSUPIWIrSesue7+zTa/haAXccQ= github.com/rwynn/mgo v0.0.0-20190130173337-9b1257fb3190/go.mod h1:OQBK0ebL25cW31topLSUPIWIrSesue7+zTa/haAXccQ=
github.com/rwynn/mgo v0.0.0-20190203195949-55e8fd85e7e2 h1:+QMMf2dZav3oviMURjgWnNyppc14490oMi43Hd/PffA=
github.com/rwynn/mgo v0.0.0-20190203195949-55e8fd85e7e2/go.mod h1:OQBK0ebL25cW31topLSUPIWIrSesue7+zTa/haAXccQ=
github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
Expand Down
32 changes: 17 additions & 15 deletions mongofluxd.go
Expand Up @@ -29,7 +29,7 @@ var errorLog *log.Logger = log.New(os.Stdout, "ERROR ", log.Flags())


const ( const (
Name = "mongofluxd" Name = "mongofluxd"
Version = "0.7.1" Version = "0.8.0"
mongoUrlDefault = "localhost" mongoUrlDefault = "localhost"
influxUrlDefault = "http://localhost:8086" influxUrlDefault = "http://localhost:8086"
influxClientsDefault = 10 influxClientsDefault = 10
Expand All @@ -56,6 +56,7 @@ type measureSettings struct {
Retention string Retention string
Precision string Precision string
Measure string Measure string
Database string
Symbol string Symbol string
Tags []string Tags []string
Fields []string Fields []string
Expand Down Expand Up @@ -105,6 +106,7 @@ type InfluxMeasure struct {
retention string retention string
precision string precision string
measure string measure string
database string
tags map[string]bool tags map[string]bool
fields map[string]bool fields map[string]bool
plug func(*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error) plug func(*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error)
Expand Down Expand Up @@ -164,6 +166,7 @@ func (ctx *InfluxCtx) setupMeasurements() error {
retention: ms.Retention, retention: ms.Retention,
precision: ms.Precision, precision: ms.Precision,
measure: ms.Measure, measure: ms.Measure,
database: ms.Database,
plug: ms.plug, plug: ms.plug,
tags: make(map[string]bool), tags: make(map[string]bool),
fields: make(map[string]bool), fields: make(map[string]bool),
Expand All @@ -174,6 +177,12 @@ func (ctx *InfluxCtx) setupMeasurements() error {
return err return err
} }
} }
if im.database == "" {
im.database = strings.SplitN(im.ns, ".", 2)[0]
}
if im.measure == "" {
im.measure = strings.SplitN(im.ns, ".", 2)[1]
}
if im.precision == "" { if im.precision == "" {
im.precision = "s" im.precision = "s"
} }
Expand Down Expand Up @@ -218,18 +227,19 @@ func (ctx *InfluxCtx) createDatabase(db string) error {
} }


func (ctx *InfluxCtx) setupDatabase(op *gtm.Op) error { func (ctx *InfluxCtx) setupDatabase(op *gtm.Op) error {
db, ns := op.GetDatabase(), op.Namespace ns := op.Namespace
if _, found := ctx.m[ns]; found == false { if _, found := ctx.m[ns]; found == false {
measure := ctx.measures[ns]
bp, err := client.NewBatchPoints(client.BatchPointsConfig{ bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: db, Database: measure.database,
RetentionPolicy: ctx.measures[ns].retention, RetentionPolicy: measure.retention,
Precision: ctx.measures[ns].precision, Precision: measure.precision,
}) })
if err != nil { if err != nil {
return err return err
} }
ctx.m[ns] = bp ctx.m[ns] = bp
if err := ctx.createDatabase(db); err != nil { if err := ctx.createDatabase(measure.database); err != nil {
return err return err
} }
} }
Expand Down Expand Up @@ -313,14 +323,6 @@ func (m *InfluxDataMap) unsupportedType(op *gtm.Op, k string, v interface{}, kin
errorLog.Printf("Unsupported type %T for %s %s in namespace %s\n", v, kind, k, op.Namespace) errorLog.Printf("Unsupported type %T for %s %s in namespace %s\n", v, kind, k, op.Namespace)
} }


func (m *InfluxDataMap) loadName() {
if m.measure.measure != "" {
m.name = m.measure.measure
} else {
m.name = m.op.GetCollection()
}
}

func (m *InfluxDataMap) loadKV(k string, v interface{}) { func (m *InfluxDataMap) loadKV(k string, v interface{}) {
if m.measure.tags[k] { if m.measure.tags[k] {
if m.istagtype(v) { if m.istagtype(v) {
Expand Down Expand Up @@ -419,8 +421,8 @@ func (ctx *InfluxCtx) addPoint(op *gtm.Op) error {
mapper := &InfluxDataMap{ mapper := &InfluxDataMap{
op: op, op: op,
measure: measure, measure: measure,
name: measure.measure,
} }
mapper.loadName()
if measure.plug != nil { if measure.plug != nil {
points, err := measure.plug(&mongofluxdplug.MongoDocument{ points, err := measure.plug(&mongofluxdplug.MongoDocument{
Data: op.Data, Data: op.Data,
Expand Down

0 comments on commit 68a529c

Please sign in to comment.