Skip to content

Commit

Permalink
integrate tabletbalancer into vtgate
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Demmer <mdemmer@slack-corp.com>
Co-authored-by: Venkatraju V <venkatraju@slack-corp.com>
  • Loading branch information
demmer and venkatraju committed Jul 8, 2024
1 parent 73b16ba commit 4fd193b
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 8 deletions.
4 changes: 3 additions & 1 deletion go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ Flags:
--allow-kill-statement Allows the execution of kill statement
--allowed_tablet_types strings Specifies the tablet types this vtgate is allowed to route queries to. Should be provided as a comma-separated set of tablet types.
--alsologtostderr log to standard error as well as files
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--balancer_enabled Whether to enable the tablet balancer to evenly spread query load
--balancer_keyspaces strings When in balanced mode, a comma-separated list of keyspaces for which to use the balancer (optional)
--balancer_vtgate_cells strings When in balanced mode, a comma-separated list of cells that contain vtgates (required) --bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--buffer_drain_concurrency int Maximum number of requests retried simultaneously. More concurrency will increase the load on the PRIMARY vttablet when draining the buffer. (default 1)
--buffer_keyspace_shards string If not empty, limit buffering to these entries (comma separated). Entry format: keyspace or keyspace/shard. Requires --enable_buffer=true.
--buffer_max_failover_duration duration Stop buffering completely if a failover takes longer than this duration. (default 20s)
Expand Down
66 changes: 59 additions & 7 deletions go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"context"
"fmt"
"math/rand/v2"
"net/http"
"runtime/debug"
"slices"
"sort"
"sync"
"sync/atomic"
Expand All @@ -37,6 +39,7 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/balancer"
"vitess.io/vitess/go/vt/vtgate/buffer"
"vitess.io/vitess/go/vt/vttablet/queryservice"

Expand All @@ -54,6 +57,11 @@ var (
// retryCount is the number of times a query will be retried on error
retryCount = 2

// configuration flags for the tablet balancer
balancerEnabled bool
balancerVtgateCells []string
balancerKeyspaces []string

logCollations = logutil.NewThrottledLogger("CollationInconsistent", 1*time.Minute)
)

Expand All @@ -62,6 +70,9 @@ func init() {
fs.StringVar(&CellsToWatch, "cells_to_watch", "", "comma-separated list of cells for watching tablets")
fs.DurationVar(&initialTabletTimeout, "gateway_initial_tablet_timeout", 30*time.Second, "At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type")
fs.IntVar(&retryCount, "retry-count", 2, "retry count")
fs.BoolVar(&balancerEnabled, "balancer_enabled", false, "Whether to enable the tablet balancer to evenly spread query load")
fs.StringSliceVar(&balancerVtgateCells, "balancer_vtgate_cells", []string{}, "When in balanced mode, a comma-separated list of cells that contain vtgates (required)")
fs.StringSliceVar(&balancerKeyspaces, "balancer_keyspaces", []string{}, "When in balanced mode, a comma-separated list of keyspaces for which to use the balancer (optional)")
})
}

Expand All @@ -84,6 +95,9 @@ type TabletGateway struct {

// buffer, if enabled, buffers requests during a detected PRIMARY failover.
buffer *buffer.Buffer

// balancer used for routing to tablets
balancer balancer.TabletBalancer
}

func createHealthCheck(ctx context.Context, retryDelay, timeout time.Duration, ts *topo.Server, cell, cellsToWatch string) discovery.HealthCheck {
Expand Down Expand Up @@ -112,6 +126,9 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop
statusAggregators: make(map[string]*TabletStatusAggregator),
}
gw.setupBuffering(ctx)
if balancerEnabled {
gw.setupBalancer(ctx)
}
gw.QueryService = queryservice.Wrap(nil, gw.withRetry)
return gw
}
Expand Down Expand Up @@ -145,6 +162,13 @@ func (gw *TabletGateway) setupBuffering(ctx context.Context) {
}(bufferCtx, ksChan, gw.buffer)
}

func (gw *TabletGateway) setupBalancer(ctx context.Context) {
if len(balancerVtgateCells) == 0 {
log.Exitf("balancer_vtgate_cells is required for balanced mode")
}
gw.balancer = balancer.NewTabletBalancer(gw.localCell, balancerVtgateCells)
}

// QueryServiceByAlias satisfies the Gateway interface
func (gw *TabletGateway) QueryServiceByAlias(ctx context.Context, alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) {
qs, err := gw.hc.TabletConnection(ctx, alias, target)
Expand Down Expand Up @@ -220,6 +244,15 @@ func (gw *TabletGateway) CacheStatus() TabletCacheStatusList {
return res
}

func (gw *TabletGateway) DebugBalancerHandler(w http.ResponseWriter, r *http.Request) {
if balancerEnabled {
gw.balancer.DebugHandler(w, r)
} else {
w.Header().Set("Content-Type", "text/plain")
w.Write([]byte("not enabled"))
}
}

// withRetry gets available connections and executes the action. If there are retryable errors,
// it retries retryCount times before failing. It does not retry if the connection is in
// the middle of a transaction. While returning the error check if it maybe a result of
Expand Down Expand Up @@ -306,16 +339,35 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target,
break
}

gw.shuffleTablets(gw.localCell, tablets)

var th *discovery.TabletHealth
// skip tablets we tried before
for _, t := range tablets {
if _, ok := invalidTablets[topoproto.TabletAliasString(t.Tablet.Alias)]; !ok {
th = t
break

useBalancer := balancerEnabled
if balancerEnabled && len(balancerKeyspaces) > 0 {
useBalancer = slices.Contains(balancerKeyspaces, target.Keyspace)
}
if useBalancer {
// filter out the tablets that we've tried before (if any), then pick the best one
if len(invalidTablets) > 0 {
tablets = slices.DeleteFunc(tablets, func(t *discovery.TabletHealth) bool {
_, isInvalid := invalidTablets[topoproto.TabletAliasString(t.Tablet.Alias)]
return isInvalid
})
}

th = gw.balancer.Pick(target, tablets)

} else {
gw.shuffleTablets(gw.localCell, tablets)

// skip tablets we tried before
for _, t := range tablets {
if _, ok := invalidTablets[topoproto.TabletAliasString(t.Tablet.Alias)]; !ok {
th = t
break
}
}
}

if th == nil {
// do not override error from last attempt.
if err == nil {
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ func Init(
})
vtgateInst.registerDebugHealthHandler()
vtgateInst.registerDebugEnvHandler()
vtgateInst.registerDebugBalancerHandler()

initAPI(gw.hc)
return vtgateInst
Expand Down Expand Up @@ -437,6 +438,12 @@ func (vtg *VTGate) registerDebugHealthHandler() {
})
}

func (vtg *VTGate) registerDebugBalancerHandler() {
http.HandleFunc("/debug/balancer", func(w http.ResponseWriter, r *http.Request) {
vtg.Gateway().DebugBalancerHandler(w, r)
})
}

// IsHealthy returns nil if server is healthy.
// Otherwise, it returns an error indicating the reason.
func (vtg *VTGate) IsHealthy() error {
Expand Down

0 comments on commit 4fd193b

Please sign in to comment.