Skip to content

Commit

Permalink
[DNM] stop: disable task tracking in release builds, optimize task cr…
Browse files Browse the repository at this point in the history
…eation

This commit optimizes the Stopper for task creation by ripping out the
existing heavyweight task tracking in production builds. I realized that
my biggest concern with most of the proposals (cockroachdb#52843 and cockroachdb#51566) being
floated to address cockroachdb#51544 was that they bought more into the inefficient
tracking in the Stopper, not that they were doing anything inherently
wrong themselves.

Before this change, creating a task acquired an exclusive mutex and then
wrote to a hashmap. At high levels of concurrency, this would have
become a performance chokepoint. After this change, the cost of
launching a Task is three atomic increments – one to acquire a read
lock, one to register with a WaitGroup, and one to release the read
lock. When no one is draining the Stopper, these are all wait-free
operations, which means that task creation becomes wait-free.

With a change like this, I would feel much more comfortable pushing on
Stopper tasks to solve cockroachdb#51544.
  • Loading branch information
nvanbenschoten committed Aug 17, 2020
1 parent 8324ed3 commit 626fa13
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 76 deletions.
104 changes: 28 additions & 76 deletions pkg/util/stop/stopper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"fmt"
"net/http"
"runtime/debug"
"sort"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -84,9 +82,9 @@ func HandleDebug(w http.ResponseWriter, r *http.Request) {
defer trackedStoppers.Unlock()
for _, ss := range trackedStoppers.stoppers {
s := ss.s
s.mu.Lock()
fmt.Fprintf(w, "%p: %d tasks\n%s", s, s.mu.numTasks, s.runningTasksLocked())
s.mu.Unlock()
s.mu.RLock()
fmt.Fprintf(w, "%p: %d tasks\n%s", s, s.mu.tasks.NumTasks(), s.mu.tasks.RunningTasks())
s.mu.RUnlock()
}
}

Expand Down Expand Up @@ -136,17 +134,15 @@ type Stopper struct {
onPanic func(interface{}) // called with recover() on panic on any goroutine
stop sync.WaitGroup // Incremented for outstanding workers
mu struct {
syncutil.Mutex
quiesce *sync.Cond // Conditional variable to wait for outstanding tasks
quiescing bool // true when Stop() has been called
numTasks int // number of outstanding tasks
tasks TaskMap
closers []Closer
idAlloc int
qCancels map[int]func()
sCancels map[int]func()

stopCalled bool // turns all but first call to Stop into noop
syncutil.RWMutex
quiescing bool // true when Quiesce() or Stop() has been called
stopping bool // true when Stop() has been called
tasks taskRegistry

closers []Closer
idAlloc int
qCancels map[int]func()
sCancels map[int]func()
}
}

Expand Down Expand Up @@ -178,15 +174,14 @@ func NewStopper(options ...Option) *Stopper {
stopped: make(chan struct{}),
}

s.mu.tasks = TaskMap{}
s.mu.tasks = makeTaskRegistry()
s.mu.qCancels = map[int]func(){}
s.mu.sCancels = map[int]func(){}

for _, opt := range options {
opt.apply(s)
}

s.mu.quiesce = sync.NewCond(&s.mu)
register(s)
return s
}
Expand Down Expand Up @@ -409,73 +404,33 @@ func (s *Stopper) RunLimitedAsyncTask(
}

func (s *Stopper) runPrelude(taskName string) bool {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
defer s.mu.RUnlock()
if s.mu.quiescing {
return false
}
s.mu.numTasks++
s.mu.tasks[taskName]++
s.mu.tasks.Register(taskName)
return true
}

func (s *Stopper) runPostlude(taskName string) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.numTasks--
s.mu.tasks[taskName]--
s.mu.quiesce.Broadcast()
s.mu.tasks.Unregister(taskName)
}

// NumTasks returns the number of active tasks.
func (s *Stopper) NumTasks() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.numTasks
}

// A TaskMap is returned by RunningTasks().
type TaskMap map[string]int

// String implements fmt.Stringer and returns a sorted multi-line listing of
// the TaskMap.
func (tm TaskMap) String() string {
var lines []string
for location, num := range tm {
lines = append(lines, fmt.Sprintf("%-6d %s", num, location))
}
sort.Sort(sort.Reverse(sort.StringSlice(lines)))
return strings.Join(lines, "\n")
}

