forked from influxdata/influxdb-comparisons
/
serializer_timescale.go
148 lines (130 loc) · 3.96 KB
/
serializer_timescale.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package common
import (
"encoding/binary"
"fmt"
"github.com/influxdata/influxdb-comparisons/timescale_serializaition"
"io"
"log"
"reflect"
)
type SerializerTimescaleSql struct {
}
func NewSerializerTimescaleSql() *SerializerTimescaleSql {
return &SerializerTimescaleSql{}
}
type SerializerTimescaleBin struct {
}
func NewSerializerTimescaleBin() *SerializerTimescaleBin {
return &SerializerTimescaleBin{}
}
// SerializePoint writes Point data to the given writer, conforming to the
// Timescale insert format.
//
// This function writes output that looks like:
// INSERT INTO <tablename> (time,<tag_name list>,<field_name list>') VALUES (<timestamp in nanoseconds>, <tag values list>, <field values>)
func (s *SerializerTimescaleSql) SerializePoint(w io.Writer, p *Point) (err error) {
timestampNanos := p.Timestamp.UTC().UnixNano()
buf := make([]byte, 0, 4096)
buf = append(buf, []byte("INSERT INTO ")...)
buf = append(buf, []byte(p.MeasurementName)...)
buf = append(buf, []byte(" (time")...)
for i := 0; i < len(p.TagKeys); i++ {
buf = append(buf, ","...)
buf = append(buf, p.TagKeys[i]...)
}
for i := 0; i < len(p.FieldKeys); i++ {
buf = append(buf, ","...)
buf = append(buf, p.FieldKeys[i]...)
}
buf = append(buf, []byte(") VALUES (")...)
buf = append(buf, []byte(fmt.Sprintf("%d", timestampNanos))...)
for i := 0; i < len(p.TagValues); i++ {
buf = append(buf, ",'"...)
buf = append(buf, p.TagValues[i]...)
buf = append(buf, byte('\''))
}
for i := 0; i < len(p.FieldValues); i++ {
buf = append(buf, ","...)
v := p.FieldValues[i]
buf = fastFormatAppend(v, buf, true)
}
buf = append(buf, []byte(");\n")...)
_, err = w.Write(buf)
if err != nil {
return err
}
return nil
}
func (s *SerializerTimescaleSql) SerializeSize(w io.Writer, points int64, values int64) error {
return serializeSizeInText(w, points, values)
}
// SerializeTimeScaleBin writes Point data to the given writer, conforming to the
// Binary GOP encoded format to write
//
//
func (t *SerializerTimescaleBin) SerializePoint(w io.Writer, p *Point) (err error) {
var f timescale_serialization.FlatPoint
f.MeasurementName = string(p.MeasurementName)
// Write the batch.
f.Columns = make([]string, len(p.TagKeys)+len(p.FieldKeys)+1)
c := 0
for i := 0; i < len(p.TagKeys); i++ {
f.Columns[c] = string(p.TagKeys[i])
c++
}
for i := 0; i < len(p.FieldKeys); i++ {
f.Columns[c] = string(p.FieldKeys[i])
c++
}
f.Columns[c] = "time"
c = 0
f.Values = make([]*timescale_serialization.FlatPoint_FlatPointValue, len(p.TagValues)+len(p.FieldValues)+1)
for i := 0; i < len(p.TagValues); i++ {
v := timescale_serialization.FlatPoint_FlatPointValue{}
v.Type = timescale_serialization.FlatPoint_STRING
v.StringVal = string(p.TagValues[i])
f.Values[c] = &v
c++
}
for i := 0; i < len(p.FieldValues); i++ {
v := timescale_serialization.FlatPoint_FlatPointValue{}
switch p.FieldValues[i].(type) {
case int64:
v.Type = timescale_serialization.FlatPoint_INTEGER
v.IntVal = p.FieldValues[i].(int64)
break
case int:
v.Type = timescale_serialization.FlatPoint_INTEGER
v.IntVal = int64(p.FieldValues[i].(int))
break
case float64:
v.Type = timescale_serialization.FlatPoint_FLOAT
v.DoubleVal = p.FieldValues[i].(float64)
break
case string:
v.Type = timescale_serialization.FlatPoint_STRING
v.StringVal = p.FieldValues[i].(string)
break
default:
panic(fmt.Sprintf("logic error in timescale serialization, %s", reflect.TypeOf(v)))
}
f.Values[c] = &v
c++
}
timeVal := timescale_serialization.FlatPoint_FlatPointValue{}
timeVal.Type = timescale_serialization.FlatPoint_INTEGER
timeVal.IntVal = p.Timestamp.UnixNano()
f.Values[c] = &timeVal
out, err := f.Marshal()
if err != nil {
log.Fatal(err)
}
s := uint64(len(out))
binary.Write(w, binary.LittleEndian, s)
w.Write(out)
return nil
}
func (s *SerializerTimescaleBin) SerializeSize(w io.Writer, points int64, values int64) error {
//return serializeSizeInText(w, points, values)
return nil
}