Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add RedisTimeSeries plugin #11054

Merged
merged 24 commits into from Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ba8803c
add support for RedisTimeSeries plugin
gkorland May 1, 2022
21ff22a
fix README and example in code
gkorland May 1, 2022
27c2e74
verify redis address is set
gkorland May 1, 2022
e2317db
add logger
gkorland May 1, 2022
9feb1d6
simplify write code
gkorland May 1, 2022
ed46aba
fix lint errors
gkorland May 1, 2022
03c2676
Update plugins/outputs/redistimeseries/README.md
gkorland May 9, 2022
1f50c47
Update plugins/outputs/redistimeseries/redistimeseries.go
gkorland May 9, 2022
2503242
Update plugins/outputs/redistimeseries/redistimeseries.go
gkorland May 9, 2022
ff61ece
Update plugins/outputs/redistimeseries/redistimeseries.go
gkorland May 9, 2022
557a6ca
Update plugins/outputs/redistimeseries/redistimeseries.go
gkorland May 9, 2022
3f3aa27
Update plugins/outputs/redistimeseries/redistimeseries.go
gkorland May 9, 2022
61d6c45
ftm code
gkorland May 9, 2022
1452de8
readme updates
chayim May 15, 2022
cffe9ad
PR comments
chayim May 22, 2022
97de1e2
Update plugins/outputs/redistimeseries/README.md
gkorland May 24, 2022
eaebdd5
Merge branch 'master' into redistimeseries-plugin
gkorland May 25, 2022
0d5c540
Merge branch 'master' into redistimeseries-plugin
chayim May 29, 2022
90653df
go mod tidy
chayim Jun 7, 2022
4fde7a0
Merge branch 'master' of github.com:influxdata/telegraf into redistim…
gkorland Jun 19, 2022
2f98527
Merge branch 'master' into redistimeseries-plugin
chayim Jun 28, 2022
e12d4d8
lint fix, mod fix
chayim Jun 28, 2022
3f57e14
Update plugins/outputs/redistimeseries/redistimeseries.go
chayim Jun 30, 2022
ed0f52d
make check fix
chayim Jul 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 21 additions & 4 deletions etc/telegraf.conf
Expand Up @@ -1631,6 +1631,23 @@
# ## Export metric collection time.
# # export_timestamp = false

# # Configuration for the RedisTimeSeries server to send metrics to
MyaLongmire marked this conversation as resolved.
Show resolved Hide resolved
# [[outputs.redistimeseries]]
# ## The address of the RedisTimeSeries server.
# address = "127.0.0.1:6379"
# ## password to login Redis
# password = ""
#
# ## username (optional)
# username = ""
# # redis database number (optional, must be an integer)
# database = 0
#
# ## optional TLS configurations
# tls_ca = "/etc/telegraf/ca.pem
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
# insecure_skip_verify = false

# # Configuration for the Riemann server to send metrics to
# [[outputs.riemann]]
Expand Down Expand Up @@ -6070,7 +6087,7 @@
# # timeout = "5ms"


# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver
# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver
# [[inputs.opensmtpd]]
# ## If running as a restricted user you can prepend sudo for additional access:
# #use_sudo = false
Expand Down Expand Up @@ -8188,20 +8205,20 @@
# ## This value is propagated to pqos tool. Interval format is defined by pqos itself.
# ## If not provided or provided 0, will be set to 10 = 10x100ms = 1s.
# # sampling_interval = "10"
#
#
# ## Optionally specify the path to pqos executable.
# ## If not provided, auto discovery will be performed.
# # pqos_path = "/usr/local/bin/pqos"
#
# ## Optionally specify if IPC and LLC_Misses metrics shouldn't be propagated.
# ## If not provided, default value is false.
# # shortened_metrics = false
#
#
# ## Specify the list of groups of CPU core(s) to be provided as pqos input.
# ## Mandatory if processes aren't set and forbidden if processes are specified.
# ## e.g. ["0-3", "4,5,6"] or ["1-3,4"]
# # cores = ["0-3"]
#
#
# ## Specify the list of processes for which Metrics will be collected.
# ## Mandatory if cores aren't set and forbidden if cores are specified.
# ## e.g. ["qemu", "pmd"]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -58,7 +58,7 @@ require (
github.com/go-ldap/ldap/v3 v3.4.1
github.com/go-logfmt/logfmt v0.5.0
github.com/go-ping/ping v0.0.0-20210201095549-52eed920f98c
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-redis/redis/v7 v7.4.0
github.com/go-sql-driver/mysql v1.6.0
github.com/gobwas/glob v0.2.3
github.com/gofrs/uuid v4.2.0+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -955,6 +955,8 @@ github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3yg
github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4=
github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-redis/redis/v8 v8.0.0-beta.6/go.mod h1:g79Vpae8JMzg5qjk8BiwU9tK+HmU3iDVyS4UAJLFycI=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
Expand Down
3 changes: 2 additions & 1 deletion plugins/inputs/redis/README.md
Expand Up @@ -26,7 +26,8 @@
# # Can be "string", "integer", or "float"
# type = "string"

## specify server password
## specify server connection parameters
# username = "myuser"
# password = "s#cr@t%"

