-
Notifications
You must be signed in to change notification settings - Fork 53
/
query.go
85 lines (66 loc) · 1.86 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package clickhouse
import (
"bufio"
"io"
"github.com/scratchdata/scratchdata/pkg/util"
)
func (s *ClickhouseServer) QueryNDJson(query string, writer io.Writer) error {
sanitized := util.TrimQuery(query)
sql := "SELECT * FROM (" + sanitized + ") FORMAT " + "JSONEachRow"
resp, err := s.httpQuery(sql)
if err != nil {
return err
}
defer resp.Close()
_, err = io.Copy(writer, resp)
return err
}
func (s *ClickhouseServer) QueryJSON(query string, writer io.Writer) error {
sanitized := util.TrimQuery(query)
sql := "SELECT * FROM (" + sanitized + ") FORMAT " + "JSONEachRow"
resp, err := s.httpQuery(sql)
if err != nil {
return err
}
defer resp.Close()
writer.Write([]byte("["))
// Treat the output as a linked list of text fragments.
// Each fragment could be a partial JSON line
var nextIsPrefix = true
var nextErr error = nil
var nextLine []byte
reader := bufio.NewReader(resp)
line, isPrefix, err := reader.ReadLine()
for {
// If we're at the end of our input, break
if err == io.EOF {
break
} else if err != nil {
return err
}
// Output the data
writer.Write(line)
// Check to see whether we are at the last row by looking for EOF
nextLine, nextIsPrefix, nextErr = reader.ReadLine()
// If the next row is not an EOF, then output a comma. This is to avoid a
// trailing comma in our JSON
if !isPrefix && nextErr != io.EOF {
writer.Write([]byte(","))
}
// Equivalent of "currentPointer = currentPointer.next"
line, isPrefix, err = nextLine, nextIsPrefix, nextErr
}
writer.Write([]byte("]"))
return nil
}
func (s *ClickhouseServer) QueryCSV(query string, writer io.Writer) error {
sanitized := util.TrimQuery(query)
sql := "SELECT * FROM (" + sanitized + ") FORMAT " + "CSVWithNames"
resp, err := s.httpQuery(sql)
if err != nil {
return err
}
defer resp.Close()
_, err = io.Copy(writer, resp)
return err
}