Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow configuring flush interval using time.Duration #129

Merged
merged 1 commit into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions senders/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package senders

import (
"fmt"
"github.com/wavefronthq/wavefront-sdk-go/event"
"github.com/wavefronthq/wavefront-sdk-go/histogram"
"github.com/wavefronthq/wavefront-sdk-go/internal"
eventInternal "github.com/wavefronthq/wavefront-sdk-go/internal/event"
histogramInternal "github.com/wavefronthq/wavefront-sdk-go/internal/histogram"
"github.com/wavefronthq/wavefront-sdk-go/internal/metric"
"github.com/wavefronthq/wavefront-sdk-go/internal/span"
"time"

"github.com/wavefronthq/wavefront-sdk-go/event"
"github.com/wavefronthq/wavefront-sdk-go/histogram"
"github.com/wavefronthq/wavefront-sdk-go/internal"
)

// Sender Interface for sending metrics, distributions and spans to Wavefront
Expand Down Expand Up @@ -58,16 +56,14 @@ type wavefrontSender struct {
}

func newLineHandler(reporter internal.Reporter, cfg *configuration, format, prefix string, registry *internal.MetricRegistry) *internal.LineHandler {
flushInterval := time.Second * time.Duration(cfg.FlushIntervalSeconds)

opts := []internal.LineHandlerOption{internal.SetHandlerPrefix(prefix), internal.SetRegistry(registry)}
batchSize := cfg.BatchSize
if format == internal.EventFormat {
batchSize = 1
opts = append(opts, internal.SetLockOnThrottledError(true))
}

return internal.NewLineHandler(reporter, format, flushInterval, batchSize, cfg.MaxBufferSize, opts...)
return internal.NewLineHandler(reporter, format, cfg.FlushInterval, batchSize, cfg.MaxBufferSize, opts...)
}

func (sender *wavefrontSender) Start() {
Expand Down
35 changes: 21 additions & 14 deletions senders/client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
const (
defaultTracesPort = 30001
defaultMetricsPort = 2878
defaultBatchSize = 10000
defaultBufferSize = 50000
defaultFlushInterval = 1
defaultBatchSize = 10_000
defaultBufferSize = 50_000
defaultFlushInterval = 1 * time.Second
defaultTimeout = 10 * time.Second
)

Expand Down Expand Up @@ -48,9 +48,9 @@ type configuration struct {

// interval (in seconds) at which to flush data to Wavefront. defaults to 1 Second.
// together with batch size controls the max theoretical throughput of the sender.
FlushIntervalSeconds int
SDKMetricsTags map[string]string
Path string
FlushInterval time.Duration
SDKMetricsTags map[string]string
Path string

Timeout time.Duration

Expand Down Expand Up @@ -86,13 +86,13 @@ func NewSender(wfURL string, setters ...Option) (Sender, error) {
// CreateConfig is for internal use only.
func CreateConfig(wfURL string, setters ...Option) (*configuration, error) {
cfg := &configuration{
MetricsPort: defaultMetricsPort,
TracesPort: defaultTracesPort,
BatchSize: defaultBatchSize,
MaxBufferSize: defaultBufferSize,
FlushIntervalSeconds: defaultFlushInterval,
SDKMetricsTags: map[string]string{},
Timeout: defaultTimeout,
MetricsPort: defaultMetricsPort,
TracesPort: defaultTracesPort,
BatchSize: defaultBatchSize,
MaxBufferSize: defaultBufferSize,
FlushInterval: defaultFlushInterval,
SDKMetricsTags: map[string]string{},
Timeout: defaultTimeout,
}

u, err := url.Parse(wfURL)
Expand Down Expand Up @@ -221,7 +221,14 @@ func MaxBufferSize(n int) Option {
// FlushIntervalSeconds set the interval (in seconds) at which to flush data to Wavefront. Defaults to 1 Second.
func FlushIntervalSeconds(n int) Option {
return func(cfg *configuration) {
cfg.FlushIntervalSeconds = n
cfg.FlushInterval = time.Second * time.Duration(n)
}
}

// FlushInterval set the interval at which to flush data to Wavefront. Defaults to 1 Second.
func FlushInterval(interval time.Duration) Option {
return func(cfg *configuration) {
cfg.FlushInterval = interval
}
}

Expand Down
11 changes: 9 additions & 2 deletions senders/client_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestDefaults(t *testing.T) {
require.NoError(t, err)

assert.Equal(t, 10000, cfg.BatchSize)
assert.Equal(t, 1, cfg.FlushIntervalSeconds)
assert.Equal(t, 1*time.Second, cfg.FlushInterval)
assert.Equal(t, 50000, cfg.MaxBufferSize)
assert.Equal(t, 2878, cfg.MetricsPort)
assert.Equal(t, 30001, cfg.TracesPort)
Expand All @@ -116,7 +116,14 @@ func TestFlushIntervalSeconds(t *testing.T) {
cfg, err := senders.CreateConfig("https://localhost", senders.FlushIntervalSeconds(123))
require.NoError(t, err)

assert.Equal(t, 123, cfg.FlushIntervalSeconds)
assert.Equal(t, 123*time.Second, cfg.FlushInterval)
}

func TestFlushInterval(t *testing.T) {
cfg, err := senders.CreateConfig("https://localhost", senders.FlushInterval(1*time.Hour))
require.NoError(t, err)

assert.Equal(t, 1*time.Hour, cfg.FlushInterval)
}

func TestMaxBufferSize(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion senders/example_newsender_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package senders_test
import (
"crypto/tls"
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders"
"time"
)

func ExampleNewSender_options() {
// NewSender accepts optional arguments. Use these if you need to set non-default ports for your Wavefront Proxy, tune batching parameters, or set tags for internal SDK metrics.
sender, err := wavefront.NewSender(
"http://localhost",
wavefront.BatchSize(20000), // Send batches of 20,000.
wavefront.FlushIntervalSeconds(5), // Flush every 5 seconds.
wavefront.FlushInterval(5*time.Second), // Flush every 5 seconds.
wavefront.MetricsPort(4321), // Use port 4321 for metrics.
wavefront.TracesPort(40001), // Use port 40001 for traces.
wavefront.Timeout(15), // Set an HTTP timeout in seconds (default is 10s)
Expand Down