This repository has been archived by the owner on Oct 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
128 lines (109 loc) Β· 3.51 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package bigquery
import (
"context"
_ "embed"
"errors"
"fmt"
"sync"
"cloud.google.com/go/bigquery"
"github.com/gofrs/uuid"
"go.uber.org/zap"
"google.golang.org/api/option"
"google.golang.org/protobuf/types/known/structpb"
"github.com/instill-ai/component/pkg/base"
connectorPB "github.com/instill-ai/protogen-go/vdp/connector/v1alpha"
)
const (
taskInsert = "TASK_INSERT"
)
//go:embed config/definitions.json
var definitionsJSON []byte
//go:embed config/tasks.json
var tasksJSON []byte
var once sync.Once
var connector base.IConnector
type Connector struct {
base.Connector
}
type Execution struct {
base.Execution
}
func Init(logger *zap.Logger) base.IConnector {
once.Do(func() {
connector = &Connector{
Connector: base.Connector{
Component: base.Component{Logger: logger},
},
}
err := connector.LoadConnectorDefinitions(definitionsJSON, tasksJSON)
if err != nil {
logger.Fatal(err.Error())
}
})
return connector
}
func (c *Connector) CreateExecution(defUID uuid.UUID, task string, config *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
e := &Execution{}
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, config, logger)
return e, nil
}
func NewClient(jsonKey, projectID string) (*bigquery.Client, error) {
return bigquery.NewClient(context.Background(), projectID, option.WithCredentialsJSON([]byte(jsonKey)))
}
func getJSONKey(config *structpb.Struct) string {
return config.GetFields()["json_key"].GetStringValue()
}
func getProjectID(config *structpb.Struct) string {
return config.GetFields()["project_id"].GetStringValue()
}
func getDatasetID(config *structpb.Struct) string {
return config.GetFields()["dataset_id"].GetStringValue()
}
func getTableName(config *structpb.Struct) string {
return config.GetFields()["table_name"].GetStringValue()
}
func (e *Execution) Execute(inputs []*structpb.Struct) ([]*structpb.Struct, error) {
outputs := []*structpb.Struct{}
client, err := NewClient(getJSONKey(e.Config), getProjectID(e.Config))
if err != nil || client == nil {
return nil, fmt.Errorf("error creating BigQuery client: %v", err)
}
defer client.Close()
for _, input := range inputs {
var output *structpb.Struct
switch e.Task {
case taskInsert, "":
datasetID := getDatasetID(e.Config)
tableName := getTableName(e.Config)
tableRef := client.Dataset(datasetID).Table(tableName)
metaData, err := tableRef.Metadata(context.Background())
if err != nil {
return nil, err
}
valueSaver, err := getDataSaver(input, metaData.Schema)
if err != nil {
return nil, err
}
err = insertDataToBigQuery(getProjectID(e.Config), datasetID, tableName, valueSaver, client)
if err != nil {
return nil, err
}
output = &structpb.Struct{Fields: map[string]*structpb.Value{"status": {Kind: &structpb.Value_StringValue{StringValue: "success"}}}}
default:
return nil, errors.New("unsupported task type")
}
outputs = append(outputs, output)
}
return outputs, nil
}
func (c *Connector) Test(defUid uuid.UUID, config *structpb.Struct, logger *zap.Logger) (connectorPB.ConnectorResource_State, error) {
client, err := NewClient(getJSONKey(config), getProjectID(config))
if err != nil || client == nil {
return connectorPB.ConnectorResource_STATE_ERROR, fmt.Errorf("error creating BigQuery client: %v", err)
}
defer client.Close()
if client.Project() == getProjectID(config) {
return connectorPB.ConnectorResource_STATE_CONNECTED, nil
}
return connectorPB.ConnectorResource_STATE_DISCONNECTED, errors.New("project ID does not match")
}