-
Notifications
You must be signed in to change notification settings - Fork 1
/
engine.go
402 lines (356 loc) · 11.8 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
package engine
import (
"context"
"fmt"
"sync"
"github.com/nextlinux/tracee/pkg/logger"
"github.com/nextlinux/tracee/pkg/signatures/metrics"
"github.com/nextlinux/tracee/types/detect"
"github.com/nextlinux/tracee/types/protocol"
)
const ALL_EVENT_ORIGINS = "*"
const EVENT_CONTAINER_ORIGIN = "container"
const EVENT_HOST_ORIGIN = "host"
const ALL_EVENT_TYPES = "*"
// Config defines the engine's configurable values
type Config struct {
// Enables the signatures engine to run in the events pipeline
Enabled bool
SignatureBufferSize uint
Signatures []detect.Signature
DataSources []detect.DataSource
}
// Engine is a signatures-engine that can process events coming from a set of input sources against a set of loaded signatures, and report the signatures' findings
type Engine struct {
signatures map[detect.Signature]chan protocol.Event
signaturesIndex map[detect.SignatureEventSelector][]detect.Signature
signaturesMutex sync.RWMutex
inputs EventSources
output chan detect.Finding
waitGroup sync.WaitGroup
config Config
stats metrics.Stats
dataSources map[string]map[string]detect.DataSource
dataSourcesMutex sync.RWMutex
}
// EventSources is a bundle of input sources used to configure the Engine
type EventSources struct {
Tracee chan protocol.Event
}
func (engine *Engine) Stats() *metrics.Stats {
return &engine.stats
}
// NewEngine creates a new signatures-engine with the given arguments
// inputs and outputs are given as channels created by the consumer
func NewEngine(config Config, sources EventSources, output chan detect.Finding) (*Engine, error) {
if sources.Tracee == nil || output == nil {
return nil, fmt.Errorf("nil input received")
}
engine := Engine{}
engine.waitGroup = sync.WaitGroup{}
engine.inputs = sources
engine.output = output
engine.config = config
engine.signaturesMutex.Lock()
engine.signatures = make(map[detect.Signature]chan protocol.Event)
engine.signaturesIndex = make(map[detect.SignatureEventSelector][]detect.Signature)
engine.signaturesMutex.Unlock()
engine.dataSourcesMutex.Lock()
engine.dataSources = map[string]map[string]detect.DataSource{}
engine.dataSourcesMutex.Unlock()
for _, datasource := range config.DataSources {
err := engine.RegisterDataSource(datasource)
if err != nil {
logger.Errorw("Loading signatures data source: " + err.Error())
}
}
for _, sig := range config.Signatures {
_, err := engine.loadSignature(sig)
if err != nil {
logger.Errorw("Loading signature: " + err.Error())
}
}
return &engine, nil
}
// signatureStart is the signature handling business logics.
func signatureStart(signature detect.Signature, c chan protocol.Event, wg *sync.WaitGroup) {
wg.Add(1)
for e := range c {
if err := signature.OnEvent(e); err != nil {
meta, _ := signature.GetMetadata()
logger.Errorw("Handling event by signature " + meta.Name + ": " + err.Error())
}
}
wg.Done()
}
// Start starts processing events and detecting signatures
// it runs continuously until stopped by the done channel
// once done, it cleans all internal resources, which means the engine is not reusable
// note that the input and output channels are created by the consumer and therefore are not closed
func (engine *Engine) Start(ctx context.Context) {
defer engine.unloadAllSignatures()
engine.signaturesMutex.RLock()
for s, c := range engine.signatures {
go signatureStart(s, c, &engine.waitGroup)
}
engine.signaturesMutex.RUnlock()
engine.consumeSources(ctx)
}
func (engine *Engine) unloadAllSignatures() {
engine.signaturesMutex.Lock()
defer engine.signaturesMutex.Unlock()
for sig, c := range engine.signatures {
sig.Close()
close(c)
delete(engine.signatures, sig)
}
engine.signaturesIndex = make(map[detect.SignatureEventSelector][]detect.Signature)
}
// matchHandler is a function that runs when a signature is matched
func (engine *Engine) matchHandler(res detect.Finding) {
_ = engine.stats.Detections.Increment()
engine.output <- res
}
// checkCompletion is a function that runs at the end of each input source
// closing tracee-rules if no more pending input sources exists
func (engine *Engine) checkCompletion() bool {
if engine.inputs.Tracee == nil {
engine.unloadAllSignatures()
engine.waitGroup.Wait()
return true
}
return false
}
func (engine *Engine) processEvent(event protocol.Event) {
engine.signaturesMutex.RLock()
defer engine.signaturesMutex.RUnlock()
signatureSelector := detect.SignatureEventSelector{
Source: event.Headers.Selector.Source,
Name: event.Headers.Selector.Name,
Origin: event.Headers.Selector.Origin,
}
_ = engine.stats.Events.Increment()
// Check the selector for every case and partial case
// Match full selector
for _, s := range engine.signaturesIndex[signatureSelector] {
engine.dispatchEvent(s, event)
}
// Match partial selector, select for all origins
partialSigEvtSelector := detect.SignatureEventSelector{
Source: signatureSelector.Source,
Name: signatureSelector.Name,
Origin: ALL_EVENT_ORIGINS,
}
for _, s := range engine.signaturesIndex[partialSigEvtSelector] {
engine.dispatchEvent(s, event)
}
// Match partial selector, select for event names
partialSigEvtSelector = detect.SignatureEventSelector{
Source: signatureSelector.Source,
Name: ALL_EVENT_TYPES,
Origin: signatureSelector.Origin,
}
for _, s := range engine.signaturesIndex[partialSigEvtSelector] {
engine.dispatchEvent(s, event)
}
// Match partial selector, select for all origins and event names
partialSigEvtSelector = detect.SignatureEventSelector{
Source: signatureSelector.Source,
Name: ALL_EVENT_TYPES,
Origin: ALL_EVENT_ORIGINS,
}
for _, s := range engine.signaturesIndex[partialSigEvtSelector] {
engine.dispatchEvent(s, event)
}
}
// consumeSources starts consuming the input sources
// it runs continuously until stopped by the done channel
func (engine *Engine) consumeSources(ctx context.Context) {
for {
select {
case event, ok := <-engine.inputs.Tracee:
if !ok {
engine.signaturesMutex.RLock()
for sig := range engine.signatures {
se, err := sig.GetSelectedEvents()
if err != nil {
logger.Errorw("Getting selected events: " + err.Error())
continue
}
for _, sel := range se {
if sel.Source == "tracee" {
_ = sig.OnSignal(detect.SignalSourceComplete("tracee"))
break
}
}
}
engine.signaturesMutex.RUnlock()
engine.inputs.Tracee = nil
if engine.checkCompletion() {
return
}
continue
}
engine.processEvent(event)
case <-ctx.Done():
goto drain
}
}
drain:
// drain and process all remaining events
for {
select {
case event := <-engine.inputs.Tracee:
engine.processEvent(event)
default:
return
}
}
}
func (engine *Engine) dispatchEvent(s detect.Signature, event protocol.Event) {
engine.signatures[s] <- event
}
// LoadSignature will call the internal signature loading logic and activate its handling business logics.
// It will return the signature ID as well as error.
func (engine *Engine) LoadSignature(signature detect.Signature) (string, error) {
id, err := engine.loadSignature(signature)
if err != nil {
return id, err
}
engine.signaturesMutex.RLock()
go signatureStart(signature, engine.signatures[signature], &engine.waitGroup)
engine.signaturesMutex.RUnlock()
return id, nil
}
// loadSignature handles storing a signature in the Engine data structures
// It will return the signature ID as well as error.
func (engine *Engine) loadSignature(signature detect.Signature) (string, error) {
metadata, err := signature.GetMetadata()
if err != nil {
return "", fmt.Errorf("error getting metadata: %w", err)
}
selectedEvents, err := signature.GetSelectedEvents()
if err != nil {
return "", fmt.Errorf("error getting selected events for signature %s: %w", metadata.Name, err)
}
// insert in engine.signatures map
engine.signaturesMutex.RLock()
if engine.signatures[signature] != nil {
engine.signaturesMutex.RUnlock()
// signature already exists
return "", fmt.Errorf("failed to store signature: signature \"%s\" already loaded", metadata.Name)
}
engine.signaturesMutex.RUnlock()
signatureCtx := detect.SignatureContext{
Callback: engine.matchHandler,
Logger: logger.Current(),
GetDataSource: func(namespace, id string) (detect.DataSource, bool) {
return engine.GetDataSource(namespace, id)
},
}
if err := signature.Init(signatureCtx); err != nil {
// failed to initialize
return "", fmt.Errorf("error initializing signature %s: %w", metadata.Name, err)
}
c := make(chan protocol.Event, engine.config.SignatureBufferSize)
engine.signaturesMutex.Lock()
engine.signatures[signature] = c
engine.signaturesMutex.Unlock()
// insert in engine.signaturesIndex map
for _, selectedEvent := range selectedEvents {
if selectedEvent.Name == "" {
selectedEvent.Name = ALL_EVENT_TYPES
}
if selectedEvent.Origin == "" {
selectedEvent.Origin = ALL_EVENT_ORIGINS
}
if selectedEvent.Source == "" {
logger.Errorw("Signature " + metadata.Name + " doesn't declare an input source")
} else {
engine.signaturesMutex.Lock()
engine.signaturesIndex[selectedEvent] = append(engine.signaturesIndex[selectedEvent], signature)
engine.signaturesMutex.Unlock()
}
}
_ = engine.stats.Signatures.Increment()
return metadata.ID, nil
}
// UnloadSignature will remove from Engine data structures the given signature and stop its handling goroutine
func (engine *Engine) UnloadSignature(signatureId string) error {
var signature detect.Signature
engine.signaturesMutex.RLock()
for sig := range engine.signatures {
metadata, _ := sig.GetMetadata()
if metadata.ID == signatureId {
signature = sig
break
}
}
engine.signaturesMutex.RUnlock()
if signature == nil {
return fmt.Errorf("could not find signature with ID: %v", signatureId)
}
selectedEvents, err := signature.GetSelectedEvents()
if err != nil {
return fmt.Errorf("failed to unload signature: %w", err)
}
engine.signaturesMutex.Lock()
defer engine.signaturesMutex.Unlock()
// remove from engine.signatures map
c, ok := engine.signatures[signature]
if ok {
delete(engine.signatures, signature)
defer func() {
_ = engine.stats.Signatures.Decrement()
}()
defer signature.Close()
defer close(c)
}
// remove from engine.signaturesIndex map
for _, selectedEvent := range selectedEvents {
signatures := engine.signaturesIndex[selectedEvent]
for i, sig := range signatures {
metadata, _ := sig.GetMetadata()
if metadata.ID == signatureId {
// signature found, remove it
signatures = append(signatures[:i], signatures[i+1:]...)
engine.signaturesIndex[selectedEvent] = signatures
break
}
}
}
return nil
}
// GetSelectedEvents returns the event selectors that are relevant to the currently loaded signatures
func (engine *Engine) GetSelectedEvents() []detect.SignatureEventSelector {
res := make([]detect.SignatureEventSelector, 0)
for k := range engine.signaturesIndex {
res = append(res, k)
}
return res
}
func (engine *Engine) RegisterDataSource(dataSource detect.DataSource) error {
engine.dataSourcesMutex.Lock()
defer engine.dataSourcesMutex.Unlock()
namespace := dataSource.Namespace()
id := dataSource.ID()
if _, ok := engine.dataSources[namespace]; !ok {
engine.dataSources[namespace] = map[string]detect.DataSource{}
}
_, exists := engine.dataSources[namespace][id]
if exists {
return fmt.Errorf("failed to register data source: data source with name \"%s\" already exists in namespace \"%s\"", id, namespace)
}
engine.dataSources[namespace][id] = dataSource
return nil
}
func (engine *Engine) GetDataSource(namespace string, id string) (detect.DataSource, bool) {
engine.dataSourcesMutex.RLock()
defer engine.dataSourcesMutex.RUnlock()
namespaceCaches, ok := engine.dataSources[namespace]
if !ok {
return nil, false
}
cache, ok := namespaceCaches[id]
return cache, ok
}