Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(migrations): Add option migration for inputs.nats_consumer #14234

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions migrations/all/inputs.nats_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || (migrations && (inputs || inputs.nats_consumer))

package all

import _ "github.com/influxdata/telegraf/migrations/inputs_nats_consumer" // register migration
43 changes: 43 additions & 0 deletions migrations/inputs_nats_consumer/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package inputs_nats_consumer

import (
"github.com/influxdata/toml"
"github.com/influxdata/toml/ast"

"github.com/influxdata/telegraf/migrations"
)

// Migration function
func migrate(tbl *ast.Table) ([]byte, string, error) {
// Decode the old data structure
var plugin map[string]interface{}
if err := toml.UnmarshalTable(tbl, &plugin); err != nil {
return nil, "", err
}

// Check for deprecated option(s) and migrate them
var applied bool
if _, found := plugin["metric_buffer"]; found {
applied = true

// Remove the ignored setting
delete(plugin, "metric_buffer")
}

// No options migrated so we can exit early
if !applied {
return nil, "", migrations.ErrNotApplicable
}

// Create the corresponding plugin configurations
cfg := migrations.CreateTOMLStruct("inputs", "nats_consumer")
cfg.Add("inputs", "nats_consumer", plugin)

output, err := toml.Marshal(cfg)
return output, "", err
}

// Register the migration function for the plugin type
func init() {
migrations.AddPluginOptionMigration("inputs.nats_consumer", migrate)
}
135 changes: 135 additions & 0 deletions migrations/inputs_nats_consumer/migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package inputs_nats_consumer_test

import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/config"
_ "github.com/influxdata/telegraf/migrations/inputs_nats_consumer" // register migration
_ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer" // register plugin
_ "github.com/influxdata/telegraf/plugins/parsers/all" // register parsers
)

func TestNoMigration(t *testing.T) {
defaultCfg := []byte(`
# Read metrics from NATS subject(s)
[[inputs.nats_consumer]]
## urls of NATS servers
servers = ["nats://localhost:4222"]

## subject(s) to consume
## If you use jetstream you need to set the subjects
## in jetstream_subjects
subjects = ["telegraf"]

## jetstream subjects
## jetstream is a streaming technology inside of nats.
## With jetstream the nats-server persists messages and
## a consumer can consume historical messages. This is
## useful when telegraf needs to restart it don't miss a
## message. You need to configure the nats-server.
## https://docs.nats.io/nats-concepts/jetstream.
jetstream_subjects = ["js_telegraf"]

## name a queue group
queue_group = "telegraf_consumers"

## Optional credentials
# username = ""
# password = ""

## Optional NATS 2.0 and NATS NGS compatible user credentials
# credentials = "/etc/telegraf/nats.creds"

## Use Transport Layer Security
# secure = false

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536
# pending_bytes_limit = 67108864

## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data
## is not lost. This option sets the maximum messages to read from the
## broker that have not been written by an output.
##
## This value needs to be picked with awareness of the agent's
## metric_batch_size value as well. Setting max undelivered messages too high
## can result in a constant stream of data batches to the output. While
## setting it too low may never flush the broker's messages.
# max_undelivered_messages = 1000

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`)

// Migrate and check that nothing changed
output, n, err := config.ApplyMigrations(defaultCfg)
require.NoError(t, err)
require.NotEmpty(t, output)
require.Zero(t, n)
require.Equal(t, string(defaultCfg), string(output))
}

func TestCases(t *testing.T) {
// Get all directories in testdata
folders, err := os.ReadDir("testcases")
require.NoError(t, err)

for _, f := range folders {
// Only handle folders
if !f.IsDir() {
continue
}

t.Run(f.Name(), func(t *testing.T) {
testcasePath := filepath.Join("testcases", f.Name())
inputFile := filepath.Join(testcasePath, "telegraf.conf")
expectedFile := filepath.Join(testcasePath, "expected.conf")

// Read the expected output
expected := config.NewConfig()
require.NoError(t, expected.LoadConfig(expectedFile))
require.NotEmpty(t, expected.Inputs)

// Read the input data
input, remote, err := config.LoadConfigFile(inputFile)
require.NoError(t, err)
require.False(t, remote)
require.NotEmpty(t, input)

// Migrate
output, n, err := config.ApplyMigrations(input)
require.NoError(t, err)
require.NotEmpty(t, output)
require.GreaterOrEqual(t, n, uint64(1))
actual := config.NewConfig()
require.NoError(t, actual.LoadConfigData(output))

// Test the output
require.Len(t, actual.Inputs, len(expected.Inputs))
actualIDs := make([]string, 0, len(expected.Inputs))
expectedIDs := make([]string, 0, len(expected.Inputs))
for i := range actual.Inputs {
actualIDs = append(actualIDs, actual.Inputs[i].ID())
expectedIDs = append(expectedIDs, expected.Inputs[i].ID())
}
require.ElementsMatch(t, expectedIDs, actualIDs, string(output))
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[[inputs.nats_consumer]]
servers = ["nats://localhost:4222"]
subjects = ["telegraf"]
jetstream_subjects = ["js_telegraf"]
queue_group = "telegraf_consumers"
data_format = "influx"
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Read metrics from NATS subject(s)
[[inputs.nats_consumer]]
## urls of NATS servers
servers = ["nats://localhost:4222"]

## subject(s) to consume
subjects = ["telegraf"]

## jetstream subjects
jetstream_subjects = ["js_telegraf"]

## name a queue group
queue_group = "telegraf_consumers"

## Input data format
data_format = "influx"

## Number of metrics to buffer
metric_buffer = 1024
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[[inputs.nats_consumer]]
servers = ["nats://localhost:4222"]
subjects = ["telegraf"]
jetstream_subjects = ["js_telegraf"]
queue_group = "telegraf_consumers"
data_format = "xpath_json"
xpath_native_types = true

[[inputs.nats_consumer.xpath]]
metric_name = "/name"
timestamp = "/timestamp"
timestamp_format = "unix_ms"
field_selection = "/fields/*"
tag_selection = "/tags/*"
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Read metrics from NATS subject(s)
[[inputs.nats_consumer]]
## urls of NATS servers
servers = ["nats://localhost:4222"]

## subject(s) to consume
subjects = ["telegraf"]

## jetstream subjects
jetstream_subjects = ["js_telegraf"]

## name a queue group
queue_group = "telegraf_consumers"

## Number of metrics to buffer
metric_buffer = 1024

data_format = "xpath_json"
xpath_native_types = true

# Configuration matching the first (ENERGY) message
[[inputs.nats_consumer.xpath]]
metric_name = "/name"
timestamp = "/timestamp"
timestamp_format = "unix_ms"
field_selection = "/fields/*"
tag_selection = "/tags/*"