forked from timescale/tsbs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
serializer.go
148 lines (135 loc) · 3.78 KB
/
serializer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package akumuli
import (
"encoding/binary"
"errors"
"fmt"
"github.com/timescale/tsbs/pkg/data"
"github.com/timescale/tsbs/pkg/data/serialize"
"io"
)
const (
placeholderText = "AAAAFFEE"
)
// Serializer writes a series of Point elements into RESP encoded
// buffer.
type Serializer struct {
book map[string]uint32
bookClosed bool
deferred []byte
index uint32
}
// NewAkumuliSerializer initializes AkumuliSerializer instance.
func NewAkumuliSerializer() *Serializer {
s := &Serializer{}
s.book = make(map[string]uint32)
s.deferred = make([]byte, 0, 4096)
s.bookClosed = false
return s
}
// Serialize writes Point data to the given writer, conforming to the
// AKUMULI RESP protocol. Serializer adds extra data to guide data loader.
// This function writes output that contains binary and text data in RESP format.
func (s *Serializer) Serialize(p *data.Point, w io.Writer) (err error) {
deferPoint := false
buf := make([]byte, 0, 1024)
// Add cue
const HeaderLength = 8
buf = append(buf, placeholderText...)
buf = append(buf, "+"...)
// Series name
fieldKeys := p.FieldKeys()
measurementName := p.MeasurementName()
for i := 0; i < len(fieldKeys); i++ {
buf = append(buf, measurementName...)
buf = append(buf, '.')
buf = append(buf, fieldKeys[i]...)
if i+1 < len(fieldKeys) {
buf = append(buf, '|')
} else {
buf = append(buf, ' ')
}
}
tagKeys := p.TagKeys()
tagValues := p.TagValues()
for i := 0; i < len(tagKeys); i++ {
buf = append(buf, ' ')
buf = append(buf, tagKeys[i]...)
buf = append(buf, '=')
buf = append(buf, tagValues[i].(string)...)
}
series := string(buf[HeaderLength:])
if !s.bookClosed {
// Save point for later
if id, ok := s.book[series]; ok {
s.bookClosed = true
_, err = w.Write(s.deferred)
if err != nil {
return err
}
buf = buf[:HeaderLength]
buf = append(buf, fmt.Sprintf(":%d", id)...)
binary.LittleEndian.PutUint32(buf[:4], id)
} else {
// Shortcut
s.index++
tmp := make([]byte, 0, 1024)
tmp = append(tmp, placeholderText...)
tmp = append(tmp, "*2\n"...)
tmp = append(tmp, buf[HeaderLength:]...)
tmp = append(tmp, '\n')
tmp = append(tmp, fmt.Sprintf(":%d\n", s.index)...)
s.book[series] = s.index
// Update cue
binary.LittleEndian.PutUint16(tmp[4:6], uint16(len(tmp)))
binary.LittleEndian.PutUint16(tmp[6:HeaderLength], uint16(0))
binary.LittleEndian.PutUint32(tmp[:4], s.index)
binary.LittleEndian.PutUint32(buf[:4], s.index)
_, err = w.Write(tmp)
if err != nil {
return err
}
deferPoint = true
buf = buf[:HeaderLength]
buf = append(buf, fmt.Sprintf(":%d", s.index)...)
}
} else {
// Replace the series name with the value from the book
if id, ok := s.book[series]; ok {
buf = buf[:HeaderLength]
buf = append(buf, fmt.Sprintf(":%d", id)...)
binary.LittleEndian.PutUint16(buf[4:6], uint16(len(buf)))
binary.LittleEndian.PutUint16(buf[6:HeaderLength], uint16(0))
binary.LittleEndian.PutUint32(buf[:4], id)
} else {
return errors.New("unexpected series name")
}
}
buf = append(buf, '\n')
// Timestamp
buf = append(buf, ':')
buf = serialize.FastFormatAppend(p.Timestamp().UTC().UnixNano(), buf)
buf = append(buf, '\n')
// Values
fieldValues := p.FieldValues()
buf = append(buf, fmt.Sprintf("*%d\n", len(fieldValues))...)
for i := 0; i < len(fieldValues); i++ {
v := fieldValues[i]
switch v.(type) {
case int, int64:
buf = append(buf, ':')
case float64:
buf = append(buf, '+')
}
buf = serialize.FastFormatAppend(v, buf)
buf = append(buf, '\n')
}
// Update cue
binary.LittleEndian.PutUint16(buf[4:6], uint16(len(buf)))
binary.LittleEndian.PutUint16(buf[6:HeaderLength], uint16(len(fieldValues)))
if deferPoint {
s.deferred = append(s.deferred, buf...)
return nil
}
_, err = w.Write(buf)
return err
}