## Optional TLS Config
Expand Down
7 changes: 3 additions & 4 deletions plugins/inputs/redis/redis.go
Expand Up @@ -14,8 +14,7 @@ import (
"sync"
"time"

"github.com/go-redis/redis"

"github.com/go-redis/redis/v7"
MyaLongmire marked this conversation as resolved.
Show resolved Hide resolved
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
Expand Down Expand Up @@ -173,11 +172,11 @@ func (r *RedisClient) Do(returnType string, args ...interface{}) (interface{}, e
case "integer":
return rawVal.Int64()
case "string":
return rawVal.String()
return rawVal.Text()
case "float":
return rawVal.Float64()
default:
return rawVal.String()
return rawVal.Text()
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/redis_sentinel/redis_sentinel.go
Expand Up @@ -11,7 +11,7 @@ import (
"strings"
"sync"

"github.com/go-redis/redis"
"github.com/go-redis/redis/v7"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/tls"
Expand Down
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Expand Up @@ -41,6 +41,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/opentelemetry"
_ "github.com/influxdata/telegraf/plugins/outputs/opentsdb"
_ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client"
_ "github.com/influxdata/telegraf/plugins/outputs/redistimeseries"
_ "github.com/influxdata/telegraf/plugins/outputs/riemann"
_ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy"
_ "github.com/influxdata/telegraf/plugins/outputs/sensu"
Expand Down
22 changes: 22 additions & 0 deletions plugins/outputs/redistimeseries/README.md
@@ -0,0 +1,22 @@
# RedisTimeSeries Producer Output Plugin

The RedisTimeSeries output plugin writes metrics to the RedisTimeSeries server.

```toml
[[outputs.redistimeseries]]
## The address of the RedisTimeSeries server.
address = "127.0.0.1:6379"
## password to login Redis
gkorland marked this conversation as resolved.
Show resolved Hide resolved
password = ""

## username (optional)
# username = ""
# redis database number (optional, must be an integer)
# database = 0

## optional TLS configurations
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
# insecure_skip_verify = false
```
90 changes: 90 additions & 0 deletions plugins/outputs/redistimeseries/redistimeseries.go
@@ -0,0 +1,90 @@
package redistimeseries

import (
"fmt"

"github.com/go-redis/redis/v7"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
)

const sampleConfig = `
## The address of the RedisTimeSeries server.
address = "127.0.0.1:6379"

## Redis ACL credentials
# username = ""
# password = ""
# database = 0
srebhan marked this conversation as resolved.
Show resolved Hide resolved

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
# insecure_skip_verify = false
`

type RedisTimeSeries struct {
Address string `toml:"address"`
Username string `toml:"username"`
Password string `toml:"password"`
Database int `toml:"database"`
Log telegraf.Logger `toml:"-"`
tls.ClientConfig
client *redis.Client
}

func (r *RedisTimeSeries) Connect() error {
if r.Address == "" {
return fmt.Errorf("Redis address must be specified")
chayim marked this conversation as resolved.
Show resolved Hide resolved
}
r.client = redis.NewClient(&redis.Options{
Addr: r.Address,
Password: r.Password,
Username: r.Username,
DB: r.Database,
})
return r.client.Ping().Err()
}

func (r *RedisTimeSeries) Close() error {
return r.client.Close()
}

func (r *RedisTimeSeries) Description() string {
return "Plugin for sending metrics to RedisTimeSeries"
}

func (r *RedisTimeSeries) SampleConfig() string {
return sampleConfig
}
func (r *RedisTimeSeries) Write(metrics []telegraf.Metric) error {
for _, m := range metrics {
now := m.Time().UnixNano() / 1000000 // in milliseconds
name := m.Name()

var tags []interface{}
for k, v := range m.Tags() {
tags = append(tags, k, v)
}

for fieldName, value := range m.Fields() {
key := name + "_" + fieldName

addSlice := []interface{}{"TS.ADD", key, now, value}
addSlice = append(addSlice, tags...)

if err := r.client.Do(addSlice...).Err(); err != nil {
return fmt.Errorf("adding sample failed: %v", err)
}
}
}
return nil
}

func init() {
outputs.Add("redistimeseries", func() telegraf.Output {
return &RedisTimeSeries{}
})
}
26 changes: 26 additions & 0 deletions plugins/outputs/redistimeseries/redistimeseries_test.go
@@ -0,0 +1,26 @@
package redistimeseries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is zero testing in this PR that would get routinely run, I'd like to see an integration test.

There is an example in the redis input using testcontainers that you can copy and paste from.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@powersj Maybe I misunderstand the testing (and likely)! But doesn't the TestConnectAndWrite in this file, with the MockMetrics cover this use case? I'd like to make sure I'm not missing something - and I'm pretty sure I am.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is that you have only a single test and this test isn't run by default (see if testing.Short() { ...Skip...}) as it requires a local instance of redis running. @powersj added some infrastructure to enable integration tests using testcontainers (aka docker images) to verify functionality of plugins.

So the request is to make use of this infrastructure (see the plugin linked above) to at least enable integration tests for your plugin...


import (
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"testing"
)

func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

address := testutil.GetLocalHost() + ":6379"
redis := &RedisTimeSeries{
Address: address,
}

// Verify that we can connect to the RedisTimeSeries server
err := redis.Connect()
require.NoError(t, err)

// Verify that we can successfully write data to the RedisTimeSeries server
err = redis.Write(testutil.MockMetrics())
require.NoError(t, err)
}
Comment on lines +9 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To allow integration tests you want something like this

Suggested change
func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
address := testutil.GetLocalHost() + ":6379"
redis := &RedisTimeSeries{
Address: address,
}
// Verify that we can connect to the RedisTimeSeries server
err := redis.Connect()
require.NoError(t, err)
// Verify that we can successfully write data to the RedisTimeSeries server
err = redis.Write(testutil.MockMetrics())
require.NoError(t, err)
}
func TestConnectAndWriteIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
servicePort := "6379"
container := testutil.Container{
Image: "redis:alpine",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
}
require.NoError(t, container.Start(), "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
redis := &RedisTimeSeries{
Address: fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]),
}
// Verify that we can connect to the RedisTimeSeries server
require.NoError(t, redis.Connect())
// Verify that we can successfully write data to the RedisTimeSeries server
require.NoError(t, redis.Write(testutil.MockMetrics()))
}