-
Notifications
You must be signed in to change notification settings - Fork 0
/
document_per_loader.go
99 lines (84 loc) · 2.48 KB
/
document_per_loader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main
import (
"log"
"sync"
"github.com/globalsign/mgo"
"github.com/timescale/tsbs/cmd/tsbs_generate_data/serialize"
"github.com/timescale/tsbs/load"
)
// naiveBenchmark allows you to run a benchmark using the naive, one document per
// event Mongo approach
type naiveBenchmark struct {
mongoBenchmark
}
func newNaiveBenchmark(l *load.BenchmarkRunner) *naiveBenchmark {
return &naiveBenchmark{mongoBenchmark{l, &dbCreator{}}}
}
func (b *naiveBenchmark) GetProcessor() load.Processor {
return &naiveProcessor{dbc: b.dbc}
}
func (b *naiveBenchmark) GetPointIndexer(_ uint) load.PointIndexer {
return &load.ConstantIndexer{}
}
type singlePoint struct {
Measurement string `bson:"measurement"`
Timestamp int64 `bson:"timestamp_ns"`
Fields map[string]interface{} `bson:"fields"`
Tags map[string]string `bson:"tags"`
}
var spPool = &sync.Pool{New: func() interface{} { return &singlePoint{} }}
type naiveProcessor struct {
dbc *dbCreator
collection *mgo.Collection
pvs []interface{}
}
func (p *naiveProcessor) Init(workerNUm int, doLoad bool) {
if doLoad {
sess := p.dbc.session.Copy()
db := sess.DB(loader.DatabaseName())
p.collection = db.C(collectionName)
}
p.pvs = []interface{}{}
}
// ProcessBatch creates a new document for each incoming event for a simpler
// approach to storing the data. This is _NOT_ the default since the aggregation method
// is recommended by Mongo and other blogs
func (p *naiveProcessor) ProcessBatch(b load.Batch, doLoad bool) (uint64, uint64) {
batch := b.(*batch).arr
if cap(p.pvs) < len(batch) {
p.pvs = make([]interface{}, len(batch))
}
p.pvs = p.pvs[:len(batch)]
var metricCnt uint64
for i, event := range batch {
x := spPool.Get().(*singlePoint)
x.Measurement = string(event.MeasurementName())
x.Timestamp = event.Timestamp()
x.Fields = map[string]interface{}{}
x.Tags = map[string]string{}
f := &serialize.MongoReading{}
for j := 0; j < event.FieldsLength(); j++ {
event.Fields(f, j)
x.Fields[string(f.Key())] = f.Value()
}
t := &serialize.MongoTag{}
for j := 0; j < event.TagsLength(); j++ {
event.Tags(t, j)
x.Tags[string(t.Key())] = string(t.Value())
}
p.pvs[i] = x
metricCnt += uint64(event.FieldsLength())
}
if doLoad {
bulk := p.collection.Bulk()
bulk.Insert(p.pvs...)
_, err := bulk.Run()
if err != nil {
log.Fatalf("Bulk insert docs err: %s\n", err.Error())
}
}
for _, p := range p.pvs {
spPool.Put(p)
}
return metricCnt, 0
}