Skip to content

Commit

Permalink
Add dry-run/monitoring-only mode for TxThrottler (vitessio#13604)
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>
Signed-off-by: Eduardo J. Ortega U. <5791035+ejortegau@users.noreply.github.com>
  • Loading branch information
ejortegau authored and timvaillancourt committed May 16, 2024
1 parent bd9da81 commit e1ac65f
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 14 deletions.
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ Usage of vttablet:
--twopc_enable if the flag is on, 2pc is enabled. Other 2pc flags must be supplied.
--tx-throttler-config string Synonym to -tx_throttler_config (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9")
--tx-throttler-default-priority int Default priority assigned to queries that lack priority information (default 100)
--tx-throttler-dry-run If present, the transaction throttler only records metrics about requests received and throttled, but does not actually throttle any requests.
--tx-throttler-healthcheck-cells strings Synonym to -tx_throttler_healthcheck_cells
--tx-throttler-tablet-types strings A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica)
--tx-throttler-topo-refresh-interval duration The rate that the transaction throttler will refresh the topology to find cells. (default 5m0s)
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
flagutil.DualFormatStringListVar(fs, &currentConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.")
fs.IntVar(&currentConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information")
fs.Var(currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly.")
fs.BoolVar(&currentConfig.TxThrottlerDryRun, "tx-throttler-dry-run", defaultConfig.TxThrottlerDryRun, "If present, the transaction throttler only records metrics about requests received and throttled, but does not actually throttle any requests.")
fs.DurationVar(&currentConfig.TxThrottlerTopoRefreshInterval, "tx-throttler-topo-refresh-interval", time.Minute*5, "The rate that the transaction throttler will refresh the topology to find cells.")

fs.BoolVar(&enableHotRowProtection, "enable_hot_row_protection", false, "If true, incoming transactions for the same row (range) will be queued and cannot consume all txpool slots.")
Expand Down Expand Up @@ -342,6 +343,7 @@ type TabletConfig struct {
TxThrottlerDefaultPriority int `json:"-"`
TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"`
TxThrottlerTopoRefreshInterval time.Duration `json:"-"`
TxThrottlerDryRun bool `json:"-"`

EnableLagThrottler bool `json:"-"`

Expand Down Expand Up @@ -623,6 +625,7 @@ var defaultConfig = TabletConfig{
TxThrottlerHealthCheckCells: []string{},
TxThrottlerDefaultPriority: sqlparser.MaxPriorityValue, // This leads to all queries being candidates to throttle
TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA},
TxThrottlerDryRun: false,
TxThrottlerTopoRefreshInterval: time.Minute * 5,

EnableLagThrottler: false, // Feature flag; to switch to 'true' at some stage in the future
Expand Down
39 changes: 25 additions & 14 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ type txThrottler struct {

// state holds an open transaction throttler state. It is nil
// if the TransactionThrottler is closed.
state *txThrottlerState
state txThrottlerState

target *querypb.Target
topoServer *topo.Server
Expand All @@ -170,6 +170,10 @@ type txThrottlerConfig struct {
// returns false.
enabled bool

// if dryRun is true, the txThrottler will run only on monitoring mode, meaning that it will increase counters for
// total and actually throttled requests, but it will not actually return that a transaction should be throttled.
dryRun bool

throttlerConfig *throttlerdatapb.Configuration
// healthCheckCells stores the cell names in which running vttablets will be monitored for
// replication lag.
Expand All @@ -182,8 +186,14 @@ type txThrottlerConfig struct {
topoRefreshInterval time.Duration
}

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
type txThrottlerState interface {
deallocateResources()
StatsUpdate(tabletStats *discovery.TabletHealth)
throttle() bool
}

// txThrottlerStateImpl holds the state of an open TxThrottler object.
type txThrottlerStateImpl struct {
config *txThrottlerConfig
txThrottler *txThrottler

Expand Down Expand Up @@ -221,6 +231,7 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
throttlerConfig = &txThrottlerConfig{
enabled: true,
healthCheckCells: healthCheckCells,
dryRun: env.Config().TxThrottlerDryRun,
tabletTypes: tabletTypes,
throttlerConfig: env.Config().TxThrottlerConfig.Get(),
topoRefreshInterval: env.Config().TxThrottlerTopoRefreshInterval,
Expand Down Expand Up @@ -299,10 +310,10 @@ func (t *txThrottler) Throttle(priority int, workload string) (result bool) {
t.requestsThrottled.Add(workload, 1)
}

return result
return result && !t.config.dryRun
}

func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) {
func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, target *querypb.Target) (txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}

t, err := throttlerFactory(
Expand All @@ -319,7 +330,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta
t.Close()
return nil, err
}
state := &txThrottlerState{
state := &txThrottlerStateImpl{
config: config,
healthCheckCells: config.healthCheckCells,
throttler: t,
Expand All @@ -342,7 +353,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta
return state, nil
}

func (ts *txThrottlerState) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) {
func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) {
ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells)
ts.healthCheckChan = ts.healthCheck.Subscribe()

Expand All @@ -362,7 +373,7 @@ func (ts *txThrottlerState) initHealthCheckStream(topoServer *topo.Server, targe
}
}

func (ts *txThrottlerState) closeHealthCheckStream() {
func (ts *txThrottlerStateImpl) closeHealthCheckStream() {
if ts.healthCheck == nil {
return
}
Expand All @@ -375,7 +386,7 @@ func (ts *txThrottlerState) closeHealthCheckStream() {
ts.healthCheck.Close()
}

func (ts *txThrottlerState) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()

Expand All @@ -388,7 +399,7 @@ func (ts *txThrottlerState) updateHealthCheckCells(ctx context.Context, topoServ
}
}

func (ts *txThrottlerState) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
var cellsUpdateTicks <-chan time.Time
if ts.cellsFromTopo {
ticker := time.NewTicker(ts.config.topoRefreshInterval)
Expand All @@ -407,7 +418,7 @@ func (ts *txThrottlerState) healthChecksProcessor(ctx context.Context, topoServe
}
}

func (ts *txThrottlerState) throttle() bool {
func (ts *txThrottlerStateImpl) throttle() bool {
if ts.throttler == nil {
log.Error("throttle called after deallocateResources was called")
return false
Expand All @@ -418,19 +429,19 @@ func (ts *txThrottlerState) throttle() bool {
return ts.throttler.Throttle(0 /* threadId */) > 0
}

func (ts *txThrottlerState) deallocateResources() {
func (ts *txThrottlerStateImpl) deallocateResources() {
// Close healthcheck and topo watchers
ts.closeHealthCheckStream()
ts.healthCheck = nil

// After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not
// After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not
// to be executing, so we can safely close the throttler.
ts.throttler.Close()
ts.throttler = nil
}

// StatsUpdate updates the health of a tablet with the given healthcheck.
func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) {
func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth) {
if ts.config.tabletTypes == nil {
return
}
Expand Down
51 changes: 51 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,54 @@ func TestNewTxThrottler(t *testing.T) {
assert.Equal(t, []string{"cell1", "cell2"}, throttlerImpl.config.healthCheckCells)
}
}

func TestDryRunThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, t.Name())

testCases := []struct {
Name string
txThrottlerStateShouldThrottle bool
throttlerDryRun bool
expectedResult bool
}{
{Name: "Real run throttles when txThrottlerStateImpl says it should", txThrottlerStateShouldThrottle: true, throttlerDryRun: false, expectedResult: true},
{Name: "Real run does not throttle when txThrottlerStateImpl says it should not", txThrottlerStateShouldThrottle: false, throttlerDryRun: false, expectedResult: false},
{Name: "Dry run does not throttle when txThrottlerStateImpl says it should", txThrottlerStateShouldThrottle: true, throttlerDryRun: true, expectedResult: false},
{Name: "Dry run does not throttle when txThrottlerStateImpl says it should not", txThrottlerStateShouldThrottle: false, throttlerDryRun: true, expectedResult: false},
}

for _, aTestCase := range testCases {
theTestCase := aTestCase

t.Run(theTestCase.Name, func(t *testing.T) {
aTxThrottler := &txThrottler{
config: &txThrottlerConfig{
enabled: true,
dryRun: theTestCase.throttlerDryRun,
},
state: &mockTxThrottlerState{shouldThrottle: theTestCase.txThrottlerStateShouldThrottle},
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
requestsTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerRequests", "transaction throttler requests", "workload"),
requestsThrottled: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerThrottled", "transaction throttler requests throttled", "workload"),
}

assert.Equal(t, theTestCase.expectedResult, aTxThrottler.Throttle(100, "some-workload"))
})
}
}

type mockTxThrottlerState struct {
shouldThrottle bool
}

func (t *mockTxThrottlerState) deallocateResources() {

}
func (t *mockTxThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) {

}

func (t *mockTxThrottlerState) throttle() bool {
return t.shouldThrottle
}

0 comments on commit e1ac65f

Please sign in to comment.