Skip to content

Commit

Permalink
Changing AddValues to AddFields and temp disabling adding w time
Browse files Browse the repository at this point in the history
Currently adding with time is broken, because InfluxDB does not support
using precision for timestamp truncation both with and without
timestamps. This will be re-enabled once we fix InfluxDB to use the
precision argument for truncation in all cases, and a "unit" argument
in the line-protocol for adding points with non-nanosecond stamps

Fixes #175
  • Loading branch information
sparrc committed Sep 16, 2015
1 parent 46cd9ff commit 733ba07
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 14 deletions.
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Plugin interface {

type Accumulator interface {
Add(measurement string, value interface{}, tags map[string]string)
AddValuesWithTime(measurement string,
AddFieldsWithTime(measurement string,
values map[string]interface{},
tags map[string]string,
timestamp time.Time)
Expand All @@ -63,7 +63,7 @@ The `Add` function takes 3 arguments:
about the metric. For instance, the `net` plugin adds a tag named `"interface"`
set to the name of the network interface, like `"eth0"`.

The `AddValuesWithTime` allows multiple values for a point to be passed. The values
The `AddFieldsWithTime` allows multiple values for a point to be passed. The values
used are the same type profile as **value** above. The **timestamp** argument
allows a point to be registered as having occurred at an arbitrary time.

Expand Down
59 changes: 53 additions & 6 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,60 @@ func (bp *BatchPoints) Add(
})
}

// AddValuesWithTime adds a measurement with a provided timestamp
func (bp *BatchPoints) AddValuesWithTime(
// AddFieldsWithTime adds a measurement with a provided timestamp
func (bp *BatchPoints) AddFieldsWithTime(
measurement string,
values map[string]interface{},
fields map[string]interface{},
tags map[string]string,
timestamp time.Time,
) {
// TODO this function should add the fields with the timestamp, but that will
// need to wait for the InfluxDB point precision/unit to be fixed
bp.AddFields(measurement, fields, tags)
// bp.mu.Lock()
// defer bp.mu.Unlock()

// measurement = bp.Prefix + measurement

// if bp.Config != nil {
// if !bp.Config.ShouldPass(measurement, tags) {
// return
// }
// }

// if bp.Debug {
// var tg []string

// for k, v := range tags {
// tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v))
// }

// var vals []string

// for k, v := range fields {
// vals = append(vals, fmt.Sprintf("%s=%v", k, v))
// }

// sort.Strings(tg)
// sort.Strings(vals)

// fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " "))
// }

// bp.Points = append(bp.Points, client.Point{
// Measurement: measurement,
// Tags: tags,
// Fields: fields,
// Time: timestamp,
// })
}

// AddFields will eventually replace the Add function, once we move to having a
// single plugin as a single measurement with multiple fields
func (bp *BatchPoints) AddFields(
measurement string,
fields map[string]interface{},
tags map[string]string,
) {
bp.mu.Lock()
defer bp.mu.Unlock()
Expand All @@ -89,7 +137,7 @@ func (bp *BatchPoints) AddValuesWithTime(

var vals []string

for k, v := range values {
for k, v := range fields {
vals = append(vals, fmt.Sprintf("%s=%v", k, v))
}

Expand All @@ -102,7 +150,6 @@ func (bp *BatchPoints) AddValuesWithTime(
bp.Points = append(bp.Points, client.Point{
Measurement: measurement,
Tags: tags,
Fields: values,
Time: timestamp,
Fields: fields,
})
}
2 changes: 1 addition & 1 deletion plugins/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte
}

for _, point := range points {
acc.AddValuesWithTime(point.Name(), point.Fields(), point.Tags(), point.Time())
acc.AddFieldsWithTime(point.Name(), point.Fields(), point.Tags(), point.Time())
}
case <-timeout:
return nil
Expand Down
2 changes: 1 addition & 1 deletion plugins/mongodb/mongodb_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, s
}

func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) {
acc.AddValuesWithTime(
acc.AddFieldsWithTime(
key,
map[string]interface{}{
"value": val,
Expand Down
4 changes: 2 additions & 2 deletions plugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ type Accumulator interface {

// Create a point with a set of values, decorating it with tags
// NOTE: tags and values are expected to be owned by the caller, don't mutate
// them after passing to AddValuesWithTime.
AddValuesWithTime(
// them after passing to AddFieldsWithTime.
AddFieldsWithTime(
measurement string,
values map[string]interface{},
tags map[string]string,
Expand Down
4 changes: 2 additions & 2 deletions testutil/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func (a *Accumulator) Add(measurement string, value interface{}, tags map[string
)
}

// AddValuesWithTime adds a measurement point with a specified timestamp.
func (a *Accumulator) AddValuesWithTime(
// AddFieldsWithTime adds a measurement point with a specified timestamp.
func (a *Accumulator) AddFieldsWithTime(
measurement string,
values map[string]interface{},
tags map[string]string,
Expand Down

0 comments on commit 733ba07

Please sign in to comment.