forked from distribution/distribution
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bigquery.go
368 lines (326 loc) · 9.6 KB
/
bigquery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"container/list"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math"
"math/rand"
"net/http"
"os"
"strconv"
"strings"
"time"
bigquery "google.golang.org/api/bigquery/v2"
storage "google.golang.org/api/storage/v1"
)
const (
GB = 1 << 30
MaxBackoff = 30000
BaseBackoff = 250
BackoffGrowthFactor = 1.8
BackoffGrowthDamper = 0.25
JobStatusDone = "DONE"
DatasetAlreadyExists = "Already Exists: Dataset"
TableWriteEmptyDisposition = "WRITE_EMPTY"
)
func init() {
scope := fmt.Sprintf("%s %s %s", bigquery.BigqueryScope,
storage.DevstorageReadOnlyScope,
"https://www.googleapis.com/auth/userinfo.profile")
registerDemo("bigquery", scope, bqMain)
}
// This example demonstrates loading objects from Google Cloud Storage into
// BigQuery. Objects are specified by their bucket and a name prefix. Each
// object will be loaded into a new table identified by the object name minus
// any file extension. All tables are added to the specified dataset (one will
// be created if necessary). Currently, tables will not be overwritten and an
// attempt to load an object into a dataset that already contains its table
// will emit an error message indicating the table already exists.
// A schema file must be provided and it will be applied to every object/table.
// Example usage:
// go-api-demo -clientid="my-clientid" -secret="my-secret" bq myProject
// myDataBucket datafile2013070 DataFiles2013
// ./datafile_schema.json 100
//
// This will load all objects (e.g. all data files from July 2013) from
// gs://myDataBucket into a (possibly new) BigQuery dataset named DataFiles2013
// using the schema file provided and allowing up to 100 bad records. Assuming
// each object is named like datafileYYYYMMDD.csv.gz and all of July's files are
// stored in the bucket, 9 tables will be created named like datafile201307DD
// where DD ranges from 01 to 09, inclusive.
// When the program completes, it will emit a results line similar to:
//
// 9 files loaded in 3m58s (18m2.708s). Size: 7.18GB Rows: 7130725
//
// The total elapsed time from the start of first job to the end of the last job
// (effectively wall clock time) is shown. In parenthesis is the aggregate time
// taken to load all tables.
func bqMain(client *http.Client, argv []string) {
if len(argv) != 6 {
fmt.Fprintln(os.Stderr,
"Usage: bq project_id bucket prefix dataset schema max_bad_records")
return
}
var (
project = argv[0]
bucket = argv[1]
objPrefix = argv[2]
datasetId = argv[3]
schemaFile = argv[4]
)
badRecords, err := strconv.ParseInt(argv[5], 10, 64)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
rand.Seed(time.Now().UnixNano())
service, err := storage.New(client)
if err != nil {
log.Fatalf("Unable to create Storage service: %v", err)
}
// Get the list of objects in the bucket matching the specified prefix.
list := service.Objects.List(bucket)
list.Prefix(objPrefix)
objects, err := list.Do()
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
// Create the wrapper and insert the (new) dataset.
dataset, err := newBQDataset(client, project, datasetId)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
if err = dataset.insert(true); err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
objectSource := &tableSource{
maxBadRecords: badRecords,
disposition: TableWriteEmptyDisposition,
}
// Load the schema from disk.
f, err := ioutil.ReadFile(schemaFile)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
if err = json.Unmarshal(f, &objectSource.schema); err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
// Assumes all objects have .csv, .csv.gz (or no) extension.
tableIdFromObject := func(name string) string {
return strings.TrimSuffix(strings.TrimSuffix(name, ".gz"), ".csv")
}
// A jobset is way to group a collection of jobs together for monitoring.
// For this example, we just use the name of the bucket and object prefix.
jobset := fmt.Sprintf("%s:%s", bucket, objPrefix)
fmt.Fprintf(os.Stderr, "\nLoading %d objects.\n", len(objects.Items))
// Load each object into a dataset of the same name (minus any extension).
// A successful insert call will inject the job into our queue for monitoring.
for _, o := range objects.Items {
objectSource.id = tableIdFromObject(o.Name)
objectSource.uri = fmt.Sprintf("gs://%s/%s", o.Bucket, o.Name)
if err = dataset.load(jobset, objectSource); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}
dataset.monitor(jobset)
}
// Wraps the BigQuery service and dataset and provides some helper functions.
type bqDataset struct {
project string
id string
bq *bigquery.Service
dataset *bigquery.Dataset
jobsets map[string]*list.List
}
func newBQDataset(client *http.Client, dsProj string, dsId string) (*bqDataset,
error) {
service, err := bigquery.New(client)
if err != nil {
log.Fatalf("Unable to create BigQuery service: %v", err)
}
return &bqDataset{
project: dsProj,
id: dsId,
bq: service,
dataset: &bigquery.Dataset{
DatasetReference: &bigquery.DatasetReference{
DatasetId: dsId,
ProjectId: dsProj,
},
},
jobsets: make(map[string]*list.List),
}, nil
}
func (ds *bqDataset) insert(existsOK bool) error {
call := ds.bq.Datasets.Insert(ds.project, ds.dataset)
_, err := call.Do()
if err != nil && (!existsOK || !strings.Contains(err.Error(),
DatasetAlreadyExists)) {
return err
}
return nil
}
type tableSource struct {
id string
uri string
schema bigquery.TableSchema
maxBadRecords int64
disposition string
}
func (ds *bqDataset) load(jobset string, source *tableSource) error {
job := &bigquery.Job{
Configuration: &bigquery.JobConfiguration{
Load: &bigquery.JobConfigurationLoad{
DestinationTable: &bigquery.TableReference{
DatasetId: ds.dataset.DatasetReference.DatasetId,
ProjectId: ds.project,
TableId: source.id,
},
MaxBadRecords: source.maxBadRecords,
Schema: &source.schema,
SourceUris: []string{source.uri},
WriteDisposition: source.disposition,
},
},
}
call := ds.bq.Jobs.Insert(ds.project, job)
job, err := call.Do()
if err != nil {
return err
}
_, ok := ds.jobsets[jobset]
if !ok {
ds.jobsets[jobset] = list.New()
}
ds.jobsets[jobset].PushBack(job)
return nil
}
func (ds *bqDataset) getJob(id string) (*bigquery.Job, error) {
return ds.bq.Jobs.Get(ds.project, id).Do()
}
func (ds *bqDataset) monitor(jobset string) {
jobq, ok := ds.jobsets[jobset]
if !ok {
return
}
var backoff float64 = BaseBackoff
pause := func(grow bool) {
if grow {
backoff *= BackoffGrowthFactor
backoff -= (backoff * rand.Float64() * BackoffGrowthDamper)
backoff = math.Min(backoff, MaxBackoff)
fmt.Fprintf(os.Stderr, "[%s] Checking remaining %d jobs...\n", jobset,
1+jobq.Len())
}
time.Sleep(time.Duration(backoff) * time.Millisecond)
}
var stats jobStats
// Track a 'head' pending job in queue for detecting cycling.
head := ""
// Loop until all jobs are done - with either success or error.
for jobq.Len() > 0 {
jel := jobq.Front()
job := jel.Value.(*bigquery.Job)
jobq.Remove(jel)
jid := job.JobReference.JobId
loop := false
// Check and possibly pick a new head job id.
if len(head) == 0 {
head = jid
} else {
if jid == head {
loop = true
}
}
// Retrieve the job's current status.
pause(loop)
j, err := ds.getJob(jid)
if err != nil {
fmt.Fprintln(os.Stderr, err)
// In this case of a transient API error, we want keep the job.
if j == nil {
jobq.PushBack(job)
} else {
// Must reset head tracker if job is discarded.
if loop {
head = ""
backoff = BaseBackoff
}
}
continue
}
// Reassign with the updated job data (from Get).
// We don't use j here as Get might return nil for this value.
job = j
if job.Status.State != JobStatusDone {
jobq.PushBack(job)
continue
}
if res := job.Status.ErrorResult; res != nil {
fmt.Fprintln(os.Stderr, res.Message)
} else {
stat := job.Statistics
lstat := stat.Load
stats.files += 1
stats.bytesIn += lstat.InputFileBytes
stats.bytesOut += lstat.OutputBytes
stats.rows += lstat.OutputRows
stats.elapsed +=
time.Duration(stat.EndTime-stat.StartTime) * time.Millisecond
if stats.start.IsZero() {
stats.start = time.Unix(stat.StartTime/1000, 0)
} else {
t := time.Unix(stat.StartTime/1000, 0)
if stats.start.Sub(t) > 0 {
stats.start = t
}
}
if stats.finish.IsZero() {
stats.finish = time.Unix(stat.EndTime/1000, 0)
} else {
t := time.Unix(stat.EndTime/1000, 0)
if t.Sub(stats.finish) > 0 {
stats.finish = t
}
}
}
// When the head job is processed reset the backoff since the loads
// run in BQ in parallel.
if loop {
head = ""
backoff = BaseBackoff
}
}
fmt.Fprintf(os.Stderr, "%#v\n", stats)
}
type jobStats struct {
// Number of files (sources) loaded.
files int64
// Bytes read from source (possibly compressed).
bytesIn int64
// Bytes loaded into BigQuery (uncompressed).
bytesOut int64
// Rows loaded into BigQuery.
rows int64
// Time taken to load source into table.
elapsed time.Duration
// Start time of the job.
start time.Time
// End time of the job.
finish time.Time
}
func (s jobStats) GoString() string {
return fmt.Sprintf("\n%d files loaded in %v (%v). Size: %.2fGB Rows: %d\n",
s.files, s.finish.Sub(s.start), s.elapsed, float64(s.bytesOut)/GB,
s.rows)
}