Skip to content

Commit

Permalink
[chore] [receiver/filelog] Add test to verify receiver consume contract
Browse files Browse the repository at this point in the history
Verify that `retry_on_failure.enabled` config option makes the receiver correctly re-send all the logs temporarily rejected by the next consumer
  • Loading branch information
dmitryax committed Jun 2, 2023
1 parent 9878b9f commit 0f33fd2
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions receiver/filelogreceiver/filelog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/file"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/json"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex"
)

Expand Down Expand Up @@ -313,3 +315,65 @@ func rotationTestConfig(tempDir string) *FileLogConfig {
}(),
}
}

// TestConsumeContract tests the contract between the filelog receiver and the next consumer with enabled retry.
func TestConsumeContract(t *testing.T) {
tmpDir := t.TempDir()
filePattern := "test-*.log"
flg := &fileLogGenerator{t: t, tmpDir: tmpDir, filePattern: filePattern}

cfg := createDefaultConfig()
cfg.RetryOnFailure.Enabled = true
cfg.RetryOnFailure.InitialInterval = 1 * time.Millisecond
cfg.RetryOnFailure.MaxInterval = 10 * time.Millisecond
cfg.InputConfig.Include = []string{filepath.Join(tmpDir, filePattern)}
cfg.InputConfig.StartAt = "beginning"
jsonParser := json.NewConfig()
tsField := entry.NewAttributeField("ts")
jsonParser.TimeParser = &helper.TimeParser{
ParseFrom: &tsField,
Layout: time.RFC3339,
LayoutType: "gotime",
}
jsonParser.ParseTo = entry.RootableField{Field: entry.NewAttributeField()}
logField := entry.NewAttributeField("log")
jsonParser.BodyField = &logField
cfg.Operators = []operator.Config{{Builder: jsonParser}}

receivertest.CheckConsumeContract(receivertest.CheckConsumeContractParams{
T: t,
Factory: NewFactory(),
DataType: component.DataTypeLogs,
Config: cfg,
Generator: flg,
GenerateCount: 10000,
})
}

// A generator that can send data to exampleReceiver.
type fileLogGenerator struct {
t *testing.T
tmpDir string
filePattern string
tmpFile *os.File
sequenceNum int64
}

func (g *fileLogGenerator) Start() {
tmpFile, err := os.CreateTemp(g.tmpDir, g.filePattern)
require.NoError(g.t, err)
g.tmpFile = tmpFile
}

func (g *fileLogGenerator) Stop() {
g.tmpFile.Close()
os.Remove(g.tmpFile.Name())
}

func (g *fileLogGenerator) Generate() []receivertest.UniqueIDAttrVal {
id := receivertest.UniqueIDAttrVal(fmt.Sprintf("%d", atomic.AddInt64(&g.sequenceNum, 1)))
logLine := fmt.Sprintf(`{"ts": "%s", "log": "log-%s", "%s": "%s"}`, time.Now().Format(time.RFC3339), id,
receivertest.UniqueIDAttrName, id)
g.tmpFile.WriteString(logLine + "\n")
return []receivertest.UniqueIDAttrVal{id}
}

0 comments on commit 0f33fd2

Please sign in to comment.