diff --git a/partition.go b/partition.go index 94201b66..3c2ea421 100644 --- a/partition.go +++ b/partition.go @@ -342,7 +342,7 @@ func (p *partition) markRecovered() (err error) { } func (p *partition) fetchStats() *PartitionStats { - timer := time.NewTimer(5 * time.Second) + timer := time.NewTimer(100 * time.Millisecond) defer timer.Stop() select { diff --git a/processor.go b/processor.go index e91e32cf..f166987f 100644 --- a/processor.go +++ b/processor.go @@ -202,8 +202,10 @@ func (g *Processor) isStateless() bool { /////////////////////////////////////////////////////////////////////////////// // Get returns a read-only copy of a value from the group table if the -// respective partition is owned by the processor instace. Get can be only -// used with stateful processors (ie, when group table is enabled). +// respective partition is owned by the processor instace. +// Get can be called by multiple goroutines concurrently. +// Get can be only used with stateful processors (ie, when group table is +// enabled) and after Recovered returns true. func (g *Processor) Get(key string) (interface{}, error) { if g.isStateless() { return nil, fmt.Errorf("can't get a value from stateless processor") @@ -740,21 +742,49 @@ func (g *Processor) Recovered() bool { } func (g *Processor) Stats() *ProcessorStats { - stats := newProcessorStats(len(g.partitions)) + var ( + m sync.Mutex + wg sync.WaitGroup + stats = newProcessorStats(len(g.partitions)) + ) + for i, p := range g.partitions { - stats.Group[i] = p.fetchStats() + wg.Add(1) + go func(pid int32, par *partition) { + s := par.fetchStats() + m.Lock() + stats.Group[pid] = s + m.Unlock() + wg.Done() + }(int32(i), p) } for i, p := range g.partitionViews { if _, ok := stats.Joined[i]; !ok { stats.Joined[i] = make(map[string]*PartitionStats) } for t, tp := range p { - stats.Joined[i][t] = tp.fetchStats() + wg.Add(1) + go func(pid int32, topic string, par *partition) { + s := par.fetchStats() + m.Lock() + stats.Joined[pid][topic] = s + m.Unlock() + wg.Done() + }(int32(i), t, tp) } } for t, v := range g.views { - stats.Lookup[t] = v.Stats() + wg.Add(1) + go func(topic string, vi *View) { + s := vi.Stats() + m.Lock() + stats.Lookup[t] = s + m.Unlock() + wg.Done() + }(t, v) } + + wg.Wait() return stats } diff --git a/view.go b/view.go index 4624d836..fa669faf 100644 --- a/view.go +++ b/view.go @@ -139,7 +139,7 @@ func (v *View) Start() error { } func (v *View) fail(err error) { - v.opts.log.Printf("failing view: %v", err) + v.opts.log.Printf("View: failing: %v", err) v.errors.Collect(err) go v.stop() } @@ -149,19 +149,24 @@ func (v *View) stop() { defer close(v.dead) // stop consumer if err := v.consumer.Close(); err != nil { - v.errors.Collect(fmt.Errorf("failed to close consumer on stopping the view: %v", err)) + err = fmt.Errorf("failed to close consumer on stopping the view: %v", err) + v.opts.log.Printf("error: %v", err) + v.errors.Collect(err) } <-v.done + v.opts.log.Printf("View: stopping partitions") var wg sync.WaitGroup - for _, par := range v.partitions { + for i, par := range v.partitions { wg.Add(1) - go func(p *partition) { + go func(pid int, p *partition) { p.stop() wg.Done() - }(par) + v.opts.log.Printf("View: partition %d stopped", pid) + }(i, par) } wg.Wait() + v.opts.log.Printf("View: shutdown complete") }) } @@ -169,7 +174,6 @@ func (v *View) stop() { func (v *View) Stop() { v.opts.log.Printf("View: stopping") v.stop() - v.opts.log.Printf("View: shutdown complete") } func (v *View) hash(key string) (int32, error) { @@ -206,6 +210,8 @@ func (v *View) Topic() string { } // Get returns the value for the key in the view, if exists. Nil if it doesn't. +// Get can be called by multiple goroutines concurrently. +// Get can only be called after Recovered returns true. func (v *View) Get(key string) (interface{}, error) { // find partition where key is located s, err := v.find(key) @@ -314,9 +320,22 @@ func (v *View) Recovered() bool { } func (v *View) Stats() *ViewStats { - stats := newViewStats() + var ( + m sync.Mutex + wg sync.WaitGroup + stats = newViewStats() + ) + + wg.Add(len(v.partitions)) for i, p := range v.partitions { - stats.Partitions[int32(i)] = p.fetchStats() + go func(pid int32, par *partition) { + s := par.fetchStats() + m.Lock() + stats.Partitions[pid] = s + m.Unlock() + wg.Done() + }(int32(i), p) } + wg.Wait() return stats }