-
Notifications
You must be signed in to change notification settings - Fork 360
/
mongodb.go
88 lines (73 loc) · 1.91 KB
/
mongodb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package host
import (
"runtime"
"time"
"github.com/activecm/rita/resources"
"github.com/activecm/rita/util"
"github.com/globalsign/mgo"
"github.com/vbauerster/mpb"
"github.com/vbauerster/mpb/decor"
)
type repo struct {
res *resources.Resources
}
//NewMongoRepository create new repository
func NewMongoRepository(res *resources.Resources) Repository {
return &repo{
res: res,
}
}
func (r *repo) CreateIndexes() error {
session := r.res.DB.Session.Copy()
defer session.Close()
coll := session.DB(r.res.DB.GetSelectedDB()).C(r.res.Config.T.Structure.HostTable)
// create hosts collection
// Desired indexes
indexes := []mgo.Index{
{Key: []string{"ip"}, Unique: true},
{Key: []string{"local"}},
{Key: []string{"ipv4_binary"}},
}
for _, index := range indexes {
err := coll.EnsureIndex(index)
if err != nil {
return err
}
}
return nil
}
//Upsert loops through every domain ....
func (r *repo) Upsert(hostMap map[string]*IP) {
//Create the workers
writerWorker := newWriter(r.res.Config.T.Structure.HostTable, r.res.DB, r.res.Config, r.res.Log)
analyzerWorker := newAnalyzer(
r.res.Config.S.Bro.CurrentChunk,
r.res.DB,
r.res.Config,
writerWorker.collect,
writerWorker.close,
)
//kick off the threaded goroutines
for i := 0; i < util.Max(1, runtime.NumCPU()/2); i++ {
analyzerWorker.start()
writerWorker.start()
}
// progress bar for troubleshooting
p := mpb.New(mpb.WithWidth(20))
bar := p.AddBar(int64(len(hostMap)),
mpb.PrependDecorators(
decor.Name("\t[-] Host Analysis:", decor.WC{W: 30, C: decor.DidentRight}),
decor.CountersNoUnit(" %d / %d ", decor.WCSyncWidth),
),
mpb.AppendDecorators(decor.Percentage()),
)
// loop over map entries
for _, entry := range hostMap {
start := time.Now()
analyzerWorker.collect(entry)
bar.IncrBy(1, time.Since(start))
}
p.Wait()
// start the closing cascade (this will also close the other channels)
analyzerWorker.close()
}