forked from taggledevel2/ratchet
/
big_query_writer.go
80 lines (69 loc) · 2.76 KB
/
big_query_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package processors
import (
bigquery "github.com/dailyburn/bigquery/client"
"github.com/dailyburn/ratchet/data"
"github.com/dailyburn/ratchet/logger"
"github.com/dailyburn/ratchet/util"
)
// BigQueryWriter is used to write data to Google's BigQuery. If the table you want to
// write to already exists, use NewBigQueryWriter, otherwise use NewBigQueryWriterForNewTable
// and the desired table structure will be created when the client is initiated.
type BigQueryWriter struct {
client *bigquery.Client
config *BigQueryConfig
tableName string
fieldsForNewTable map[string]string
ConcurrencyLevel int // See ConcurrentDataProcessor
}
// NewBigQueryWriter instantiates a new instance of BigQueryWriter
func NewBigQueryWriter(config *BigQueryConfig, tableName string) *BigQueryWriter {
w := BigQueryWriter{config: config, tableName: tableName}
return &w
}
// NewBigQueryWriterForNewTable instantiates a new instance of BigQueryWriter and prepares
// to write results to a new table
func NewBigQueryWriterForNewTable(config *BigQueryConfig, tableName string, fields map[string]string) *BigQueryWriter {
// This writer will attempt to write new table with the provided fields if it does not already exist.
w := BigQueryWriter{config: config, tableName: tableName, fieldsForNewTable: fields}
return &w
}
// ProcessData defers to WriterBatch
func (w *BigQueryWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) {
queuedRows, err := data.ObjectsFromJSON(d)
util.KillPipelineIfErr(err, killChan)
logger.Info("BigQueryWriter: Writing -", len(queuedRows))
err = w.WriteBatch(queuedRows)
if err != nil {
util.KillPipelineIfErr(err, killChan)
}
logger.Info("BigQueryWriter: Write complete")
}
// WriteBatch inserts the supplied data into BigQuery
func (w *BigQueryWriter) WriteBatch(queuedRows []map[string]interface{}) (err error) {
err = w.bqClient().InsertRows(w.config.ProjectID, w.config.DatasetID, w.tableName, queuedRows)
return err
}
// Finish - see interface for documentation.
func (w *BigQueryWriter) Finish(outputChan chan data.JSON, killChan chan error) {
}
func (w *BigQueryWriter) String() string {
return "BigQueryWriter"
}
// Concurrency delegates to ConcurrentDataProcessor
func (w *BigQueryWriter) Concurrency() int {
return w.ConcurrencyLevel
}
func (w *BigQueryWriter) bqClient() *bigquery.Client {
if w.client == nil {
w.client = bigquery.New(w.config.JsonPemPath)
w.client.PrintDebug = true
if w.fieldsForNewTable != nil {
err := w.client.InsertNewTableIfDoesNotExist(w.config.ProjectID, w.config.DatasetID, w.tableName, w.fieldsForNewTable)
if err != nil {
// Only thrown if table existence could not be verified or if the table could not be created.
panic(err)
}
}
}
return w.client
}