diff --git a/go.mod b/go.mod index 944bccf8ec6..61731852fd2 100644 --- a/go.mod +++ b/go.mod @@ -121,6 +121,7 @@ require ( github.com/hashicorp/go-version v1.6.0 github.com/planetscale/log v0.0.0-20221118170849-fb599bc35c50 github.com/slok/noglog v0.2.0 + go.uber.org/mock v0.4.0 go.uber.org/zap v1.23.0 golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 golang.org/x/sync v0.3.0 diff --git a/go.sum b/go.sum index 04b512a01e9..494e9dfb049 100644 --- a/go.sum +++ b/go.sum @@ -783,6 +783,8 @@ go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 5cd22bacf55..64944fa69be 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -345,8 +345,10 @@ Usage of vttablet: --twopc_enable if the flag is on, 2pc is enabled. Other 2pc flags must be supplied. --tx-throttler-config string Synonym to -tx_throttler_config (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9") --tx-throttler-default-priority int Default priority assigned to queries that lack priority information (default 100) + --tx-throttler-dry-run If present, the transaction throttler only records metrics about requests received and throttled, but does not actually throttle any requests. --tx-throttler-healthcheck-cells strings Synonym to -tx_throttler_healthcheck_cells --tx-throttler-tablet-types strings A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica) + --tx-throttler-topo-refresh-interval duration The rate that the transaction throttler will refresh the topology to find cells. (default 5m0s) --tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9") --tx_throttler_healthcheck_cells strings A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler. --unhealthy_threshold duration replication lag after which a replica is considered unhealthy (default 2h0m0s) diff --git a/go/vt/discovery/replicationlag.go b/go/vt/discovery/replicationlag.go index 71ab78fd15b..c084b580e40 100644 --- a/go/vt/discovery/replicationlag.go +++ b/go/vt/discovery/replicationlag.go @@ -75,13 +75,13 @@ func SetMinNumTablets(numTablets int) { minNumTablets = numTablets } -// IsReplicationLagHigh verifies that the given LegacytabletHealth refers to a tablet with high +// IsReplicationLagHigh verifies that the given TabletHealth refers to a tablet with high // replication lag, i.e. higher than the configured discovery_low_replication_lag flag. func IsReplicationLagHigh(tabletHealth *TabletHealth) bool { return float64(tabletHealth.Stats.ReplicationLagSeconds) > lowReplicationLag.Seconds() } -// IsReplicationLagVeryHigh verifies that the given LegacytabletHealth refers to a tablet with very high +// IsReplicationLagVeryHigh verifies that the given TabletHealth refers to a tablet with very high // replication lag, i.e. higher than the configured discovery_high_replication_lag_minimum_serving flag. func IsReplicationLagVeryHigh(tabletHealth *TabletHealth) bool { return float64(tabletHealth.Stats.ReplicationLagSeconds) > highReplicationLagMinServing.Seconds() @@ -117,7 +117,7 @@ func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHeal return filterStatsByLag(tabletHealthList) } res := filterStatsByLagWithLegacyAlgorithm(tabletHealthList) - // run the filter again if exactly one tablet is removed, + // Run the filter again if exactly one tablet is removed, // and we have spare tablets. if len(res) > minNumTablets && len(res) == len(tabletHealthList)-1 { res = filterStatsByLagWithLegacyAlgorithm(res) @@ -128,12 +128,12 @@ func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHeal func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth { list := make([]tabletLagSnapshot, 0, len(tabletHealthList)) - // filter non-serving tablets and those with very high replication lag + // Filter out non-serving tablets and those with very high replication lag. for _, ts := range tabletHealthList { if !ts.Serving || ts.LastError != nil || ts.Stats == nil || IsReplicationLagVeryHigh(ts) { continue } - // Pull the current replication lag for a stable sort later. + // Save the current replication lag for a stable sort later. list = append(list, tabletLagSnapshot{ ts: ts, replag: ts.Stats.ReplicationLagSeconds}) @@ -142,7 +142,7 @@ func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth { // Sort by replication lag. sort.Sort(tabletLagSnapshotList(list)) - // Pick those with low replication lag, but at least minNumTablets tablets regardless. + // Pick tablets with low replication lag, but at least minNumTablets tablets regardless. res := make([]*TabletHealth, 0, len(list)) for i := 0; i < len(list); i++ { if !IsReplicationLagHigh(list[i].ts) || i < minNumTablets { @@ -154,7 +154,7 @@ func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth { func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*TabletHealth { list := make([]*TabletHealth, 0, len(tabletHealthList)) - // filter non-serving tablets + // Filter out non-serving tablets. for _, ts := range tabletHealthList { if !ts.Serving || ts.LastError != nil || ts.Stats == nil { continue @@ -164,7 +164,7 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta if len(list) <= 1 { return list } - // if all have low replication lag (<=30s), return all tablets. + // If all tablets have low replication lag (<=30s), return all of them. allLowLag := true for _, ts := range list { if IsReplicationLagHigh(ts) { @@ -175,12 +175,12 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta if allLowLag { return list } - // filter those affecting "mean" lag significantly - // calculate mean for all tablets + // We want to filter out tablets that are affecting "mean" lag significantly. + // We first calculate the mean across all tablets. res := make([]*TabletHealth, 0, len(list)) m, _ := mean(list, -1) for i, ts := range list { - // calculate mean by excluding ith tablet + // Now we calculate the mean by excluding ith tablet mi, _ := mean(list, i) if float64(mi) > float64(m)*0.7 { res = append(res, ts) @@ -189,9 +189,11 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta if len(res) >= minNumTablets { return res } - // return at least minNumTablets tablets to avoid over loading, - // if there is enough tablets with replication lag < highReplicationLagMinServing. - // Pull the current replication lag for a stable sort. + + // We want to return at least minNumTablets tablets to avoid overloading, + // as long as there are enough tablets with replication lag < highReplicationLagMinServing. + + // Save the current replication lag for a stable sort. snapshots := make([]tabletLagSnapshot, 0, len(list)) for _, ts := range list { if !IsReplicationLagVeryHigh(ts) { diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 5e5c54ab57a..78d5f696f6f 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -54,15 +54,15 @@ var ( "Operation", topologyWatcherOpListTablets, topologyWatcherOpGetTablet) ) -// tabletInfo is used internally by the TopologyWatcher class +// tabletInfo is used internally by the TopologyWatcher struct. type tabletInfo struct { alias string tablet *topodata.Tablet } -// TopologyWatcher polls tablet from a configurable set of tablets -// periodically. When tablets are added / removed, it calls -// the LegacyTabletRecorder AddTablet / RemoveTablet interface appropriately. +// TopologyWatcher polls the topology periodically for changes to +// the set of tablets. When tablets are added / removed / modified, +// it calls the AddTablet / RemoveTablet interface appropriately. type TopologyWatcher struct { // set at construction time topoServer *topo.Server @@ -80,20 +80,21 @@ type TopologyWatcher struct { // mu protects all variables below mu sync.Mutex - // tablets contains a map of alias -> tabletInfo for all known tablets + // tablets contains a map of alias -> tabletInfo for all known tablets. tablets map[string]*tabletInfo - // topoChecksum stores a crc32 of the tablets map and is exported as a metric + // topoChecksum stores a crc32 of the tablets map and is exported as a metric. topoChecksum uint32 - // lastRefresh records the timestamp of the last topo refresh + // lastRefresh records the timestamp of the last refresh of the topology. lastRefresh time.Time - // firstLoadDone is true when first load of the topology data is done. + // firstLoadDone is true when the initial load of the topology data is complete. firstLoadDone bool - // firstLoadChan is closed when the initial loading of topology data is done. + // firstLoadChan is closed when the initial load of topology data is complete. firstLoadChan chan struct{} } // NewTopologyWatcher returns a TopologyWatcher that monitors all -// the tablets in a cell, and starts refreshing. +// the tablets that it is configured to watch, and reloads them periodically if needed. +// As of now there is only one implementation: watch all tablets in a cell. func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, @@ -115,14 +116,14 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC } // NewCellTabletsWatcher returns a TopologyWatcher that monitors all -// the tablets in a cell, and starts refreshing. +// the tablets in a cell, and reloads them as needed. func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) { return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell) }) } -// Start starts the topology watcher +// Start starts the topology watcher. func (tw *TopologyWatcher) Start() { tw.wg.Add(1) go func(t *TopologyWatcher) { @@ -140,7 +141,7 @@ func (tw *TopologyWatcher) Start() { }(tw) } -// Stop stops the watcher. It does not clean up the tablets added to LegacyTabletRecorder. +// Stop stops the watcher. It does not clean up the tablets added to HealthCheck. func (tw *TopologyWatcher) Stop() { tw.cancelFunc() // wait for watch goroutine to finish. @@ -151,7 +152,7 @@ func (tw *TopologyWatcher) loadTablets() { var wg sync.WaitGroup newTablets := make(map[string]*tabletInfo) - // first get the list of relevant tabletAliases + // First get the list of relevant tabletAliases. tabletAliases, err := tw.getTablets(tw) topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1) if err != nil { @@ -166,7 +167,7 @@ func (tw *TopologyWatcher) loadTablets() { } // Accumulate a list of all known alias strings to use later - // when sorting + // when sorting. tabletAliasStrs := make([]string, 0, len(tabletAliases)) tw.mu.Lock() @@ -175,7 +176,7 @@ func (tw *TopologyWatcher) loadTablets() { tabletAliasStrs = append(tabletAliasStrs, aliasStr) if !tw.refreshKnownTablets { - // we already have a tabletInfo for this and the flag tells us to not refresh + // We already have a tabletInfo for this and the flag tells us to not refresh. if val, ok := tw.tablets[aliasStr]; ok { newTablets[aliasStr] = val continue @@ -188,7 +189,7 @@ func (tw *TopologyWatcher) loadTablets() { tw.sem <- 1 // Wait for active queue to drain. tablet, err := tw.topoServer.GetTablet(tw.ctx, alias) topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1) - <-tw.sem // Done; enable next request to run + <-tw.sem // Done; enable next request to run. if err != nil { topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1) select { @@ -218,7 +219,7 @@ func (tw *TopologyWatcher) loadTablets() { continue } - // trust the alias from topo and add it if it doesn't exist + // Trust the alias from topo and add it if it doesn't exist. if val, ok := tw.tablets[alias]; ok { // check if the host and port have changed. If yes, replace tablet. oldKey := TabletToMapKey(val.tablet) @@ -230,7 +231,7 @@ func (tw *TopologyWatcher) loadTablets() { topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1) } } else { - // This is a new tablet record, let's add it to the healthcheck + // This is a new tablet record, let's add it to the HealthCheck. tw.healthcheck.AddTablet(newVal.tablet) topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1) } @@ -252,8 +253,8 @@ func (tw *TopologyWatcher) loadTablets() { close(tw.firstLoadChan) } - // iterate through the tablets in a stable order and compute a - // checksum of the tablet map + // Iterate through the tablets in a stable order and compute a + // checksum of the tablet map. sort.Strings(tabletAliasStrs) var buf bytes.Buffer for _, alias := range tabletAliasStrs { @@ -269,7 +270,7 @@ func (tw *TopologyWatcher) loadTablets() { } -// RefreshLag returns the time since the last refresh +// RefreshLag returns the time since the last refresh. func (tw *TopologyWatcher) RefreshLag() time.Duration { tw.mu.Lock() defer tw.mu.Unlock() @@ -277,7 +278,7 @@ func (tw *TopologyWatcher) RefreshLag() time.Duration { return time.Since(tw.lastRefresh) } -// TopoChecksum returns the checksum of the current state of the topo +// TopoChecksum returns the checksum of the current state of the topo. func (tw *TopologyWatcher) TopoChecksum() uint32 { tw.mu.Lock() defer tw.mu.Unlock() @@ -286,7 +287,7 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 { } // TabletFilter is an interface that can be given to a TopologyWatcher -// to be applied as an additional filter on the list of tablets returned by its getTablets function +// to be applied as an additional filter on the list of tablets returned by its getTablets function. type TabletFilter interface { // IsIncluded returns whether tablet is included in this filter IsIncluded(tablet *topodata.Tablet) bool @@ -300,18 +301,18 @@ type FilterByShard struct { } // filterShard describes a filter for a given shard or keyrange inside -// a keyspace +// a keyspace. type filterShard struct { keyspace string shard string keyRange *topodata.KeyRange // only set if shard is also a KeyRange } -// NewFilterByShard creates a new FilterByShard on top of an existing -// LegacyTabletRecorder. Each filter is a keyspace|shard entry, where shard +// NewFilterByShard creates a new FilterByShard for use by a +// TopologyWatcher. Each filter is a keyspace|shard entry, where shard // can either be a shard name, or a keyrange. All tablets that match -// at least one keyspace|shard tuple will be forwarded to the -// underlying LegacyTabletRecorder. +// at least one keyspace|shard tuple will be forwarded by the +// TopologyWatcher to its consumer. func NewFilterByShard(filters []string) (*FilterByShard, error) { m := make(map[string][]*filterShard) for _, filter := range filters { @@ -348,8 +349,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) { }, nil } -// IsIncluded returns true iff the tablet's keyspace and shard should be -// forwarded to the underlying LegacyTabletRecorder. +// IsIncluded returns true iff the tablet's keyspace and shard match what we have. func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool { canonical, kr, err := topo.ValidateShardName(tablet.Shard) if err != nil { @@ -370,15 +370,14 @@ func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool { return false } -// FilterByKeyspace is a filter that filters tablets by -// keyspace +// FilterByKeyspace is a filter that filters tablets by keyspace. type FilterByKeyspace struct { keyspaces map[string]bool } // NewFilterByKeyspace creates a new FilterByKeyspace. // Each filter is a keyspace entry. All tablets that match -// a keyspace will be forwarded to the underlying LegacyTabletRecorder. +// a keyspace will be forwarded to the TopologyWatcher's consumer. func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace { m := make(map[string]bool) for _, keyspace := range selectedKeyspaces { @@ -390,8 +389,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace { } } -// IsIncluded returns true if the tablet's keyspace should be -// forwarded to the underlying LegacyTabletRecorder. +// IsIncluded returns true if the tablet's keyspace matches what we have. func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool { _, exist := fbk.keyspaces[tablet.Keyspace] return exist diff --git a/go/vt/proto/replicationdata/replicationdata.pb.go b/go/vt/proto/replicationdata/replicationdata.pb.go index 7fc537b9cca..6f500dda898 100644 --- a/go/vt/proto/replicationdata/replicationdata.pb.go +++ b/go/vt/proto/replicationdata/replicationdata.pb.go @@ -92,6 +92,8 @@ type Status struct { unknownFields protoimpl.UnknownFields Position string `protobuf:"bytes,1,opt,name=position,proto3" json:"position,omitempty"` + IoThreadRunning bool `protobuf:"varint,2,opt,name=io_thread_running,json=ioThreadRunning,proto3" json:"io_thread_running,omitempty"` + SqlThreadRunning bool `protobuf:"varint,3,opt,name=sql_thread_running,json=sqlThreadRunning,proto3" json:"sql_thread_running,omitempty"` ReplicationLagSeconds uint32 `protobuf:"varint,4,opt,name=replication_lag_seconds,json=replicationLagSeconds,proto3" json:"replication_lag_seconds,omitempty"` SourceHost string `protobuf:"bytes,5,opt,name=source_host,json=sourceHost,proto3" json:"source_host,omitempty"` SourcePort int32 `protobuf:"varint,6,opt,name=source_port,json=sourcePort,proto3" json:"source_port,omitempty"` @@ -155,6 +157,20 @@ func (x *Status) GetPosition() string { return "" } +func (x *Status) GetIoThreadRunning() bool { + if x != nil { + return x.IoThreadRunning + } + return false +} + +func (x *Status) GetSqlThreadRunning() bool { + if x != nil { + return x.SqlThreadRunning + } + return false +} + func (x *Status) GetReplicationLagSeconds() uint32 { if x != nil { return x.ReplicationLagSeconds @@ -620,145 +636,150 @@ var File_replicationdata_proto protoreflect.FileDescriptor var file_replicationdata_proto_rawDesc = []byte{ 0x0a, 0x15, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x22, 0x96, 0x07, 0x0a, 0x06, 0x53, 0x74, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x22, 0xe4, 0x07, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, - 0x36, 0x0a, 0x17, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, - 0x61, 0x67, 0x5f, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, - 0x52, 0x15, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x61, 0x67, - 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x5f, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x48, 0x6f, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x73, - 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x6f, 0x6e, - 0x6e, 0x65, 0x63, 0x74, 0x5f, 0x72, 0x65, 0x74, 0x72, 0x79, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, - 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x12, 0x2c, - 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x70, 0x6f, 0x73, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x72, 0x65, 0x6c, 0x61, - 0x79, 0x4c, 0x6f, 0x67, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0d, - 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x09, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0c, 0x66, 0x69, 0x6c, 0x65, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x12, 0x5b, 0x0a, 0x2b, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x73, - 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x5f, 0x65, 0x71, 0x75, - 0x69, 0x76, 0x61, 0x6c, 0x65, 0x6e, 0x74, 0x5f, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x26, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x4c, 0x6f, 0x67, - 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x42, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x45, 0x71, 0x75, 0x69, - 0x76, 0x61, 0x6c, 0x65, 0x6e, 0x74, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x28, - 0x0a, 0x10, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, - 0x69, 0x64, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x5f, 0x75, 0x75, 0x69, 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, - 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x75, 0x69, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x69, 0x6f, 0x5f, - 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x69, 0x6f, 0x53, - 0x74, 0x61, 0x74, 0x65, 0x12, 0x22, 0x0a, 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x69, 0x6f, 0x5f, - 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6c, 0x61, 0x73, - 0x74, 0x49, 0x6f, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x71, 0x6c, 0x5f, - 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x73, 0x71, 0x6c, - 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x71, - 0x6c, 0x5f, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x10, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x6c, - 0x61, 0x73, 0x74, 0x53, 0x71, 0x6c, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x35, 0x0a, 0x17, 0x72, - 0x65, 0x6c, 0x61, 0x79, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x70, 0x6f, - 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x11, 0x20, 0x01, 0x28, 0x09, 0x52, 0x14, 0x72, 0x65, - 0x6c, 0x61, 0x79, 0x4c, 0x6f, 0x67, 0x46, 0x69, 0x6c, 0x65, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x75, 0x73, 0x65, - 0x72, 0x18, 0x12, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, - 0x73, 0x65, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x71, 0x6c, 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, - 0x18, 0x13, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x71, 0x6c, 0x44, 0x65, 0x6c, 0x61, 0x79, - 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x75, 0x74, 0x6f, 0x5f, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x18, 0x14, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x61, 0x75, 0x74, 0x6f, 0x50, 0x6f, 0x73, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1d, 0x0a, 0x0a, 0x75, 0x73, 0x69, 0x6e, 0x67, 0x5f, 0x67, - 0x74, 0x69, 0x64, 0x18, 0x15, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x75, 0x73, 0x69, 0x6e, 0x67, - 0x47, 0x74, 0x69, 0x64, 0x12, 0x36, 0x0a, 0x17, 0x68, 0x61, 0x73, 0x5f, 0x72, 0x65, 0x70, 0x6c, - 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x73, 0x18, - 0x16, 0x20, 0x01, 0x28, 0x08, 0x52, 0x15, 0x68, 0x61, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x73, 0x12, 0x1f, 0x0a, 0x0b, - 0x73, 0x73, 0x6c, 0x5f, 0x61, 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x64, 0x18, 0x17, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x0a, 0x73, 0x73, 0x6c, 0x41, 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x64, 0x12, 0x36, 0x0a, - 0x17, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x61, 0x67, - 0x5f, 0x75, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x18, 0x18, 0x20, 0x01, 0x28, 0x08, 0x52, 0x15, - 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x61, 0x67, 0x55, 0x6e, - 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x4a, 0x04, 0x08, 0x02, 0x10, 0x03, 0x4a, 0x04, 0x08, 0x03, 0x10, - 0x04, 0x22, 0x77, 0x0a, 0x15, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2f, 0x0a, 0x06, 0x62, 0x65, - 0x66, 0x6f, 0x72, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x72, 0x65, 0x70, - 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x06, 0x62, 0x65, 0x66, 0x6f, 0x72, 0x65, 0x12, 0x2d, 0x0a, 0x05, 0x61, - 0x66, 0x74, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x72, 0x65, 0x70, - 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x05, 0x61, 0x66, 0x74, 0x65, 0x72, 0x22, 0x50, 0x0a, 0x0d, 0x50, 0x72, - 0x69, 0x6d, 0x61, 0x72, 0x79, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x70, - 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, - 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0d, 0x66, 0x69, 0x6c, 0x65, 0x5f, - 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, - 0x66, 0x69, 0x6c, 0x65, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0xc3, 0x07, 0x0a, - 0x0a, 0x46, 0x75, 0x6c, 0x6c, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x73, - 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, - 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x65, 0x72, 0x76, - 0x65, 0x72, 0x5f, 0x75, 0x75, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, - 0x65, 0x72, 0x76, 0x65, 0x72, 0x55, 0x75, 0x69, 0x64, 0x12, 0x46, 0x0a, 0x12, 0x72, 0x65, 0x70, - 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x11, - 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x12, 0x45, 0x0a, 0x0e, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x5f, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x65, 0x70, 0x6c, - 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x50, 0x72, 0x69, 0x6d, - 0x61, 0x72, 0x79, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0d, 0x70, 0x72, 0x69, 0x6d, 0x61, - 0x72, 0x79, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x67, 0x74, 0x69, 0x64, - 0x5f, 0x70, 0x75, 0x72, 0x67, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x67, - 0x74, 0x69, 0x64, 0x50, 0x75, 0x72, 0x67, 0x65, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, - 0x69, 0x6f, 0x6e, 0x12, 0x27, 0x0a, 0x0f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x63, - 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x76, 0x65, - 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x1b, 0x0a, 0x09, - 0x72, 0x65, 0x61, 0x64, 0x5f, 0x6f, 0x6e, 0x6c, 0x79, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x08, 0x72, 0x65, 0x61, 0x64, 0x4f, 0x6e, 0x6c, 0x79, 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x74, 0x69, - 0x64, 0x5f, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x74, - 0x69, 0x64, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, - 0x5f, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x62, - 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x12, 0x28, 0x0a, 0x10, 0x62, - 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x5f, 0x72, 0x6f, 0x77, 0x5f, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x18, - 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x52, 0x6f, 0x77, - 0x49, 0x6d, 0x61, 0x67, 0x65, 0x12, 0x26, 0x0a, 0x0f, 0x6c, 0x6f, 0x67, 0x5f, 0x62, 0x69, 0x6e, - 0x5f, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, - 0x6c, 0x6f, 0x67, 0x42, 0x69, 0x6e, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x12, 0x2e, 0x0a, - 0x13, 0x6c, 0x6f, 0x67, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x75, 0x70, 0x64, - 0x61, 0x74, 0x65, 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x08, 0x52, 0x11, 0x6c, 0x6f, 0x67, 0x52, - 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x12, 0x39, 0x0a, - 0x19, 0x73, 0x65, 0x6d, 0x69, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x70, 0x72, 0x69, 0x6d, 0x61, - 0x72, 0x79, 0x5f, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x16, 0x73, 0x65, 0x6d, 0x69, 0x53, 0x79, 0x6e, 0x63, 0x50, 0x72, 0x69, 0x6d, 0x61, 0x72, - 0x79, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x12, 0x39, 0x0a, 0x19, 0x73, 0x65, 0x6d, 0x69, - 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x65, 0x6e, - 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x08, 0x52, 0x16, 0x73, 0x65, 0x6d, - 0x69, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x45, 0x6e, 0x61, 0x62, - 0x6c, 0x65, 0x64, 0x12, 0x37, 0x0a, 0x18, 0x73, 0x65, 0x6d, 0x69, 0x5f, 0x73, 0x79, 0x6e, 0x63, - 0x5f, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, - 0x10, 0x20, 0x01, 0x28, 0x08, 0x52, 0x15, 0x73, 0x65, 0x6d, 0x69, 0x53, 0x79, 0x6e, 0x63, 0x50, - 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x37, 0x0a, 0x18, - 0x73, 0x65, 0x6d, 0x69, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x11, 0x20, 0x01, 0x28, 0x08, 0x52, 0x15, - 0x73, 0x65, 0x6d, 0x69, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x53, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, 0x19, 0x73, 0x65, 0x6d, 0x69, 0x5f, 0x73, 0x79, - 0x6e, 0x63, 0x5f, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e, - 0x74, 0x73, 0x18, 0x12, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x16, 0x73, 0x65, 0x6d, 0x69, 0x53, 0x79, - 0x6e, 0x63, 0x50, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, - 0x12, 0x39, 0x0a, 0x19, 0x73, 0x65, 0x6d, 0x69, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x70, 0x72, - 0x69, 0x6d, 0x61, 0x72, 0x79, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x13, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x16, 0x73, 0x65, 0x6d, 0x69, 0x53, 0x79, 0x6e, 0x63, 0x50, 0x72, 0x69, - 0x6d, 0x61, 0x72, 0x79, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x45, 0x0a, 0x20, 0x73, - 0x65, 0x6d, 0x69, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x77, 0x61, 0x69, 0x74, 0x5f, 0x66, 0x6f, - 0x72, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, - 0x14, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x1b, 0x73, 0x65, 0x6d, 0x69, 0x53, 0x79, 0x6e, 0x63, 0x57, - 0x61, 0x69, 0x74, 0x46, 0x6f, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x43, 0x6f, 0x75, - 0x6e, 0x74, 0x2a, 0x3b, 0x0a, 0x13, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x0e, 0x49, 0x4f, 0x41, - 0x4e, 0x44, 0x53, 0x51, 0x4c, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x10, 0x00, 0x12, 0x10, 0x0a, - 0x0c, 0x49, 0x4f, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x4f, 0x4e, 0x4c, 0x59, 0x10, 0x01, 0x42, - 0x2e, 0x5a, 0x2c, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, - 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, - 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x2a, 0x0a, 0x11, 0x69, 0x6f, 0x5f, 0x74, 0x68, 0x72, 0x65, 0x61, 0x64, 0x5f, 0x72, 0x75, 0x6e, + 0x6e, 0x69, 0x6e, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x69, 0x6f, 0x54, 0x68, + 0x72, 0x65, 0x61, 0x64, 0x52, 0x75, 0x6e, 0x6e, 0x69, 0x6e, 0x67, 0x12, 0x2c, 0x0a, 0x12, 0x73, + 0x71, 0x6c, 0x5f, 0x74, 0x68, 0x72, 0x65, 0x61, 0x64, 0x5f, 0x72, 0x75, 0x6e, 0x6e, 0x69, 0x6e, + 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x73, 0x71, 0x6c, 0x54, 0x68, 0x72, 0x65, + 0x61, 0x64, 0x52, 0x75, 0x6e, 0x6e, 0x69, 0x6e, 0x67, 0x12, 0x36, 0x0a, 0x17, 0x72, 0x65, 0x70, + 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x61, 0x67, 0x5f, 0x73, 0x65, 0x63, + 0x6f, 0x6e, 0x64, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x15, 0x72, 0x65, 0x70, 0x6c, + 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x61, 0x67, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, + 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x68, 0x6f, 0x73, 0x74, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x48, 0x6f, + 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x6f, 0x72, + 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, + 0x6f, 0x72, 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x5f, 0x72, + 0x65, 0x74, 0x72, 0x79, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x6e, + 0x65, 0x63, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x12, 0x2c, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, + 0x79, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x08, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x4c, 0x6f, 0x67, 0x50, 0x6f, + 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0d, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x70, + 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x66, + 0x69, 0x6c, 0x65, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x5b, 0x0a, 0x2b, 0x72, + 0x65, 0x6c, 0x61, 0x79, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, + 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x5f, 0x65, 0x71, 0x75, 0x69, 0x76, 0x61, 0x6c, 0x65, 0x6e, + 0x74, 0x5f, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x26, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x4c, 0x6f, 0x67, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x42, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x45, 0x71, 0x75, 0x69, 0x76, 0x61, 0x6c, 0x65, 0x6e, 0x74, + 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x0b, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x0e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, + 0x49, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x75, 0x75, 0x69, + 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, + 0x75, 0x69, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x69, 0x6f, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, + 0x0d, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x69, 0x6f, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x22, + 0x0a, 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x69, 0x6f, 0x5f, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, + 0x0e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x49, 0x6f, 0x45, 0x72, 0x72, + 0x6f, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x71, 0x6c, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, + 0x0f, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x73, 0x71, 0x6c, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, + 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x71, 0x6c, 0x5f, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x18, 0x10, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x53, 0x71, 0x6c, + 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x35, 0x0a, 0x17, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x5f, 0x6c, + 0x6f, 0x67, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x18, 0x11, 0x20, 0x01, 0x28, 0x09, 0x52, 0x14, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x4c, 0x6f, 0x67, + 0x46, 0x69, 0x6c, 0x65, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x75, 0x73, 0x65, 0x72, 0x18, 0x12, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x73, 0x65, 0x72, 0x12, 0x1b, 0x0a, + 0x09, 0x73, 0x71, 0x6c, 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x13, 0x20, 0x01, 0x28, 0x0d, + 0x52, 0x08, 0x73, 0x71, 0x6c, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x75, + 0x74, 0x6f, 0x5f, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x14, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x0c, 0x61, 0x75, 0x74, 0x6f, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, + 0x1d, 0x0a, 0x0a, 0x75, 0x73, 0x69, 0x6e, 0x67, 0x5f, 0x67, 0x74, 0x69, 0x64, 0x18, 0x15, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x09, 0x75, 0x73, 0x69, 0x6e, 0x67, 0x47, 0x74, 0x69, 0x64, 0x12, 0x36, + 0x0a, 0x17, 0x68, 0x61, 0x73, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x73, 0x18, 0x16, 0x20, 0x01, 0x28, 0x08, 0x52, + 0x15, 0x68, 0x61, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, + 0x69, 0x6c, 0x74, 0x65, 0x72, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x73, 0x6c, 0x5f, 0x61, 0x6c, + 0x6c, 0x6f, 0x77, 0x65, 0x64, 0x18, 0x17, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x73, 0x73, 0x6c, + 0x41, 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x64, 0x12, 0x36, 0x0a, 0x17, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x61, 0x67, 0x5f, 0x75, 0x6e, 0x6b, 0x6e, 0x6f, + 0x77, 0x6e, 0x18, 0x18, 0x20, 0x01, 0x28, 0x08, 0x52, 0x15, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x61, 0x67, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x22, + 0x77, 0x0a, 0x15, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2f, 0x0a, 0x06, 0x62, 0x65, 0x66, 0x6f, + 0x72, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x52, 0x06, 0x62, 0x65, 0x66, 0x6f, 0x72, 0x65, 0x12, 0x2d, 0x0a, 0x05, 0x61, 0x66, 0x74, + 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x52, 0x05, 0x61, 0x66, 0x74, 0x65, 0x72, 0x22, 0x50, 0x0a, 0x0d, 0x50, 0x72, 0x69, 0x6d, + 0x61, 0x72, 0x79, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0d, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x70, 0x6f, + 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x66, 0x69, + 0x6c, 0x65, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0xc3, 0x07, 0x0a, 0x0a, 0x46, + 0x75, 0x6c, 0x6c, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, + 0x5f, 0x75, 0x75, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x55, 0x75, 0x69, 0x64, 0x12, 0x46, 0x0a, 0x12, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x11, 0x72, 0x65, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, + 0x45, 0x0a, 0x0e, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x50, 0x72, 0x69, 0x6d, 0x61, 0x72, + 0x79, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0d, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x67, 0x74, 0x69, 0x64, 0x5f, 0x70, + 0x75, 0x72, 0x67, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x67, 0x74, 0x69, + 0x64, 0x50, 0x75, 0x72, 0x67, 0x65, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x12, 0x27, 0x0a, 0x0f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x6d, + 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x76, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x72, 0x65, + 0x61, 0x64, 0x5f, 0x6f, 0x6e, 0x6c, 0x79, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x72, + 0x65, 0x61, 0x64, 0x4f, 0x6e, 0x6c, 0x79, 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x74, 0x69, 0x64, 0x5f, + 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x74, 0x69, 0x64, + 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x5f, 0x66, + 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x62, 0x69, 0x6e, + 0x6c, 0x6f, 0x67, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x12, 0x28, 0x0a, 0x10, 0x62, 0x69, 0x6e, + 0x6c, 0x6f, 0x67, 0x5f, 0x72, 0x6f, 0x77, 0x5f, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x18, 0x0b, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x52, 0x6f, 0x77, 0x49, 0x6d, + 0x61, 0x67, 0x65, 0x12, 0x26, 0x0a, 0x0f, 0x6c, 0x6f, 0x67, 0x5f, 0x62, 0x69, 0x6e, 0x5f, 0x65, + 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, 0x6c, 0x6f, + 0x67, 0x42, 0x69, 0x6e, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x12, 0x2e, 0x0a, 0x13, 0x6c, + 0x6f, 0x67, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, + 0x65, 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x08, 0x52, 0x11, 0x6c, 0x6f, 0x67, 0x52, 0x65, 0x70, + 0x6c, 0x69, 0x63, 0x61, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x12, 0x39, 0x0a, 0x19, 0x73, + 0x65, 0x6d, 0x69, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, + 0x5f, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x08, 0x52, 0x16, + 0x73, 0x65, 0x6d, 0x69, 0x53, 0x79, 0x6e, 0x63, 0x50, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x45, + 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x12, 0x39, 0x0a, 0x19, 0x73, 0x65, 0x6d, 0x69, 0x5f, 0x73, + 0x79, 0x6e, 0x63, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x65, 0x6e, 0x61, 0x62, + 0x6c, 0x65, 0x64, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x08, 0x52, 0x16, 0x73, 0x65, 0x6d, 0x69, 0x53, + 0x79, 0x6e, 0x63, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, + 0x64, 0x12, 0x37, 0x0a, 0x18, 0x73, 0x65, 0x6d, 0x69, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x70, + 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x10, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x15, 0x73, 0x65, 0x6d, 0x69, 0x53, 0x79, 0x6e, 0x63, 0x50, 0x72, 0x69, + 0x6d, 0x61, 0x72, 0x79, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x37, 0x0a, 0x18, 0x73, 0x65, + 0x6d, 0x69, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, + 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x11, 0x20, 0x01, 0x28, 0x08, 0x52, 0x15, 0x73, 0x65, + 0x6d, 0x69, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, 0x19, 0x73, 0x65, 0x6d, 0x69, 0x5f, 0x73, 0x79, 0x6e, 0x63, + 0x5f, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, + 0x18, 0x12, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x16, 0x73, 0x65, 0x6d, 0x69, 0x53, 0x79, 0x6e, 0x63, + 0x50, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x39, + 0x0a, 0x19, 0x73, 0x65, 0x6d, 0x69, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x70, 0x72, 0x69, 0x6d, + 0x61, 0x72, 0x79, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x13, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x16, 0x73, 0x65, 0x6d, 0x69, 0x53, 0x79, 0x6e, 0x63, 0x50, 0x72, 0x69, 0x6d, 0x61, + 0x72, 0x79, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x45, 0x0a, 0x20, 0x73, 0x65, 0x6d, + 0x69, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x77, 0x61, 0x69, 0x74, 0x5f, 0x66, 0x6f, 0x72, 0x5f, + 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x14, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x1b, 0x73, 0x65, 0x6d, 0x69, 0x53, 0x79, 0x6e, 0x63, 0x57, 0x61, 0x69, + 0x74, 0x46, 0x6f, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x43, 0x6f, 0x75, 0x6e, 0x74, + 0x2a, 0x3b, 0x0a, 0x13, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x0e, 0x49, 0x4f, 0x41, 0x4e, 0x44, + 0x53, 0x51, 0x4c, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x10, 0x00, 0x12, 0x10, 0x0a, 0x0c, 0x49, + 0x4f, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x4f, 0x4e, 0x4c, 0x59, 0x10, 0x01, 0x42, 0x2e, 0x5a, + 0x2c, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, + 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x72, 0x65, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go b/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go index 9a7b297a4fa..e742713ec0a 100644 --- a/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go +++ b/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go @@ -214,6 +214,26 @@ func (m *Status) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i-- dAtA[i] = 0x20 } + if m.SqlThreadRunning { + i-- + if m.SqlThreadRunning { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x18 + } + if m.IoThreadRunning { + i-- + if m.IoThreadRunning { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x10 + } if len(m.Position) > 0 { i -= len(m.Position) copy(dAtA[i:], m.Position) @@ -547,6 +567,12 @@ func (m *Status) SizeVT() (n int) { if l > 0 { n += 1 + l + sov(uint64(l)) } + if m.IoThreadRunning { + n += 2 + } + if m.SqlThreadRunning { + n += 2 + } if m.ReplicationLagSeconds != 0 { n += 1 + sov(uint64(m.ReplicationLagSeconds)) } @@ -813,6 +839,46 @@ func (m *Status) UnmarshalVT(dAtA []byte) error { } m.Position = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field IoThreadRunning", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.IoThreadRunning = bool(v != 0) + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SqlThreadRunning", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.SqlThreadRunning = bool(v != 0) case 4: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field ReplicationLagSeconds", wireType) diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 83a1c52225e..68905db1ad5 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/topodata" throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" ) @@ -224,6 +225,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration { return t.threadThrottlers[threadID].throttle(t.nowFunc()) } +// MaxLag returns the max of all the last replication lag values seen across all tablets of +// the provided type, excluding ignored tablets. +func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { + cache := t.maxReplicationLagModule.lagCacheByType(tabletType) + + var maxLag uint32 + cacheEntries := cache.entries + + for key := range cacheEntries { + if cache.isIgnored(key) { + continue + } + + lag := cache.latest(key).Stats.ReplicationLagSeconds + if lag > maxLag { + maxLag = lag + } + } + + return maxLag +} + // ThreadFinished marks threadID as finished and redistributes the thread's // rate allotment across the other threads. // After ThreadFinished() is called, Throttle() must not be called anymore. diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 2fc4b289828..8e730475c43 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -205,7 +205,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt } qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT - if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) { + if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) { return nil, errTxThrottled } @@ -220,7 +220,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt } func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) { - if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) { + if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) { return nil, errTxThrottled } conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting) diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index 4131a97b039..934d933ecac 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1587,6 +1587,6 @@ func (m mockTxThrottler) Open() (err error) { func (m mockTxThrottler) Close() { } -func (m mockTxThrottler) Throttle(priority int) (result bool) { +func (m mockTxThrottler) Throttle(priority int, workload string) (result bool) { return m.throttle } diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index ee42fd513bc..fc622538e58 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -167,6 +167,8 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { flagutil.DualFormatStringListVar(fs, ¤tConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.") fs.IntVar(¤tConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information") fs.Var(currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly.") + fs.BoolVar(¤tConfig.TxThrottlerDryRun, "tx-throttler-dry-run", defaultConfig.TxThrottlerDryRun, "If present, the transaction throttler only records metrics about requests received and throttled, but does not actually throttle any requests.") + fs.DurationVar(¤tConfig.TxThrottlerTopoRefreshInterval, "tx-throttler-topo-refresh-interval", time.Minute*5, "The rate that the transaction throttler will refresh the topology to find cells.") fs.BoolVar(&enableHotRowProtection, "enable_hot_row_protection", false, "If true, incoming transactions for the same row (range) will be queued and cannot consume all txpool slots.") fs.BoolVar(&enableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", false, "If true, hot row protection is not enforced but logs if transactions would have been queued.") @@ -335,11 +337,13 @@ type TabletConfig struct { TwoPCCoordinatorAddress string `json:"-"` TwoPCAbandonAge Seconds `json:"-"` - EnableTxThrottler bool `json:"-"` - TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"` - TxThrottlerHealthCheckCells []string `json:"-"` - TxThrottlerDefaultPriority int `json:"-"` - TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"` + EnableTxThrottler bool `json:"-"` + TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"` + TxThrottlerHealthCheckCells []string `json:"-"` + TxThrottlerDefaultPriority int `json:"-"` + TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"` + TxThrottlerTopoRefreshInterval time.Duration `json:"-"` + TxThrottlerDryRun bool `json:"-"` EnableLagThrottler bool `json:"-"` @@ -534,9 +538,6 @@ func (c *TabletConfig) verifyTxThrottlerConfig() error { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse throttlerdatapb.Configuration config: %v", err) } - if len(c.TxThrottlerHealthCheckCells) == 0 { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "empty healthCheckCells given: %+v", c.TxThrottlerHealthCheckCells) - } if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v) } @@ -619,11 +620,13 @@ var defaultConfig = TabletConfig{ DeprecatedCacheResultFields: true, SignalWhenSchemaChange: true, - EnableTxThrottler: false, - TxThrottlerConfig: defaultTxThrottlerConfig(), - TxThrottlerHealthCheckCells: []string{}, - TxThrottlerDefaultPriority: sqlparser.MaxPriorityValue, // This leads to all queries being candidates to throttle - TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}, + EnableTxThrottler: false, + TxThrottlerConfig: defaultTxThrottlerConfig(), + TxThrottlerHealthCheckCells: []string{}, + TxThrottlerDefaultPriority: sqlparser.MaxPriorityValue, // This leads to all queries being candidates to throttle + TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}, + TxThrottlerDryRun: false, + TxThrottlerTopoRefreshInterval: time.Minute * 5, EnableLagThrottler: false, // Feature flag; to switch to 'true' at some stage in the future diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index 79c9091d077..1eae5218d2a 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -376,13 +376,6 @@ func TestVerifyTxThrottlerConfig(t *testing.T) { EnableTxThrottler: true, TxThrottlerConfig: &TxThrottlerConfigFlag{invalidMaxReplicationLagModuleConfig}, }, - { - // enabled without cells defined - Name: "enabled without cells", - ExpectedErrorCode: vtrpcpb.Code_FAILED_PRECONDITION, - EnableTxThrottler: true, - TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}, - }, { // enabled with good config (default/replica tablet type) Name: "enabled", diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index efb5fd5add4..cc3160fe1a9 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -488,7 +488,7 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { startTime := time.Now() - if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options)) { + if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options), options.GetWorkloadName()) { return errTxThrottled } var connSetting *pools.Setting diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_healthcheck_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_healthcheck_test.go index 17f21f7690b..84254003496 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_healthcheck_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/mock_healthcheck_test.go @@ -8,7 +8,7 @@ import ( context "context" reflect "reflect" - gomock "github.com/golang/mock/gomock" + gomock "go.uber.org/mock/gomock" discovery "vitess.io/vitess/go/vt/discovery" query "vitess.io/vitess/go/vt/proto/query" diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go index 53b827d591a..aeb75d258a3 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go @@ -8,10 +8,11 @@ import ( reflect "reflect" time "time" - gomock "github.com/golang/mock/gomock" + gomock "go.uber.org/mock/gomock" discovery "vitess.io/vitess/go/vt/discovery" throttlerdata "vitess.io/vitess/go/vt/proto/throttlerdata" + topodata "vitess.io/vitess/go/vt/proto/topodata" ) // MockThrottlerInterface is a mock of ThrottlerInterface interface. @@ -63,6 +64,20 @@ func (mr *MockThrottlerInterfaceMockRecorder) GetConfiguration() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).GetConfiguration)) } +// MaxLag mocks base method. +func (m *MockThrottlerInterface) MaxLag(tabletType topodata.TabletType) uint32 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MaxLag", tabletType) + ret0, _ := ret[0].(uint32) + return ret0 +} + +// MaxLag indicates an expected call of LastMaxLagNotIgnoredForTabletType. +func (mr *MockThrottlerInterfaceMockRecorder) MaxLag(tabletType interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottlerInterface)(nil).MaxLag), tabletType) +} + // MaxRate mocks base method. func (m *MockThrottlerInterface) MaxRate() int64 { m.ctrl.T.Helper() diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_topology_watcher_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_topology_watcher_test.go deleted file mode 100644 index 5afb16d3473..00000000000 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_topology_watcher_test.go +++ /dev/null @@ -1,58 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler (interfaces: TopologyWatcherInterface) - -// Package txthrottler is a generated GoMock package. -package txthrottler - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" -) - -// MockTopologyWatcherInterface is a mock of TopologyWatcherInterface interface. -type MockTopologyWatcherInterface struct { - ctrl *gomock.Controller - recorder *MockTopologyWatcherInterfaceMockRecorder -} - -// MockTopologyWatcherInterfaceMockRecorder is the mock recorder for MockTopologyWatcherInterface. -type MockTopologyWatcherInterfaceMockRecorder struct { - mock *MockTopologyWatcherInterface -} - -// NewMockTopologyWatcherInterface creates a new mock instance. -func NewMockTopologyWatcherInterface(ctrl *gomock.Controller) *MockTopologyWatcherInterface { - mock := &MockTopologyWatcherInterface{ctrl: ctrl} - mock.recorder = &MockTopologyWatcherInterfaceMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockTopologyWatcherInterface) EXPECT() *MockTopologyWatcherInterfaceMockRecorder { - return m.recorder -} - -// Start mocks base method. -func (m *MockTopologyWatcherInterface) Start() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Start") -} - -// Start indicates an expected call of Start. -func (mr *MockTopologyWatcherInterfaceMockRecorder) Start() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockTopologyWatcherInterface)(nil).Start)) -} - -// Stop mocks base method. -func (m *MockTopologyWatcherInterface) Stop() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Stop") -} - -// Stop indicates an expected call of Stop. -func (mr *MockTopologyWatcherInterfaceMockRecorder) Stop() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockTopologyWatcherInterface)(nil).Stop)) -} diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 30e2ec19c56..8147fcae4bc 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -19,8 +19,10 @@ package txthrottler import ( "context" "math/rand" + "reflect" "strings" "sync" + "sync/atomic" "time" "google.golang.org/protobuf/proto" @@ -31,48 +33,43 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/throttler" "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" querypb "vitess.io/vitess/go/vt/proto/query" throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) // These vars store the functions used to create the topo server, healthcheck, -// topology watchers and go/vt/throttler. These are provided here so that they can be overridden +// and go/vt/throttler. These are provided here so that they can be overridden // in tests to generate mocks. type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck -type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) var ( - healthCheckFactory healthCheckFactoryFunc - topologyWatcherFactory topologyWatcherFactoryFunc - throttlerFactory throttlerFactoryFunc + healthCheckFactory healthCheckFactoryFunc + throttlerFactory throttlerFactoryFunc ) func resetTxThrottlerFactories() { healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) } - topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { - return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency) - } throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) } } +func init() { + resetTxThrottlerFactories() +} + // TxThrottler defines the interface for the transaction throttler. type TxThrottler interface { InitDBConfig(target *querypb.Target) Open() (err error) Close() - Throttle(priority int) (result bool) -} - -func init() { - resetTxThrottlerFactories() + Throttle(priority int, workload string) (result bool) } // ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler @@ -87,6 +84,7 @@ type ThrottlerInterface interface { GetConfiguration() *throttlerdatapb.Configuration UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error ResetConfiguration() + MaxLag(tabletType topodatapb.TabletType) uint32 } // TopologyWatcherInterface defines the public interface that is implemented by @@ -101,6 +99,17 @@ type TopologyWatcherInterface interface { // go/vt/throttler.GlobalManager. const TxThrottlerName = "TransactionThrottler" +// fetchKnownCells gathers a list of known cells from the topology. On error, +// the cell of the local tablet will be used and an error is logged. +func fetchKnownCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) []string { + cells, err := topoServer.GetKnownCells(ctx) + if err != nil { + log.Errorf("txThrottler: falling back to local cell due to error fetching cells from topology: %+v", err) + cells = []string{target.Cell} + } + return cells +} + // txThrottler implements TxThrottle for throttling transactions based on replication lag. // It's a thin wrapper around the throttler found in vitess/go/vt/throttler. // It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler. @@ -129,44 +138,33 @@ const TxThrottlerName = "TransactionThrottler" // be executing a method. The only exception is the 'Throttle' method where multiple goroutines are // allowed to execute it concurrently. type txThrottler struct { - // config stores the transaction throttler's configuration. - // It is populated in NewTxThrottler and is not modified - // since. - config *txThrottlerConfig + config *tabletenv.TabletConfig // state holds an open transaction throttler state. It is nil // if the TransactionThrottler is closed. - state *txThrottlerState + state txThrottlerState target *querypb.Target topoServer *topo.Server // stats - throttlerRunning *stats.Gauge - requestsTotal *stats.Counter - requestsThrottled *stats.Counter + throttlerRunning *stats.Gauge + healthChecksReadTotal *stats.CountersWithMultiLabels + healthChecksRecordedTotal *stats.CountersWithMultiLabels + requestsTotal *stats.CountersWithSingleLabel + requestsThrottled *stats.CountersWithSingleLabel } -// txThrottlerConfig holds the parameters that need to be -// passed when constructing a TxThrottler object. -type txThrottlerConfig struct { - // enabled is true if the transaction throttler is enabled. All methods - // of a disabled transaction throttler do nothing and Throttle() always - // returns false. - enabled bool - - throttlerConfig *throttlerdatapb.Configuration - // healthCheckCells stores the cell names in which running vttablets will be monitored for - // replication lag. - healthCheckCells []string - - // tabletTypes stores the tablet types for throttling - tabletTypes *topoproto.TabletTypeListFlag +type txThrottlerState interface { + deallocateResources() + StatsUpdate(tabletStats *discovery.TabletHealth) + throttle() bool } -// txThrottlerState holds the state of an open TxThrottler object. -type txThrottlerState struct { - config *txThrottlerConfig +// txThrottlerStateImpl holds the state of an open TxThrottler object. +type txThrottlerStateImpl struct { + config *tabletenv.TabletConfig + txThrottler *txThrottler // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. @@ -175,38 +173,44 @@ type txThrottlerState struct { stopHealthCheck context.CancelFunc healthCheck discovery.HealthCheck - topologyWatchers []TopologyWatcherInterface + healthCheckChan chan *discovery.TabletHealth + healthCheckCells []string + cellsFromTopo bool + + // tabletTypes stores the tablet types for throttling + tabletTypes map[topodatapb.TabletType]bool + + maxLag int64 + done chan bool + waitForTermination sync.WaitGroup } -// NewTxThrottler tries to construct a txThrottler from the -// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if -// any error occurs. -// This function calls tryCreateTxThrottler that does the actual creation work -// and returns an error if one occurred. +// NewTxThrottler tries to construct a txThrottler from the relevant +// fields in the tabletenv.Env and topo.Server objects. func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { - throttlerConfig := &txThrottlerConfig{enabled: false} - - if env.Config().EnableTxThrottler { - // Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells - // is immutable. - healthCheckCells := env.Config().TxThrottlerHealthCheckCells - - throttlerConfig = &txThrottlerConfig{ - enabled: true, - tabletTypes: env.Config().TxThrottlerTabletTypes, - throttlerConfig: env.Config().TxThrottlerConfig.Get(), - healthCheckCells: healthCheckCells, + config := env.Config() + if config.EnableTxThrottler { + if len(config.TxThrottlerHealthCheckCells) == 0 { + defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, cellsFromTopo: true, topoRefreshInterval: %s, throttlerConfig: %q", + config.TxThrottlerTabletTypes, config.TxThrottlerTopoRefreshInterval, config.TxThrottlerConfig.Get(), + ) + } else { + defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, healthCheckCells: %+v, throttlerConfig: %q", + config.TxThrottlerTabletTypes, config.TxThrottlerHealthCheckCells, config.TxThrottlerConfig.Get(), + ) } - - defer log.Infof("Initialized transaction throttler with config: %+v", throttlerConfig) } return &txThrottler{ - config: throttlerConfig, - topoServer: topoServer, - throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), - requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"), - requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"), + config: config, + topoServer: topoServer, + throttlerRunning: env.Exporter().NewGauge(TxThrottlerName+"Running", "transaction throttler running state"), + healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRead", "transaction throttler healthchecks read", + []string{"cell", "DbType"}), + healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRecorded", "transaction throttler healthchecks recorded", + []string{"cell", "DbType"}), + requestsTotal: env.Exporter().NewCountersWithSingleLabel(TxThrottlerName+"Requests", "transaction throttler requests", "workload"), + requestsThrottled: env.Exporter().NewCountersWithSingleLabel(TxThrottlerName+"Throttled", "transaction throttler requests throttled", "workload"), } } @@ -217,7 +221,7 @@ func (t *txThrottler) InitDBConfig(target *querypb.Target) { // Open opens the transaction throttler. It must be called prior to 'Throttle'. func (t *txThrottler) Open() (err error) { - if !t.config.enabled { + if !t.config.EnableTxThrottler { return nil } if t.state != nil { @@ -225,7 +229,7 @@ func (t *txThrottler) Open() (err error) { } log.Info("txThrottler: opening") t.throttlerRunning.Set(1) - t.state, err = newTxThrottlerState(t.topoServer, t.config, t.target) + t.state, err = newTxThrottlerState(t, t.config, t.target) return err } @@ -233,7 +237,7 @@ func (t *txThrottler) Open() (err error) { // It should be called after the throttler is no longer needed. // It's ok to call this method on a closed throttler--in which case the method does nothing. func (t *txThrottler) Close() { - if !t.config.enabled { + if !t.config.EnableTxThrottler { return } if t.state == nil { @@ -249,8 +253,8 @@ func (t *txThrottler) Close() { // It returns true if the transaction should not proceed (the caller // should back off). Throttle requires that Open() was previously called // successfully. -func (t *txThrottler) Throttle(priority int) (result bool) { - if !t.config.enabled { +func (t *txThrottler) Throttle(priority int, workload string) (result bool) { + if !t.config.EnableTxThrottler { return false } if t.state == nil { @@ -259,18 +263,18 @@ func (t *txThrottler) Throttle(priority int) (result bool) { // Throttle according to both what the throttler state says and the priority. Workloads with lower priority value // are less likely to be throttled. - result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority + result = rand.Intn(sqlparser.MaxPriorityValue) < priority && t.state.throttle() - t.requestsTotal.Add(1) + t.requestsTotal.Add(workload, 1) if result { - t.requestsThrottled.Add(1) + t.requestsThrottled.Add(workload, 1) } - return result + return result && !t.config.TxThrottlerDryRun } -func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) { - maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig} +func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfig, target *querypb.Target) (txThrottlerState, error) { + maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.TxThrottlerConfig.Get()} t, err := throttlerFactory( TxThrottlerName, @@ -282,91 +286,155 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar if err != nil { return nil, err } - if err := t.UpdateConfiguration(config.throttlerConfig, true /* copyZeroValues */); err != nil { + if err := t.UpdateConfiguration(config.TxThrottlerConfig.Get(), true /* copyZeroValues */); err != nil { t.Close() return nil, err } - result := &txThrottlerState{ - config: config, - throttler: t, + + tabletTypes := make(map[topodatapb.TabletType]bool, len(*config.TxThrottlerTabletTypes)) + for _, tabletType := range *config.TxThrottlerTabletTypes { + tabletTypes[tabletType] = true } - createTxThrottlerHealthCheck(topoServer, config, result, target.Cell) - - result.topologyWatchers = make( - []TopologyWatcherInterface, 0, len(config.healthCheckCells)) - for _, cell := range config.healthCheckCells { - result.topologyWatchers = append( - result.topologyWatchers, - topologyWatcherFactory( - topoServer, - result.healthCheck, - cell, - target.Keyspace, - target.Shard, - discovery.DefaultTopologyWatcherRefreshInterval, - discovery.DefaultTopoReadConcurrency)) + + state := &txThrottlerStateImpl{ + config: config, + healthCheckCells: config.TxThrottlerHealthCheckCells, + tabletTypes: tabletTypes, + throttler: t, + txThrottler: txThrottler, + done: make(chan bool, 1), + } + + // get cells from topo if none defined in tabletenv config + if len(state.healthCheckCells) == 0 { + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) + state.cellsFromTopo = true } - return result, nil -} -func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) { ctx, cancel := context.WithCancel(context.Background()) - result.stopHealthCheck = cancel - result.healthCheck = healthCheckFactory(topoServer, cell, config.healthCheckCells) - ch := result.healthCheck.Subscribe() - go func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case th := <-ch: - result.StatsUpdate(th) - } + state.stopHealthCheck = cancel + state.initHealthCheckStream(txThrottler.topoServer, target) + go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) + state.waitForTermination.Add(1) + go state.updateMaxLag() + + return state, nil +} + +func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) { + ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) + ts.healthCheckChan = ts.healthCheck.Subscribe() + +} + +func (ts *txThrottlerStateImpl) closeHealthCheckStream() { + if ts.healthCheck == nil { + return + } + ts.stopHealthCheck() + ts.healthCheck.Close() +} + +func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { + fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + + knownCells := fetchKnownCells(fetchCtx, topoServer, target) + if !reflect.DeepEqual(knownCells, ts.healthCheckCells) { + log.Info("txThrottler: restarting healthcheck stream due to topology cells update") + ts.healthCheckCells = knownCells + ts.closeHealthCheckStream() + ts.initHealthCheckStream(topoServer, target) + } +} + +func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { + var cellsUpdateTicks <-chan time.Time + if ts.cellsFromTopo { + ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) + cellsUpdateTicks = ticker.C + defer ticker.Stop() + } + for { + select { + case <-ctx.Done(): + return + case <-cellsUpdateTicks: + ts.updateHealthCheckCells(ctx, topoServer, target) + case th := <-ts.healthCheckChan: + ts.StatsUpdate(th) } - }(ctx) + } } -func (ts *txThrottlerState) throttle() bool { +func (ts *txThrottlerStateImpl) throttle() bool { if ts.throttler == nil { - log.Error("throttle called after deallocateResources was called") + log.Error("txThrottler: throttle called after deallocateResources was called") return false } // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() defer ts.throttleMu.Unlock() - return ts.throttler.Throttle(0 /* threadId */) > 0 + + maxLag := atomic.LoadInt64(&ts.maxLag) + + return maxLag > ts.config.TxThrottlerConfig.TargetReplicationLagSec && + ts.throttler.Throttle(0 /* threadId */) > 0 } -func (ts *txThrottlerState) deallocateResources() { - // We don't really need to nil out the fields here - // as deallocateResources is not expected to be called - // more than once, but it doesn't hurt to do so. - for _, watcher := range ts.topologyWatchers { - watcher.Stop() +func (ts *txThrottlerStateImpl) updateMaxLag() { + defer ts.waitForTermination.Done() + // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value + ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second) + defer ticker.Stop() +outerloop: + for { + select { + case <-ticker.C: + var maxLag uint32 + + for tabletType := range ts.tabletTypes { + maxLagPerTabletType := ts.throttler.MaxLag(tabletType) + if maxLagPerTabletType > maxLag { + maxLag = maxLagPerTabletType + } + } + atomic.StoreInt64(&ts.maxLag, int64(maxLag)) + case <-ts.done: + break outerloop + } } - ts.topologyWatchers = nil +} - ts.healthCheck.Close() +func (ts *txThrottlerStateImpl) deallocateResources() { + // Close healthcheck and topo watchers + ts.closeHealthCheckStream() ts.healthCheck = nil - // After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not + ts.done <- true + ts.waitForTermination.Wait() + // After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not // to be executing, so we can safely close the throttler. ts.throttler.Close() ts.throttler = nil } // StatsUpdate updates the health of a tablet with the given healthcheck. -func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) { - if ts.config.tabletTypes == nil { +func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth) { + if len(ts.tabletTypes) == 0 { return } + tabletType := tabletStats.Target.TabletType + metricLabels := []string{tabletStats.Target.Cell, tabletType.String()} + ts.txThrottler.healthChecksReadTotal.Add(metricLabels, 1) + // Monitor tablets for replication lag if they have a tablet - // type specified by the --tx_throttler_tablet_types flag. - for _, expectedTabletType := range *ts.config.tabletTypes { - if tabletStats.Target.TabletType == expectedTabletType { - ts.throttler.RecordReplicationLag(time.Now(), tabletStats) - return - } + // type specified by the --tx-throttler-tablet-types flag. + if ts.tabletTypes[tabletType] { + ts.throttler.RecordReplicationLag(time.Now(), tabletStats) + ts.txThrottler.healthChecksRecordedTotal.Add(metricLabels, 1) } } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index ffb88bf21f6..de50f32378d 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -19,14 +19,15 @@ package txthrottler // Commands to generate the mocks for this test. //go:generate mockgen -destination mock_healthcheck_test.go -package txthrottler -mock_names "HealthCheck=MockHealthCheck" vitess.io/vitess/go/vt/discovery HealthCheck //go:generate mockgen -destination mock_throttler_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler ThrottlerInterface -//go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface import ( + "context" + "sync/atomic" "testing" "time" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/throttler" @@ -49,7 +50,7 @@ func TestDisabledThrottler(t *testing.T) { Shard: "shard", }) assert.Nil(t, throttler.Open()) - assert.False(t, throttler.Throttle(0)) + assert.False(t, throttler.Throttle(0, "some-workload")) throttlerImpl, _ := throttler.(*txThrottler) assert.Zero(t, throttlerImpl.throttlerRunning.Get()) throttler.Close() @@ -71,47 +72,54 @@ func TestEnabledThrottler(t *testing.T) { return mockHealthCheck } - topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { - assert.Equal(t, ts, topoServer) - assert.Contains(t, []string{"cell1", "cell2"}, cell) - assert.Equal(t, "keyspace", keyspace) - assert.Equal(t, "shard", shard) - result := NewMockTopologyWatcherInterface(mockCtrl) - result.EXPECT().Stop() - return result - } - mockThrottler := NewMockThrottlerInterface(mockCtrl) throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { assert.Equal(t, 1, threadCount) return mockThrottler, nil } - call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */) - call1 := mockThrottler.EXPECT().Throttle(0) - call1.Return(0 * time.Second) + var calls []*gomock.Call + + call := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */) + calls = append(calls, call) + + // 1 + call = mockThrottler.EXPECT().Throttle(0) + call.Return(0 * time.Second) + calls = append(calls, call) + tabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ + Cell: "cell1", TabletType: topodatapb.TabletType_REPLICA, }, } - call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) - call3 := mockThrottler.EXPECT().Throttle(0) - call3.Return(1 * time.Second) - call4 := mockThrottler.EXPECT().Throttle(0) - call4.Return(1 * time.Second) - calllast := mockThrottler.EXPECT().Close() + call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) + calls = append(calls, call) + + // 2 + call = mockThrottler.EXPECT().Throttle(0) + call.Return(1 * time.Second) + calls = append(calls, call) + + // 3 + // Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first + // whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle() + + // 4 + // Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first + // whether there is lag or not, so no call to the underlying mockThrottler is issued. + + call = mockThrottler.EXPECT().Close() + calls = append(calls, call) - call1.After(call0) - call2.After(call1) - call3.After(call2) - call4.After(call3) - calllast.After(call4) + for i := 1; i < len(calls); i++ { + calls[i].After(calls[i-1]) + } config := tabletenv.NewDefaultConfig() config.EnableTxThrottler = true - config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"} config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA} env := tabletenv.NewEnv(config, t.Name()) @@ -119,60 +127,121 @@ func TestEnabledThrottler(t *testing.T) { throttlerImpl, _ := throttler.(*txThrottler) assert.NotNil(t, throttlerImpl) throttler.InitDBConfig(&querypb.Target{ + Cell: "cell1", Keyspace: "keyspace", Shard: "shard", }) + assert.Nil(t, throttlerImpl.Open()) + throttlerStateImpl, ok := throttlerImpl.state.(*txThrottlerStateImpl) + assert.True(t, ok) + assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes) assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) - assert.False(t, throttlerImpl.Throttle(100)) - assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Get()) - assert.Zero(t, throttlerImpl.requestsThrottled.Get()) + // Stop the go routine that keeps updating the cached shard's max lag to prevent it from changing the value in a + // way that will interfere with how we manipulate that value in our tests to evaluate different cases: + throttlerStateImpl.done <- true + + // 1 should not throttle due to return value of underlying Throttle(), despite high lag + atomic.StoreInt64(&throttlerStateImpl.maxLag, 20) + assert.False(t, throttlerImpl.Throttle(100, "some-workload")) + assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some-workload"]) + assert.Zero(t, throttlerImpl.requestsThrottled.Counts()["some-workload"]) throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) rdonlyTabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ + Cell: "cell2", TabletType: topodatapb.TabletType_RDONLY, }, } // This call should not be forwarded to the go/vt/throttlerImpl.Throttler object. throttlerImpl.state.StatsUpdate(rdonlyTabletStats) - // The second throttle call should reject. - assert.True(t, throttlerImpl.Throttle(100)) - assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Get()) - assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) - - // This call should not throttle due to priority. Check that's the case and counters agree. - assert.False(t, throttlerImpl.Throttle(0)) - assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Get()) - assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) - throttlerImpl.Close() + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttlerImpl.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) + + // 2 should throttle due to return value of underlying Throttle(), high lag & priority = 100 + assert.True(t, throttlerImpl.Throttle(100, "some-workload")) + assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) + + // 3 should not throttle despite return value of underlying Throttle() and high lag, due to priority = 0 + assert.False(t, throttlerImpl.Throttle(0, "some-workload")) + assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) + + // 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag + atomic.StoreInt64(&throttlerStateImpl.maxLag, 1) + assert.False(t, throttler.Throttle(100, "some-workload")) + assert.Equal(t, int64(4), throttlerImpl.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) + + throttler.Close() assert.Zero(t, throttlerImpl.throttlerRunning.Get()) } -func TestNewTxThrottler(t *testing.T) { +func TestFetchKnownCells(t *testing.T) { + { + ts := memorytopo.NewServer("cell1", "cell2") + cells := fetchKnownCells(context.Background(), ts, &querypb.Target{Cell: "cell1"}) + assert.Equal(t, []string{"cell1", "cell2"}, cells) + } + { + ts := memorytopo.NewServer() + cells := fetchKnownCells(context.Background(), ts, &querypb.Target{Cell: "cell1"}) + assert.Equal(t, []string{"cell1"}, cells) + } +} + +func TestDryRunThrottler(t *testing.T) { config := tabletenv.NewDefaultConfig() env := tabletenv.NewEnv(config, t.Name()) - { - // disabled - config.EnableTxThrottler = false - throttler := NewTxThrottler(env, nil) - throttlerImpl, _ := throttler.(*txThrottler) - assert.NotNil(t, throttlerImpl) - assert.NotNil(t, throttlerImpl.config) - assert.False(t, throttlerImpl.config.enabled) + testCases := []struct { + Name string + txThrottlerStateShouldThrottle bool + throttlerDryRun bool + expectedResult bool + }{ + {Name: "Real run throttles when txThrottlerStateImpl says it should", txThrottlerStateShouldThrottle: true, throttlerDryRun: false, expectedResult: true}, + {Name: "Real run does not throttle when txThrottlerStateImpl says it should not", txThrottlerStateShouldThrottle: false, throttlerDryRun: false, expectedResult: false}, + {Name: "Dry run does not throttle when txThrottlerStateImpl says it should", txThrottlerStateShouldThrottle: true, throttlerDryRun: true, expectedResult: false}, + {Name: "Dry run does not throttle when txThrottlerStateImpl says it should not", txThrottlerStateShouldThrottle: false, throttlerDryRun: true, expectedResult: false}, } - { - // enabled - config.EnableTxThrottler = true - config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"} - config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA} - throttler := NewTxThrottler(env, nil) - throttlerImpl, _ := throttler.(*txThrottler) - assert.NotNil(t, throttlerImpl) - assert.NotNil(t, throttlerImpl.config) - assert.True(t, throttlerImpl.config.enabled) - assert.Equal(t, []string{"cell1", "cell2"}, throttlerImpl.config.healthCheckCells) + + for _, aTestCase := range testCases { + theTestCase := aTestCase + + t.Run(theTestCase.Name, func(t *testing.T) { + aTxThrottler := &txThrottler{ + config: &tabletenv.TabletConfig{ + EnableTxThrottler: true, + TxThrottlerDryRun: theTestCase.throttlerDryRun, + }, + state: &mockTxThrottlerState{shouldThrottle: theTestCase.txThrottlerStateShouldThrottle}, + throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), + requestsTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerRequests", "transaction throttler requests", "workload"), + requestsThrottled: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerThrottled", "transaction throttler requests throttled", "workload"), + } + + assert.Equal(t, theTestCase.expectedResult, aTxThrottler.Throttle(100, "some-workload")) + }) } } + +type mockTxThrottlerState struct { + shouldThrottle bool +} + +func (t *mockTxThrottlerState) deallocateResources() { + +} +func (t *mockTxThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) { + +} + +func (t *mockTxThrottlerState) throttle() bool { + return t.shouldThrottle +} diff --git a/proto/replicationdata.proto b/proto/replicationdata.proto index 536ea2c4d13..750351afbb2 100644 --- a/proto/replicationdata.proto +++ b/proto/replicationdata.proto @@ -25,9 +25,8 @@ package replicationdata; // flavor-specific command and parsed into a Position and fields. message Status { string position = 1; - // 2 used to be io_thread_running. Instead io_state is used. - // 3 used to be sql_thread_running. Instead sql_state is used. - reserved 2, 3; + bool io_thread_running = 2; + bool sql_thread_running = 3; uint32 replication_lag_seconds = 4; string source_host = 5; int32 source_port = 6; diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 4ebcf2fb977..3f1eb62f669 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -31369,6 +31369,12 @@ export namespace replicationdata { /** Status position */ position?: (string|null); + /** Status io_thread_running */ + io_thread_running?: (boolean|null); + + /** Status sql_thread_running */ + sql_thread_running?: (boolean|null); + /** Status replication_lag_seconds */ replication_lag_seconds?: (number|null); @@ -31445,6 +31451,12 @@ export namespace replicationdata { /** Status position. */ public position: string; + /** Status io_thread_running. */ + public io_thread_running: boolean; + + /** Status sql_thread_running. */ + public sql_thread_running: boolean; + /** Status replication_lag_seconds. */ public replication_lag_seconds: number; diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index 8bb4ceb2c83..eac24db610c 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -74075,6 +74075,8 @@ $root.replicationdata = (function() { * @memberof replicationdata * @interface IStatus * @property {string|null} [position] Status position + * @property {boolean|null} [io_thread_running] Status io_thread_running + * @property {boolean|null} [sql_thread_running] Status sql_thread_running * @property {number|null} [replication_lag_seconds] Status replication_lag_seconds * @property {string|null} [source_host] Status source_host * @property {number|null} [source_port] Status source_port @@ -74121,6 +74123,22 @@ $root.replicationdata = (function() { */ Status.prototype.position = ""; + /** + * Status io_thread_running. + * @member {boolean} io_thread_running + * @memberof replicationdata.Status + * @instance + */ + Status.prototype.io_thread_running = false; + + /** + * Status sql_thread_running. + * @member {boolean} sql_thread_running + * @memberof replicationdata.Status + * @instance + */ + Status.prototype.sql_thread_running = false; + /** * Status replication_lag_seconds. * @member {number} replication_lag_seconds @@ -74315,6 +74333,10 @@ $root.replicationdata = (function() { writer = $Writer.create(); if (message.position != null && Object.hasOwnProperty.call(message, "position")) writer.uint32(/* id 1, wireType 2 =*/10).string(message.position); + if (message.io_thread_running != null && Object.hasOwnProperty.call(message, "io_thread_running")) + writer.uint32(/* id 2, wireType 0 =*/16).bool(message.io_thread_running); + if (message.sql_thread_running != null && Object.hasOwnProperty.call(message, "sql_thread_running")) + writer.uint32(/* id 3, wireType 0 =*/24).bool(message.sql_thread_running); if (message.replication_lag_seconds != null && Object.hasOwnProperty.call(message, "replication_lag_seconds")) writer.uint32(/* id 4, wireType 0 =*/32).uint32(message.replication_lag_seconds); if (message.source_host != null && Object.hasOwnProperty.call(message, "source_host")) @@ -74394,6 +74416,12 @@ $root.replicationdata = (function() { case 1: message.position = reader.string(); break; + case 2: + message.io_thread_running = reader.bool(); + break; + case 3: + message.sql_thread_running = reader.bool(); + break; case 4: message.replication_lag_seconds = reader.uint32(); break; @@ -74495,6 +74523,12 @@ $root.replicationdata = (function() { if (message.position != null && message.hasOwnProperty("position")) if (!$util.isString(message.position)) return "position: string expected"; + if (message.io_thread_running != null && message.hasOwnProperty("io_thread_running")) + if (typeof message.io_thread_running !== "boolean") + return "io_thread_running: boolean expected"; + if (message.sql_thread_running != null && message.hasOwnProperty("sql_thread_running")) + if (typeof message.sql_thread_running !== "boolean") + return "sql_thread_running: boolean expected"; if (message.replication_lag_seconds != null && message.hasOwnProperty("replication_lag_seconds")) if (!$util.isInteger(message.replication_lag_seconds)) return "replication_lag_seconds: integer expected"; @@ -74575,6 +74609,10 @@ $root.replicationdata = (function() { var message = new $root.replicationdata.Status(); if (object.position != null) message.position = String(object.position); + if (object.io_thread_running != null) + message.io_thread_running = Boolean(object.io_thread_running); + if (object.sql_thread_running != null) + message.sql_thread_running = Boolean(object.sql_thread_running); if (object.replication_lag_seconds != null) message.replication_lag_seconds = object.replication_lag_seconds >>> 0; if (object.source_host != null) @@ -74635,6 +74673,8 @@ $root.replicationdata = (function() { var object = {}; if (options.defaults) { object.position = ""; + object.io_thread_running = false; + object.sql_thread_running = false; object.replication_lag_seconds = 0; object.source_host = ""; object.source_port = 0; @@ -74659,6 +74699,10 @@ $root.replicationdata = (function() { } if (message.position != null && message.hasOwnProperty("position")) object.position = message.position; + if (message.io_thread_running != null && message.hasOwnProperty("io_thread_running")) + object.io_thread_running = message.io_thread_running; + if (message.sql_thread_running != null && message.hasOwnProperty("sql_thread_running")) + object.sql_thread_running = message.sql_thread_running; if (message.replication_lag_seconds != null && message.hasOwnProperty("replication_lag_seconds")) object.replication_lag_seconds = message.replication_lag_seconds; if (message.source_host != null && message.hasOwnProperty("source_host"))