// RunningTasks returns a map containing the count of running tasks keyed by
// call site.
func (s *Stopper) RunningTasks() TaskMap {
s.mu.Lock()
defer s.mu.Unlock()
return s.runningTasksLocked()
}

func (s *Stopper) runningTasksLocked() TaskMap {
m := TaskMap{}
for k := range s.mu.tasks {
if s.mu.tasks[k] == 0 {
continue
}
m[k] = s.mu.tasks[k]
}
return m
return s.mu.tasks.NumTasks()
}

// Stop signals all live workers to stop and then waits for each to
// confirm it has stopped.
func (s *Stopper) Stop(ctx context.Context) {
s.mu.Lock()
stopCalled := s.mu.stopCalled
s.mu.stopCalled = true
stopping := s.mu.stopping
s.mu.stopping = true
s.mu.Unlock()

if stopCalled {
if stopping {
return
}

Expand Down Expand Up @@ -564,14 +519,11 @@ func (s *Stopper) Quiesce(ctx context.Context) {
s.mu.quiescing = true
close(s.quiescer)
}
for s.mu.numTasks > 0 {
t := time.AfterFunc(5*time.Second, func() {
// If we're waiting for 5+s without a task terminating, log the ones
// that remain.
log.Infof(ctx, "quiescing; tasks left:\n%s", s.RunningTasks())
})
// Unlock s.mu, wait for the signal, and lock s.mu.
s.mu.quiesce.Wait()
t.Stop()
}
t := time.AfterFunc(5*time.Second, func() {
// If we're waiting for 5+s without a task terminating, log the ones
// that remain.
log.Infof(ctx, "quiescing; tasks left:\n%s", s.mu.tasks.RunningTasks())
})
s.mu.tasks.Quiesce()
t.Stop()
}
31 changes: 31 additions & 0 deletions pkg/util/stop/task_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package stop

import (
"fmt"
"sort"
"strings"
)

// A TaskMap is returned by taskRegistry.RunningTasks().
type TaskMap map[string]int

// String implements fmt.Stringer and returns a sorted multi-line listing of
// the TaskMap.
func (tm TaskMap) String() string {
var lines []string
for location, num := range tm {
lines = append(lines, fmt.Sprintf("%-6d %s", num, location))
}
sort.Sort(sort.Reverse(sort.StringSlice(lines)))
return strings.Join(lines, "\n")
}
74 changes: 74 additions & 0 deletions pkg/util/stop/task_registry_debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

// +build race

package stop

import (
"sync"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// Drain and Register are never called concurrently.
type taskRegistry struct {
mu syncutil.Mutex
wg sync.WaitGroup
numTasks int
tasks TaskMap
}

func makeTaskRegistry() taskRegistry {
return taskRegistry{
tasks: TaskMap{},
}
}

func (r *taskRegistry) Register(taskName string) {
r.mu.Lock()
defer r.mu.Unlock()
r.wg.Add(1)
r.numTasks++
r.tasks[taskName]++
}

func (r *taskRegistry) Unregister(taskName string) {
r.mu.Lock()
defer r.mu.Unlock()
r.wg.Done()
r.numTasks--
r.tasks[taskName]--
}

func (r *taskRegistry) Quiesce() {
r.wg.Wait()
}

func (r *taskRegistry) NumTasks() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.numTasks
}

// RunningTasks returns a map containing the count of running tasks keyed by
// call site.
func (r *taskRegistry) RunningTasks() TaskMap {
r.mu.Lock()
defer r.mu.Unlock()
m := TaskMap{}
for k := range r.tasks {
if r.tasks[k] == 0 {
continue
}
m[k] = r.tasks[k]
}
return m
}
44 changes: 44 additions & 0 deletions pkg/util/stop/task_registry_release.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

// +build !race

package stop

import "sync"

// Drain and Register are never called concurrently.
type taskRegistry struct {
wg sync.WaitGroup
}

func makeTaskRegistry() taskRegistry {
return taskRegistry{}
}

func (r *taskRegistry) Register(taskName string) {
r.wg.Add(1)
}

func (r *taskRegistry) Unregister(taskName string) {
r.wg.Done()
}

func (r *taskRegistry) Quiesce() {
r.wg.Wait()
}

func (r *taskRegistry) NumTasks() int {
return -1
}

func (r *taskRegistry) RunningTasks() TaskMap {
return TaskMap{"tracking disabled": 0}
}

0 comments on commit 626fa13

Please sign in to comment.