forked from timescale/tsbs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
serializer.go
87 lines (73 loc) · 2.57 KB
/
serializer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package siridb
import (
"encoding/binary"
"fmt"
"github.com/timescale/tsbs/pkg/data"
"io"
"log"
"strconv"
qpack "github.com/transceptor-technology/go-qpack"
)
// Serializer writes a Point in a serialized form for SiriDB
type Serializer struct{}
// Serialize writes Point data to the given writer.
//
// The format is is serialized in such a way that a known number of bytes
// acting as a header indicate the number of bytes of content that will follow.
//
// The first 8 bytes are reserved for the main header that is filled at the end
// of the script. 4 bytes of this are for number of metrics and 4 bytes for
// the length of the measuremnt name and tags.
//
// There are also sub headers of 8 bytes reserved for every data point that are
// filled in later. 4 bytes for the length of the field key and 4 bytes for the
// length of the packed data (timestamp and value).
//
// The output looks like this:
// <number of metrics> <length of name and tags> <name and tags> <length of field key_1> <length of timestamp_1 and field value_1> <field key_1> <packed timestamp_1 and value_1> <length of field key_2> <length of timestamp_1 and field value_2> <field key_2> <packed timestamp_1 and value_2>... etc.
func (s *Serializer) Serialize(p *data.Point, w io.Writer) error {
line := make([]byte, 8, 1024)
line = append(line, p.MeasurementName()...)
line = append(line, '|')
tagKeys := p.TagKeys()
tagValues := p.TagValues()
for i, v := range tagValues {
if i != 0 {
line = append(line, ',')
}
switch t := v.(type) {
case string:
line = append(line, tagKeys[i]...)
line = append(line, '=')
line = append(line, []byte(t)...)
default:
panic("Non string tags not supported")
}
}
lenName := len(line) - 8
var err error
metricCount := 0
fieldValues := p.FieldValues()
fieldKeys := p.FieldKeys()
for i, value := range fieldValues {
indexLenData := len(line) + 4
key := make([]byte, 9, 64)
key[8] = '|'
key = append(key, fieldKeys[i]...)
binary.LittleEndian.PutUint32(key[0:], uint32(len(key)-8))
line = append(line, key...)
preQpack := len(line)
ts, _ := strconv.ParseInt(fmt.Sprintf("%d", p.Timestamp().UTC().UnixNano()), 10, 64)
err := qpack.PackTo(&line, []interface{}{ts, value}) // packs a byte array in the right format for SiriDB
if err != nil {
log.Fatal(err)
}
postQpack := len(line)
binary.LittleEndian.PutUint32(line[indexLenData:], uint32(postQpack-preQpack))
metricCount++
}
binary.LittleEndian.PutUint32(line[0:], uint32(metricCount))
binary.LittleEndian.PutUint32(line[4:], uint32(lenName))
_, err = w.Write(line)
return err
}