Skip to content

mmynk/otel-kafka-poc

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

23 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

otel-kafka-poc

Recently at my "day job", I have been exploring OpenTelemetry as part of our observability pipeline. I have studied the OpenTelemetry Collector in some detail, and have been impressed with its capabilities and the ease of setup. Although I would be remiss if I didn't mention the project is very much still in its infancy.

OpenTelemetry Collector

The OpenTelemetry Collector offers a vendor-agnostic implementation of how to receive, process and export telemetry data. It removes the need to run, operate, and maintain multiple agents/collectors. OpenTelemetry Architecture

Without diving too deep into the architecture, the collector is composed of three main components:

  • Receivers: scrapes telemetry data at regular intervals from a source
  • Processors: transforms the data as needed
  • Exporters: sends the data to a destination

Building a custom OpenTelemetry Collector

For this POC, I wanted to build a custom collector that scrapes virtual memory metrics and exports them to Kafka. Let's see how we can leverage the OpenTelemetry Collector to build this pipeline.

Step 0: Install the prerequisites

Step 1: Define the receiver

The first component we need to build is the receiver. We are calling it vmreceiver for obvious reasons. Any otel (OpenTelemetry) component is defined using a metadata.yaml file. This file defines the metrics (or logs or traces) that the component will collect and their dimensions (attributes).

Let's create a metadata.yaml file for our receiver at receivers/vmreceiver/:

type: vmstats

status:
  class: receiver
  stability:
    alpha: [metrics]
  distributions: [contrib]

attributes:
  hostname:
    description: Hostname of the machine
    type: string
  ...

metrics:
  swapped:
    enabled: true
    description: Amount of virtual memory used
    type: bytes
    gauge:
      value_type: int
    attributes:
      - hostname
  free:
    enabled: true
    description: Amount of idle memory
    unit: bytes
    gauge:
      value_type: int
    attributes:
      - hostname
  ...

The entire schema is defined here.

Now that we have defined the receiver metadata, we can generate the boilerplate code for the receiver using otel tool mdatagen:

$ cd receivers/vmreceiver
$ mdatagen metadata.yaml

Now we are ready to write our receiver code. We will use vmstat to scrape the virtual memory metrics. Let's start by defining the config which contains any tuning parameters for the receiver:

// config.go

type Config struct {
	Delay int `mapstructure:",omitempty"` // Delay is the delay between consecutive `vmstat` calls.
	Count int `mapstructure:",omitempty"` // Count is the number of `vmstat` calls to make.

	metadata.MetricsBuilderConfig           `mapstructure:",squash"` // MetricsBuilderConfig to enable/disable specific metrics (default: all enabled)
	scraperhelper.ScraperControllerSettings `mapstructure:",squash"` // ScraperControllerSettings to configure scraping interval (default: scrape every second)
}

The last two fields are embedded structs that we can use to configure the metrics and the scraping interval.

Let's now write a simple vmstat wrapper that will scrape the metrics.

// stat.go

type vmStat struct {
    Swapped         int64
    Free            int64
}

type vmStatReader struct {
    delay int
    count int
    
    logger *zap.Logger
}

func (r *vmStatReader) Read() (*vmStat, error) {
    cmd := exec.Command("vmstat", fmt.Sprintf("%d", r.delay), fmt.Sprintf("%d", r.count))
    out, err := cmd.Output()
    if err != nil {
        r.logger.Error("failed to execute vmstat", zap.Error(err))
        return nil, err
    }
    return r.parse(out)
}

func (r *vmStatReader) parse(out []byte) (*vmStat, error) {
	// parse the output of vmstat
}

Simple enough. Now let's plugin our vmStatReader into the receiver.

// scraper.go

type scraper struct {
	logger         *zap.Logger              // Logger to log events
	metricsBuilder *metadata.MetricsBuilder // MetricsBuilder to build metrics
	reader         *vmStatReader            // vmStatReader to read vmstat output
}

func newScraper(cfg *Config, metricsBuilder *metadata.MetricsBuilder, logger *zap.Logger) *scraper {
	return &scraper{
		logger:         logger,
		metricsBuilder: metricsBuilder,
		reader:         newVmStatReader(cfg, logger),
	}
}

