Skip to content

Commit

Permalink
[exporter/splunkhec] - Add heartbeat check while startup (#24423)
Browse files Browse the repository at this point in the history
**Description:** Add heartbeat check while startup. This is different
than the `healtcheck_startup`, as the latter doesn't take token or index
into account.

**Link to tracking Issue:**
[<24411>](#24411)

**Testing:** Added relevant test cases

**Documentation:** <Describe the documentation added.>

---------

Co-authored-by: Curtis Robert <92119472+crobert-1@users.noreply.github.com>
  • Loading branch information
VihasMakwana and crobert-1 committed Aug 1, 2023
1 parent 5d410bb commit a3556a4
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 2 deletions.
20 changes: 20 additions & 0 deletions .chloggen/heartbeat-while-startup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: splunkhecexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add heartbeat check while startup and new config param, heartbeat/startup (defaults to false). This is different than the healtcheck_startup, as the latter doesn't take token or index into account.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24411]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions exporter/splunkhecexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ The following configuration options can also be configured:
- `otel_to_hec_fields/severity_number` (default = `otel.log.severity.number`): Specifies the name of the field to map the severity number field of log events.
- `otel_to_hec_fields/name` (default = `"otel.log.name`): Specifies the name of the field to map the name field of log events.
- `heartbeat/interval` (no default): Specifies the interval of sending hec heartbeat to the destination. If not specified, heartbeat is not enabled.
- `heartbeat/startup` (default: false): Check heartbeat at start up time. This action enforces a synchronous heartbeat action during the collector start up sequence. The collector will fail to start if the heartbeat returns an error.
- `telemetry/enabled` (default: false): Specifies whether to enable telemetry inside splunk hec exporter.
- `telemetry/override_metrics_names` (default: empty map): Specifies the metrics name to overrides in splunk hec exporter.
- `telemetry/extra_attributes` (default: empty map): Specifies the extra metrics attributes in splunk hec exporter.
Expand Down
9 changes: 8 additions & 1 deletion exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type client struct {
buildInfo component.BuildInfo
heartbeater *heartbeater
bufferPool bufferPool
exporterName string
}

var jsonStreamPool = sync.Pool{
Expand All @@ -67,6 +68,7 @@ func newClient(set exporter.CreateSettings, cfg *Config, maxContentLength uint)
telemetrySettings: set.TelemetrySettings,
buildInfo: set.BuildInfo,
bufferPool: newBufferPool(maxContentLength, !cfg.DisableCompression),
exporterName: set.ID.String(),
}
}

Expand Down Expand Up @@ -624,12 +626,17 @@ func (c *client) start(_ context.Context, host component.Host) (err error) {
healthCheckURL, _ := c.config.getURL()
healthCheckURL.Path = c.config.HealthPath
if err := checkHecHealth(httpClient, healthCheckURL); err != nil {
return fmt.Errorf("health check failed: %w", err)
return fmt.Errorf("%s: health check failed: %w", c.exporterName, err)
}
}
url, _ := c.config.getURL()
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(c.config, c.buildInfo)}
c.heartbeater = newHeartbeater(c.config, c.buildInfo, getPushLogFn(c))
if c.config.Heartbeat.Startup {
if err := c.heartbeater.sendHeartbeat(c.config, c.buildInfo, getPushLogFn(c)); err != nil {
return fmt.Errorf("%s: heartbeat on startup failed: %w", c.exporterName, err)
}
}
return nil
}

Expand Down
106 changes: 106 additions & 0 deletions exporter/splunkhecexporter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,112 @@ func TestInvalidURL(t *testing.T) {
assert.EqualError(t, err, "Post \"ftp://example.com:134/services/collector\": unsupported protocol scheme \"ftp\"")
}

func TestHeartbeatStartupFailed(t *testing.T) {
rr := make(chan receivedRequest)
capture := CapturingData{receivedRequest: rr, statusCode: 403}
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}
s := &http.Server{
Handler: &capture,
ReadHeaderTimeout: 20 * time.Second,
}
defer s.Close()
go func() {
if e := s.Serve(listener); e != http.ErrServerClosed {
require.NoError(t, e)
}
}()
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
// Disable QueueSettings to ensure that we execute the request when calling ConsumeTraces
// otherwise we will not see the error.
cfg.QueueSettings.Enabled = false
// Disable retries to not wait too much time for the return error.
cfg.RetrySettings.Enabled = false
cfg.DisableCompression = true
cfg.Token = "1234-1234"
cfg.Heartbeat.Startup = true

