forked from googleapis/google-cloud-go
/
external.go
400 lines (351 loc) · 12.9 KB
/
external.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
// Copyright 2017 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bigquery
import (
"encoding/base64"
"unicode/utf8"
bq "google.golang.org/api/bigquery/v2"
)
// DataFormat describes the format of BigQuery table data.
type DataFormat string
// Constants describing the format of BigQuery table data.
const (
CSV DataFormat = "CSV"
Avro DataFormat = "AVRO"
JSON DataFormat = "NEWLINE_DELIMITED_JSON"
DatastoreBackup DataFormat = "DATASTORE_BACKUP"
GoogleSheets DataFormat = "GOOGLE_SHEETS"
Bigtable DataFormat = "BIGTABLE"
Parquet DataFormat = "PARQUET"
ORC DataFormat = "ORC"
)
// ExternalData is a table which is stored outside of BigQuery. It is implemented by
// *ExternalDataConfig.
// GCSReference also implements it, for backwards compatibility.
type ExternalData interface {
toBQ() bq.ExternalDataConfiguration
}
// ExternalDataConfig describes data external to BigQuery that can be used
// in queries and to create external tables.
type ExternalDataConfig struct {
// The format of the data. Required.
SourceFormat DataFormat
// The fully-qualified URIs that point to your
// data in Google Cloud. Required.
//
// For Google Cloud Storage URIs, each URI can contain one '*' wildcard character
// and it must come after the 'bucket' name. Size limits related to load jobs
// apply to external data sources.
//
// For Google Cloud Bigtable URIs, exactly one URI can be specified and it has be
// a fully specified and valid HTTPS URL for a Google Cloud Bigtable table.
//
// For Google Cloud Datastore backups, exactly one URI can be specified. Also,
// the '*' wildcard character is not allowed.
SourceURIs []string
// The schema of the data. Required for CSV and JSON; disallowed for the
// other formats.
Schema Schema
// Try to detect schema and format options automatically.
// Any option specified explicitly will be honored.
AutoDetect bool
// The compression type of the data.
Compression Compression
// IgnoreUnknownValues causes values not matching the schema to be
// tolerated. Unknown values are ignored. For CSV this ignores extra values
// at the end of a line. For JSON this ignores named values that do not
// match any column name. If this field is not set, records containing
// unknown values are treated as bad records. The MaxBadRecords field can
// be used to customize how bad records are handled.
IgnoreUnknownValues bool
// MaxBadRecords is the maximum number of bad records that will be ignored
// when reading data.
MaxBadRecords int64
// Additional options for CSV, GoogleSheets and Bigtable formats.
Options ExternalDataConfigOptions
}
func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
q := bq.ExternalDataConfiguration{
SourceFormat: string(e.SourceFormat),
SourceUris: e.SourceURIs,
Autodetect: e.AutoDetect,
Compression: string(e.Compression),
IgnoreUnknownValues: e.IgnoreUnknownValues,
MaxBadRecords: e.MaxBadRecords,
}
if e.Schema != nil {
q.Schema = e.Schema.toBQ()
}
if e.Options != nil {
e.Options.populateExternalDataConfig(&q)
}
return q
}
func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfig, error) {
e := &ExternalDataConfig{
SourceFormat: DataFormat(q.SourceFormat),
SourceURIs: q.SourceUris,
AutoDetect: q.Autodetect,
Compression: Compression(q.Compression),
IgnoreUnknownValues: q.IgnoreUnknownValues,
MaxBadRecords: q.MaxBadRecords,
Schema: bqToSchema(q.Schema),
}
switch {
case q.CsvOptions != nil:
e.Options = bqToCSVOptions(q.CsvOptions)
case q.GoogleSheetsOptions != nil:
e.Options = bqToGoogleSheetsOptions(q.GoogleSheetsOptions)
case q.BigtableOptions != nil:
var err error
e.Options, err = bqToBigtableOptions(q.BigtableOptions)
if err != nil {
return nil, err
}
}
return e, nil
}
// ExternalDataConfigOptions are additional options for external data configurations.
// This interface is implemented by CSVOptions, GoogleSheetsOptions and BigtableOptions.
type ExternalDataConfigOptions interface {
populateExternalDataConfig(*bq.ExternalDataConfiguration)
}
// CSVOptions are additional options for CSV external data sources.
type CSVOptions struct {
// AllowJaggedRows causes missing trailing optional columns to be tolerated
// when reading CSV data. Missing values are treated as nulls.
AllowJaggedRows bool
// AllowQuotedNewlines sets whether quoted data sections containing
// newlines are allowed when reading CSV data.
AllowQuotedNewlines bool
// Encoding is the character encoding of data to be read.
Encoding Encoding
// FieldDelimiter is the separator for fields in a CSV file, used when
// reading or exporting data. The default is ",".
FieldDelimiter string
// Quote is the value used to quote data sections in a CSV file. The
// default quotation character is the double quote ("), which is used if
// both Quote and ForceZeroQuote are unset.
// To specify that no character should be interpreted as a quotation
// character, set ForceZeroQuote to true.
// Only used when reading data.
Quote string
ForceZeroQuote bool
// The number of rows at the top of a CSV file that BigQuery will skip when
// reading data.
SkipLeadingRows int64
}
func (o *CSVOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) {
c.CsvOptions = &bq.CsvOptions{
AllowJaggedRows: o.AllowJaggedRows,
AllowQuotedNewlines: o.AllowQuotedNewlines,
Encoding: string(o.Encoding),
FieldDelimiter: o.FieldDelimiter,
Quote: o.quote(),
SkipLeadingRows: o.SkipLeadingRows,
}
}
// quote returns the CSV quote character, or nil if unset.
func (o *CSVOptions) quote() *string {
if o.ForceZeroQuote {
quote := ""
return "e
}
if o.Quote == "" {
return nil
}
return &o.Quote
}
func (o *CSVOptions) setQuote(ps *string) {
if ps != nil {
o.Quote = *ps
if o.Quote == "" {
o.ForceZeroQuote = true
}
}
}
func bqToCSVOptions(q *bq.CsvOptions) *CSVOptions {
o := &CSVOptions{
AllowJaggedRows: q.AllowJaggedRows,
AllowQuotedNewlines: q.AllowQuotedNewlines,
Encoding: Encoding(q.Encoding),
FieldDelimiter: q.FieldDelimiter,
SkipLeadingRows: q.SkipLeadingRows,
}
o.setQuote(q.Quote)
return o
}
// GoogleSheetsOptions are additional options for GoogleSheets external data sources.
type GoogleSheetsOptions struct {
// The number of rows at the top of a sheet that BigQuery will skip when
// reading data.
SkipLeadingRows int64
}
func (o *GoogleSheetsOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) {
c.GoogleSheetsOptions = &bq.GoogleSheetsOptions{
SkipLeadingRows: o.SkipLeadingRows,
}
}
func bqToGoogleSheetsOptions(q *bq.GoogleSheetsOptions) *GoogleSheetsOptions {
return &GoogleSheetsOptions{
SkipLeadingRows: q.SkipLeadingRows,
}
}
// BigtableOptions are additional options for Bigtable external data sources.
type BigtableOptions struct {
// A list of column families to expose in the table schema along with their
// types. If omitted, all column families are present in the table schema and
// their values are read as BYTES.
ColumnFamilies []*BigtableColumnFamily
// If true, then the column families that are not specified in columnFamilies
// list are not exposed in the table schema. Otherwise, they are read with BYTES
// type values. The default is false.
IgnoreUnspecifiedColumnFamilies bool
// If true, then the rowkey column families will be read and converted to string.
// Otherwise they are read with BYTES type values and users need to manually cast
// them with CAST if necessary. The default is false.
ReadRowkeyAsString bool
}
func (o *BigtableOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) {
q := &bq.BigtableOptions{
IgnoreUnspecifiedColumnFamilies: o.IgnoreUnspecifiedColumnFamilies,
ReadRowkeyAsString: o.ReadRowkeyAsString,
}
for _, f := range o.ColumnFamilies {
q.ColumnFamilies = append(q.ColumnFamilies, f.toBQ())
}
c.BigtableOptions = q
}
func bqToBigtableOptions(q *bq.BigtableOptions) (*BigtableOptions, error) {
b := &BigtableOptions{
IgnoreUnspecifiedColumnFamilies: q.IgnoreUnspecifiedColumnFamilies,
ReadRowkeyAsString: q.ReadRowkeyAsString,
}
for _, f := range q.ColumnFamilies {
f2, err := bqToBigtableColumnFamily(f)
if err != nil {
return nil, err
}
b.ColumnFamilies = append(b.ColumnFamilies, f2)
}
return b, nil
}
// BigtableColumnFamily describes how BigQuery should access a Bigtable column family.
type BigtableColumnFamily struct {
// Identifier of the column family.
FamilyID string
// Lists of columns that should be exposed as individual fields as opposed to a
// list of (column name, value) pairs. All columns whose qualifier matches a
// qualifier in this list can be accessed as .. Other columns can be accessed as
// a list through .Column field.
Columns []*BigtableColumn
// The encoding of the values when the type is not STRING. Acceptable encoding values are:
// - TEXT - indicates values are alphanumeric text strings.
// - BINARY - indicates values are encoded using HBase Bytes.toBytes family of functions.
// This can be overridden for a specific column by listing that column in 'columns' and
// specifying an encoding for it.
Encoding string
// If true, only the latest version of values are exposed for all columns in this
// column family. This can be overridden for a specific column by listing that
// column in 'columns' and specifying a different setting for that column.
OnlyReadLatest bool
// The type to convert the value in cells of this
// column family. The values are expected to be encoded using HBase
// Bytes.toBytes function when using the BINARY encoding value.
// Following BigQuery types are allowed (case-sensitive):
// BYTES STRING INTEGER FLOAT BOOLEAN.
// The default type is BYTES. This can be overridden for a specific column by
// listing that column in 'columns' and specifying a type for it.
Type string
}
func (b *BigtableColumnFamily) toBQ() *bq.BigtableColumnFamily {
q := &bq.BigtableColumnFamily{
FamilyId: b.FamilyID,
Encoding: b.Encoding,
OnlyReadLatest: b.OnlyReadLatest,
Type: b.Type,
}
for _, col := range b.Columns {
q.Columns = append(q.Columns, col.toBQ())
}
return q
}
func bqToBigtableColumnFamily(q *bq.BigtableColumnFamily) (*BigtableColumnFamily, error) {
b := &BigtableColumnFamily{
FamilyID: q.FamilyId,
Encoding: q.Encoding,
OnlyReadLatest: q.OnlyReadLatest,
Type: q.Type,
}
for _, col := range q.Columns {
c, err := bqToBigtableColumn(col)
if err != nil {
return nil, err
}
b.Columns = append(b.Columns, c)
}
return b, nil
}
// BigtableColumn describes how BigQuery should access a Bigtable column.
type BigtableColumn struct {
// Qualifier of the column. Columns in the parent column family that have this
// exact qualifier are exposed as . field. The column field name is the
// same as the column qualifier.
Qualifier string
// If the qualifier is not a valid BigQuery field identifier i.e. does not match
// [a-zA-Z][a-zA-Z0-9_]*, a valid identifier must be provided as the column field
// name and is used as field name in queries.
FieldName string
// If true, only the latest version of values are exposed for this column.
// See BigtableColumnFamily.OnlyReadLatest.
OnlyReadLatest bool
// The encoding of the values when the type is not STRING.
// See BigtableColumnFamily.Encoding
Encoding string
// The type to convert the value in cells of this column.
// See BigtableColumnFamily.Type
Type string
}
func (b *BigtableColumn) toBQ() *bq.BigtableColumn {
q := &bq.BigtableColumn{
FieldName: b.FieldName,
OnlyReadLatest: b.OnlyReadLatest,
Encoding: b.Encoding,
Type: b.Type,
}
if utf8.ValidString(b.Qualifier) {
q.QualifierString = b.Qualifier
} else {
q.QualifierEncoded = base64.RawStdEncoding.EncodeToString([]byte(b.Qualifier))
}
return q
}
func bqToBigtableColumn(q *bq.BigtableColumn) (*BigtableColumn, error) {
b := &BigtableColumn{
FieldName: q.FieldName,
OnlyReadLatest: q.OnlyReadLatest,
Encoding: q.Encoding,
Type: q.Type,
}
if q.QualifierString != "" {
b.Qualifier = q.QualifierString
} else {
bytes, err := base64.RawStdEncoding.DecodeString(q.QualifierEncoded)
if err != nil {
return nil, err
}
b.Qualifier = string(bytes)
}
return b, nil
}