Skip to content
This repository was archived by the owner on Mar 6, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions collector/pulsar/examples/compose/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Pulsar metrics using the OpenTelemetry Collector

## Overview

Pulsar natively exposes a Prometheus endpoint and the OpenTelemetry Collector has a [Prometheus receiver][otel-prom-receiver] that can be used to scrape its Prometheus endpoint. This directory contains an example showing how to configure Pulsar and the Collector to send metrics to Lightstep Observability.

## Prerequisites

* Docker
* Docker Compose
* A Lightstep Observability [access token][ls-docs-access-token]

## How to run the example

* Export your Lightstep access token

```sh
export LS_ACCESS_TOKEN=<YOUR_TOKEN>
```

* Run the docker compose example

```sh
docker-compose up -d
```

### Explore Metrics in Lightstep

See the [Pulsar Telemetry Docs][pulsar-docs-telemetry] for comprehensive documentation on metrics emitted and the [dashboard documentation][ls-docs-dashboards] for more details.

## Configure the Collector

Below is a snippet showing how to configure the Prometheus Receiver to scrape the Prometheus endpoint exposed by the Pulsar Server.

```yaml
receivers:
prometheus:
config:
scrape_configs:
- job_name: 'pulsar'
scrape_interval: 10s
static_configs:
- targets: ['localhost:8080']
```


## Additional information

- [OpenTelemetry Collector Prometheus Receiver][otel-prom-receiver]
- [Pulsar Telemetry Reference][scylla-docs-telemetry]

[ls-docs-access-token]: https://docs.lightstep.com/docs/create-and-manage-access-tokens
[ls-docs-dashboards]: https://docs.lightstep.com/docs/create-and-manage-dashboards
[otel-prom-receiver]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/prometheusreceiver
[pulsar-docs-telemetry]: https://pulsar.apache.org/docs/next/reference-metrics/
11 changes: 11 additions & 0 deletions collector/pulsar/examples/compose/app/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM golang:1.17 AS builder

WORKDIR /app
COPY . .
RUN go get github.com/apache/pulsar-client-go/pulsar
RUN CGO_ENABLED=0 GOOS=linux go build -o pulsar_demo .

FROM alpine:latest
COPY --from=builder /app/pulsar_demo /pulsar_demo

ENTRYPOINT ["/pulsar_demo"]
43 changes: 43 additions & 0 deletions collector/pulsar/examples/compose/app/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
module github.com/lightstep/integrations/collector/pulsar/examples/compose/app

go 1.20

require (
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.1 // indirect
github.com/AthenZ/athenz v1.10.39 // indirect
github.com/DataDog/zstd v1.5.0 // indirect
github.com/apache/pulsar-client-go v0.11.0 // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.4.0 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang-jwt/jwt v3.2.1+incompatible // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/klauspost/compress v1.14.4 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/linkedin/goavro/v2 v2.9.8 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.11.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/mod v0.5.1 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 // indirect
golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)
524 changes: 524 additions & 0 deletions collector/pulsar/examples/compose/app/go.sum

Large diffs are not rendered by default.

87 changes: 87 additions & 0 deletions collector/pulsar/examples/compose/app/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package main

import (
"context"
"fmt"
"log"
"math/rand"
"os"
"time"

"github.com/apache/pulsar-client-go/pulsar"
)

func randomString(length int) string {
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
b := make([]byte, length)
for i := range b {
b[i] = charset[rand.Intn(len(charset))]
}
return string(b)
}

func main() {
rand.Seed(time.Now().UnixNano())

pulsarURL := os.Getenv("PULSAR_URL")
if pulsarURL == "" {
pulsarURL = "pulsar://localhost:6650"
}

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: pulsarURL,
})

if err != nil {
log.Fatal(err)
}

defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "otel-topic",
})

if err != nil {
log.Fatal(err)
}

defer producer.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "otel-topic",
SubscriptionName: "sub-1",
})

if err != nil {
log.Fatal(err)
}

defer consumer.Close()

go func() {
for {
randomMsg := randomString(10)
if _, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(randomMsg),
}); err != nil {
log.Fatal(err)
}

time.Sleep(1 * time.Second)
}
}()

for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
cancel()

fmt.Printf("Received: %s\n", string(msg.Payload()))

consumer.Ack(msg)
}
}
26 changes: 26 additions & 0 deletions collector/pulsar/examples/compose/collector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
receivers:
prometheus:
config:
scrape_configs:
- job_name: 'pulsar'
scrape_interval: 10s
static_configs:
- targets: ['pulsar:8080']

processors:
batch:

exporters:
logging:
loglevel: debug
otlp:
endpoint: ingest.lightstep.com:443
headers:
- lightstep-access-token: ${LS_ACCESS_TOKEN}

service:
pipelines:
metrics:
receivers: [prometheus]
processors: [batch]
exporters: [logging,otlp]
46 changes: 46 additions & 0 deletions collector/pulsar/examples/compose/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
version: '3.7'
services:
pulsar:
container_name: pulsar
image: apachepulsar/pulsar:latest
environment:
- PULSAR_URL=pulsar://pulsar:6650
ports:
- "6650:6650"
- "8080:8080"
command: bin/pulsar standalone
healthcheck:
test: ["CMD", "netstat", "-an", "|", "grep", "6650"]
interval: 10s
timeout: 10s
retries: 5
networks:
- integrations

pulsar_demo_app:
container_name: pulsar_demo_app
build:
context: ./app
depends_on:
- pulsar
networks:
- integrations

otel-collector:
container_name: otel-collector
image: otel/opentelemetry-collector-contrib:0.81.0
hostname: otel-collector
restart: always
command: [ "--config=/conf/collector.yaml" ]
volumes:
- ./collector.yaml:/conf/collector.yaml:rw
environment:
LS_ACCESS_TOKEN: "${LS_ACCESS_TOKEN}"
depends_on:
- pulsar
networks:
- integrations

networks:
integrations:

Loading