Permalink
Browse files

More optimization

  • Loading branch information...
grisha committed Mar 1, 2017
1 parent 544c822 commit 90924e4afa4ac8bef61caf46c3439794983660ec
Showing with 61 additions and 18 deletions.
  1. +16 −3 receiver/director.go
  2. +28 −6 receiver/dscache.go
  3. +1 −1 receiver/dsfinder.go
  4. +1 −1 receiver/elastic_ch.go
  5. +7 −1 receiver/flusher.go
  6. +5 −5 receiver/receiver.go
  7. +2 −0 receiver/vcache.go
  8. +1 −1 serde/postgres_common.go
@@ -72,8 +72,9 @@ var directorProcessDataPoint = func(cds *cachedDs, dsf dsFlusherBlocking) int {
log.Printf("directorProcessDataPoint [%v] error: %v", cds.Ident(), err)
}

if cds.PointCount() > 0 {
if cds.PointCount() > 0 && cds.lastFlush.Before(time.Now().Add(-cds.Step())) {
dsf.flushDs(cds.DbDataSourcer, false)
cds.lastFlush = time.Now()
}
return cnt
}
@@ -118,11 +119,11 @@ var directorProcessIncomingDP = func(dp *incomingDP, dsc *dsCache, loaderCh chan
return
}

