-
Notifications
You must be signed in to change notification settings - Fork 35
/
client.go
64 lines (52 loc) · 1.56 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package msg
import (
"context"
"fmt"
"github.com/viant/endly"
"github.com/viant/scy/cred"
"time"
)
const (
ResourceVendorGoogleCloudPlatform = "gcp"
ResourceVendorAmazonWebService = "aws"
ResourceVendorKafka = "kafka"
)
type Client interface {
Push(ctx context.Context, dest *Resource, message *Message) (Result, error)
PullN(ctx context.Context, source *Resource, count int, nack bool) ([]*Message, error)
SetupResource(resource *ResourceSetup) (*Resource, error)
DeleteResource(resource *Resource) error
Close() error
}
// NewPubSubClient creates a new Client
func NewPubSubClient(context *endly.Context, dest *Resource, timeout time.Duration) (Client, error) {
credConfig := &cred.Generic{}
var err error
if dest.Credentials != "" {
credConfig, err = context.Secrets.GetCredentials(context.Background(), dest.Credentials)
}
if err != nil {
return nil, err
}
if len(dest.Brokers) > 0 {
dest.Vendor = ResourceVendorKafka
dest.Type = ResourceTypeTopic
}
if dest.Vendor == "" {
dest.Vendor = inferResourceTypeFromCredentialConfig(credConfig)
}
state := context.State()
if credConfig.ProjectID != "" {
state.SetValue("msg.projectID", credConfig.ProjectID)
}
dest = expandResource(context, dest)
switch dest.Vendor {
case ResourceVendorGoogleCloudPlatform:
return newCloudPubSub(credConfig, dest.URL, timeout)
case ResourceVendorAmazonWebService:
return newAwsSqsClient(credConfig, timeout)
case ResourceVendorKafka:
return newKafkaClient(timeout)
}
return nil, fmt.Errorf("unsupported vendor: '%v'", dest.Vendor)
}