Skip to content

Commit

Permalink
kvprober: add setting to bypass admission queue
Browse files Browse the repository at this point in the history
Currently kvprober is bypassing the kv admission queue unconditionally.
This is fine for dedicated clusters, since the user owns capacity
planning.  But it is a problem for serverless clusters because the SRE
team owns capacity scaling. When hardware resources are overloaded in
serverless clusters, the probes are expected to fail.

This commit adds a new cluster setting for kvprober that determines if
the admission control should be bypassed. The cluster setting works by
adjusting the admission source header in kvprober's transaction
requests:

- When set, it uses a txn sourced from OTHER which is going to bypass
  the admission queue.
- When not set, it uses a txn sourced from ROOT_KV which is not going to
  bypass admission queue.

Control over what the txn admission header is set to is provided through
kv.DB and kv.Txn.

Release note: None
  • Loading branch information
dasrirez committed Nov 23, 2021
1 parent 96fef3f commit 18b95cb
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 84 deletions.
18 changes: 18 additions & 0 deletions pkg/kv/db.go
Expand Up @@ -820,6 +820,24 @@ func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) err
nodeID, _ := db.ctx.NodeID.OptionalNodeID() // zero if not available
txn := NewTxn(ctx, db, nodeID)
txn.SetDebugName("unnamed")
return runTxn(ctx, txn, retryable)
}

// TxnRootKV is the same as Txn, but specifically represents a request
// originating within KV, and that is at the root of the tree of requests. For
// KV usage that should be subject to admission control. Do not use this for
// executing work originating in SQL. This distinction only causes this
// transaction to undergo admission control. See AdmissionHeader_Source for more
// details.
func (db *DB) TxnRootKV(ctx context.Context, retryable func(context.Context, *Txn) error) error {
nodeID, _ := db.ctx.NodeID.OptionalNodeID() // zero if not available
txn := NewTxnRootKV(ctx, db, nodeID)
txn.SetDebugName("unnamed")
return runTxn(ctx, txn, retryable)
}

