-
Notifications
You must be signed in to change notification settings - Fork 100
/
sql_store.go
377 lines (330 loc) · 11 KB
/
sql_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
package bigquery
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math/big"
"os"
"regexp"
"strings"
"time"
"cloud.google.com/go/bigquery"
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/compress"
"github.com/apache/arrow/go/v14/parquet/pqarrow"
"github.com/c2h5oh/datasize"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/observability"
"go.uber.org/zap"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)
// recommended size is 512MB - 1GB, entire data is buffered in memory before its written to disk
const rowGroupBufferSize = int64(datasize.MB) * 512
const _jsonDownloadLimitBytes = 100 * int64(datasize.MB)
// Regex to parse BigQuery SELECT ALL statement: SELECT * FROM `project_id.dataset.table`
var selectQueryRegex = regexp.MustCompile("(?i)^\\s*SELECT\\s+\\*\\s+FROM\\s+(`?[a-zA-Z0-9_.-]+`?)\\s*$")
// Query implements drivers.SQLStore
func (c *Connection) Query(ctx context.Context, props map[string]any) (drivers.RowIterator, error) {
return nil, drivers.ErrNotImplemented
}
// QueryAsFiles implements drivers.SQLStore
func (c *Connection) QueryAsFiles(ctx context.Context, props map[string]any, opt *drivers.QueryOption, p drivers.Progress) (drivers.FileIterator, error) {
srcProps, err := parseSourceProperties(props)
if err != nil {
return nil, err
}
opts, err := c.clientOption(ctx)
if err != nil {
return nil, err
}
var client *bigquery.Client
var it *bigquery.RowIterator
var fallbackToQueryExecution bool
match := selectQueryRegex.FindStringSubmatch(srcProps.SQL)
queryIsSelectAll := match != nil
if queryIsSelectAll {
// "SELECT * FROM `project_id.dataset.table`" statement so storage api might be used
// project_id and backticks are optional
fullTableName := match[1]
fullTableName = strings.Trim(fullTableName, "`")
var projectID, dataset, tableID string
parts := strings.Split(fullTableName, ".")
switch len(parts) {
case 2:
dataset, tableID = parts[0], parts[1]
projectID = srcProps.ProjectID
case 3:
projectID, dataset, tableID = parts[0], parts[1], parts[2]
default:
return nil, fmt.Errorf("invalid table format, `project_id.dataset.table` is expected")
}
client, err = createClient(ctx, srcProps.ProjectID, opts)
if err != nil {
return nil, err
}
if err = client.EnableStorageReadClient(ctx, opts...); err != nil {
client.Close()
return nil, err
}
table := client.DatasetInProject(projectID, dataset).Table(tableID)
// extract source metadata to ensure the source is a regular table or a snapshot
// as storage api doesn't support other types
metadata, err := table.Metadata(ctx)
if err != nil {
client.Close()
return nil, fmt.Errorf("source metadata cannot be extracted: %w", err)
}
if metadata.Type == bigquery.RegularTable || metadata.Type == bigquery.Snapshot {
it = table.Read(ctx)
} else {
c.logger.Debug("source is not a regular table or a snapshot, falling back to a query execution")
fallbackToQueryExecution = true
client.Close()
}
}
if !queryIsSelectAll || fallbackToQueryExecution {
// storage api cannot be used, switching to a query execution
now := time.Now()
client, err = createClient(ctx, srcProps.ProjectID, opts)
if err != nil {
return nil, err
}
if err := client.EnableStorageReadClient(ctx, opts...); err != nil {
client.Close()
return nil, err
}
q := client.Query(srcProps.SQL)
it, err = q.Read(ctx)
if err != nil && strings.Contains(err.Error(), "Response too large to return") {
// https://cloud.google.com/knowledge/kb/bigquery-response-too-large-to-return-consider-setting-allowlargeresults-to-true-in-your-job-configuration-000004266
client.Close()
return nil, fmt.Errorf("response too large, consider converting the source to a table and " +
"ingesting the entire table with 'select * from `project_id.dataset.tablename`'")
}
if err != nil && !strings.Contains(err.Error(), "Syntax error") {
// close the read storage API client
client.Close()
c.logger.Debug("query failed, retrying without storage api", zap.Error(err))
// the query results are always cached in a temporary table that storage api can use
// there are some exceptions when results aren't cached
// so we also try without storage api
client, err = createClient(ctx, srcProps.ProjectID, opts)
if err != nil {
return nil, err
}
q := client.Query(srcProps.SQL)
it, err = q.Read(ctx)
}
if err != nil {
client.Close()
return nil, err
}
c.logger.Debug("query took", zap.Duration("duration", time.Since(now)), observability.ZapCtx(ctx))
}
p.Target(int64(it.TotalRows), drivers.ProgressUnitRecord)
return &fileIterator{
client: client,
bqIter: it,
logger: c.logger,
limitInBytes: opt.TotalLimitInBytes,
progress: p,
totalRecords: int64(it.TotalRows),
ctx: ctx,
}, nil
}
func createClient(ctx context.Context, projectID string, opts []option.ClientOption) (*bigquery.Client, error) {
client, err := bigquery.NewClient(ctx, projectID, opts...)
if err != nil {
if strings.Contains(err.Error(), "unable to detect projectID") {
return nil, fmt.Errorf("projectID not detected in credentials. Please set `project_id` in source yaml")
}
return nil, fmt.Errorf("failed to create bigquery client: %w", err)
}
return client, nil
}
type fileIterator struct {
client *bigquery.Client
bqIter *bigquery.RowIterator
logger *zap.Logger
limitInBytes int64
progress drivers.Progress
totalRecords int64
tempFilePath string
downloaded bool
ctx context.Context // TODO :: refatcor NextBatch to take context on NextBatch
}
// Close implements drivers.FileIterator.
func (f *fileIterator) Close() error {
return os.Remove(f.tempFilePath)
}
// Next implements drivers.FileIterator.
// TODO :: currently it downloads all records in a single file. Need to check if it is efficient to ingest a single file with size in tens of GBs or more.
func (f *fileIterator) Next() ([]string, error) {
if f.downloaded {
return nil, io.EOF
}
// storage API not available so can't read as arrow records. Read results row by row and dump in a json file.
if !f.bqIter.IsAccelerated() {
f.logger.Debug("downloading results in json file", observability.ZapCtx(f.ctx))
if err := f.downloadAsJSONFile(); err != nil {
return nil, err
}
return []string{f.tempFilePath}, nil
}
f.logger.Debug("downloading results in parquet file", observability.ZapCtx(f.ctx))
// create a temp file
fw, err := os.CreateTemp("", "temp*.parquet")
if err != nil {
return nil, err
}
defer fw.Close()
f.tempFilePath = fw.Name()
f.downloaded = true
rdr, err := f.AsArrowRecordReader()
if err != nil {
return nil, err
}
defer rdr.Release()
tf := time.Now()
defer func() {
f.logger.Debug("time taken to write arrow records in parquet file", zap.Duration("duration", time.Since(tf)), observability.ZapCtx(f.ctx))
}()
writer, err := pqarrow.NewFileWriter(rdr.Schema(), fw,
parquet.NewWriterProperties(
parquet.WithCompression(compress.Codecs.Snappy),
parquet.WithRootRepetition(parquet.Repetitions.Required),
// duckdb has issues reading statistics of string type generated with this write
// column statistics may not be useful if full file need to be ingested so better to disable to save computations
parquet.WithStats(false),
),
pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema()))
if err != nil {
return nil, err
}
defer writer.Close()
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
// write arrow records to parquet file
for rdr.Next() {
select {
case <-f.ctx.Done():
return nil, f.ctx.Err()
case <-ticker.C:
fileInfo, err := os.Stat(fw.Name())
if err == nil { // ignore error
if fileInfo.Size() > f.limitInBytes {
return nil, drivers.ErrStorageLimitExceeded
}
}
default:
rec := rdr.Record()
f.progress.Observe(rec.NumRows(), drivers.ProgressUnitRecord)
if writer.RowGroupTotalBytesWritten() >= rowGroupBufferSize {
writer.NewBufferedRowGroup()
}
if err := writer.WriteBuffered(rec); err != nil {
return nil, err
}
}
}
if rdr.Err() != nil {
return nil, fmt.Errorf("file write failed with error: %w", rdr.Err())
}
writer.Close()
fw.Close()
fileInfo, err := os.Stat(fw.Name())
if err != nil {
return nil, err
}
f.logger.Debug("size of file", zap.String("size", datasize.ByteSize(fileInfo.Size()).HumanReadable()), observability.ZapCtx(f.ctx))
return []string{fw.Name()}, nil
}
// Size implements drivers.FileIterator.
func (f *fileIterator) Size(unit drivers.ProgressUnit) (int64, bool) {
switch unit {
case drivers.ProgressUnitRecord:
return f.totalRecords, true
case drivers.ProgressUnitFile:
return 1, true
default:
return 0, false
}
}
func (f *fileIterator) Format() string {
return ""
}
func (f *fileIterator) downloadAsJSONFile() error {
tf := time.Now()
defer func() {
f.logger.Debug("time taken to write row in json file", zap.Duration("duration", time.Since(tf)), observability.ZapCtx(f.ctx))
}()
// create a temp file
fw, err := os.CreateTemp("", "temp*.ndjson")
if err != nil {
return err
}
defer fw.Close()
f.tempFilePath = fw.Name()
f.downloaded = true
init := false
rows := 0
enc := json.NewEncoder(fw)
enc.SetEscapeHTML(false)
bigNumericFields := make([]string, 0)
for {
row := make(map[string]bigquery.Value)
err := f.bqIter.Next(&row)
if err != nil {
if errors.Is(err, iterator.Done) {
if !init {
return fmt.Errorf("no results found for the query")
}
return nil
}
return err
}
// schema and total rows is available after first call to next only
if !init {
init = true
f.progress.Target(int64(f.bqIter.TotalRows), drivers.ProgressUnitRecord)
for _, f := range f.bqIter.Schema {
if f.Type == bigquery.BigNumericFieldType {
bigNumericFields = append(bigNumericFields, f.Name)
}
}
}
// convert fields into a.b else fields are marshalled as a/b
for _, f := range bigNumericFields {
r, ok := row[f].(*big.Rat)
if !ok {
continue
}
num, exact := r.Float64()
if exact {
row[f] = num
} else { // number doesn't fit in float so cast to string,
row[f] = r.FloatString(38)
}
}
err = enc.Encode(row)
if err != nil {
return fmt.Errorf("conversion of row to json failed with error: %w", err)
}
// If we don't have storage API access, BigQuery may return massive JSON results. (But even with storage API access, it may return JSON for small results.)
// We want to avoid JSON for massive results. Currently, the only way to do so is to error at a limit.
rows++
if rows != 0 && rows%10000 == 0 { // Check file size every 10k rows
fileInfo, err := os.Stat(fw.Name())
if err != nil {
return fmt.Errorf("bigquery: failed to poll json file size: %w", err)
}
if fileInfo.Size() >= _jsonDownloadLimitBytes {
return fmt.Errorf("bigquery: json download exceeded limit of %d bytes (enable and provide access to the BigQuery Storage Read API to read larger results)", _jsonDownloadLimitBytes)
}
}
}
}
var _ drivers.FileIterator = &fileIterator{}