/
collector.go
112 lines (98 loc) · 3.51 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package beacon
import (
"sync"
"github.com/activecm/rita/config"
"github.com/activecm/rita/database"
"github.com/activecm/rita/datatypes/data"
"github.com/activecm/rita/datatypes/structure"
"github.com/globalsign/mgo/bson"
)
type (
// collector collects Conn records into groups based on destination given
// a source host
collector struct {
db *database.DB // provides access to MongoDB
conf *config.Config // contains details needed to access MongoDB
connectionThreshold int // the minimum number of connections to be considered a beacon
collectedCallback func(*beaconAnalysisInput) // called on each collected set of connections
closedCallback func() // called when .close() is called and no more calls to collectedCallback will be made
collectChannel chan string // holds ip addresses
collectWg sync.WaitGroup // wait for collection to finish
}
)
// newCollector creates a new collector for creating beaconAnalysisInput objects
// which group the given source, a detected destination, and all of their
// connection analysis details (timestamps, data sizes, etc.)
func newCollector(db *database.DB, conf *config.Config, connectionThreshold int,
collectedCallback func(*beaconAnalysisInput), closedCallback func()) *collector {
return &collector{
db: db,
conf: conf,
connectionThreshold: connectionThreshold,
collectedCallback: collectedCallback,
closedCallback: closedCallback,
collectChannel: make(chan string),
}
}
// collect queues a host for collection
// Note: this function may block
func (c *collector) collect(srcHost string) {
c.collectChannel <- srcHost
}
// close waits for the collection threads to finish
func (c *collector) close() {
close(c.collectChannel)
c.collectWg.Wait()
c.closedCallback()
}
// start kicks off a new collection thread
func (c *collector) start() {
c.collectWg.Add(1)
go func() {
session := c.db.Session.Copy()
defer session.Close()
host, more := <-c.collectChannel
for more {
//grab all destinations related with this host
var uconn structure.UniqueConnection
destIter := session.DB(c.db.GetSelectedDB()).
C(c.conf.T.Structure.UniqueConnTable).
Find(bson.M{"src": host}).Iter()
for destIter.Next(&uconn) {
//skip the connection pair if they are under the threshold
if uconn.ConnectionCount < c.connectionThreshold {
continue
}
//create our new input
newInput := &beaconAnalysisInput{
uconnID: uconn.ID,
src: uconn.Src,
dst: uconn.Dst,
}
//Grab connection data
var conn data.Conn
connIter := session.DB(c.db.GetSelectedDB()).
C(c.conf.T.Structure.ConnTable).
Find(bson.M{"id_orig_h": uconn.Src, "id_resp_h": uconn.Dst}).
Iter()
for connIter.Next(&conn) {
//filter out unestablished connections
//We expect at least SYN ACK SYN-ACK [FIN ACK FIN ACK/ RST]
if conn.Proto == "tcp" && conn.OriginPackets+conn.ResponsePackets <= 3 {
continue
}
newInput.ts = append(newInput.ts, conn.Ts)
newInput.origIPBytes = append(newInput.origIPBytes, conn.OriginIPBytes)
}
//filtering may have reduced the amount of connections
//check again if we should skip this unique connection
if len(newInput.ts) < c.connectionThreshold {
continue
}
c.collectedCallback(newInput)
}
host, more = <-c.collectChannel
}
c.collectWg.Done()
}()
}