/
serializer_mongodb.go
143 lines (124 loc) · 4.94 KB
/
serializer_mongodb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package common
import (
"encoding/binary"
"fmt"
"github.com/google/flatbuffers/go"
"github.com/taosdata/timeseriesdatabase-comparisons/mongo_serialization"
"io"
"reflect"
)
type SerializerMongo struct {
}
func NewSerializerMongo() *SerializerMongo {
return &SerializerMongo{}
}
// SerializeMongo writes Point data to the given writer, conforming to the
// mongo_serialization FlatBuffers format.
func (s *SerializerMongo) SerializePoint(w io.Writer, p *Point) (err error) {
// Prepare the series id prefix, which is the set of tags associated
// with this point. The series id prefix is the base of each value's
// particular collection name:
lenBuf := bufPool8.Get().([]byte)
// Prepare the timestamp, which is the same for each value in this
// Point:
timestampNanos := p.Timestamp.UTC().UnixNano()
// Fetch a flatbuffers builder from a pool:
builder := fbBuilderPool.Get().(*flatbuffers.Builder)
// For each field in this Point, serialize its:
// collection name (series id prefix + the name of the value)
// timestamp in nanos (int64)
// numeric value (int, int64, or float64 -- determined by reflection)
tagOffsets := make([]flatbuffers.UOffsetT, 0, len(p.TagKeys))
fieldOffsets := make([]flatbuffers.UOffsetT, 0, len(p.FieldKeys))
// write the tag data, which must be separate:
for i := 0; i < len(p.TagKeys); i++ {
keyData := builder.CreateByteVector(p.TagKeys[i])
valData := builder.CreateByteVector(p.TagValues[i])
mongo_serialization.TagStart(builder)
mongo_serialization.TagAddKey(builder, keyData)
mongo_serialization.TagAddVal(builder, valData)
tagOffset := mongo_serialization.TagEnd(builder)
tagOffsets = append(tagOffsets, tagOffset)
}
mongo_serialization.ItemStartTagsVector(builder, len(tagOffsets))
for _, tagOffset := range tagOffsets {
builder.PrependUOffsetT(tagOffset)
}
tagsVecOffset := builder.EndVector(len(tagOffsets))
// write the field data, which must be separate:
for i := 0; i < len(p.FieldKeys); i++ {
keyData := builder.CreateByteVector(p.FieldKeys[i])
mongo_serialization.FieldStart(builder)
mongo_serialization.FieldAddKey(builder, keyData)
genericValue := p.FieldValues[i]
switch v := genericValue.(type) {
// (We can't switch on sets of types (e.g. int, int64) because that does not make v concrete.)
case int:
mongo_serialization.FieldAddValueType(builder, mongo_serialization.ValueTypeInt)
mongo_serialization.FieldAddIntValue(builder, int32(v))
case int32:
mongo_serialization.FieldAddValueType(builder, mongo_serialization.ValueTypeInt)
mongo_serialization.FieldAddIntValue(builder, v)
case int64:
mongo_serialization.FieldAddValueType(builder, mongo_serialization.ValueTypeLong)
mongo_serialization.FieldAddLongValue(builder, v)
case float32:
mongo_serialization.FieldAddValueType(builder, mongo_serialization.ValueTypeFloat)
mongo_serialization.FieldAddFloatValue(builder, v)
case float64:
mongo_serialization.FieldAddValueType(builder, mongo_serialization.ValueTypeDouble)
mongo_serialization.FieldAddDoubleValue(builder, v)
case string, []byte:
var stringOffset flatbuffers.UOffsetT
switch v2 := v.(type) {
case string:
stringOffset = builder.CreateString(v2)
case []byte:
stringOffset = builder.CreateByteVector(v2)
}
mongo_serialization.FieldAddValueType(builder, mongo_serialization.ValueTypeString)
mongo_serialization.FieldAddStringValue(builder, stringOffset)
default:
panic(fmt.Sprintf("logic error in mongo serialization, %s", reflect.TypeOf(v)))
}
fieldOffset := mongo_serialization.FieldEnd(builder)
fieldOffsets = append(fieldOffsets, fieldOffset)
}
mongo_serialization.ItemStartFieldsVector(builder, len(fieldOffsets))
for _, fieldOffset := range fieldOffsets {
builder.PrependUOffsetT(fieldOffset)
}
fieldsVecOffset := builder.EndVector(len(fieldOffsets))
// build the flatbuffer representing this point:
measurementNameOffset := builder.CreateByteVector(p.MeasurementName)
mongo_serialization.ItemStart(builder)
mongo_serialization.ItemAddTimestampNanos(builder, timestampNanos)
mongo_serialization.ItemAddMeasurementName(builder, measurementNameOffset)
mongo_serialization.ItemAddTags(builder, tagsVecOffset)
mongo_serialization.ItemAddFields(builder, fieldsVecOffset)
rootTable := mongo_serialization.ItemEnd(builder)
builder.Finish(rootTable)
// Access the finished byte slice representing this flatbuffer:
buf := builder.FinishedBytes()
// Write the metadata for the flatbuffer object:
binary.LittleEndian.PutUint64(lenBuf, uint64(len(buf)))
_, err = w.Write(lenBuf)
if err != nil {
return err
}
// Write the flatbuffer object:
_, err = w.Write(buf)
if err != nil {
return err
}
// Give the flatbuffers builder back to a pool:
builder.Reset()
fbBuilderPool.Put(builder)
// Give the 8-byte buf back to a pool:
bufPool8.Put(lenBuf)
return nil
}
func (s *SerializerMongo) SerializeSize(w io.Writer, points int64, values int64) error {
//return serializeSizeInText(w, points, values)
return nil
}