/
simulation_data_source.go
90 lines (81 loc) · 2.21 KB
/
simulation_data_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package timestream
import (
"github.com/timescale/tsbs/pkg/data"
"github.com/timescale/tsbs/pkg/data/serialize"
"github.com/timescale/tsbs/pkg/data/usecases/common"
"log"
"strconv"
"time"
)
type simulatorDataSource struct {
_headers *common.GeneratedDataHeaders
simulator common.Simulator
useCurrentTs bool
}
func (s *simulatorDataSource) NextItem() data.LoadedPoint {
if s._headers == nil {
log.Fatal("headers not read before starting to read points")
return data.LoadedPoint{}
}
newSimulatorPoint := data.NewPoint()
var write bool
for !s.simulator.Finished() {
write = s.simulator.Next(newSimulatorPoint)
if write {
break
}
newSimulatorPoint.Reset()
}
if s.simulator.Finished() || !write {
return data.LoadedPoint{}
}
timeUnixNano := s.prepareTimestamp(newSimulatorPoint.Timestamp())
return data.NewLoadedPoint(&deserializedPoint{
timeUnixNano: timeUnixNano,
table: string(newSimulatorPoint.MeasurementName()),
tags: tagsToStringArr(newSimulatorPoint.TagValues()),
tagKeys: tagKeysToStringArr(newSimulatorPoint.TagKeys()),
fields: fieldsToStringArr(newSimulatorPoint.FieldValues()),
})
}
func (s *simulatorDataSource) prepareTimestamp(pointTs *time.Time) string {
var ts time.Time
if !s.useCurrentTs {
ts = *pointTs
} else {
ts = time.Now()
}
return strconv.FormatInt(ts.UnixNano(), 10)
}
func (s *simulatorDataSource) Headers() *common.GeneratedDataHeaders {
if s._headers != nil {
return s._headers
}
s._headers = s.simulator.Headers()
return s._headers
}
func tagsToStringArr(tagValues []interface{}) []string {
tagsAsStr := make([]string, len(tagValues))
for i, tag := range tagValues {
var buf []byte
tagsAsStr[i] = string(serialize.FastFormatAppend(tag, buf))
}
return tagsAsStr
}
func tagKeysToStringArr(tagKeys [][]byte) []string {
tagsAsStr := make([]string, len(tagKeys))
for i, tag := range tagKeys {
tagsAsStr[i] = string(tag)
}
return tagsAsStr
}
func fieldsToStringArr(fieldValues []interface{}) []*string {
fieldsAsStr := make([]*string, len(fieldValues))
for i, field := range fieldValues {
var buf []byte
var str string
str = string(serialize.FastFormatAppend(field, buf))
fieldsAsStr[i] = &str
}
return fieldsAsStr
}