Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse the Redis optional replication section redis_replication measuremet #5921

Merged
merged 1 commit into from Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions plugins/inputs/redis/README.md
Expand Up @@ -126,6 +126,16 @@ Additionally the plugin also calculates the hit/miss ratio (keyspace\_hitrate) a
- usec(int, mircoseconds)
- usec_per_call(float, microseconds)

- redis_replication
- tags:
- replication_role
- replica_ip
- replica_port
- state (either "online", "wait_bgsave", or "send_bulk")

- fields:
- lag(int, number)
- offset(int, number)

### Tags:

Expand Down
53 changes: 53 additions & 0 deletions plugins/inputs/redis/redis.go
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"log"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -48,6 +49,8 @@ func (r *RedisClient) BaseTags() map[string]string {
return tags
}

var replicationSlaveMetricPrefix = regexp.MustCompile(`^slave\d+`)

var sampleConfig = `
## specify servers via a url matching:
## [protocol://][:password]@address[:port]
Expand Down Expand Up @@ -253,6 +256,12 @@ func gatherInfoOutput(
gatherCommandstateLine(name, kline, acc, tags)
continue
}
if section == "Replication" && replicationSlaveMetricPrefix.MatchString(name) {
kline := strings.TrimSpace(parts[1])
gatherReplicationLine(name, kline, acc, tags)
continue
}

metric = name
}

Expand Down Expand Up @@ -374,6 +383,50 @@ func gatherCommandstateLine(
acc.AddFields("redis_cmdstat", fields, tags)
}

// Parse the special Replication line
// Example:
// slave0:ip=127.0.0.1,port=7379,state=online,offset=4556468,lag=0
// This line will only be visible when a node has a replica attached.
func gatherReplicationLine(
name string,
line string,
acc telegraf.Accumulator,
global_tags map[string]string,
) {
fields := make(map[string]interface{})
tags := make(map[string]string)
for k, v := range global_tags {
tags[k] = v
}

tags["replica_id"] = strings.TrimLeft(name, "slave")
tags["replication_role"] = "slave"

parts := strings.Split(line, ",")
for _, part := range parts {
kv := strings.Split(part, "=")
if len(kv) != 2 {
continue
}

switch kv[0] {
case "ip":
tags["replica_ip"] = kv[1]
case "port":
tags["replica_port"] = kv[1]
case "state":
tags[kv[0]] = kv[1]
default:
ival, err := strconv.ParseInt(kv[1], 10, 64)
if err == nil {
fields[kv[0]] = ival
}
}
}

acc.AddFields("redis_replication", fields, tags)
}

func init() {
inputs.Add("redis", func() telegraf.Input {
return &Redis{}
Expand Down
36 changes: 34 additions & 2 deletions plugins/inputs/redis/redis_test.go
Expand Up @@ -82,7 +82,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
"pubsub_channels": int64(0),
"pubsub_patterns": int64(0),
"latest_fork_usec": int64(0),
"connected_slaves": int64(0),
"connected_slaves": int64(2),
"master_repl_offset": int64(0),
"repl_backlog_active": int64(0),
"repl_backlog_size": int64(1048576),
Expand Down Expand Up @@ -134,6 +134,36 @@ func TestRedis_ParseMetrics(t *testing.T) {
"usec_per_call": float64(990.0),
}
acc.AssertContainsTaggedFields(t, "redis_cmdstat", cmdstatCommandFields, cmdstatCommandTags)

replicationTags := map[string]string{
"host": "redis.net",
"replication_role": "slave",
"replica_id": "0",
"replica_ip": "127.0.0.1",
"replica_port": "7379",
"state": "online",
}
replicationFields := map[string]interface{}{
"lag": int64(0),
"offset": int64(4556468),
}

acc.AssertContainsTaggedFields(t, "redis_replication", replicationFields, replicationTags)

replicationTags = map[string]string{
"host": "redis.net",
"replication_role": "slave",
"replica_id": "1",
"replica_ip": "127.0.0.1",
"replica_port": "8379",
"state": "send_bulk",
}
replicationFields = map[string]interface{}{
"lag": int64(1),
"offset": int64(0),
}

acc.AssertContainsTaggedFields(t, "redis_replication", replicationFields, replicationTags)
}

const testOutput = `# Server
Expand Down Expand Up @@ -209,7 +239,9 @@ latest_fork_usec:0

# Replication
role:master
connected_slaves:0
connected_slaves:2
slave0:ip=127.0.0.1,port=7379,state=online,offset=4556468,lag=0
slave1:ip=127.0.0.1,port=8379,state=send_bulk,offset=0,lag=1
master_replid:8c4d7b768b26826825ceb20ff4a2c7c54616350b
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
Expand Down