params := exportertest.NewNopCreateSettings()
exporter, err := factory.CreateTracesExporter(context.Background(), params, cfg)
assert.NoError(t, err)
// The exporter's name is "" while generating default params
assert.EqualError(t, exporter.Start(context.Background(), componenttest.NewNopHost()), ": heartbeat on startup failed: HTTP 403 \"Forbidden\"")
}

func TestHeartbeatStartupPass_Disabled(t *testing.T) {
rr := make(chan receivedRequest)
capture := CapturingData{receivedRequest: rr, statusCode: 403}
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}
s := &http.Server{
Handler: &capture,
ReadHeaderTimeout: 20 * time.Second,
}
defer s.Close()
go func() {
if e := s.Serve(listener); e != http.ErrServerClosed {
require.NoError(t, e)
}
}()
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
// Disable QueueSettings to ensure that we execute the request when calling ConsumeTraces
// otherwise we will not see the error.
cfg.QueueSettings.Enabled = false
// Disable retries to not wait too much time for the return error.
cfg.RetrySettings.Enabled = false
cfg.DisableCompression = true
cfg.Token = "1234-1234"
cfg.Heartbeat.Startup = false

params := exportertest.NewNopCreateSettings()
exporter, err := factory.CreateTracesExporter(context.Background(), params, cfg)
assert.NoError(t, err)
assert.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost()))
}

func TestHeartbeatStartupPass(t *testing.T) {
rr := make(chan receivedRequest)
capture := CapturingData{receivedRequest: rr, statusCode: 200}
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}
s := &http.Server{
Handler: &capture,
ReadHeaderTimeout: 20 * time.Second,
}
defer s.Close()
go func() {
if e := s.Serve(listener); e != http.ErrServerClosed {
require.NoError(t, e)
}
}()
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
// Disable QueueSettings to ensure that we execute the request when calling ConsumeTraces
// otherwise we will not see the error.
cfg.QueueSettings.Enabled = false
// Disable retries to not wait too much time for the return error.
cfg.RetrySettings.Enabled = false
cfg.DisableCompression = true
cfg.Token = "1234-1234"
cfg.Heartbeat.Startup = true

params := exportertest.NewNopCreateSettings()
exporter, err := factory.CreateTracesExporter(context.Background(), params, cfg)
assert.NoError(t, err)
assert.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost()))
}

type badJSON struct {
Foo float64 `json:"foo"`
}
Expand Down
3 changes: 3 additions & 0 deletions exporter/splunkhecexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type HecHeartbeat struct {
// heartbeat is not enabled.
// A heartbeat is an event sent to _internal index with metadata for the current collector/host.
Interval time.Duration `mapstructure:"interval"`

// Startup is used to send heartbeat events on exporter's startup.
Startup bool `mapstructure:"startup"`
}

// HecTelemetry defines the telemetry configuration for the exporter
Expand Down
6 changes: 5 additions & 1 deletion exporter/splunkhecexporter/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func newHeartbeater(config *Config, buildInfo component.BuildInfo, pushLogFn fun
case <-hbter.hbDoneChan:
return
case <-ticker.C:
err := pushLogFn(context.Background(), generateHeartbeatLog(config.HecToOtelAttrs, buildInfo))
err := hbter.sendHeartbeat(config, buildInfo, pushLogFn)
if config.Telemetry.Enabled {
observe(heartbeatsSent, heartbeatsFailed, tagMutators, err)
}
Expand All @@ -111,6 +111,10 @@ func (h *heartbeater) shutdown() {
close(h.hbDoneChan)
}

func (h *heartbeater) sendHeartbeat(config *Config, buildInfo component.BuildInfo, pushLogFn func(ctx context.Context, ld plog.Logs) error) error {
return pushLogFn(context.Background(), generateHeartbeatLog(config.HecToOtelAttrs, buildInfo))
}

// there is only use case for open census metrics recording for now. Extend to use open telemetry in the future.
func observe(heartbeatsSent *stats.Int64Measure, heartbeatsFailed *stats.Int64Measure, tagMutators []tag.Mutator, err error) {
var counter *stats.Int64Measure
Expand Down

0 comments on commit a3556a4

Please sign in to comment.