Skip to content

Commit

Permalink
Unify time parsing in json/csv parsers (#5382)
Browse files Browse the repository at this point in the history
  • Loading branch information
glinton authored and danielnelson committed Feb 6, 2019
1 parent 7887e15 commit 10ac030
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 88 deletions.
56 changes: 54 additions & 2 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@ import (
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"log"
"math"
"math/big"
"os"
"os/exec"
"regexp"
"runtime"
"strconv"
"strings"
"syscall"
"time"
"unicode"

"fmt"
"github.com/alecthomas/units"
"runtime"
)

const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
Expand Down Expand Up @@ -330,3 +332,53 @@ func CompressWithGzip(data io.Reader) (io.Reader, error) {

return pipeReader, err
}

// ParseTimestamp parses a timestamp value as a unix epoch of various precision.
//
// format = "unix": epoch is assumed to be in seconds and can come as number or string. Can have a decimal part.
// format = "unix_ms": epoch is assumed to be in milliseconds and can come as number or string. Cannot have a decimal part.
// format = "unix_us": epoch is assumed to be in microseconds and can come as number or string. Cannot have a decimal part.
// format = "unix_ns": epoch is assumed to be in nanoseconds and can come as number or string. Cannot have a decimal part.
func ParseTimestamp(timestamp interface{}, format string) (time.Time, error) {
timeInt, timeFractional := int64(0), int64(0)
timeEpochStr, ok := timestamp.(string)
var err error

if !ok {
timeEpochFloat, ok := timestamp.(float64)
if !ok {
return time.Time{}, fmt.Errorf("time: %v could not be converted to string nor float64", timestamp)
}
intPart, frac := math.Modf(timeEpochFloat)
timeInt, timeFractional = int64(intPart), int64(frac*1e9)
} else {
splitted := regexp.MustCompile("[.,]").Split(timeEpochStr, 2)
timeInt, err = strconv.ParseInt(splitted[0], 10, 64)
if err != nil {
return time.Parse(format, timeEpochStr)
}

if len(splitted) == 2 {
if len(splitted[1]) > 9 {
splitted[1] = splitted[1][:9] //truncates decimal part to nanoseconds precision
}
nanosecStr := splitted[1] + strings.Repeat("0", 9-len(splitted[1])) //adds 0's to the right to obtain a valid number of nanoseconds

timeFractional, err = strconv.ParseInt(nanosecStr, 10, 64)
if err != nil {
return time.Time{}, err
}
}
}
if strings.EqualFold(format, "unix") {
return time.Unix(timeInt, timeFractional).UTC(), nil
} else if strings.EqualFold(format, "unix_ms") {
return time.Unix(timeInt/1000, (timeInt%1000)*1e6).UTC(), nil
} else if strings.EqualFold(format, "unix_us") {
return time.Unix(0, timeInt*1e3).UTC(), nil
} else if strings.EqualFold(format, "unix_ns") {
return time.Unix(0, timeInt).UTC(), nil
} else {
return time.Time{}, errors.New("Invalid unix format")
}
}
17 changes: 2 additions & 15 deletions plugins/parsers/csv/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
)

