diff --git a/plugins/inputs/redis/README.md b/plugins/inputs/redis/README.md index 35e8b3528e07e..797ceb16750a9 100644 --- a/plugins/inputs/redis/README.md +++ b/plugins/inputs/redis/README.md @@ -43,6 +43,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. # username = "" # password = "" + ## Optional Node Discovery for Redis Cluster + ## Enabling this feature triggers the execution of a `cluster nodes` command + ## at each metric gathering interval. This command automatically detects + ## all cluster nodes and retrieves metrics from each of them. + # node_discovery = true + ## Optional TLS Config ## Check tls/config.go ClientConfig for more options # tls_enable = true diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index b19cdd8a611a3..b2a5b4467a087 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -32,10 +32,11 @@ type RedisCommand struct { } type Redis struct { - Commands []*RedisCommand `toml:"commands"` - Servers []string `toml:"servers"` - Username string `toml:"username"` - Password string `toml:"password"` + Commands []*RedisCommand `toml:"commands"` + Servers []string `toml:"servers"` + Username string `toml:"username"` + Password string `toml:"password"` + NodeDiscovery bool `toml:"node_discovery"` tls.ClientConfig @@ -57,6 +58,12 @@ type RedisClient struct { tags map[string]string } +type redisClusterNode struct { + nodeID string + host string + port string +} + // RedisFieldTypes defines the types expected for each of the fields redis reports on type RedisFieldTypes struct { ActiveDefragHits int64 `json:"active_defrag_hits"` @@ -222,7 +229,7 @@ func (r *Redis) Init() error { } func (r *Redis) connect() error { - if r.connected { + if r.connected && !r.NodeDiscovery { return nil } @@ -281,18 +288,56 @@ func (r *Redis) connect() error { }, ) - tags := map[string]string{} - if u.Scheme == "unix" { - tags["socket"] = u.Path + if r.NodeDiscovery { + nodes, err := discoverNodes(&RedisClient{client, make(map[string]string)}) + if err != nil { + return err + } + if nodes == nil { + return fmt.Errorf("unable to discover nodes from %s", address) + } + for _, node := range nodes { + nodeAddress := address + if node.host != "" { + nodeAddress = node.host + ":" + node.port + } + + discoveredClient := redis.NewClient( + &redis.Options{ + Addr: nodeAddress, + Username: username, + Password: password, + Network: u.Scheme, + PoolSize: 1, + TLSConfig: tlsConfig, + }, + ) + + tags := map[string]string{ + "server": u.Hostname(), + "port": node.port, + "nodeID": node.nodeID, + } + + r.clients = append(r.clients, &RedisClient{ + client: discoveredClient, + tags: tags, + }) + } } else { - tags["server"] = u.Hostname() - tags["port"] = u.Port() - } + tags := map[string]string{} + if u.Scheme == "unix" { + tags["socket"] = u.Path + } else { + tags["server"] = u.Hostname() + tags["port"] = u.Port() + } - r.clients = append(r.clients, &RedisClient{ - client: client, - tags: tags, - }) + r.clients = append(r.clients, &RedisClient{ + client: client, + tags: tags, + }) + } } r.connected = true @@ -302,7 +347,7 @@ func (r *Redis) connect() error { // Reads stats from all configured servers accumulates stats. // Returns one of the errors encountered while gather stats (if any). func (r *Redis) Gather(acc telegraf.Accumulator) error { - if !r.connected { + if !r.connected || r.NodeDiscovery { err := r.connect() if err != nil { return err @@ -344,6 +389,20 @@ func (r *Redis) gatherCommandValues(client Client, acc telegraf.Accumulator) err return nil } +func discoverNodes(client Client) ([]redisClusterNode, error) { + val, err := client.Do("string", "cluster", "nodes") + + if err != nil { + return nil, err + } + + str, ok := val.(string) + if !ok { + return nil, fmt.Errorf("could not discover nodes: %w", err) + } + return parseClusterNodes(str) +} + func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error { info, err := client.Info().Result() if err != nil { @@ -354,6 +413,49 @@ func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error { return gatherInfoOutput(rdr, acc, client.BaseTags()) } +// Parse the list of nodes from a `cluster nodes` command +// This response looks like: +// +// d1861060fe6a534d42d8a19aeb36600e18785e04 127.0.0.1:6379 myself - 0 1318428930 1 connected 0-1364 +// 3886e65cc906bfd9b1f7e7bde468726a052d1dae 127.0.0.1:6380 master - 1318428930 1318428931 2 connected 1365-2729 +// d289c575dcbc4bdd2931585fd4339089e461a27d 127.0.0.1:6381 master - 1318428931 1318428931 3 connected 2730-4095 +// +// Per Redis docs: +// (https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/) +// +// In the above listing the different fields are in order: +// node id, address:port, flags, last ping sent, last pong received, +// configuration epoch, link state, slots. +func parseClusterNodes(nodesResponse string) ([]redisClusterNode, error) { + lines := strings.Split(nodesResponse, "\n") + nodes := make([]redisClusterNode, 0, len(lines)) + + for _, line := range lines { + fields := strings.Fields(line) + + if len(fields) == 0 { + continue + } + if len(fields) < 8 { + return nil, fmt.Errorf("unexpected cluster node: \"%s\"", line) + } + + endpointParts := strings.FieldsFunc(fields[1], func(r rune) bool { + return strings.ContainsRune(":@", r) + }) + if string(fields[1][0]) == ":" { + endpointParts = append([]string{""}, endpointParts...) + } + nodes = append(nodes, redisClusterNode{ + nodeID: fields[0], + host: endpointParts[0], + port: endpointParts[1], + }) + } + + return nodes, nil +} + // gatherInfoOutput gathers func gatherInfoOutput( rdr io.Reader, diff --git a/plugins/inputs/redis/redis_test.go b/plugins/inputs/redis/redis_test.go index e5e588fd76e23..01eafd3554dad 100644 --- a/plugins/inputs/redis/redis_test.go +++ b/plugins/inputs/redis/redis_test.go @@ -3,6 +3,7 @@ package redis import ( "bufio" "fmt" + "reflect" "strings" "testing" "time" @@ -62,6 +63,84 @@ func TestRedisConnectIntegration(t *testing.T) { require.NoError(t, err) } +func TestRedisConnectWithNodeDiscoveryIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + servicePort := "6379" + container := testutil.Container{ + Image: "redis:alpine", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForListeningPort(nat.Port(servicePort)), + Cmd: []string{ + "redis-server", + "--port", servicePort, + "--appendonly", "yes", + "--cluster-enabled", "yes", + "--cluster-config-file", "nodes.conf", + "--cluster-node-timeout", "5000", + }, + } + err := container.Start() + require.NoError(t, err, "failed to start container") + defer container.Terminate() + + addr := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) + + r := &Redis{ + Log: testutil.Logger{}, + Servers: []string{addr}, + NodeDiscovery: true, + } + + var acc testutil.Accumulator + + err = acc.GatherError(r.Gather) + require.NoError(t, err) +} + +func TestParseClusterNodes(t *testing.T) { + var tests = map[string]struct { + clusterNodesResponse string + expectedNodes []redisClusterNode + }{ + "SingleContainerCluster": { + "5998443a50112d5a7fa619c0b044451df052974e :6379@16379 myself,master - 0 0 0 connected\n", + []redisClusterNode{ + {nodeID: "5998443a50112d5a7fa619c0b044451df052974e", host: "", port: "6379"}, + }, + }, + "ClusterNodesResponseA": { + `d1861060fe6a534d42d8a19aeb36600e18785e04 127.0.0.1:6379 myself - 0 1318428930 1 connected 0-1364 + 3886e65cc906bfd9b1f7e7bde468726a052d1dae 127.0.0.1:6380 master - 1318428930 1318428931 2 connected 1365-2729 + d289c575dcbc4bdd2931585fd4339089e461a27d 127.0.0.1:6381 master - 1318428931 1318428931 3 connected 2730-4095`, + []redisClusterNode{ + {nodeID: "d1861060fe6a534d42d8a19aeb36600e18785e04", host: "127.0.0.1", port: "6379"}, + {nodeID: "3886e65cc906bfd9b1f7e7bde468726a052d1dae", host: "127.0.0.1", port: "6380"}, + {nodeID: "d289c575dcbc4bdd2931585fd4339089e461a27d", host: "127.0.0.1", port: "6381"}, + }, + }, + "ClusterNodesResponseB": { + `4ce37a099986f2d0465955e2e66937d6893aa0e1 10.64.82.45:11006@16379 myself,master - 0 1713739012000 5 connected 5462-10922 + d6eb119a1f050982cc901ae663e7448867e49f7c 10.64.82.46:11005@16379 master - 0 1713739011916 4 connected 10923-16383 + 3a386fb6930d8f6c1a6536082071eb2f32590d31 10.64.82.46:11007@16379 master - 0 1713739012922 6 connected 0-5461`, + []redisClusterNode{ + {nodeID: "4ce37a099986f2d0465955e2e66937d6893aa0e1", host: "10.64.82.45", port: "11006"}, + {nodeID: "d6eb119a1f050982cc901ae663e7448867e49f7c", host: "10.64.82.46", port: "11005"}, + {nodeID: "3a386fb6930d8f6c1a6536082071eb2f32590d31", host: "10.64.82.46", port: "11007"}, + }, + }, + } + + for tname, tt := range tests { + testResponse, _ := parseClusterNodes(tt.clusterNodesResponse) + if !reflect.DeepEqual(testResponse, tt.expectedNodes) { + t.Error(tname, "fail! Got:\n", testResponse, "\nExpected:\n", tt.expectedNodes) + } + } +} + func TestRedis_Commands(t *testing.T) { const redisListKey = "test-list-length" var acc testutil.Accumulator diff --git a/plugins/inputs/redis/sample.conf b/plugins/inputs/redis/sample.conf index 00571cbd7cb37..4767a091d5c98 100644 --- a/plugins/inputs/redis/sample.conf +++ b/plugins/inputs/redis/sample.conf @@ -27,6 +27,12 @@ # username = "" # password = "" + ## Optional Node Discovery for Redis Cluster + ## Enabling this feature triggers the execution of a `cluster nodes` command + ## at each metric gathering interval. This command automatically detects + ## all cluster nodes and retrieves metrics from each of them. + # node_discovery = true + ## Optional TLS Config ## Check tls/config.go ClientConfig for more options # tls_enable = true