Skip to content

Commit

Permalink
[pkg/stanza] Cache event publishers: log warn once per provider (open…
Browse files Browse the repository at this point in the history
…-telemetry#27658)

**Description:**

Cache the publisher event to:

1. Avoid logging the same error message every time one event from the
given source is logged.
2. Avoid opening and closing the event publisher for every single event.

**Link to tracking Issue:**

[Item 4 described on the
investigation](open-telemetry#21491 (comment))
for issue open-telemetry#21491.

**Testing:**

* Go tests for `pkg/stanza` and `receiver/windowseventlogreceiver` on
Windows box.
* Ran the contrib build locally to validate the change.
* Can't run the full make locally: misspell is failing on Windows
because the command line is too long.

**Documentation:**

Let me know if changing the severity of the log message requires a
changelog update.
  • Loading branch information
pjanotti authored and jmsnll committed Nov 12, 2023
1 parent 1657d50 commit 0d3b850
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 7 deletions.
27 changes: 27 additions & 0 deletions .chloggen/log-warning-when-publisher-not-available.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Log warning, instead of error, when Windows Event Log publisher metadata is not available and cache the successfully retrieved ones.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27658]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
17 changes: 13 additions & 4 deletions pkg/stanza/operator/input/windows/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type Input struct {
excludeProviders []string
pollInterval time.Duration
persister operator.Persister
publisherCache publisherCache
cancel context.CancelFunc
wg sync.WaitGroup
}
Expand Down Expand Up @@ -123,6 +124,8 @@ func (e *Input) Start(persister operator.Persister) error {
return fmt.Errorf("failed to open subscription: %w", err)
}

e.publisherCache = newPublisherCache()

e.wg.Add(1)
go e.readOnInterval(ctx)
return nil
Expand All @@ -141,6 +144,10 @@ func (e *Input) Stop() error {
return fmt.Errorf("failed to close bookmark: %w", err)
}

if err := e.publisherCache.evictAll(); err != nil {
return fmt.Errorf("failed to close publishers: %w", err)
}

return nil
}

Expand Down Expand Up @@ -231,13 +238,15 @@ func (e *Input) processEvent(ctx context.Context, event Event) {
}
}