Expand Down Expand Up @@ -239,22 +240,8 @@ func parseTimestamp(timeFunc func() time.Time, recordFields map[string]interface
case "":
err = fmt.Errorf("timestamp format must be specified")
return
case "unix":
var unixTime int64
unixTime, err = strconv.ParseInt(tStr, 10, 64)
if err != nil {
return
}
metricTime = time.Unix(unixTime, 0)
case "unix_ms":
var unixTime int64
unixTime, err = strconv.ParseInt(tStr, 10, 64)
if err != nil {
return
}
metricTime = time.Unix(unixTime/1000, (unixTime%1000)*1e6)
default:
metricTime, err = time.Parse(timestampFormat, tStr)
metricTime, err = internal.ParseTimestamp(tStr, timestampFormat)
if err != nil {
return
}
Expand Down
77 changes: 6 additions & 71 deletions plugins/parsers/json/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ import (
"encoding/json"
"fmt"
"log"
"math"
"regexp"
"strconv"
"strings"
"time"

"github.com/tidwall/gjson"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/pkg/errors"
"github.com/tidwall/gjson"
)

var (
Expand Down Expand Up @@ -50,55 +49,6 @@ func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) {
return metrics, nil
}

// format = "unix": epoch is assumed to be in seconds and can come as number or string. Can have a decimal part.
// format = "unix_ms": epoch is assumed to be in milliseconds and can come as number or string. Cannot have a decimal part.
// format = "unix_us": epoch is assumed to be in microseconds and can come as number or string. Cannot have a decimal part.
// format = "unix_ns": epoch is assumed to be in nanoseconds and can come as number or string. Cannot have a decimal part.
func parseUnixTimestamp(jsonValue interface{}, format string) (time.Time, error) {
timeInt, timeFractional := int64(0), int64(0)
timeEpochStr, ok := jsonValue.(string)
var err error

if !ok {
timeEpochFloat, ok := jsonValue.(float64)
if !ok {
err := fmt.Errorf("time: %v could not be converted to string nor float64", jsonValue)
return time.Time{}, err
}
intPart, frac := math.Modf(timeEpochFloat)
timeInt, timeFractional = int64(intPart), int64(frac*1e9)
} else {
splitted := regexp.MustCompile("[.,]").Split(timeEpochStr, 2)
timeInt, err = strconv.ParseInt(splitted[0], 10, 64)
if err != nil {
return time.Time{}, err
}

if len(splitted) == 2 {
if len(splitted[1]) > 9 {
splitted[1] = splitted[1][:9] //truncates decimal part to nanoseconds precision
}
nanosecStr := splitted[1] + strings.Repeat("0", 9-len(splitted[1])) //adds 0's to the right to obtain a valid number of nanoseconds

timeFractional, err = strconv.ParseInt(nanosecStr, 10, 64)
if err != nil {
return time.Time{}, err
}
}
}
if strings.EqualFold(format, "unix") {
return time.Unix(timeInt, timeFractional).UTC(), nil
} else if strings.EqualFold(format, "unix_ms") {
return time.Unix(timeInt/1000, (timeInt%1000)*1e6).UTC(), nil
} else if strings.EqualFold(format, "unix_us") {
return time.Unix(0, timeInt*1e3).UTC(), nil
} else if strings.EqualFold(format, "unix_ns") {
return time.Unix(0, timeInt).UTC(), nil
} else {
return time.Time{}, errors.New("Invalid unix format")
}
}

func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) {
tags := make(map[string]string)
for k, v := range p.DefaultTags {
Expand Down Expand Up @@ -132,24 +82,9 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
return nil, err
}

if strings.EqualFold(p.JSONTimeFormat, "unix") ||
strings.EqualFold(p.JSONTimeFormat, "unix_ms") ||
strings.EqualFold(p.JSONTimeFormat, "unix_us") ||
strings.EqualFold(p.JSONTimeFormat, "unix_ns") {
nTime, err = parseUnixTimestamp(f.Fields[p.JSONTimeKey], p.JSONTimeFormat)
if err != nil {
return nil, err
}
} else {
timeStr, ok := f.Fields[p.JSONTimeKey].(string)
if !ok {
err := fmt.Errorf("time: %v could not be converted to string", f.Fields[p.JSONTimeKey])
return nil, err
}
nTime, err = time.Parse(p.JSONTimeFormat, timeStr)
if err != nil {
return nil, err
}
nTime, err = internal.ParseTimestamp(f.Fields[p.JSONTimeKey], p.JSONTimeFormat)
if err != nil {
return nil, err
}

delete(f.Fields, p.JSONTimeKey)
Expand Down

0 comments on commit 10ac030

Please sign in to comment.