Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

speed up stats collection #69

Merged
merged 1 commit into from
Dec 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (p *partition) markRecovered() (err error) {
}

func (p *partition) fetchStats() *PartitionStats {
timer := time.NewTimer(5 * time.Second)
timer := time.NewTimer(100 * time.Millisecond)
defer timer.Stop()

select {
Expand Down
42 changes: 36 additions & 6 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ func (g *Processor) isStateless() bool {
///////////////////////////////////////////////////////////////////////////////

// Get returns a read-only copy of a value from the group table if the
// respective partition is owned by the processor instace. Get can be only
// used with stateful processors (ie, when group table is enabled).
// respective partition is owned by the processor instace.
// Get can be called by multiple goroutines concurrently.
// Get can be only used with stateful processors (ie, when group table is
// enabled) and after Recovered returns true.
func (g *Processor) Get(key string) (interface{}, error) {
if g.isStateless() {
return nil, fmt.Errorf("can't get a value from stateless processor")
Expand Down Expand Up @@ -740,21 +742,49 @@ func (g *Processor) Recovered() bool {
}

func (g *Processor) Stats() *ProcessorStats {
stats := newProcessorStats(len(g.partitions))
var (
m sync.Mutex
wg sync.WaitGroup
stats = newProcessorStats(len(g.partitions))
)

for i, p := range g.partitions {
stats.Group[i] = p.fetchStats()
wg.Add(1)
go func(pid int32, par *partition) {
s := par.fetchStats()
m.Lock()
stats.Group[pid] = s
m.Unlock()
wg.Done()
}(int32(i), p)
}
for i, p := range g.partitionViews {
if _, ok := stats.Joined[i]; !ok {
stats.Joined[i] = make(map[string]*PartitionStats)
}
for t, tp := range p {
stats.Joined[i][t] = tp.fetchStats()
wg.Add(1)
go func(pid int32, topic string, par *partition) {
s := par.fetchStats()
m.Lock()
stats.Joined[pid][topic] = s
m.Unlock()
wg.Done()
}(int32(i), t, tp)
}
}
for t, v := range g.views {
stats.Lookup[t] = v.Stats()
wg.Add(1)
go func(topic string, vi *View) {
s := vi.Stats()
m.Lock()
stats.Lookup[t] = s
m.Unlock()
wg.Done()
}(t, v)
}

wg.Wait()
return stats
}

Expand Down
35 changes: 27 additions & 8 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (v *View) Start() error {
}

func (v *View) fail(err error) {
v.opts.log.Printf("failing view: %v", err)
v.opts.log.Printf("View: failing: %v", err)
v.errors.Collect(err)
go v.stop()
}
Expand All @@ -149,27 +149,31 @@ func (v *View) stop() {
defer close(v.dead)
// stop consumer
if err := v.consumer.Close(); err != nil {
v.errors.Collect(fmt.Errorf("failed to close consumer on stopping the view: %v", err))
err = fmt.Errorf("failed to close consumer on stopping the view: %v", err)
v.opts.log.Printf("error: %v", err)
v.errors.Collect(err)
}
<-v.done
v.opts.log.Printf("View: stopping partitions")

var wg sync.WaitGroup
for _, par := range v.partitions {
for i, par := range v.partitions {
wg.Add(1)
go func(p *partition) {
go func(pid int, p *partition) {
p.stop()
wg.Done()
}(par)
v.opts.log.Printf("View: partition %d stopped", pid)
}(i, par)
}
wg.Wait()
v.opts.log.Printf("View: shutdown complete")
})
}

// Stop stops the view, frees any resources + connections to kafka
func (v *View) Stop() {
v.opts.log.Printf("View: stopping")
v.stop()
v.opts.log.Printf("View: shutdown complete")
}

func (v *View) hash(key string) (int32, error) {
Expand Down Expand Up @@ -206,6 +210,8 @@ func (v *View) Topic() string {
}

// Get returns the value for the key in the view, if exists. Nil if it doesn't.
// Get can be called by multiple goroutines concurrently.
// Get can only be called after Recovered returns true.
func (v *View) Get(key string) (interface{}, error) {
// find partition where key is located
s, err := v.find(key)
Expand Down Expand Up @@ -314,9 +320,22 @@ func (v *View) Recovered() bool {
}

func (v *View) Stats() *ViewStats {
stats := newViewStats()
var (
m sync.Mutex
wg sync.WaitGroup
stats = newViewStats()
)

wg.Add(len(v.partitions))
for i, p := range v.partitions {
stats.Partitions[int32(i)] = p.fetchStats()
go func(pid int32, par *partition) {
s := par.fetchStats()
m.Lock()
stats.Partitions[pid] = s
m.Unlock()
wg.Done()
}(int32(i), p)
}
wg.Wait()
return stats
}