func (s *scraper) scrape(_ context.Context) (pmetric.Metrics, error) {
	vmStat, err := s.reader.Read()
	if err != nil {
		return pmetric.Metrics{}, err
	}
	attr := newAttributeReader(s.logger).getAttributes()
	s.recordVmStats(vmStat, attr)
	return s.metricsBuilder.Emit(), nil
}

func (s *scraper) recordVmStats(stat *vmStat, attr *attributes) {
	now := pcommon.NewTimestampFromTime(time.Now())

	s.metricsBuilder.RecordSwappedDataPoint(now, stat.Swapped, attr.host, attr.os, attr.arch, "memory")
	s.metricsBuilder.RecordFreeDataPoint(now, stat.Free, attr.host, attr.os, attr.arch, "memory")
}

Now finally, we can define a Factory that will be the entrypoint for the receiver.

// factory.go

func NewFactory() receiver.Factory {
	return receiver.NewFactory(
		metadata.Type,
		createDefaultConfig,
		receiver.WithMetrics(CreateVmStatReceiver, component.StabilityLevelDevelopment),
	)
}

func CreateVmStatReceiver(
	_ context.Context,
	settings receiver.CreateSettings,
	cfg component.Config,
	consumer consumer.Metrics,
) (receiver.Metrics, error) {
	logger := settings.Logger
	config, ok := cfg.(*Config)
	if !ok {
		em := "failed to cast to type Config"
		logger.Error(em)
		return nil, fmt.Errorf(em)
	}

	mb := metadata.NewMetricsBuilder(config.MetricsBuilderConfig, settings)

	ns := newScraper(config, mb, logger)
	scraper, err := scraperhelper.NewScraper(metadata.Type, ns.scrape)
	if err != nil {
		logger.Error("failed to create scraper", zap.Error(err))
		return nil, err
	}

	return scraperhelper.NewScraperControllerReceiver(
		&config.ScraperControllerSettings,
		settings,
		consumer,
		scraperhelper.AddScraper(scraper),
	)
}

Well, that's about it. Our receiver is ready to start scraping metrics.

Step 2: Define the configs

We need two configuration files for an OpenTelemetry Collector (obviously, the names are arbitrary):

  • builder-config.yaml: defines the components of the collector
  • otelcol.yaml: defines the configuration of each of the components
# builder-config.yaml
receivers:
  - gomod: github.com/mmynk/otel-kafka-poc/receivers/vmreceiver v0.0.1
    import: github.com/mmynk/otel-kafka-poc/receivers/vmreceiver
    name: 'vmreceiver'
    path: './receivers/vmreceiver'
# otelcol.yaml

receivers:
  vmstats:
    collection_interval: 10s
    delay: 2
    count: 2

Before we write our own exporter, we can actually deploy our collector by using a pre-built exporter from the OpenTelemetry Collector Contrib repo. Let's add the Prometheus exporter to our pipeline.

# builder-config.yaml
exporters:
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter v0.92.0

# otelcol.yaml
exporters:
  prometheus:
    endpoint: 0.0.0.0:8889

service:
    pipelines:
        metrics:
        receivers: [vmstats]
        exporters: [prometheus]

Step 3: Deploy the collector

Let's fire up the collector and see if it works.

$ builder --config builder-config.yaml
...
2024-01-22T01:02:53.018Z	INFO	builder/main.go:121	Compiled	{"binary": "./otelcol-dev/otelcol"}

This will generate a binary otelcol-dev/otelcol in the current directory.

If everything went well, we should be able to run the binary.

$ ./otelcol-dev/otelcol --config otelcol.yaml
...
2024-01-22T01:04:20.061Z	info	service@v0.92.0/telemetry.go:159	Serving metrics	{"address": ":8888", "level": "Basic"}
...
2024-01-22T01:04:20.062Z	info	service@v0.92.0/service.go:177	Everything is ready. Begin running and processing data.

We should now be able to see the metrics at http://localhost:8888/metrics.

Yay! We have successfully built a custom collector.

Step 4: Build a custom exporter

TODO

About

A POC for collecting, publishing, receiving and processing metrics using OpenTelemety with Kafka.

Topics

Resources

Stars

Watchers

Forks

Languages