Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to batch metrics as a processor. #1060

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions internal/data/testdata/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,3 +563,13 @@ func generateOtlpMetricDescriptor(name string, ty pdata.MetricType) *otlpmetrics
Labels: nil,
}
}

func GenerateMetricDataManyMetricsSameResource(metricsCount int) data.MetricData {
md := GenerateMetricDataOneEmptyInstrumentationLibrary()
rs0ilm0 := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0)
rs0ilm0.Metrics().Resize(metricsCount)
for i:=0; i < metricsCount; i++ {
initCounterIntMetric(rs0ilm0.Metrics().At(i))
}
return md
}
6 changes: 6 additions & 0 deletions internal/data/testdata/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,9 @@ func TestToFromOtlpMetricsWithNils(t *testing.T) {
assert.False(t, ilss.Metrics().At(0).IsNil())
assert.True(t, ilss.Metrics().At(1).IsNil())
}

func TestGenerateMetricDataManyMetricsSameResource(t *testing.T) {
md := GenerateMetricDataManyMetricsSameResource(100)
assert.EqualValues(t, 1, md.ResourceMetrics().Len())
assert.EqualValues(t, 100, md.MetricCount())
}
11 changes: 6 additions & 5 deletions processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,10 @@ examples on using the processor.

## <a name="batch"></a>Batch Processor

The batch processor accepts spans and places them into batches. Batching helps
better compress the data and reduce the number of outgoing connections required
to transmit the data. This processor supports both size and time based batching.
The batch processor accepts spans or metrics and places them into batches.
Batching helps better compress the data and reduce the number of outgoing
connections required to transmit the data. This processor supports both size and
time based batching.

It is highly recommended to configure the batch processor on every collector.
The batch processor should be defined in the pipeline after the memory_limiter
Expand All @@ -262,8 +263,8 @@ any data drops such as sampling.
Please refer to [config.go](batchprocessor/config.go) for the config spec.

The following configuration options can be modified:
- `send_batch_size` (default = 8192): Number of spans after which a batch will
be sent.
- `send_batch_size` (default = 8192): Number of spans or metrics after which a
batch will be sent.
- `timeout` (default = 200ms): Time duration after which a batch will be sent
regardless of size.

Expand Down
236 changes: 190 additions & 46 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,45 +25,88 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/processor"
)

// batch_processor is a component that accepts spans, places them into batches and sends downstream.
// batch_processor is a component that accepts spans and metrics, places them
// into batches and sends downstream.
//
// batch_processor implements consumer.TraceConsumer
// batch_processor implements consumer.TraceConsumer and consumer.MetricsConsumer
//
// Batches are sent out with any of the following conditions:
// - batch size reaches cfg.SendBatchSize
// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out.
type batchProcessor struct {
sender consumer.TraceConsumer
name string
logger *zap.Logger

sendBatchSize uint32
timeout time.Duration

batch *batch
timer *time.Timer
newItem chan pdata.Traces
done chan struct{}
timer *time.Timer
done chan struct{}
}