cds := dsc.getByIdentOrCreateEmpty(dp.Ident)
cds := dsc.getByIdentOrCreateEmpty(dp.cachedIdent)
if cds == nil {
stats.dropped++
if debug {
log.Printf("director: No spec matched ident: %#v, ignoring data point", dp.Ident)
log.Printf("director: No spec matched ident: %#v, ignoring data point", dp.cachedIdent.String())
}
return
}
@@ -273,6 +274,18 @@ var director = func(wc wController, dpCh chan interface{}, nWorkers int, clstr c
// this came from the loader, we do not need to look it up
directorProcessOrForward(dsc, cds, workerCh, clstr, snd, &stats)
} else {
// wait for worker and loader channels to empty
log.Printf("director: channel closed, waiting for loader and workers to empty...")
for {
w, l := len(workerCh), len(loaderCh)
if w == 0 && l == 0 {
break
log.Printf(" - worker: %d loader: %d", w, l)
time.Sleep(1 * time.Millisecond)
w, l = len(workerCh), len(loaderCh)
}
}

// signal to exit
log.Printf("director: closing worker channels, waiting for workers to finish....")
close(workerCh)
@@ -18,6 +18,7 @@ package receiver
import (
"fmt"
"sync"
"time"

"github.com/tgres/tgres/cluster"
"github.com/tgres/tgres/rrd"
@@ -47,7 +48,7 @@ func newDsCache(db serde.Fetcher, finder MatchingDSSpecFinder, dsf dsFlusherBloc
}

// getByName rlocks and gets a DS pointer.
func (d *dsCache) getByIdent(ident serde.Ident) *cachedDs {
func (d *dsCache) getByIdent(ident *cachedIdent) *cachedDs {
d.RLock()
defer d.RUnlock()
return d.byIdent[ident.String()]
@@ -95,12 +96,12 @@ func (d *dsCache) preLoad() error {
}

// get or create and empty cached ds
func (d *dsCache) getByIdentOrCreateEmpty(ident serde.Ident) *cachedDs {
func (d *dsCache) getByIdentOrCreateEmpty(ident *cachedIdent) *cachedDs {
result := d.getByIdent(ident)
if result == nil {
if spec := d.finder.FindMatchingDSSpec(ident); spec != nil {
if spec := d.finder.FindMatchingDSSpec(ident.Ident); spec != nil {
// return a cachedDs with nil DataSourcer
dbds := serde.NewDbDataSource(0, ident, nil)
dbds := serde.NewDbDataSource(0, ident.Ident, nil)
result = &cachedDs{DbDataSourcer: dbds, spec: spec, mu: &sync.Mutex{}}
d.insert(result)
}
@@ -147,6 +148,7 @@ type cachedDs struct {
incoming []*incomingDP
spec *rrd.DSSpec // for when DS needs to be created
sentToLoader bool
lastFlush time.Time
mu *sync.Mutex
}

@@ -165,11 +167,31 @@ func (cds *cachedDs) processIncoming() (int, error) {
err = cds.ProcessDataPoint(dp.Value, dp.TimeStamp)
}
count := len(cds.incoming)
cds.incoming = nil
//cds.incoming = cds.incoming[:0] // leave the backing array in place to avoid extra memory allocations
if count < 32 {
// leave the backing array in place to avoid extra memory allocations
cds.incoming = cds.incoming[:0]
} else {
cds.incoming = nil
}
return count, err
}

type cachedIdent struct {
serde.Ident
s string
}

func (ci *cachedIdent) String() string {
if ci.s == "" {
ci.s = ci.Ident.String()
}
return ci.s
}

func newCachedIdent(ident serde.Ident) *cachedIdent {
return &cachedIdent{Ident: ident, s: ident.String()}
}

// distDs keeps a pointer to the dsCache so that it can delete itself
// from it, as well as access the Flusher to persist during Relinquish
type distDs struct {
@@ -61,7 +61,7 @@ type SimpleDSFinder struct {
}

func (s *SimpleDSFinder) FindMatchingDSSpec(ident serde.Ident) *rrd.DSSpec {
if name := ident["name"]; name == "" {
if ident["name"] == "" {
return nil
}
return s.DSSpec
@@ -43,7 +43,7 @@ func (q *fifoQueue) size() int {
// much as we can at a time for performance.
func elasticCh(cin <-chan interface{}, cout chan<- interface{}, queue *fifoQueue) {

const maxReceive = 256
const maxReceive = 1024
var (
in <-chan interface{}
out chan<- interface{}
@@ -18,6 +18,7 @@ package receiver
import (
"fmt"
"log"
"math/rand"
"sync"
"time"

@@ -111,7 +112,12 @@ func (f *dsFlusher) flushDs(ds serde.DbDataSourcer, block bool) {
// stuff to another cache.
f.verticalFlush(ds)
ds.ClearRRAs(false)
f.horizontalFlush(ds, block)
if block {
f.horizontalFlush(ds, block)
} else if rand.Intn(3) == 0 {
// flush a third of DSs, just in case
f.horizontalFlush(ds, false)
}
}
return
}
@@ -105,10 +105,10 @@ type Receiver struct {
// data point representation has no notion of duration and therefore
// must rely on some kind of a separately stored "last update" time.
type incomingDP struct {
Ident serde.Ident
TimeStamp time.Time
Value float64
Hops int
cachedIdent *cachedIdent
TimeStamp time.Time
Value float64
Hops int
}

// Create a Receiver. The first argument is a SerDe, the second is a
@@ -181,7 +181,7 @@ func (r *Receiver) SetCluster(c clusterer) {
// paced metrics (QueueSum/QueueGauge) for non-rate data.
func (r *Receiver) QueueDataPoint(ident serde.Ident, ts time.Time, v float64) {
if !r.stopped {
r.dpCh <- &incomingDP{Ident: ident, TimeStamp: ts, Value: v}
r.dpCh <- &incomingDP{cachedIdent: newCachedIdent(ident), TimeStamp: ts, Value: v}
}
}

@@ -125,10 +125,12 @@ func (bc *verticalCache) stats() *vcStats {

st.segments = len(bc.m)
for _, segment := range bc.m {
segment.Lock()
st.rows += len(segment.rows)
for _, row := range segment.rows {
st.points += len(row)
}
segment.Unlock()
}

return &st
@@ -76,7 +76,7 @@ type pgSearchResult struct {
rows *sql.Rows
err error
id int64
ident map[string]string
ident Ident
}

func (sr *pgSearchResult) Next() bool {

0 comments on commit 90924e4

Please sign in to comment.