-
Notifications
You must be signed in to change notification settings - Fork 152
/
result.go
95 lines (82 loc) · 2.35 KB
/
result.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package line
import (
"bufio"
"io"
"strings"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/values"
)
// ResultDecoder decodes raw input strings from a reader into a flux.Result.
// It uses a separator to split the input into tokens and generate table rows.
// Tokens are kept as they are and put into a table with schema `_time`, `_value`.
// The `_value` column contains tokens.
// The `_time` column contains the timestamps for when each `_value` has been read.
// Strings in `_value` are obtained from the io.Reader passed to the Decode function.
// ResultDecoder outputs one table once the reader reaches EOF.
type ResultDecoder struct {
reader *bufio.Reader
config *ResultDecoderConfig
}
// NewResultDecoder creates a new result decoder from config.
func NewResultDecoder(config *ResultDecoderConfig) *ResultDecoder {
return &ResultDecoder{config: config}
}
// TimeProvider gives the current time.
type TimeProvider interface {
CurrentTime() values.Time
}
// ResultDecoderConfig is the configuration for a result decoder.
type ResultDecoderConfig struct {
Separator byte
TimeProvider TimeProvider
}
func (rd *ResultDecoder) Do(f func(flux.Table) error) error {
timeCol := flux.ColMeta{Label: "_time", Type: flux.TTime}
valueCol := flux.ColMeta{Label: "_value", Type: flux.TString}
key := execute.NewGroupKey(nil, nil)
builder := execute.NewColListTableBuilder(key, &memory.ResourceAllocator{})
timeIdx, err := builder.AddCol(timeCol)
if err != nil {
return err
}
valueIdx, err := builder.AddCol(valueCol)
if err != nil {
return err
}
var eof bool
for !eof {
s, err := rd.reader.ReadString(rd.config.Separator)
if err != nil && err == io.EOF {
break
} else if err != nil {
return err
}
v := strings.Trim(s, string(rd.config.Separator))
ts := rd.config.TimeProvider.CurrentTime()
err = builder.AppendTime(timeIdx, ts)
if err != nil {
return err
}
err = builder.AppendString(valueIdx, v)
if err != nil {
return err
}
}
tbl, err := builder.Table()
if err != nil {
return err
}
return f(tbl)
}
func (*ResultDecoder) Name() string {
return "_result"
}
func (rd *ResultDecoder) Tables() flux.TableIterator {
return rd
}
func (rd *ResultDecoder) Decode(r io.Reader) (flux.Result, error) {
rd.reader = bufio.NewReader(r)
return rd, nil
}