var _ consumer.TraceConsumer = (*batchProcessor)(nil)
type batchTraceProcessor struct {
batchProcessor

// newBatchProcessor creates a new batch processor that batch traces by size or with timeout
func newBatchProcessor(params component.ProcessorCreateParams, sender consumer.TraceConsumer, cfg *Config) *batchProcessor {
p := &batchProcessor{
name: cfg.Name(),
sender: sender,
logger: params.Logger,
traceConsumer consumer.TraceConsumer

sendBatchSize: cfg.SendBatchSize,
timeout: cfg.Timeout,
batchTraces *batchTraces
newTraceItem chan pdata.Traces
}

type batchMetricProcessor struct {
batchProcessor

metricsConsumer consumer.MetricsConsumer

batchMetrics *batchMetrics
newMetricItem chan pdata.Metrics
}

batch: newBatch(),
newItem: make(chan pdata.Traces, 1),
done: make(chan struct{}),
var _ consumer.TraceConsumer = (*batchTraceProcessor)(nil)
var _ consumer.MetricsConsumer = (*batchMetricProcessor)(nil)

// newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout
func newBatchTracesProcessor(params component.ProcessorCreateParams, trace consumer.TraceConsumer, cfg *Config) *batchTraceProcessor {
p := &batchTraceProcessor{
batchProcessor: batchProcessor{
name: cfg.Name(),
logger: params.Logger,

sendBatchSize: cfg.SendBatchSize,
timeout: cfg.Timeout,
done: make(chan struct{}),
},
traceConsumer: trace,

batchTraces: newBatchTraces(),
newTraceItem: make(chan pdata.Traces, 1),
}

go p.startProcessingCycle()

return p
}

// newBatchMetricsProcessor creates a new batch processor that batches metrics by size or with timeout
func newBatchMetricsProcessor(params component.ProcessorCreateParams, metrics consumer.MetricsConsumer, cfg *Config) *batchMetricProcessor {
p := &batchMetricProcessor{
batchProcessor: batchProcessor{
name: cfg.Name(),
logger: params.Logger,

sendBatchSize: cfg.SendBatchSize,
timeout: cfg.Timeout,
done: make(chan struct{}),
},
metricsConsumer: metrics,

batchMetrics: newBatchMetrics(),
newMetricItem: make(chan pdata.Metrics, 1),
}

go p.startProcessingCycle()
Expand All @@ -72,46 +115,46 @@ func newBatchProcessor(params component.ProcessorCreateParams, sender consumer.T
}

// ConsumeTraces implements batcher as a SpanProcessor and takes the provided spans and adds them to
// batches
func (bp *batchProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
bp.newItem <- td
// batches.
func (bp *batchTraceProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
bp.newTraceItem <- td
return nil
}

func (bp *batchProcessor) GetCapabilities() component.ProcessorCapabilities {
func (bp *batchTraceProcessor) GetCapabilities() component.ProcessorCapabilities {
return component.ProcessorCapabilities{MutatesConsumedData: true}
}

// Start is invoked during service startup.
func (bp *batchProcessor) Start(ctx context.Context, host component.Host) error {
func (bp *batchTraceProcessor) Start(ctx context.Context, host component.Host) error {
return nil
}

// Shutdown is invoked during service shutdown.
func (bp *batchProcessor) Shutdown(context.Context) error {
func (bp *batchTraceProcessor) Shutdown(context.Context) error {
close(bp.done)
return nil
}

func (bp *batchProcessor) startProcessingCycle() {
func (bp *batchTraceProcessor) startProcessingCycle() {
bp.timer = time.NewTimer(bp.timeout)
for {
select {
case td := <-bp.newItem:
bp.batch.add(td)
case td := <-bp.newTraceItem:
bp.batchTraces.add(td)

if bp.batch.getSpanCount() >= bp.sendBatchSize {
if bp.batchTraces.getSpanCount() >= bp.sendBatchSize {
bp.timer.Stop()
bp.sendItems(statBatchSizeTriggerSend)
bp.resetTimer()
}
case <-bp.timer.C:
if bp.batch.hasData() {
if bp.batchTraces.hasData() {
bp.sendItems(statTimeoutTriggerSend)
}
bp.resetTimer()
case <-bp.done:
if bp.batch.hasData() {
if bp.batchTraces.hasData() {
// TODO: Set a timeout on sendTraces or
// make it cancellable using the context that Shutdown gets as a parameter
bp.sendItems(statTimeoutTriggerSend)
Expand All @@ -121,32 +164,91 @@ func (bp *batchProcessor) startProcessingCycle() {
}
}

func (bp *batchProcessor) resetTimer() {
func (bp *batchTraceProcessor) resetTimer() {
bp.timer.Reset(bp.timeout)
}

func (bp *batchProcessor) sendItems(measure *stats.Int64Measure) {
func (bp *batchTraceProcessor) sendItems(measure *stats.Int64Measure) {
// Add that it came form the trace pipeline?
statsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, bp.name)}
_ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1))

_ = bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData())
bp.batchTraces.reset()
}

func (bp *batchMetricProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
// First thing is convert into a different internal format
bp.newMetricItem <- md
return nil
}

func (bp *batchMetricProcessor) GetCapabilities() component.ProcessorCapabilities {
return component.ProcessorCapabilities{MutatesConsumedData: true}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an issue to discuss if this is needed. Not sure if we "mutate" data.

}

// Start is invoked during service startup.
func (bp *batchMetricProcessor) Start(ctx context.Context, host component.Host) error {
return nil
}

// Shutdown is invoked during service shutdown.
func (bp *batchMetricProcessor) Shutdown(context.Context) error {
close(bp.done)
return nil
}

func (bp *batchMetricProcessor) startProcessingCycle() {
bp.timer = time.NewTimer(bp.timeout)
for {
select {
case md := <-bp.newMetricItem:
bp.batchMetrics.add(md)
if bp.batchMetrics.getItemCount() >= bp.sendBatchSize {
bp.timer.Stop()
bp.sendItems(statBatchSizeTriggerSend)
bp.resetTimer()
}
case <-bp.timer.C:
if bp.batchMetrics.hasData() {
bp.sendItems(statTimeoutTriggerSend)
}
bp.resetTimer()
case <-bp.done:
if bp.batchMetrics.hasData() {
bp.sendItems(statTimeoutTriggerSend)
}
return
}
}
}

func (bp *batchMetricProcessor) resetTimer() {
bp.timer.Reset(bp.timeout)
}
func (bp *batchMetricProcessor) sendItems(measure *stats.Int64Measure) {
// Add that it came from the metrics pipeline
statsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, bp.name)}
_ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1))

_ = bp.sender.ConsumeTraces(context.Background(), bp.batch.getTraceData())
bp.batch.reset()
_ = bp.metricsConsumer.ConsumeMetrics(context.Background(), bp.batchMetrics.getData())
bp.batchMetrics.reset()
}

type batch struct {
type batchTraces struct {
traceData pdata.Traces
resourceSpansCount uint32
spanCount uint32
}

func newBatch() *batch {
b := &batch{}
func newBatchTraces() *batchTraces {
b := &batchTraces{}
b.reset()
return b
}

// add updates current batch by adding new TraceData object
func (b *batch) add(td pdata.Traces) {
// add updates current batchTraces by adding new TraceData object
func (b *batchTraces) add(td pdata.Traces) {
newResourceSpansCount := td.ResourceSpans().Len()
if newResourceSpansCount == 0 {
return
Expand All @@ -157,25 +259,67 @@ func (b *batch) add(td pdata.Traces) {
td.ResourceSpans().MoveAndAppendTo(b.traceData.ResourceSpans())
}

func (b *batch) getTraceData() pdata.Traces {
func (b *batchTraces) getTraceData() pdata.Traces {
return b.traceData
}

func (b *batch) getSpanCount() uint32 {
func (b *batchTraces) getSpanCount() uint32 {
return b.spanCount
}

func (b *batch) hasData() bool {
func (b *batchTraces) hasData() bool {
return b.traceData.ResourceSpans().Len() > 0
}

// resets the current batch structure with zero values
func (b *batch) reset() {

// resets the current batchTraces structure with zero values
func (b *batchTraces) reset() {
// TODO: Use b.resourceSpansCount to preset capacity of b.traceData.ResourceSpans
// once internal data API provides such functionality
b.traceData = pdata.NewTraces()

b.spanCount = 0
b.resourceSpansCount = 0
}

type batchMetrics struct {
metricData data.MetricData
resourceCount uint32
itemCount uint32
}

func newBatchMetrics() *batchMetrics {
b := &batchMetrics{}
b.reset()
return b
}

func (bm *batchMetrics) getData() pdata.Metrics {
return pdatautil.MetricsFromInternalMetrics(bm.metricData)
}

func (bm *batchMetrics) getItemCount() uint32 {
return bm.itemCount
}

func (bm *batchMetrics) hasData() bool {
return bm.metricData.ResourceMetrics().Len() > 0
}

// resets the current batchMetrics structure with zero/empty values.
func (bm *batchMetrics) reset() {
bm.metricData = data.NewMetricData()
bm.itemCount = 0
bm.resourceCount = 0
}

func (bm *batchMetrics) add(pm pdata.Metrics) {
md := pdatautil.MetricsToInternalMetrics(pm)

newResourceCount := md.ResourceMetrics().Len()
if newResourceCount == 0 {
return
}
bm.resourceCount = bm.resourceCount + uint32(newResourceCount)
bm.itemCount = bm.itemCount + uint32(md.MetricCount())
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
}