-
Notifications
You must be signed in to change notification settings - Fork 3
/
config.go
131 lines (124 loc) · 4.28 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package bigquery
import (
"context"
"strconv"
"strings"
"time"
"github.com/zeebo/errs/v2"
"google.golang.org/api/option"
"storj.io/eventkit"
"storj.io/eventkit/destination"
)
// CreateDestination creates eventkit destination based on complex configuration.
// Example configurations:
//
// 127.0.0.1:1234
// bigquery:app=...,project=...,dataset=...
// bigquery:app=...,project=...,dataset=...|batch:queueSize=111,flashSize=111,flushInterval=111
// bigquery:app=...,project=...,dataset=...|parallel:runners=10|batch:queueSize=111,flashSize=111,flushInterval=111
// bigquery:app=...,project=...,dataset=...,credentialsPath=/path/to/my/service-account.json|parallel:runners=10|batch:queueSize=111
func CreateDestination(ctx context.Context, config string) (eventkit.Destination, error) {
layers := strings.Split(config, "|")
var lastLayer func() (eventkit.Destination, error)
for _, layer := range layers {
layer = strings.TrimSpace(layer)
if layer == "" {
continue
}
typeName, params, found := strings.Cut(layer, ":")
if !found {
return nil, errs.Errorf("eventkit destination parameters should be defined in the form type:param=value,...")
}
switch typeName {
case "bigquery", "bq":
var appName, project, dataset, credentialsPath string
for _, param := range strings.Split(params, ",") {
key, value, found := strings.Cut(param, "=")
if !found {
return nil, errs.Errorf("eventkit destination parameters should be defined in param2=value2 format")
}
switch key {
case "appName":
appName = value
case "project":
project = value
case "dataset":
dataset = value
case "credentialsPath":
credentialsPath = value
default:
return nil, errs.Errorf("Unknown parameter for bigquery destination %s. Please use appName/project/dataset", key)
}
}
lastLayer = func() (eventkit.Destination, error) {
var options []option.ClientOption
if credentialsPath != "" {
options = append(options, option.WithCredentialsFile(credentialsPath))
}
return NewBigQueryDestination(ctx, appName, project, dataset, options...)
}
case "parallel":
var workers int
for _, param := range strings.Split(params, ",") {
key, value, found := strings.Cut(param, "=")
if !found {
return nil, errs.Errorf("eventkit destination parameters should be defined in param2=value2 format")
}
switch key {
case "workers":
var err error
workers, err = strconv.Atoi(value)
if err != nil {
return nil, errs.Errorf("workers parameter of parallel destination should be a number and not %s", value)
}
default:
return nil, errs.Errorf("Unknown parameter for parallel destination %s. Please use appName/project/dataset", value)
}
}
ll := lastLayer
lastLayer = func() (eventkit.Destination, error) {
return destination.NewParallel(ll, workers), nil
}
case "batch":
var queueSize, batchSize int
var flushInterval time.Duration
var err error
for _, param := range strings.Split(params, ",") {
key, value, found := strings.Cut(param, "=")
if !found {
return nil, errs.Errorf("eventkit destination parameters should be defined in param2=value2 format")
}
switch key {
case "queueSize":
queueSize, err = strconv.Atoi(value)
if err != nil {
return nil, errs.Errorf("queueSize parameter of batch destination should be a number and not %s", value)
}
case "batchSize":
batchSize, err = strconv.Atoi(value)
if err != nil {
return nil, errs.Errorf("batchSize parameter of batch destination should be a number and not %s", value)
}
case "flushInterval":
flushInterval, err = time.ParseDuration(value)
if err != nil {
return nil, errs.Errorf("flushInterval parameter of batch destination should be a duration and not %s", value)
}
default:
return nil, errs.Errorf("Unknown parameter for batch destination %s. Please use queueSize/batchSize/flushInterval", key)
}
}
ekDest, err := lastLayer()
if err != nil {
return nil, err
}
lastLayer = func() (eventkit.Destination, error) {
return destination.NewBatchQueue(ekDest, queueSize, batchSize, flushInterval), nil
}
}
}
if lastLayer == nil {
return nil, errs.Errorf("No evenkit destinatino is defined")
}
return lastLayer()
}