Skip to content

Commit

Permalink
txthrottler: add metrics for topoWatcher and healthCheckStreamer (vit…
Browse files Browse the repository at this point in the history
…essio#13153)

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Apr 17, 2024
1 parent 39e5ade commit 7a380b8
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 40 deletions.
98 changes: 58 additions & 40 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// These vars store the functions used to create the topo server, healthcheck,
Expand Down Expand Up @@ -63,6 +63,10 @@ func resetTxThrottlerFactories() {
}
}

func init() {
resetTxThrottlerFactories()
}

// TxThrottler defines the interface for the transaction throttler.
type TxThrottler interface {
InitDBConfig(target *querypb.Target)
Expand All @@ -71,10 +75,6 @@ type TxThrottler interface {
Throttle(priority int) (result bool)
}

func init() {
resetTxThrottlerFactories()
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
// It is only used here to allow mocking out a throttler object.
type ThrottlerInterface interface {
Expand Down Expand Up @@ -142,9 +142,12 @@ type txThrottler struct {
topoServer *topo.Server

// stats
throttlerRunning *stats.Gauge
requestsTotal *stats.Counter
requestsThrottled *stats.Counter
throttlerRunning *stats.Gauge
topoWatchers *stats.GaugesWithSingleLabel
healthChecksReadTotal *stats.CountersWithMultiLabels
healthChecksRecordedTotal *stats.CountersWithMultiLabels
requestsTotal *stats.Counter
requestsThrottled *stats.Counter
}

// txThrottlerConfig holds the parameters that need to be
Expand All @@ -161,12 +164,13 @@ type txThrottlerConfig struct {
healthCheckCells []string

// tabletTypes stores the tablet types for throttling
tabletTypes *topoproto.TabletTypeListFlag
tabletTypes map[topodatapb.TabletType]bool
}

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
config *txThrottlerConfig
config *txThrottlerConfig
txThrottler *txThrottler

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
// That method is required to be called in serial for each threadId.
Expand All @@ -175,7 +179,7 @@ type txThrottlerState struct {
stopHealthCheck context.CancelFunc

healthCheck discovery.HealthCheck
topologyWatchers []TopologyWatcherInterface
topologyWatchers map[string]TopologyWatcherInterface
}

// NewTxThrottler tries to construct a txThrottler from the
Expand All @@ -191,9 +195,14 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
// is immutable.
healthCheckCells := env.Config().TxThrottlerHealthCheckCells

tabletTypes := make(map[topodatapb.TabletType]bool, len(*env.Config().TxThrottlerTabletTypes))
for _, tabletType := range *env.Config().TxThrottlerTabletTypes {
tabletTypes[tabletType] = true
}

throttlerConfig = &txThrottlerConfig{
enabled: true,
tabletTypes: env.Config().TxThrottlerTabletTypes,
tabletTypes: tabletTypes,
throttlerConfig: env.Config().TxThrottlerConfig.Get(),
healthCheckCells: healthCheckCells,
}
Expand All @@ -202,9 +211,14 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
}

return &txThrottler{
config: throttlerConfig,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
config: throttlerConfig,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"),
healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read",
[]string{"cell", "DbType"}),
healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded",
[]string{"cell", "DbType"}),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
}
Expand All @@ -225,7 +239,7 @@ func (t *txThrottler) Open() (err error) {
}
log.Info("txThrottler: opening")
t.throttlerRunning.Set(1)
t.state, err = newTxThrottlerState(t.topoServer, t.config, t.target)
t.state, err = newTxThrottlerState(t, t.config, t.target)
return err
}

Expand Down Expand Up @@ -269,7 +283,7 @@ func (t *txThrottler) Throttle(priority int) (result bool) {
return result
}

func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) {
func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}

t, err := throttlerFactory(
Expand All @@ -286,27 +300,28 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
t.Close()
return nil, err
}
result := &txThrottlerState{
config: config,
throttler: t,
state := &txThrottlerState{
config: config,
throttler: t,
txThrottler: txThrottler,
}
createTxThrottlerHealthCheck(topoServer, config, result, target.Cell)
createTxThrottlerHealthCheck(txThrottler.topoServer, config, state, target.Cell)

result.topologyWatchers = make(
[]TopologyWatcherInterface, 0, len(config.healthCheckCells))
state.topologyWatchers = make(
map[string]TopologyWatcherInterface, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
result.topologyWatchers = append(
result.topologyWatchers,
topologyWatcherFactory(
topoServer,
result.healthCheck,
cell,
target.Keyspace,
target.Shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency))
state.topologyWatchers[cell] = topologyWatcherFactory(
txThrottler.topoServer,
state.healthCheck,
cell,
target.Keyspace,
target.Shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency,
)
txThrottler.topoWatchers.Add(cell, 1)
}
return result, nil
return state, nil
}

func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) {
Expand Down Expand Up @@ -341,8 +356,9 @@ func (ts *txThrottlerState) deallocateResources() {
// We don't really need to nil out the fields here
// as deallocateResources is not expected to be called
// more than once, but it doesn't hurt to do so.
for _, watcher := range ts.topologyWatchers {
for cell, watcher := range ts.topologyWatchers {
watcher.Stop()
ts.txThrottler.topoWatchers.Reset(cell)
}
ts.topologyWatchers = nil

Expand All @@ -361,12 +377,14 @@ func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) {
return
}

tabletType := tabletStats.Target.TabletType
metricLabels := []string{tabletStats.Target.Cell, tabletType.String()}
ts.txThrottler.healthChecksReadTotal.Add(metricLabels, 1)

// Monitor tablets for replication lag if they have a tablet
// type specified by the --tx_throttler_tablet_types flag.
for _, expectedTabletType := range *ts.config.tabletTypes {
if tabletStats.Target.TabletType == expectedTabletType {
ts.throttler.RecordReplicationLag(time.Now(), tabletStats)
return
}
if ts.config.tabletTypes[tabletType] {
ts.throttler.RecordReplicationLag(time.Now(), tabletStats)
ts.txThrottler.healthChecksRecordedTotal.Add(metricLabels, 1)
}
}
11 changes: 11 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func TestEnabledThrottler(t *testing.T) {
call1.Return(0 * time.Second)
tabletStats := &discovery.TabletHealth{
Target: &querypb.Target{
Cell: "cell1",
TabletType: topodatapb.TabletType_REPLICA,
},
}
Expand Down Expand Up @@ -119,24 +120,33 @@ func TestEnabledThrottler(t *testing.T) {
throttlerImpl, _ := throttler.(*txThrottler)
assert.NotNil(t, throttlerImpl)
throttler.InitDBConfig(&querypb.Target{
Cell: "cell1",
Keyspace: "keyspace",
Shard: "shard",
})

assert.Nil(t, throttlerImpl.Open())
assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts())

assert.False(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Get())
assert.Zero(t, throttlerImpl.requestsThrottled.Get())

throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts())
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts())
rdonlyTabletStats := &discovery.TabletHealth{
Target: &querypb.Target{
Cell: "cell2",
TabletType: topodatapb.TabletType_RDONLY,
},
}
// This call should not be forwarded to the go/vt/throttlerImpl.Throttler object.
throttlerImpl.state.StatsUpdate(rdonlyTabletStats)
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttlerImpl.healthChecksReadTotal.Counts())
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts())

// The second throttle call should reject.
assert.True(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Get())
Expand All @@ -148,6 +158,7 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get())
throttlerImpl.Close()
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts())
}

func TestNewTxThrottler(t *testing.T) {
Expand Down

0 comments on commit 7a380b8

Please sign in to comment.