-
Notifications
You must be signed in to change notification settings - Fork 51
/
clickhouse.go
133 lines (109 loc) · 2.86 KB
/
clickhouse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package clickhouse
import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/uber-go/zap"
"github.com/lomik/graphite-clickhouse/helper/log"
)
var ErrUvarintRead = errors.New("ReadUvarint: Malformed array")
var ErrUvarintOverflow = errors.New("ReadUvarint: varint overflows a 64-bit integer")
var ErrClickHouseResponse = errors.New("Malformed response from clickhouse")
func formatSQL(q string) string {
s := strings.Split(q, "\n")
for i := 0; i < len(s); i++ {
s[i] = strings.TrimSpace(s[i])
}
return strings.Join(s, " ")
}
func Escape(s string) string {
s = strings.Replace(s, `\`, `\\`, -1)
s = strings.Replace(s, `'`, `\'`, -1)
return s
}
func Query(ctx context.Context, dsn string, query string, timeout time.Duration) ([]byte, error) {
return Post(ctx, dsn, query, nil, timeout)
}
func Post(ctx context.Context, dsn string, query string, postBody io.Reader, timeout time.Duration) ([]byte, error) {
return do(ctx, dsn, query, postBody, false, timeout)
}
func PostGzip(ctx context.Context, dsn string, query string, postBody io.Reader, timeout time.Duration) ([]byte, error) {
return do(ctx, dsn, query, postBody, true, timeout)
}
func do(ctx context.Context, dsn string, query string, postBody io.Reader, gzip bool, timeout time.Duration) (body []byte, err error) {
start := time.Now()
logger := log.FromContext(ctx)
queryForLogger := query
if len(queryForLogger) > 500 {
queryForLogger = queryForLogger[:495] + "<...>"
}
logger = logger.With(zap.String("query", formatSQL(queryForLogger)))
defer func() {
d := time.Since(start)
log := logger.With(
zap.Duration("runtime_ns", d),
zap.String("runtime", d.String()),
)
// fmt.Println(time.Since(start), formatSQL(queryForLogger))
if err != nil {
log.Error("query", zap.Error(err))
} else {
log.Info("query")
}
}()
p, err := url.Parse(dsn)
if err != nil {
return
}
if postBody != nil {
q := p.Query()
q.Set("query", query)
p.RawQuery = q.Encode()
} else {
postBody = strings.NewReader(query)
}
url := p.String()
req, err := http.NewRequest("POST", url, postBody)
if err != nil {
return
}
if gzip {
req.Header.Add("Content-Encoding", "gzip")
}
client := &http.Client{Timeout: timeout}
resp, err := client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
body, _ = ioutil.ReadAll(resp.Body)
if resp.StatusCode != 200 {
err = fmt.Errorf("clickhouse response status %d: %s", resp.StatusCode, string(body))
return
}
return
}
func ReadUvarint(array []byte) (uint64, int, error) {
var x uint64
var s uint
l := len(array) - 1
for i := 0; ; i++ {
if i > l {
return x, i + 1, ErrUvarintRead
}
if array[i] < 0x80 {
if i > 9 || i == 9 && array[i] > 1 {
return x, i + 1, ErrUvarintOverflow
}
return x | uint64(array[i])<<s, i + 1, nil
}
x |= uint64(array[i]&0x7f) << s
s += 7
}
}