Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Routing balancer all #42

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions xds/internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ import (
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the CDS balancer
_ "google.golang.org/grpc/xds/internal/balancer/edsbalancer" // Register the EDS balancer
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" // Register the weighted_target balancer
_ "google.golang.org/grpc/xds/internal/balancer/xdsrouting" // Register the xds_routing balancer
)
221 changes: 221 additions & 0 deletions xds/internal/balancer/xdsrouting/balancerstateaggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package xdsrouting

import (
"fmt"
"sync"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal"
)

type routingSubBalancerState struct {
state balancer.State
// stateToAggregate is the connectivity state used only for state
// aggregation. It could be different from state.ConnectivityState. For
// example when a sub-balancer transitions from TransientFailure to
// connecting, state.ConnectivityState is Connecting, but stateToAggregate
// is still TransientFailure.
stateToAggregate connectivity.State
}

func (s *routingSubBalancerState) String() string {
return fmt.Sprintf("picker:%p,state:%v,stateToAggregate:%v", s.state.Picker, s.state.ConnectivityState, s.stateToAggregate)
}

type routingBalancerStateAggregator struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger

mu sync.Mutex
// routes, one for each matcher.
routes []routingPickerRoute
// If started is false, no updates should be sent to the parent cc. A closed
// sub-balancer could still send pickers to this aggregator. This makes sure
// that no updates will be forwarded to parent when the whole balancer group
// and states aggregator is closed.
started bool
// All balancer IDs exist as keys in this map, even if balancer group is not
// started.
//
// If an ID is not in map, it's either removed or never added.
idToPickerState map[internal.LocalityID]*routingSubBalancerState
}

func newRoutingBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLogger) *routingBalancerStateAggregator {
return &routingBalancerStateAggregator{
cc: cc,
logger: logger,
idToPickerState: make(map[internal.LocalityID]*routingSubBalancerState),
}
}

// Start starts the aggregator. It can be called after Close to restart the
// aggretator.
func (rbsa *routingBalancerStateAggregator) start() {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
rbsa.started = true
}

// Close closes the aggregator. When the aggregator is closed, it won't call
// parent ClientConn to upate balancer state.
func (rbsa *routingBalancerStateAggregator) close() {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
rbsa.started = false
rbsa.clearStates()
}

// add adds a sub-balancer state with weight. It adds a place holder, and waits
// for the real sub-balancer to update state.
//
// This is called when there's a new action.
func (rbsa *routingBalancerStateAggregator) add(id internal.LocalityID) {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
rbsa.idToPickerState[id] = &routingSubBalancerState{
// Start everything in CONNECTING, so if one of the sub-balancers
// reports TransientFailure, the RPCs will still wait for the other
// sub-balancers.
state: balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
},
stateToAggregate: connectivity.Connecting,
}
}

// remove removes the sub-balancer state. Future updates from this sub-balancer,
// if any, will be ignored.
//
// This is called when an action is removed.
func (rbsa *routingBalancerStateAggregator) remove(id internal.LocalityID) {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
if _, ok := rbsa.idToPickerState[id]; !ok {
return
}
// Remove id and picker from picker map. This also results in future updates
// for this ID to be ignored.
delete(rbsa.idToPickerState, id)
}

// updateRoutes updates the routes. Note that it doesn't trigger an update to
// the parent ClientConn. The caller should decide when it's necessary, and call
// buildAndUpdate.
func (rbsa *routingBalancerStateAggregator) updateRoutes(newRoutes []routingPickerRoute) {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
rbsa.routes = newRoutes
}

// UpdateState is called to report a balancer state change from sub-balancer.
// It's usually called by the balancer group.
//
// It calls parent ClientConn's UpdateState with the new aggregated state.
func (rbsa *routingBalancerStateAggregator) UpdateState(id internal.LocalityID, state balancer.State) {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
pickerSt, ok := rbsa.idToPickerState[id]
if !ok {
// All state starts with an entry in pickStateMap. If ID is not in map,
// it's either removed, or never existed.
return
}
if !(pickerSt.state.ConnectivityState == connectivity.TransientFailure && state.ConnectivityState == connectivity.Connecting) {
// If old state is TransientFailure, and new state is Connecting, don't
// update the state, to prevent the aggregated state from being always
// CONNECTING. Otherwise, stateToAggregate is the same as
// state.ConnectivityState.
pickerSt.stateToAggregate = state.ConnectivityState
}
pickerSt.state = state

if !rbsa.started {
return
}
rbsa.cc.UpdateState(rbsa.build())
}

// clearState Reset everything to init state (Connecting) but keep the entry in
// map (to keep the weight).
//
// Caller must hold rbsa.mu.
func (rbsa *routingBalancerStateAggregator) clearStates() {
for _, pState := range rbsa.idToPickerState {
pState.state = balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
}
pState.stateToAggregate = connectivity.Connecting
}
}

// buildAndUpdate combines the sub-state from each sub-balancer into one state,
// and update it to parent ClientConn.
func (rbsa *routingBalancerStateAggregator) buildAndUpdate() {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
if !rbsa.started {
return
}
rbsa.cc.UpdateState(rbsa.build())
}

// build combines sub-states into one. The picker will do routing pick.
//
// Caller must hold rbsa.mu.
func (rbsa *routingBalancerStateAggregator) build() balancer.State {
m := rbsa.idToPickerState
var readyN, connectingN int
for _, ps := range m {
switch ps.stateToAggregate {
case connectivity.Ready:
readyN++
case connectivity.Connecting:
connectingN++
}
}
var aggregatedState connectivity.State
switch {
case readyN > 0:
aggregatedState = connectivity.Ready
case connectingN > 0:
aggregatedState = connectivity.Connecting
default:
aggregatedState = connectivity.TransientFailure
}

// The picker's return error might not be consistent with the
// aggregatedState. Because for routing, we want to always build picker with
// all sub-pickers (not even ready sub-pickers), so even if the overall
// state is Ready, pick for certain RPCs can behave like Connecting or
// TransientFailure.
rbsa.logger.Infof("Child pickers with routes: %s, actions: %+v", rbsa.routes, rbsa.idToPickerState)
picker := newRoutingPickerGroup(rbsa.routes, rbsa.idToPickerState)
return balancer.State{
ConnectivityState: aggregatedState,
Picker: picker,
}
}
20 changes: 20 additions & 0 deletions xds/internal/balancer/xdsrouting/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package xdsrouting implements the routing balancer for xds.
package xdsrouting
34 changes: 34 additions & 0 deletions xds/internal/balancer/xdsrouting/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package xdsrouting

import (
"fmt"

"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
)

const prefix = "[xds-routing-lb %p] "

var logger = grpclog.Component("xds")

func prefixLogger(p *routingBalancer) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
}
Loading