-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclient.go
220 lines (192 loc) · 5.12 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
//go:build windows || darwin || linux || solaris || netbsd || openbsd || freebsd
// +build windows darwin linux solaris netbsd openbsd freebsd
package usp_bigquery
import (
"context"
"errors"
"fmt"
"github.com/refractionPOINT/go-uspclient"
"github.com/refractionPOINT/go-uspclient/protocol"
"strings"
"sync"
"time"
"cloud.google.com/go/bigquery"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)
type BigQueryAdapter struct {
conf BigQueryConfig
client *bigquery.Client
dataset *bigquery.Dataset
table *bigquery.Table
isStop uint32
wg sync.WaitGroup
uspClient *uspclient.Client
ctx context.Context
cancel context.CancelFunc
}
func (bq *BigQueryConfig) Validate() error {
if bq.ProjectId == "" {
return errors.New("missing project_id")
}
// this will usually be th same as projectID but could be different if using outside project dataset such as a public data set
if bq.BigQueryProject == "" {
return errors.New("missing bigquery project name")
}
if bq.DatasetName == "" {
return errors.New("missing dataset_name")
}
if bq.TableName == "" {
return errors.New("missing table_name")
}
if bq.SqlQuery == "" {
return errors.New("missing sql query")
}
return nil
}
func NewBigQueryAdapter(conf BigQueryConfig) (*BigQueryAdapter, chan struct{}, error) {
// Create bq cancellable context
ctx, cancel := context.WithCancel(context.Background())
bq := &BigQueryAdapter{
conf: conf,
ctx: ctx,
cancel: cancel,
}
var err error
if bq.conf.ServiceAccountCreds == "" {
if bq.client, err = bigquery.NewClient(context.Background(), bq.conf.ProjectId); err != nil {
return nil, nil, err
}
} else if bq.conf.ServiceAccountCreds == "-" {
if bq.client, err = bigquery.NewClient(context.Background(), bq.conf.ProjectId, option.WithoutAuthentication()); err != nil {
return nil, nil, err
}
} else if !strings.HasPrefix(bq.conf.ServiceAccountCreds, "{") {
if bq.client, err = bigquery.NewClient(context.Background(), bq.conf.ProjectId, option.WithCredentialsFile(bq.conf.ServiceAccountCreds)); err != nil {
return nil, nil, err
}
} else {
if bq.client, err = bigquery.NewClient(context.Background(), bq.conf.ProjectId, option.WithCredentialsJSON([]byte(bq.conf.ServiceAccountCreds))); err != nil {
return nil, nil, err
}
}
bq.dataset = bq.client.Dataset(bq.conf.DatasetName)
bq.table = bq.dataset.Table(bq.conf.TableName)
bq.uspClient, err = uspclient.NewClient(conf.ClientOptions)
if err != nil {
return nil, nil, err
}
// Parse the query interval
queryInterval, err := time.ParseDuration(conf.QueryInterval)
if err != nil {
return nil, nil, fmt.Errorf("invalid query interval: %w", err)
}
chStopped := make(chan struct{})
bq.wg.Add(1)
go func() {
defer bq.wg.Done()
defer close(chStopped)
for {
err = bq.lookupAndSend(bq.ctx)
if err != nil {
bq.conf.ClientOptions.OnError(fmt.Errorf("bigquery error: %v", err))
return
}
if bq.conf.IsOneTimeLoad {
return
}
select {
case <-time.After(queryInterval):
case <-bq.ctx.Done():
return
}
}
}()
return bq, chStopped, nil
}
func (bq *BigQueryAdapter) lookupAndSend(ctx context.Context) error {
q := bq.client.Query(bq.conf.SqlQuery)
it, err := q.Read(ctx)
if err != nil {
return err
}
// used for accessing column names
tableRef := bq.client.DatasetInProject(bq.conf.BigQueryProject, bq.conf.DatasetName).Table(bq.conf.TableName)
meta, err := tableRef.Metadata(ctx)
if err != nil {
return err
}
schema := meta.Schema
errChan, rowsChan := make(chan error), make(chan []bigquery.Value, 5000)
go func() {
defer close(rowsChan)
for {
var row []bigquery.Value
err = it.Next(&row)
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
errChan <- err
return
}
select {
case rowsChan <- row:
case <-ctx.Done():
return
}
}
}()
doneChan := make(chan struct{})
go func() {
defer close(doneChan)
for row := range rowsChan {
// Check the context's Done channel before calling Next
select {
case <-ctx.Done():
return
default:
}
// Convert response to json format
rowMap := make(map[string]interface{})
for i, col := range row {
rowMap[schema[i].Name] = col // use column name to form json object
}
msg := &protocol.DataMessage{
JsonPayload: rowMap,
TimestampMs: uint64(time.Now().UnixMilli()),
}
if err = bq.uspClient.Ship(msg, 10*time.Second); err != nil {
if errors.Is(err, uspclient.ErrorBufferFull) {
bq.conf.ClientOptions.OnWarning("stream falling behind")
err = bq.uspClient.Ship(msg, 1*time.Hour)
}
if err != nil {
bq.conf.ClientOptions.OnError(fmt.Errorf("ship(): %v", err))
errChan <- err
return
}
}
}
}()
select {
case <-doneChan:
return nil
case err = <-errChan:
return err
case <-ctx.Done():
return ctx.Err()
}
}
func (bq *BigQueryAdapter) Close() error {
bq.conf.ClientOptions.DebugLog("closing")
bq.cancel() // cancel the context
bq.wg.Wait()
bq.client.Close()
err1 := bq.uspClient.Drain(1 * time.Minute)
_, err2 := bq.uspClient.Close()
if err1 != nil {
return err1
}
return err2
}