/
connector.go
151 lines (132 loc) · 4.69 KB
/
connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package bigquerysql
import (
"context"
"database/sql"
"fmt"
"net/url"
"strings"
"github.com/pingcap-inc/tidb2dw/pkg/utils"
"cloud.google.com/go/bigquery"
"github.com/pingcap-inc/tidb2dw/pkg/tidbsql"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"go.uber.org/zap"
)
type BigQueryConnector struct {
bqClient *bigquery.Client
ctx context.Context
datasetID string
tableID string
incrementTableID string
storageURL string
columns []cloudstorage.TableCol
}
func NewBigQueryConnector(bqConfig *BigQueryConfig, incrementTableID, datasetID, tableID string, storageURI *url.URL) (*BigQueryConnector, error) {
bqClient, err := bqConfig.NewClient()
if err != nil {
return nil, errors.Trace(err)
}
storageURL := fmt.Sprintf("%s://%s%s", storageURI.Scheme, storageURI.Host, storageURI.Path)
return &BigQueryConnector{
bqClient: bqClient,
ctx: context.Background(),
datasetID: datasetID,
tableID: tableID,
incrementTableID: incrementTableID,
storageURL: storageURL,
columns: nil,
}, nil
}
func (bc *BigQueryConnector) InitSchema(columns []cloudstorage.TableCol) error {
if len(bc.columns) != 0 {
return nil
}
if len(columns) == 0 {
return errors.New("Columns in schema is empty")
}
bc.columns = columns
log.Info("table columns initialized", zap.Any("Columns", columns))
return nil
}
func (bc *BigQueryConnector) ExecDDL(tableDef cloudstorage.TableDefinition) error {
if len(bc.columns) == 0 {
return errors.New("Columns not initialized. Maybe you execute a DDL before all DMLs, which is not supported now.")
}
ddls, err := GenDDLViaColumnsDiff(bc.datasetID, bc.tableID, bc.columns, tableDef)
if err != nil {
return errors.Trace(err)
}
if len(ddls) == 0 {
log.Info("No need to execute this DDL in BigQuery", zap.String("ddl", tableDef.Query))
return nil
}
// One DDL may be rewritten to multiple DDLs
for _, ddl := range ddls {
if err = runQuery(bc.ctx, bc.bqClient, ddl); err != nil {
log.Error("Failed to execute DDL", zap.String("received", tableDef.Query), zap.String("rewritten", strings.Join(ddls, "\n")))
return errors.Annotate(err, fmt.Sprint("failed to execute", ddl))
}
}
// update columns
bc.columns = tableDef.Columns
log.Info("Successfully executed DDL", zap.String("received", tableDef.Query), zap.String("rewritten", strings.Join(ddls, "\n")))
return nil
}
// CopyTableSchema copies table schema from TiDB to BigQuery
// If table exists, delete it first
func (bc *BigQueryConnector) CopyTableSchema(sourceDatabase string, sourceTable string, sourceTiDBConn *sql.DB) error {
tableColumns, err := tidbsql.GetTiDBTableColumn(sourceTiDBConn, sourceDatabase, sourceTable)
if err != nil {
return errors.Trace(err)
}
pKColumns, err := tidbsql.GetTiDBTablePKColumns(sourceTiDBConn, sourceDatabase, sourceTable)
if err != nil {
return errors.Trace(err)
}
createTableSQL, err := GenCreateSchema(tableColumns, pKColumns, bc.datasetID, bc.tableID)
if err != nil {
return errors.Trace(err)
}
if err = runQuery(bc.ctx, bc.bqClient, createTableSQL); err != nil {
return errors.Annotate(err, "Failed to create table")
}
return nil
}
func (bc *BigQueryConnector) LoadSnapshot(targetTable, filePath string) error {
// FIXME: if source table is empty, bigquery will fail to load (file not found)
absolutePath := fmt.Sprintf("%s/%s", bc.storageURL, filePath)
err := loadGCSFileToBigQuery(bc.ctx, bc.bqClient, bc.datasetID, bc.tableID, absolutePath)
if err != nil {
return errors.Trace(err)
}
return nil
}
func (bc *BigQueryConnector) LoadIncrement(tableDef cloudstorage.TableDefinition, filePath string) error {
absolutePath := fmt.Sprintf("%s/%s", bc.storageURL, filePath)
tableColumns := utils.GenIncrementTableColumns(tableDef.Columns)
createTableSQL, err := GenCreateSchema(tableColumns, []string{}, bc.datasetID, bc.incrementTableID)
if err != nil {
return errors.Trace(err)
}
if err = runQuery(bc.ctx, bc.bqClient, createTableSQL); err != nil {
return errors.Annotate(err, "Failed to create increment table")
}
err = loadGCSFileToBigQuery(bc.ctx, bc.bqClient, bc.datasetID, bc.incrementTableID, absolutePath)
if err != nil {
return errors.Trace(err)
}
mergeSQL := GenMergeInto(tableDef, bc.datasetID, bc.tableID, bc.incrementTableID)
if err = runQuery(bc.ctx, bc.bqClient, mergeSQL); err != nil {
return errors.Annotate(err, "Failed to merge increment table")
}
err = deleteTable(bc.ctx, bc.bqClient, bc.datasetID, bc.incrementTableID)
if err != nil {
return errors.Trace(err)
}
log.Info("Successfully merge file", zap.String("file", filePath))
return nil
}
func (bc *BigQueryConnector) Close() {
bc.bqClient.Close()
}