forked from bigpigeon/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
harvester.go
155 lines (132 loc) · 3.12 KB
/
harvester.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package redis
import (
"fmt"
"time"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"strings"
"github.com/elastic/beats/filebeat/harvester"
rd "github.com/garyburd/redigo/redis"
"github.com/satori/go.uuid"
)
// Harvester contains all redis harvester data
type Harvester struct {
id uuid.UUID
done chan struct{}
conn rd.Conn
forwarder *harvester.Forwarder
}
// log contains all data related to one slowlog entry
//
// The data is in the following format:
// 1) (integer) 13
// 2) (integer) 1309448128
// 3) (integer) 30
// 4) 1) "slowlog"
// 2) "get"
// 3) "100"
//
type log struct {
id int64
timestamp int64
duration int
cmd string
key string
args []string
}
// NewHarvester creates a new harvester with the given connection
func NewHarvester(conn rd.Conn) *Harvester {
return &Harvester{
id: uuid.NewV4(),
done: make(chan struct{}),
conn: conn,
}
}
// Run starts a new redis harvester
func (h *Harvester) Run() error {
defer h.conn.Close()
select {
case <-h.done:
return nil
default:
}
// Writes Slowlog get and slowlog reset both to the buffer so they are executed together
h.conn.Send("SLOWLOG", "GET")
h.conn.Send("SLOWLOG", "RESET")
// Flush the buffer to execute both commands and receive the reply from SLOWLOG GET
h.conn.Flush()
// Receives first reply from redis which is the one from GET
logs, err := rd.Values(h.conn.Receive())
if err != nil {
return fmt.Errorf("error receiving slowlog data: %s", err)
}
// Read reply from RESET
_, err = h.conn.Receive()
if err != nil {
return fmt.Errorf("error receiving reset data: %s", err)
}
for _, item := range logs {
// Stopping here means some of the slowlog events are lost!
select {
case <-h.done:
return nil
default:
}
entry, err := rd.Values(item, nil)
if err != nil {
logp.Err("Error loading slowlog values: %s", err)
continue
}
var log log
var args []string
rd.Scan(entry, &log.id, &log.timestamp, &log.duration, &args)
// This splits up the args into cmd, key, args.
argsLen := len(args)
if argsLen > 0 {
log.cmd = args[0]
}
if argsLen > 1 {
log.key = args[1]
}
// This could contain confidential data, processors should be used to drop it if needed
if argsLen > 2 {
log.args = args[2:]
}
data := util.NewData()
subEvent := common.MapStr{
"id": log.id,
"cmd": log.cmd,
"key": log.key,
"duration": common.MapStr{
"us": log.duration,
},
}
if log.args != nil {
subEvent["args"] = log.args
}
data.Event = common.MapStr{
"@timestamp": common.Time(time.Unix(log.timestamp, 0).UTC()),
"message": strings.Join(args, " "),
"redis": common.MapStr{
"slowlog": subEvent,
},
"beat": common.MapStr{
"read_timestamp": common.Time(time.Now()),
},
"prospector": common.MapStr{
"type": "redis",
},
}
h.forwarder.Send(data)
}
return nil
}
// Stop stopps the harvester
func (h *Harvester) Stop() {
close(h.done)
}
// ID returns the unique harvester ID
func (h *Harvester) ID() uuid.UUID {
return h.id
}