-
Notifications
You must be signed in to change notification settings - Fork 0
/
mongo.go
110 lines (96 loc) · 2.78 KB
/
mongo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package serialize
import (
"encoding/binary"
"fmt"
"io"
"sync"
flatbuffers "github.com/google/flatbuffers/go"
)
var fbBuilderPool = &sync.Pool{
New: func() interface{} {
return flatbuffers.NewBuilder(0)
},
}
// MongoSerializer writes a Point in a serialized form for MongoDB
type MongoSerializer struct{}
// Serialize writes Point data to the given Writer, using basic gob encoding
func (s *MongoSerializer) Serialize(p *Point, w io.Writer) (err error) {
b := fbBuilderPool.Get().(*flatbuffers.Builder)
timestampNanos := p.timestamp.UTC().UnixNano()
fieldsMap := make(map[string]interface{})
for i, val := range p.fieldKeys {
fieldsMap[string(val)] = p.fieldValues[i]
}
tagsMap := make(map[string]string)
for i, val := range p.tagKeys {
tagsMap[string(val)] = string(p.tagValues[i])
}
tags := []flatbuffers.UOffsetT{}
// In order to keep the ordering the same on deserialization, we need
// to go in reverse order since we are prepending rather than appending.
for i := len(p.tagKeys); i > 0; i-- {
k := string(p.tagKeys[i-1])
key := b.CreateString(k)
val := b.CreateString(tagsMap[k])
MongoTagStart(b)
MongoTagAddKey(b, key)
MongoTagAddValue(b, val)
tags = append(tags, MongoTagEnd(b))
}
MongoPointStartTagsVector(b, len(tags))
for _, t := range tags {
b.PrependUOffsetT(t)
}
tagsArr := b.EndVector(len(tags))
fields := []flatbuffers.UOffsetT{}
// In order to keep the ordering the same on deserialization, we need
// to go in reverse order since we are prepending rather than appending.
for i := len(p.fieldKeys); i > 0; i-- {
k := string(p.fieldKeys[i-1])
key := b.CreateString(k)
MongoReadingStart(b)
MongoReadingAddKey(b, key)
v := fieldsMap[k]
switch val := v.(type) {
case float64:
MongoReadingAddValue(b, val)
case int:
MongoReadingAddValue(b, float64(val))
case int64:
MongoReadingAddValue(b, float64(val))
default:
panic(fmt.Sprintf("cannot covert %T to float64", val))
}
fields = append(fields, MongoReadingEnd(b))
}
MongoPointStartFieldsVector(b, len(fields))
for _, f := range fields {
b.PrependUOffsetT(f)
}
fieldsArr := b.EndVector(len(fields))
measurement := b.CreateString(string(p.measurementName))
MongoPointStart(b)
MongoPointAddMeasurementName(b, measurement)
MongoPointAddTimestamp(b, timestampNanos)
MongoPointAddTags(b, tagsArr)
MongoPointAddFields(b, fieldsArr)
point := MongoPointEnd(b)
b.Finish(point)
buf := b.FinishedBytes()
// Write the metadata for the flatbuffer object:
lenBuf := make([]byte, 8)
binary.LittleEndian.PutUint64(lenBuf, uint64(len(buf)))
_, err = w.Write(lenBuf)
if err != nil {
return err
}
// Write the flatbuffer object:
_, err = w.Write(buf)
if err != nil {
return err
}
// Give the flatbuffers builder back to a pool:
b.Reset()
fbBuilderPool.Put(b)
return nil
}