publisher := NewPublisher()
if err := publisher.Open(simpleEvent.Provider.Name); err != nil {
e.Errorf("Failed to open publisher: %s: writing log entry to pipeline without metadata", err)
publisher, openPublisherErr := e.publisherCache.get(simpleEvent.Provider.Name)
if openPublisherErr != nil {
e.Warnf("Failed to open the %q event source, respective log entries can't be formatted: %s", simpleEvent.Provider.Name, openPublisherErr)
}

if !publisher.Valid() {
e.sendEvent(ctx, simpleEvent)
return
}
defer publisher.Close()

formattedEvent, err := event.RenderFormatted(e.buffer, publisher)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/stanza/operator/input/windows/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func (p *Publisher) Open(provider string) error {
return nil
}

func (p *Publisher) Valid() bool {
return p.handle != 0
}

// Close will close the publisher handle.
func (p *Publisher) Close() error {
if p.handle == 0 {
Expand Down
23 changes: 20 additions & 3 deletions pkg/stanza/operator/input/windows/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestPublisherOpenPreexisting(t *testing.T) {
err := publisher.Open("")
require.Error(t, err)
require.Contains(t, err.Error(), "publisher handle is already open")
require.True(t, publisher.Valid())
}

func TestPublisherOpenInvalidUTF8(t *testing.T) {
Expand All @@ -25,44 +26,60 @@ func TestPublisherOpenInvalidUTF8(t *testing.T) {
err := publisher.Open(invalidUTF8)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to convert the provider name \"\\x00\" to utf16: invalid argument")
require.False(t, publisher.Valid())
}

func TestPublisherOpenSyscallFailure(t *testing.T) {
publisher := NewPublisher()
provider := "provider"
openPublisherMetadataProc = SimpleMockProc(0, 0, ErrorNotSupported)
defer mockWithDeferredRestore(&openPublisherMetadataProc, SimpleMockProc(0, 0, ErrorNotSupported))()
err := publisher.Open(provider)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to open the metadata for the \"provider\" provider: The request is not supported.")
require.False(t, publisher.Valid())
}

func TestPublisherOpenSuccess(t *testing.T) {
publisher := NewPublisher()
provider := "provider"
openPublisherMetadataProc = SimpleMockProc(5, 0, ErrorSuccess)
defer mockWithDeferredRestore(&openPublisherMetadataProc, SimpleMockProc(5, 0, ErrorSuccess))()
err := publisher.Open(provider)
require.NoError(t, err)
require.Equal(t, uintptr(5), publisher.handle)
require.True(t, publisher.Valid())
}

func TestPublisherCloseWhenAlreadyClosed(t *testing.T) {
publisher := NewPublisher()
err := publisher.Close()
require.NoError(t, err)
require.False(t, publisher.Valid())
}

func TestPublisherCloseSyscallFailure(t *testing.T) {
publisher := Publisher{handle: 5}
closeProc = SimpleMockProc(0, 0, ErrorNotSupported)
defer mockWithDeferredRestore(&closeProc, SimpleMockProc(0, 0, ErrorNotSupported))()
err := publisher.Close()
require.Error(t, err)
require.Contains(t, err.Error(), "failed to close publisher")
require.True(t, publisher.Valid())
}

func TestPublisherCloseSuccess(t *testing.T) {
publisher := Publisher{handle: 5}
originalCloseProc := closeProc
closeProc = SimpleMockProc(1, 0, ErrorSuccess)
defer func() { closeProc = originalCloseProc }()
err := publisher.Close()
require.NoError(t, err)
require.Equal(t, uintptr(0), publisher.handle)
require.False(t, publisher.Valid())
}

func mockWithDeferredRestore(call *SyscallProc, mockCall SyscallProc) func() {
original := *call
*call = mockCall
return func() {
*call = original
}
}
50 changes: 50 additions & 0 deletions pkg/stanza/operator/input/windows/publishercache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build windows
// +build windows

package windows // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/windows"

import (
"errors"
)

type publisherCache struct {
cache map[string]Publisher
}

func newPublisherCache() publisherCache {
return publisherCache{
cache: make(map[string]Publisher),
}
}

func (c *publisherCache) get(provider string) (publisher Publisher, openPublisherErr error) {
publisher, ok := c.cache[provider]
if ok {
return publisher, nil
}

publisher = NewPublisher()
err := publisher.Open(provider)

// Always store the publisher even if there was an error opening it.
c.cache[provider] = publisher

return publisher, err
}

func (c *publisherCache) evictAll() error {
var errs error
for _, publisher := range c.cache {
if publisher.Valid() {
if err := publisher.Close(); err != nil {
errs = errors.Join(errs, err)
}
}
}

c.cache = make(map[string]Publisher)
return errs
}
68 changes: 68 additions & 0 deletions pkg/stanza/operator/input/windows/publishercache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build windows
// +build windows

package windows

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestGetValidPublisher(t *testing.T) {
publisherCache := newPublisherCache()
defer publisherCache.evictAll()

// Provider "Application" exists in all Windows versions.
publisher, openPublisherErr := publisherCache.get("Application")
require.NoError(t, openPublisherErr)
require.True(t, publisher.Valid())

// Get the same publisher again.
publisher, openPublisherErr = publisherCache.get("Application")
require.NoError(t, openPublisherErr)
require.True(t, publisher.Valid())
}

func TestGetInvalidPublisher(t *testing.T) {
publisherCache := newPublisherCache()
defer publisherCache.evictAll()

// Provider "InvalidProvider" does not exist in any Windows version.
publisher, openPublisherErr := publisherCache.get("InvalidProvider")
require.Error(t, openPublisherErr, "%v", publisherCache)
require.False(t, publisher.Valid())

// Get "InvalidProvider" publisher again.
publisher, openPublisherErr = publisherCache.get("InvalidProvider")
require.NoError(t, openPublisherErr) // It is cached, no error opening it.
require.False(t, publisher.Valid())
}

func TestValidAndInvalidPublishers(t *testing.T) {
publisherCache := newPublisherCache()
defer publisherCache.evictAll()

// Provider "Application" exists in all Windows versions.
publisher, openPublisherErr := publisherCache.get("Application")
require.NoError(t, openPublisherErr)
require.True(t, publisher.Valid())

// Provider "InvalidProvider" does not exist in any Windows version.
publisher, openPublisherErr = publisherCache.get("InvalidProvider")
require.Error(t, openPublisherErr, "%v", publisherCache)
require.False(t, publisher.Valid())

// Get the existing publisher again.
publisher, openPublisherErr = publisherCache.get("Application")
require.NoError(t, openPublisherErr)
require.True(t, publisher.Valid())

// Get "InvalidProvider" publisher again.
publisher, openPublisherErr = publisherCache.get("InvalidProvider")
require.NoError(t, openPublisherErr) // It is cached, no error opening it.
require.False(t, publisher.Valid())
}

0 comments on commit 0d3b850

Please sign in to comment.