// runTxn runs the given retryable transaction function using the given *Txn.
func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn) error) error {
err := txn.exec(ctx, func(ctx context.Context, txn *Txn) error {
return retryable(ctx, txn)
})
Expand Down
105 changes: 77 additions & 28 deletions pkg/kv/kvprober/kvprober.go
Expand Up @@ -137,6 +137,69 @@ type Metrics struct {
ProbePlanFailures *metric.Counter
}

// proberOps is an interface that the prober will use to run ops against some
// system. This interface exists so that ops can be mocked for tests.
type proberOps interface {
Read(key interface{}) func(context.Context, *kv.Txn) error
Write(key interface{}) func(context.Context, *kv.Txn) error
}

// proberTxn is an interface that the prober will use to run txns. This
// interface exists so that txn can be mocked for tests.
type proberTxn interface {
// Txn runs the given function with a transaction having the admission
// source in the header set to OTHER. Transaction work submitted from this
// source currently bypassess admission control.
Txn(context.Context, func(context.Context, *kv.Txn) error) error
// TxnRootKV runs the given function with a transaction having the admission
// source in the header set to ROOT KV. Transaction work submitted from this
// source should not bypass admission control.
TxnRootKV(context.Context, func(context.Context, *kv.Txn) error) error
}

// proberOpsImpl is used to probe the kv layer.
type proberOpsImpl struct {
}

// We attempt to commit a txn that reads some data at the key.
func (p *proberOpsImpl) Read(key interface{}) func(context.Context, *kv.Txn) error {
return func(ctx context.Context, txn *kv.Txn) error {
_, err := txn.Get(ctx, key)
return err
}
}

// We attempt to commit a txn that puts some data at the key then deletes
// it. The test of the write code paths is good: We get a raft command that
// goes thru consensus and is written to the pebble log. Importantly, no
// *live* data is left at the key, which simplifies the kvprober, as then
// there is no need to clean up data at the key post range split / merge.
// Note that MVCC tombstones may be left by the probe, but this is okay, as
// GC will clean it up.
func (p *proberOpsImpl) Write(key interface{}) func(context.Context, *kv.Txn) error {
return func(ctx context.Context, txn *kv.Txn) error {
if err := txn.Put(ctx, key, putValue); err != nil {
return err
}
return txn.Del(ctx, key)
}
}

// proberTxnImpl is used to run transactions.
type proberTxnImpl struct {
db *kv.DB
}

func (p *proberTxnImpl) Txn(ctx context.Context, f func(context.Context, *kv.Txn) error) error {
return p.db.Txn(ctx, f)
}

func (p *proberTxnImpl) TxnRootKV(
ctx context.Context, f func(context.Context, *kv.Txn) error,
) error {
return p.db.TxnRootKV(ctx, f)
}

// NewProber creates a Prober from Opts.
func NewProber(opts Opts) *Prober {
return &Prober{
Expand Down Expand Up @@ -209,14 +272,10 @@ func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {

// Doesn't return an error. Instead increments error type specific metrics.
func (p *Prober) readProbe(ctx context.Context, db *kv.DB, pl planner) {
p.readProbeImpl(ctx, db, pl)
p.readProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl)
}

type dbGetter interface {
Get(ctx context.Context, key interface{}) (kv.KeyValue, error)
}

func (p *Prober) readProbeImpl(ctx context.Context, db dbGetter, pl planner) {
func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner) {
if !readEnabled.Get(&p.settings.SV) {
return
}
Expand Down Expand Up @@ -249,8 +308,11 @@ func (p *Prober) readProbeImpl(ctx context.Context, db dbGetter, pl planner) {
// There is no data at the key, but that is okay. Even tho there is no data
// at the key, the prober still executes a read operation on the range.
// TODO(josh): Trace the probes.
_, err = db.Get(ctx, step.Key)
return err
f := ops.Read(step.Key)
if bypassAdmissionControl.Get(&p.settings.SV) {
return txns.Txn(ctx, f)
}
return txns.TxnRootKV(ctx, f)
})
if err != nil {
// TODO(josh): Write structured events with log.Structured.
Expand All @@ -268,14 +330,10 @@ func (p *Prober) readProbeImpl(ctx context.Context, db dbGetter, pl planner) {

// Doesn't return an error. Instead increments error type specific metrics.
func (p *Prober) writeProbe(ctx context.Context, db *kv.DB, pl planner) {
p.writeProbeImpl(ctx, db, pl)
}

type dbTxner interface {
Txn(ctx context.Context, f func(ctx context.Context, txn *kv.Txn) error) error
p.writeProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl)
}

func (p *Prober) writeProbeImpl(ctx context.Context, db dbTxner, pl planner) {
func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner) {
if !writeEnabled.Get(&p.settings.SV) {
return
}
Expand All @@ -297,20 +355,11 @@ func (p *Prober) writeProbeImpl(ctx context.Context, db dbTxner, pl planner) {
// perspective of the user.
timeout := writeTimeout.Get(&p.settings.SV)
err = contextutil.RunWithTimeout(ctx, "write probe", timeout, func(ctx context.Context) error {
// We attempt to commit a txn that puts some data at the key then
// deletes it. The test of the write code paths is good: We get
// a raft command that goes thru consensus and is written to the
// pebble log. Importantly, no *live* data is left at the key,
// which simplifies the kvprober, as then there is no need to clean
// up data at the key post range split / merge. Note that MVCC
// tombstones may be left by the probe, but this is okay, as GC will
// clean it up.
return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.Put(ctx, step.Key, putValue); err != nil {
return err
}
return txn.Del(ctx, step.Key)
})
f := ops.Write(step.Key)
if bypassAdmissionControl.Get(&p.settings.SV) {
return txns.Txn(ctx, f)
}
return txns.TxnRootKV(ctx, f)
})
if err != nil {
log.Health.Errorf(ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v", step.Key, step.RangeID, err)
Expand Down

0 comments on commit 18b95cb

Please sign in to comment.