Skip to content

Commit

Permalink
Merge pull request #120 from bonitoo-io/mongo-new-schema
Browse files Browse the repository at this point in the history
output values/sec rate at the end
  • Loading branch information
alespour committed Dec 4, 2018
2 parents 4b73e50 + 690f3d5 commit 43d7716
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions cmd/bulk_load_mongo/main.go
Expand Up @@ -13,6 +13,7 @@ import (
"log"
"os"
"sync"
"sync/atomic"
"time"

"github.com/google/flatbuffers/go"
Expand Down Expand Up @@ -50,6 +51,7 @@ var (
workersGroup sync.WaitGroup
reportTags [][2]string
reportHostname string
valuesRead int64
)

// Magic database constants
Expand Down Expand Up @@ -170,8 +172,9 @@ func main() {
took := end.Sub(start)
itemRate := float64(itemsRead) / float64(took.Seconds())
bytesRate := float64(bytesRead) / float64(took.Seconds())
valuesRate := float64(valuesRead) / float64(took.Seconds())

fmt.Printf("loaded %d values in %fsec with %d workers (mean values rate %f values/sec, %.2fMB/sec from stdin)\n", itemsRead, took.Seconds(), workers, itemRate, bytesRate/(1<<20))
fmt.Printf("loaded %d items in %fsec with %d workers (mean point rate %f/sec, mean value rate %f/s, %.2fMB/sec from stdin)\n", itemsRead, took.Seconds(), workers, itemRate, valuesRate, bytesRate/(1<<20))

if reportHost != "" {
//append db specific tags to custom tags
Expand Down Expand Up @@ -281,6 +284,8 @@ func scan(session *mgo.Session, itemsPerBatch int) (int64, int64) {
// them to the target server. Note that mgo forcibly incurs serialization
// overhead (it always encodes to BSON).
func processBatches(session *mgo.Session) {
var workerValuesRead int64

db := session.DB(dbName)

type Tag struct {
Expand Down Expand Up @@ -374,7 +379,7 @@ func processBatches(session *mgo.Session) {
}
}
pvs[i] = x

workerValuesRead += int64(fieldLength)
}
bulk.Insert(pvs...)

Expand Down Expand Up @@ -402,6 +407,7 @@ func processBatches(session *mgo.Session) {
batch.ClearReferences()
batchPool.Put(batch)
}
atomic.AddInt64(&valuesRead, workerValuesRead)
workersGroup.Done()
}

Expand Down

0 comments on commit 43d7716

Please sign in to comment.