/
config.go
88 lines (77 loc) · 3.26 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package googlecloudpubsubexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/googlecloudpubsubexporter"
import (
"fmt"
"regexp"
"time"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)
var topicMatcher = regexp.MustCompile(`^projects/[a-z][a-z0-9\-]*/topics/`)
type Config struct {
// Timeout for all API calls. If not set, defaults to 12 seconds.
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
// Google Cloud Project ID where the Pubsub client will connect to
ProjectID string `mapstructure:"project"`
// User agent that will be used by the Pubsub client to connect to the service
UserAgent string `mapstructure:"user_agent"`
// Override of the Pubsub Endpoint, leave empty for the default endpoint
Endpoint string `mapstructure:"endpoint"`
// Only has effect if Endpoint is not ""
Insecure bool `mapstructure:"insecure"`
// The fully qualified resource name of the Pubsub topic
Topic string `mapstructure:"topic"`
// Compression of the payload (only gzip or is supported, no compression is the default)
Compression string `mapstructure:"compression"`
// Watermark defines the watermark (the ce-time attribute on the message) behavior
Watermark WatermarkConfig `mapstructure:"watermark"`
}
// WatermarkConfig customizes the behavior of the watermark
type WatermarkConfig struct {
// Behavior of the watermark. Currently, only of the message (none, earliest and current, current being the default)
// will set the timestamp on pubsub based on timestamps of the events inside the message
Behavior string `mapstructure:"behavior"`
// Indication on how much the timestamp can drift from the current time, the timestamp will be capped to the allowed
// maximum. A duration of 0 is the same as maximum duration
AllowedDrift time.Duration `mapstructure:"allowed_drift"`
}
func (config *Config) Validate() error {
if !topicMatcher.MatchString(config.Topic) {
return fmt.Errorf("topic '%s' is not a valid format, use 'projects/<project_id>/topics/<name>'", config.Topic)
}
_, err := config.parseCompression()
if err != nil {
return err
}
return config.Watermark.validate()
}
func (config *WatermarkConfig) validate() error {
if config.AllowedDrift == 0 {
config.AllowedDrift = 1<<63 - 1
}
_, err := config.parseWatermarkBehavior()
return err
}
func (config *Config) parseCompression() (compression, error) {
switch config.Compression {
case "gzip":
return gZip, nil
case "":
return uncompressed, nil
}
return uncompressed, fmt.Errorf("compression %v is not supported. supported compression formats include [gzip]", config.Compression)
}
func (config *WatermarkConfig) parseWatermarkBehavior() (WatermarkBehavior, error) {
switch config.Behavior {
case "earliest":
return earliest, nil
case "current":
return current, nil
case "":
return current, nil
}
return current, fmt.Errorf("behavior %v is not supported. supported compression formats include [current,earliest]", config.Behavior)
}