/
config.go
122 lines (115 loc) · 3.84 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package bigquery
import (
"context"
"strconv"
"strings"
"time"
"github.com/zeebo/errs/v2"
"storj.io/eventkit"
)
// CreateDestination creates eventkit destination based on complex configuration.
// Example configurations:
//
// 127.0.0.1:1234
// bigquery:app=...,project=...,dataset=...
// bigquery:app=...,project=...,dataset=...|batch:queueSize=111,flashSize=111,flushInterval=111
// bigquery:app=...,project=...,dataset=...|parallel:runners=10|batch:queueSize=111,flashSize=111,flushInterval=111
func CreateDestination(ctx context.Context, config string) (eventkit.Destination, error) {
layers := strings.Split(config, "|")
var lastLayer func() (eventkit.Destination, error)
for _, layer := range layers {
layer = strings.TrimSpace(layer)
if layer == "" {
continue
}
typeName, params, found := strings.Cut(layer, ":")
if !found {
return nil, errs.Errorf("eventkit destination parameters should be defined in the form type:param=value,...")
}
switch typeName {
case "bigquery", "bq":
var appName, project, dataset string
for _, param := range strings.Split(params, ",") {
key, value, found := strings.Cut(param, "=")
if !found {
return nil, errs.Errorf("eventkit destination parameters should be defined in param2=value2 format")
}
switch key {
case "appName":
appName = value
case "project":
project = value
case "dataset":
dataset = value
default:
return nil, errs.Errorf("Unknown parameter for bigquery destination %s. Please use appName/project/dataset", key)
}
}
lastLayer = func() (eventkit.Destination, error) {
return NewBigQueryDestination(ctx, appName, project, dataset)
}
case "parallel":
var workers int
for _, param := range strings.Split(params, ",") {
key, value, found := strings.Cut(param, "=")
if !found {
return nil, errs.Errorf("eventkit destination parameters should be defined in param2=value2 format")
}
switch key {
case "workers":
var err error
workers, err = strconv.Atoi(value)
if err != nil {
return nil, errs.Errorf("workers parameter of parallel destination should be a number and not %s", value)
}
default:
return nil, errs.Errorf("Unknown parameter for parallel destination %s. Please use appName/project/dataset", value)
}
}
ll := lastLayer
lastLayer = func() (eventkit.Destination, error) {
return NewParallel(ll, workers), nil
}
case "batch":
var queueSize, batchSize int
var flushInterval time.Duration
var err error
for _, param := range strings.Split(params, ",") {
key, value, found := strings.Cut(param, "=")
if !found {
return nil, errs.Errorf("eventkit destination parameters should be defined in param2=value2 format")
}
switch key {
case "queueSize":
queueSize, err = strconv.Atoi(value)
if err != nil {
return nil, errs.Errorf("queueSize parameter of batch destination should be a number and not %s", value)
}
case "batchSize":
batchSize, err = strconv.Atoi(value)
if err != nil {
return nil, errs.Errorf("batchSize parameter of batch destination should be a number and not %s", value)
}
case "flushInterval":
flushInterval, err = time.ParseDuration(value)
if err != nil {
return nil, errs.Errorf("flushInterval parameter of batch destination should be a duration and not %s", value)
}
default:
return nil, errs.Errorf("Unknown parameter for batch destination %s. Please use queueSize/batchSize/flushInterval", key)
}
}
destination, err := lastLayer()
if err != nil {
return nil, err
}
lastLayer = func() (eventkit.Destination, error) {
return NewBatchQueue(destination, queueSize, batchSize, flushInterval), nil
}
}
}
if lastLayer == nil {
return nil, errs.Errorf("No evenkit destinatino is defined")
}
return lastLayer()
}