forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
applayer.go
232 lines (196 loc) · 5.94 KB
/
applayer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
// The applayer module provides common definitions with common fields
// for use with application layer protocols among beats.
package applayer
import (
"errors"
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/streambuf"
)
// A Message its direction indicator
type NetDirection uint8
const (
// Message due to a reponse by server
NetReverseDirection NetDirection = 0
// Message was send by client
NetOriginalDirection NetDirection = 1
)
// Transport type indicator. One of TransportUdp or TransportTcp
type Transport uint8
const (
TransportUdp Transport = iota
TransportTcp
)
// String returns the transport type its textual representation.
func (t Transport) String() string {
switch t {
case TransportUdp:
return "udp"
case TransportTcp:
return "tcp"
default:
return "invalid"
}
}
// A Stream provides buffering data if stream based protocol is used.
// Use Init to initialize a stream with en empty buffer and buffering limit.
// A Stream its zero value is a valid unlimited stream buffer.
type Stream struct {
// Buf provides the buffering with parsing support
Buf streambuf.Buffer
// MaxDataInStream sets the maximum number of bytes held in buffer.
// If limit is reached append function will return an error.
MaxDataInStream int
}
// A Transaction defines common fields for all application layer protocols.
type Transaction struct {
// Type is the name of the application layer protocol transaction be represented.
Type string
// Transaction source and destination IPs and Ports.
Tuple common.IpPortTuple
// Transport layer type
Transport Transport
// Src describes the transaction source/initiator endpoint
Src common.Endpoint
// Dst describes the transaction destination endpoint
Dst common.Endpoint
// Ts sets the transaction its initial timestamp
Ts TransactionTimestamp
// ResponseTime is the transaction duration in milliseconds. Should be set
// to -1 if duration is unknown
ResponseTime int32
// Status of final transaction
Status string // see libbeat/common/statuses.go
// Notes holds a list of interesting events and errors encountered when
// processing the transaction
Notes []string
// BytesIn is the number of bytes returned by destination endpoint
BytesIn uint64
// BytesOut is the number of bytes send by source endpoint to destination endpoint
BytesOut uint64
}
// TransactionTimestamp defines a transaction its initial timestamps as unix
// timestamp in milliseconds and time.Time struct.
type TransactionTimestamp struct {
Millis int64
Ts time.Time
}
// Message defines common application layer message fields. Some of these fields
// are required to initialize a Transaction (see (*Transaction).InitWithMsg).
type Message struct {
Ts time.Time
Tuple common.IpPortTuple
Transport Transport
CmdlineTuple *common.CmdlineTuple
Direction NetDirection
IsRequest bool
Size uint64
Notes []string
}
// Error code if stream exceeds max allowed size on Append.
var ErrStreamTooLarge = errors.New("Stream data too large")
// Init initializes a stream with an empty buffer and max size. Calling Init
// twice will fully re-initialize the buffer, such that calling Init before putting
// the stream in some object pool, no memory will be leaked.
func (stream *Stream) Init(maxDataInStream int) {
stream.MaxDataInStream = maxDataInStream
stream.Buf = streambuf.Buffer{}
}
// Reset will remove all bytes already read from the buffer.
func (stream *Stream) Reset() {
stream.Buf.Reset()
}
// Append adds data to the Stream its buffer. If internal buffer is nil, data
// will be retained as is. Use Write if you don't intend to retain the buffer in
// the stream.
func (stream *Stream) Append(data []byte) error {
err := stream.Buf.Append(data)
if err != nil {
return err
}
if stream.MaxDataInStream > 0 && stream.Buf.Total() > stream.MaxDataInStream {
return ErrStreamTooLarge
}
return nil
}
// Write copies data to the Stream its buffer. The data slice will not be
// retained by the buffer.
func (stream *Stream) Write(data []byte) (int, error) {
n, err := stream.Buf.Write(data)
if err != nil {
return n, err
}
if stream.MaxDataInStream > 0 && stream.Buf.Total() > stream.MaxDataInStream {
return n, ErrStreamTooLarge
}
return n, nil
}
// Init initializes some common fields. ResponseTime, Status, BytesIn and
// BytesOut are initialized to zero and must be filled by application code.
func (t *Transaction) Init(
typ string,
tuple common.IpPortTuple,
transport Transport,
direction NetDirection,
time time.Time,
cmdline *common.CmdlineTuple,
notes []string,
) {
t.Type = typ
t.Transport = transport
t.Tuple = tuple
// transactions have microseconds resolution
t.Ts.Ts = time
t.Ts.Millis = int64(time.UnixNano() / 1000)
t.Src = common.Endpoint{
Ip: tuple.Src_ip.String(),
Port: tuple.Src_port,
Proc: string(cmdline.Src),
}
t.Dst = common.Endpoint{
Ip: tuple.Dst_ip.String(),
Port: tuple.Dst_port,
Proc: string(cmdline.Dst),
}
t.Notes = notes
if direction == NetReverseDirection {
t.Src, t.Dst = t.Dst, t.Src
}
}
// InitWithMsg initializes some common fields from a Message. ResponseTime,
// Status, BytesIn and BytesOut are initialized to zero and must be filled by
// application code.
func (t *Transaction) InitWithMsg(
typ string,
msg *Message,
) {
t.Init(
typ,
msg.Tuple,
msg.Transport,
msg.Direction,
msg.Ts,
msg.CmdlineTuple,
nil,
)
}
// Event fills common event fields.
func (t *Transaction) Event(event common.MapStr) error {
event["type"] = t.Type
event["@timestamp"] = common.Time(t.Ts.Ts)
event["responsetime"] = t.ResponseTime
event["src"] = &t.Src
event["dst"] = &t.Dst
event["transport"] = t.Transport.String()
event["bytes_out"] = t.BytesOut
event["bytes_in"] = t.BytesIn
event["status"] = t.Status
if len(t.Notes) > 0 {
event["notes"] = t.Notes
}
return nil
}
// AddNotes appends some notes to a message.
func (m *Message) AddNotes(n ...string) {
m.Notes = append(m.Notes, n...)
}