This repository has been archived by the owner on Oct 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
109 lines (91 loc) · 2.86 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package googlecloudstorage
import (
"context"
_ "embed"
"fmt"
"sync"
"cloud.google.com/go/storage"
"github.com/gofrs/uuid"
"go.uber.org/zap"
"google.golang.org/api/option"
"google.golang.org/protobuf/types/known/structpb"
"github.com/instill-ai/component/pkg/base"
connectorPB "github.com/instill-ai/protogen-go/vdp/connector/v1alpha"
)
const (
taskUpload = "TASK_UPLOAD"
)
//go:embed config/definitions.json
var definitionsJSON []byte
//go:embed config/tasks.json
var tasksJSON []byte
var once sync.Once
var connector base.IConnector
type Connector struct {
base.Connector
}
type Execution struct {
base.Execution
}
func Init(logger *zap.Logger) base.IConnector {
once.Do(func() {
connector = &Connector{
Connector: base.Connector{
Component: base.Component{Logger: logger},
},
}
err := connector.LoadConnectorDefinitions(definitionsJSON, tasksJSON)
if err != nil {
logger.Fatal(err.Error())
}
})
return connector
}
func (c *Connector) CreateExecution(defUID uuid.UUID, task string, config *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
e := &Execution{}
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, config, logger)
return e, nil
}
func NewClient(jsonKey string) (*storage.Client, error) {
return storage.NewClient(context.Background(), option.WithCredentialsJSON([]byte(jsonKey)))
}
func getBucketName(config *structpb.Struct) string {
return config.GetFields()["bucket_name"].GetStringValue()
}
func getJSONKey(config *structpb.Struct) string {
return config.GetFields()["json_key"].GetStringValue()
}
func (e *Execution) Execute(inputs []*structpb.Struct) ([]*structpb.Struct, error) {
outputs := []*structpb.Struct{}
client, err := NewClient(getJSONKey(e.Config))
if err != nil || client == nil {
return nil, fmt.Errorf("error creating GCS client: %v", err)
}
defer client.Close()
for _, input := range inputs {
var output *structpb.Struct
switch e.Task {
case taskUpload, "":
objectName := input.GetFields()["object_name"].GetStringValue()
data := input.GetFields()["data"].GetStringValue()
err = uploadToGCS(client, getBucketName(e.Config), objectName, data)
if err != nil {
return nil, err
}
output = &structpb.Struct{Fields: map[string]*structpb.Value{"status": {Kind: &structpb.Value_StringValue{StringValue: "success"}}}}
}
outputs = append(outputs, output)
}
return outputs, nil
}
func (c *Connector) Test(defUid uuid.UUID, config *structpb.Struct, logger *zap.Logger) (connectorPB.ConnectorResource_State, error) {
client, err := NewClient(getJSONKey(config))
if err != nil {
return connectorPB.ConnectorResource_STATE_ERROR, fmt.Errorf("error creating GCS client: %v", err)
}
if client == nil {
return connectorPB.ConnectorResource_STATE_DISCONNECTED, fmt.Errorf("GCS client is nil")
}
defer client.Close()
return connectorPB.ConnectorResource_STATE_CONNECTED, nil
}