Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --metric-tags option #10

Merged
merged 1 commit into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,16 @@ func TestPublishConsume(t *testing.T) {
args := []string{tc.publish + "-" + tc.consume, "-C", "1", "-D", "1", "-t", topic, "-T", topic}
rootCmd.SetArgs(args)
fmt.Println("Running test: omq", strings.Join(args, " "))
publishedBefore := testutil.ToFloat64(metrics.MessagesPublished.WithLabelValues(publishProtoLabel))
consumedBefore := testutil.ToFloat64(metrics.MessagesConsumed.WithLabelValues(consumeProtoLabel))

err := rootCmd.Execute()

assert.Nil(t, err)

assert.Eventually(t, func() bool {
return testutil.ToFloat64(metrics.MessagesPublished.WithLabelValues(publishProtoLabel)) == publishedBefore+1
return assert.Equal(t, 1.0, testutil.ToFloat64(metrics.MessagesPublished.WithLabelValues(publishProtoLabel)))

}, 2*time.Second, 100*time.Millisecond)
assert.Eventually(t, func() bool {
return testutil.ToFloat64(metrics.MessagesConsumed.WithLabelValues(consumeProtoLabel)) == consumedBefore+1
return assert.Equal(t, 1.0, testutil.ToFloat64(metrics.MessagesConsumed.WithLabelValues(consumeProtoLabel)))
}, 2*time.Second, 100*time.Millisecond)
})
}
Expand Down
21 changes: 21 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var (
versionCmd = &cobra.Command{}
)

var metricTags []string

func Execute() {
rootCmd := RootCmd()
err := rootCmd.Execute()
Expand Down Expand Up @@ -159,6 +161,24 @@ func RootCmd() *cobra.Command {
os.Exit(1)
}
}

// split metric tags into key-value pairs
cfg.MetricTags = make(map[string]string)
for _, tag := range metricTags {
parts := strings.Split(tag, "=")
if len(parts) != 2 {
_, _ = fmt.Fprintf(os.Stderr, "ERROR: invalid metric tags: %s, use label=value format\n", tag)
os.Exit(1)
}
cfg.MetricTags[parts[0]] = parts[1]
}
if metricTags != nil {
metrics.RegisterMetrics(cfg.MetricTags)
metricTags = nil
}
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {
metrics.UnregisterMetrics()
},
}
rootCmd.PersistentFlags().StringVarP(&cfg.PublisherUri, "publisher-uri", "", "", "URI for publishing")
Expand Down Expand Up @@ -189,6 +209,7 @@ func RootCmd() *cobra.Command {
rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValueSet, "stream-filter-value-set", "", "Stream filter value for publisher")
rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1, "AMQP-1.0 consumer credits / STOMP prefetch count")
rootCmd.PersistentFlags().DurationVarP(&cfg.ConsumerLatency, "consumer-latency", "L", 0*time.Second, "consumer latency (time to accept message)")
rootCmd.PersistentFlags().StringSliceVar(&metricTags, "metric-tags", []string{}, "Prometheus label-value pairs, eg. l1=v1,l2=v2")

rootCmd.AddCommand(amqp_amqp)
rootCmd.AddCommand(amqp_stomp)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Config struct {
Amqp AmqpOptions
MqttPublisher MqttOptions
MqttConsumer MqttOptions
MetricTags map[string]string
}

func NewConfig() Config {
Expand Down
63 changes: 34 additions & 29 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,39 +28,44 @@ var lock = &sync.Mutex{}
var metricsServer *MetricsServer

var (
MessagesPublished *prometheus.CounterVec
MessagesConsumed *prometheus.CounterVec
PublishingLatency *prometheus.SummaryVec
EndToEndLatency *prometheus.SummaryVec
)

func RegisterMetrics(globalLabels prometheus.Labels) {
fmt.Printf("negistering metrics: %v", globalLabels)
MessagesPublished = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "omq_messages_published_total",
Help: "The total number of published messages"},
[]string{
"protocol",
},
)
Name: "omq_messages_published_total",
Help: "The total number of published messages",
ConstLabels: globalLabels,
}, []string{"protocol"})
MessagesConsumed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "omq_messages_consumed_total",
Help: "The total number of consumed messages"},
[]string{
"protocol",
},
)
Name: "omq_messages_consumed_total",
Help: "The total number of consumed messages",
ConstLabels: globalLabels,
}, []string{"protocol"})
PublishingLatency = promauto.NewSummaryVec(prometheus.SummaryOpts{
Name: "omq_publishing_latency_seconds",
Help: "Time from sending a message to receiving a confirmation",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
},
[]string{
"protocol",
},
)
Name: "omq_publishing_latency_seconds",
Help: "Time from sending a message to receiving a confirmation",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
ConstLabels: globalLabels,
}, []string{"protocol"})
EndToEndLatency = promauto.NewSummaryVec(prometheus.SummaryOpts{
Name: "omq_end_to_end_latency_seconds",
Help: "Time from sending a message to receiving the message",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
},
[]string{
"protocol",
},
)
)
Name: "omq_end_to_end_latency_seconds",
Help: "Time from sending a message to receiving the message",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
ConstLabels: globalLabels,
}, []string{"protocol"})
}

func UnregisterMetrics() {
prometheus.DefaultRegisterer.Unregister(MessagesPublished)
prometheus.DefaultRegisterer.Unregister(MessagesConsumed)
prometheus.DefaultRegisterer.Unregister(PublishingLatency)
prometheus.DefaultRegisterer.Unregister(EndToEndLatency)
}

func GetMetricsServer() *MetricsServer {
lock.Lock()
Expand Down
Loading