-
Notifications
You must be signed in to change notification settings - Fork 51
/
external-data.go
138 lines (117 loc) · 3.34 KB
/
external-data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package clickhouse
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"mime/multipart"
"net/url"
"os"
"path"
"strings"
"github.com/lomik/graphite-clickhouse/pkg/scope"
"go.uber.org/zap"
)
// ExternalTable is a structure to use ClickHouse feature that creates a temporary table for a query
type ExternalTable struct {
// Table name
Name string
Columns []Column
// ClickHouse input/output format
Format string
Data []byte
}
// Column is a pair of Name and Type for temporary table structure
type Column struct {
Name string
// ClickHouse data type
Type string
}
func (c *Column) String() string {
return c.Name + " " + c.Type
}
// ExternalData is a type to use ClickHouse external data feature. You could use it to pass multiple
// temporary tables for a query.
type ExternalData struct {
Tables []ExternalTable
debug *extDataDebug
}
type extDataDebug struct {
dir string
perm os.FileMode
}
// NewExternalData returns the `*ExternalData` object for `tables`
func NewExternalData(tables ...ExternalTable) *ExternalData {
return &ExternalData{Tables: tables, debug: nil}
}
// SetDebug sets the directory and file permission for an external table data dump. Works only if
// both `debugDir` and `perm` are set
func (e *ExternalData) SetDebug(debugDir string, perm os.FileMode) {
if debugDir == "" || perm == 0 {
e.debug = nil
}
e.debug = &extDataDebug{debugDir, perm}
return
}
// buildBody returns multiform body, content type header and error
func (e *ExternalData) buildBody(ctx context.Context, u *url.URL) (*bytes.Buffer, string, error) {
body := new(bytes.Buffer)
header := ""
writer := multipart.NewWriter(body)
for _, t := range e.Tables {
part, err := writer.CreateFormFile(t.Name, t.Name)
if err != nil {
return nil, header, err
}
// Send each table in separated form
_, err = part.Write(t.Data)
if err != nil {
return nil, header, err
}
// Set name_format and name_structure for the table
q := u.Query()
if t.Format != "" {
q.Set(t.Name+"_format", t.Format)
}
structure := make([]string, 0, len(t.Columns))
for _, c := range t.Columns {
structure = append(structure, c.String())
}
q.Set(t.Name+"_structure", strings.Join(structure, ","))
u.RawQuery = q.Encode()
}
err := writer.Close()
if err != nil {
return nil, header, err
}
header = writer.FormDataContentType()
du := *u
// Do not lock the execution by debugging process
go e.debugDump(ctx, du)
return body, header, nil
}
func (e *ExternalData) debugDump(ctx context.Context, u url.URL) {
if e.debug == nil || !scope.Debug(ctx, "External-Data") {
// Do not dump if the settings are not set
return
}
requestID := scope.RequestID(ctx)
logger := scope.Logger(ctx)
command := "curl "
for _, t := range e.Tables {
filename := path.Join(e.debug.dir, fmt.Sprintf("ext-%v:%v.%v", t.Name, requestID, t.Format))
err := ioutil.WriteFile(filename, t.Data, e.debug.perm)
if err != nil {
logger.Warn("external-data", zap.Error(err))
// The debug command couldn't be built w/o all external tables
return
}
command += fmt.Sprintf("-F '%v=@%v;' ", t.Name, filename)
}
// Change query_id to not interfere with the original one
q := u.Query()
q["query_id"] = []string{fmt.Sprintf("%v:debug", requestID)}
u.RawQuery = q.Encode()
command += "'" + u.Redacted() + "'"
logger.Info("external-data", zap.String("debug command", command))
}