forked from googleapis/google-cloud-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
iterator.go
155 lines (141 loc) · 4.57 KB
/
iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bigquery
import (
"fmt"
"reflect"
"golang.org/x/net/context"
"google.golang.org/api/iterator"
)
// A pageFetcher returns a page of rows, starting from the row specified by token.
type pageFetcher interface {
fetch(ctx context.Context, s service, token string) (*readDataResult, error)
setPaging(*pagingConf)
}
func newRowIterator(ctx context.Context, s service, pf pageFetcher) *RowIterator {
it := &RowIterator{
ctx: ctx,
service: s,
pf: pf,
}
it.pageInfo, it.nextFunc = iterator.NewPageInfo(
it.fetch,
func() int { return len(it.rows) },
func() interface{} { r := it.rows; it.rows = nil; return r })
return it
}
// A RowIterator provides access to the result of a BigQuery lookup.
type RowIterator struct {
ctx context.Context
service service
pf pageFetcher
pageInfo *iterator.PageInfo
nextFunc func() error
// StartIndex can be set before the first call to Next. If PageInfo().Token
// is also set, StartIndex is ignored.
StartIndex uint64
rows [][]Value
schema Schema // populated on first call to fetch
structLoader structLoader // used to populate a pointer to a struct
}
// Next loads the next row into dst. Its return value is iterator.Done if there
// are no more results. Once Next returns iterator.Done, all subsequent calls
// will return iterator.Done.
//
// dst may implement ValueLoader, or may be a *[]Value, *map[string]Value, or struct pointer.
//
// If dst is a *[]Value, it will be set to to new []Value whose i'th element
// will be populated with the i'th column of the row.
//
// If dst is a *map[string]Value, a new map will be created if dst is nil. Then
// for each schema column name, the map key of that name is set to the column's
// value.
//
// If dst is pointer to a struct, each column in the schema will be matched
// with an exported field of the struct that has the same name, ignoring case.
// Unmatched schema columns and struct fields will be ignored.
//
// Each BigQuery column type corresponds to a single Go type; a matching struct
// field must be of the correct type. The correspondences are:
//
// Schema Go type
// STRING string
// BOOL bool
// INTEGER int
// FLOAT float64
// TIMESTAMP time.Time
//
// A repeated field corresponds to a slice of the element type.
// RECORD types (nested schemas) are not yet supported.
// All calls to Next on the same iterator must use the same struct type.
func (it *RowIterator) Next(dst interface{}) error {
var vl ValueLoader
switch dst := dst.(type) {
case ValueLoader:
vl = dst
case *[]Value:
vl = (*valueList)(dst)
case *map[string]Value:
vl = (*valueMap)(dst)
default:
if !isStructPtr(dst) {
return fmt.Errorf("bigquery: cannot convert %T to ValueLoader (need pointer to []Value, map[string]Value, or struct)", dst)
}
}
if err := it.nextFunc(); err != nil {
return err
}
row := it.rows[0]
it.rows = it.rows[1:]
if vl == nil {
// This can only happen if dst is a pointer to a struct. We couldn't
// set vl above because we need the schema.
if err := it.structLoader.set(dst, it.schema); err != nil {
return err
}
vl = &it.structLoader
}
return vl.Load(row, it.schema)
}
func isStructPtr(x interface{}) bool {
t := reflect.TypeOf(x)
return t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Struct
}
// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
func (it *RowIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
func (it *RowIterator) fetch(pageSize int, pageToken string) (string, error) {
pc := &pagingConf{}
if pageSize > 0 {
pc.recordsPerRequest = int64(pageSize)
pc.setRecordsPerRequest = true
}
if pageToken == "" {
pc.startIndex = it.StartIndex
}
it.pf.setPaging(pc)
var res *readDataResult
var err error
for {
res, err = it.pf.fetch(it.ctx, it.service, pageToken)
if err != errIncompleteJob {
break
}
}
if err != nil {
return "", err
}
it.rows = append(it.rows, res.rows...)
it.schema = res.schema
return res.pageToken, nil
}