forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 9
/
cassandra.go
206 lines (173 loc) · 5.04 KB
/
cassandra.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package cassandra
import (
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
"github.com/elastic/beats/packetbeat/publish"
gocql "github.com/elastic/beats/packetbeat/protos/cassandra/internal/gocql"
)
// cassandra application level protocol analyzer plugin
type cassandra struct {
ports protos.PortsConfig
parserConfig parserConfig
transConfig transactionConfig
pub transPub
}
// Application Layer tcp stream data to be stored on tcp connection context.
type connection struct {
streams [2]*stream
trans transactions
}
// Uni-directioal tcp stream state for parsing messages.
type stream struct {
parser parser
}
var (
debugf = logp.MakeDebug("cassandra")
)
func init() {
protos.Register("cassandra", New)
}
// New create and initializes a new cassandra protocol analyzer instance.
func New(
testMode bool,
results publish.Transactions,
cfg *common.Config,
) (protos.Plugin, error) {
p := &cassandra{}
config := defaultConfig
if !testMode {
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
}
if err := p.init(results, &config); err != nil {
return nil, err
}
return p, nil
}
func (cassandra *cassandra) init(results publish.Transactions, config *cassandraConfig) error {
if err := cassandra.setFromConfig(config); err != nil {
return err
}
cassandra.pub.results = results
return nil
}
func (cassandra *cassandra) setFromConfig(config *cassandraConfig) error {
// set module configuration
if err := cassandra.ports.Set(config.Ports); err != nil {
return err
}
// set parser configuration
parser := &cassandra.parserConfig
parser.maxBytes = tcp.TCPMaxDataInStream
// set parser's compressor, only `snappy` supported right now
if config.Compressor == gocql.Snappy {
parser.compressor = gocql.SnappyCompressor{}
} else {
parser.compressor = nil
}
// parsed ignored ops
if len(config.OPsIgnored) > 0 {
maps := map[gocql.FrameOp]bool{}
for _, op := range config.OPsIgnored {
maps[op] = true
}
parser.ignoredOps = maps
debugf("parsed config IgnoredOPs: %v ", parser.ignoredOps)
}
// set transaction correlator configuration
trans := &cassandra.transConfig
trans.transactionTimeout = config.TransactionTimeout
// set transaction publisher configuration
pub := &cassandra.pub
pub.sendRequest = config.SendRequest
pub.sendResponse = config.SendResponse
pub.sendRequestHeader = config.SendRequestHeader
pub.sendResponseHeader = config.SendResponseHeader
return nil
}
// ConnectionTimeout returns the per stream connection timeout.
// Return <=0 to set default tcp module transaction timeout.
func (cassandra *cassandra) ConnectionTimeout() time.Duration {
return cassandra.transConfig.transactionTimeout
}
// GetPorts returns the ports numbers packets shall be processed for.
func (cassandra *cassandra) GetPorts() []int {
return cassandra.ports.Ports
}
// Parse processes a TCP packet. Return nil if connection
// state shall be dropped (e.g. parser not in sync with tcp stream)
func (cassandra *cassandra) Parse(
pkt *protos.Packet,
tcptuple *common.TCPTuple, dir uint8,
private protos.ProtocolData,
) protos.ProtocolData {
defer logp.Recover("Parse cassandra exception")
conn := cassandra.ensureConnection(private)
st := conn.streams[dir]
if st == nil {
st = &stream{}
st.parser.init(&cassandra.parserConfig, func(msg *message) error {
return conn.trans.onMessage(tcptuple.IPPort(), dir, msg)
})
conn.streams[dir] = st
}
if err := st.parser.feed(pkt.Ts, pkt.Payload); err != nil {
debugf("%v, dropping TCP stream for error in direction %v.", err, dir)
cassandra.onDropConnection(conn)
return nil
}
return conn
}
// ReceivedFin handles TCP-FIN packet.
func (cassandra *cassandra) ReceivedFin(
tcptuple *common.TCPTuple, dir uint8,
private protos.ProtocolData,
) protos.ProtocolData {
return private
}
// GapInStream handles lost packets in tcp-stream.
func (cassandra *cassandra) GapInStream(tcptuple *common.TCPTuple, dir uint8,
nbytes int,
private protos.ProtocolData,
) (protos.ProtocolData, bool) {
conn := getConnection(private)
if conn != nil {
cassandra.onDropConnection(conn)
}
return nil, true
}
// onDropConnection processes and optionally sends incomplete
// transaction in case of connection being dropped due to error
func (cassandra *cassandra) onDropConnection(conn *connection) {
}
func (cassandra *cassandra) ensureConnection(private protos.ProtocolData) *connection {
conn := getConnection(private)
if conn == nil {
conn = &connection{}
conn.trans.init(&cassandra.transConfig, cassandra.pub.onTransaction)
}
return conn
}
func (conn *connection) dropStreams() {
conn.streams[0] = nil
conn.streams[1] = nil
}
func getConnection(private protos.ProtocolData) *connection {
if private == nil {
return nil
}
priv, ok := private.(*connection)
if !ok {
logp.Warn("cassandra connection type error")
return nil
}
if priv == nil {
logp.Warn("Unexpected: cassandra connection data not set")
return nil
}
return priv
}