Skip to content

Commit

Permalink
speed up stats collection
Browse files Browse the repository at this point in the history
  • Loading branch information
Diogo Behrens committed Dec 13, 2017
1 parent 676cf31 commit 955b749
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 15 deletions.
2 changes: 1 addition & 1 deletion partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (p *partition) markRecovered() (err error) {
}

func (p *partition) fetchStats() *PartitionStats {
timer := time.NewTimer(5 * time.Second)
timer := time.NewTimer(100 * time.Millisecond)
defer timer.Stop()

select {
Expand Down
42 changes: 36 additions & 6 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ func (g *Processor) isStateless() bool {
///////////////////////////////////////////////////////////////////////////////

// Get returns a read-only copy of a value from the group table if the
// respective partition is owned by the processor instace. Get can be only
// used with stateful processors (ie, when group table is enabled).
// respective partition is owned by the processor instace.
// Get can be called by multiple goroutines concurrently.
// Get can be only used with stateful processors (ie, when group table is
// enabled) and after Recovered returns true.
func (g *Processor) Get(key string) (interface{}, error) {
if g.isStateless() {
return nil, fmt.Errorf("can't get a value from stateless processor")
Expand Down Expand Up @@ -740,21 +742,49 @@ func (g *Processor) Recovered() bool {
}

func (g *Processor) Stats() *ProcessorStats {
stats := newProcessorStats(len(g.partitions))
var (
m sync.Mutex
wg sync.WaitGroup
stats = newProcessorStats(len(g.partitions))
)

for i, p := range g.partitions {
stats.Group[i] = p.fetchStats()
wg.Add(1)
go func(pid int32, par *partition) {
s := par.fetchStats()
m.Lock()
stats.Group[pid] = s
m.Unlock()
wg.Done()
}(int32(i), p)
}
for i, p := range g.partitionViews {
if _, ok := stats.Joined[i]; !ok {
stats.Joined[i] = make(map[string]*PartitionStats)
}
for t, tp := range p {
stats.Joined[i][t] = tp.fetchStats()
wg.Add(1)
go func(pid int32, topic string, par *partition) {
s := par.fetchStats()
m.Lock()
stats.Joined[pid][topic] = s
m.Unlock()
wg.Done()
}(int32(i), t, tp)
}
}
for t, v := range g.views {
stats.Lookup[t] = v.Stats()
wg.Add(1)
go func(topic string, vi *View) {
s := vi.Stats()
m.Lock()
stats.Lookup[t] = s
m.Unlock()
wg.Done()
}(t, v)
}

wg.Wait()
return stats
}

Expand Down
35 changes: 27 additions & 8 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (v *View) Start() error {
}

func (v *View) fail(err error) {
v.opts.log.Printf("failing view: %v", err)
v.opts.log.Printf("View: failing: %v", err)
v.errors.Collect(err)
go v.stop()
}
Expand All @@ -149,27 +149,31 @@ func (v *View) stop() {
defer close(v.dead)
// stop consumer
if err := v.consumer.Close(); err != nil {
v.errors.Collect(fmt.Errorf("failed to close consumer on stopping the view: %v", err))
err = fmt.Errorf("failed to close consumer on stopping the view: %v", err)
v.opts.log.Printf("error: %v", err)
v.errors.Collect(err)
}
<-v.done
v.opts.log.Printf("View: stopping partitions")

var wg sync.WaitGroup
for _, par := range v.partitions {
for i, par := range v.partitions {
wg.Add(1)
go func(p *partition) {
go func(pid int, p *partition) {
p.stop()
wg.Done()
}(par)
v.opts.log.Printf("View: partition %d stopped", pid)
}(i, par)
}
wg.Wait()
v.opts.log.Printf("View: shutdown complete")
})
}

// Stop stops the view, frees any resources + connections to kafka
func (v *View) Stop() {
v.opts.log.Printf("View: stopping")
v.stop()
v.opts.log.Printf("View: shutdown complete")
}

func (v *View) hash(key string) (int32, error) {
Expand Down Expand Up @@ -206,6 +210,8 @@ func (v *View) Topic() string {
}

// Get returns the value for the key in the view, if exists. Nil if it doesn't.
// Get can be called by multiple goroutines concurrently.
// Get can only be called after Recovered returns true.
func (v *View) Get(key string) (interface{}, error) {
// find partition where key is located
s, err := v.find(key)
Expand Down Expand Up @@ -314,9 +320,22 @@ func (v *View) Recovered() bool {
}

func (v *View) Stats() *ViewStats {
stats := newViewStats()
var (
m sync.Mutex
wg sync.WaitGroup
stats = newViewStats()
)

wg.Add(len(v.partitions))
for i, p := range v.partitions {
stats.Partitions[int32(i)] = p.fetchStats()
go func(pid int32, par *partition) {
s := par.fetchStats()
m.Lock()
stats.Partitions[pid] = s
m.Unlock()
wg.Done()
}(int32(i), p)
}
wg.Wait()
return stats
}

0 comments on commit 955b749

Please sign in to comment.