Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Fixed data race on session stream queries
* Renamed `internal/router` package to `internal/balancer` for unambiguous understanding of package mission
* Implemented detection of local data-center with measuring tcp dial RTT
* Added `trace.Driver.OnBalancer{Init,Close,ChooseEndpoint,Update}` events
* Marked the driver cluster events as deprecated
Expand Down
36 changes: 18 additions & 18 deletions balancers/balancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,31 @@ package balancers
import (
"strings"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
routerconfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/router/config"
)

// Deprecated: RoundRobin is RandomChoice now
func RoundRobin() *routerconfig.Config {
return &routerconfig.Config{}
func RoundRobin() *balancerConfig.Config {
return &balancerConfig.Config{}
}

func RandomChoice() *routerconfig.Config {
return &routerconfig.Config{}
func RandomChoice() *balancerConfig.Config {
return &balancerConfig.Config{}
}

func SingleConn() *routerconfig.Config {
return &routerconfig.Config{
func SingleConn() *balancerConfig.Config {
return &balancerConfig.Config{
SingleConn: true,
}
}

// PreferLocalDC creates balancer which use endpoints only in location such as initial endpoint location
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
// PreferLocalDC balancer try to autodetect local DC from client side.
func PreferLocalDC(balancer *routerconfig.Config) *routerconfig.Config {
balancer.IsPreferConn = func(routerInfo routerconfig.Info, c conn.Conn) bool {
return c.Endpoint().Location() == routerInfo.SelfLocation
func PreferLocalDC(balancer *balancerConfig.Config) *balancerConfig.Config {
balancer.IsPreferConn = func(info balancerConfig.Info, c conn.Conn) bool {
return c.Endpoint().Location() == info.SelfLocation
}
balancer.DetectlocalDC = true
return balancer
Expand All @@ -36,22 +36,22 @@ func PreferLocalDC(balancer *routerconfig.Config) *routerconfig.Config {
// PreferLocalDCWithFallBack creates balancer which use endpoints only in location such as initial endpoint location
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
func PreferLocalDCWithFallBack(balancer *routerconfig.Config) *routerconfig.Config {
func PreferLocalDCWithFallBack(balancer *balancerConfig.Config) *balancerConfig.Config {
balancer = PreferLocalDC(balancer)
balancer.AllowFalback = true
return balancer
}

// PreferLocations creates balancer which use endpoints only in selected locations (such as "ABC", "DEF", etc.)
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
func PreferLocations(balancer *routerconfig.Config, locations ...string) *routerconfig.Config {
func PreferLocations(balancer *balancerConfig.Config, locations ...string) *balancerConfig.Config {
if len(locations) == 0 {
panic("empty list of locations")
}
for i := range locations {
locations[i] = strings.ToUpper(locations[i])
}
balancer.IsPreferConn = func(_ routerconfig.Info, c conn.Conn) bool {
balancer.IsPreferConn = func(_ balancerConfig.Info, c conn.Conn) bool {
location := strings.ToUpper(c.Endpoint().Location())
for _, l := range locations {
if location == l {
Expand All @@ -66,7 +66,7 @@ func PreferLocations(balancer *routerconfig.Config, locations ...string) *router
// PreferLocationsWithFallback creates balancer which use endpoints only in selected locations
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
func PreferLocationsWithFallback(balancer *routerconfig.Config, locations ...string) *routerconfig.Config {
func PreferLocationsWithFallback(balancer *balancerConfig.Config, locations ...string) *balancerConfig.Config {
balancer = PreferLocations(balancer, locations...)
balancer.AllowFalback = true
return balancer
Expand All @@ -84,8 +84,8 @@ type Endpoint interface {

// Prefer creates balancer which use endpoints by filter
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
func Prefer(balancer *routerconfig.Config, filter func(endpoint Endpoint) bool) *routerconfig.Config {
balancer.IsPreferConn = func(_ routerconfig.Info, c conn.Conn) bool {
func Prefer(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
balancer.IsPreferConn = func(_ balancerConfig.Info, c conn.Conn) bool {
return filter(c.Endpoint())
}
return balancer
Expand All @@ -94,13 +94,13 @@ func Prefer(balancer *routerconfig.Config, filter func(endpoint Endpoint) bool)
// PreferWithFallback creates balancer which use endpoints by filter
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
func PreferWithFallback(balancer *routerconfig.Config, filter func(endpoint Endpoint) bool) *routerconfig.Config {
func PreferWithFallback(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
balancer = Prefer(balancer, filter)
balancer.AllowFalback = true
return balancer
}

// Default balancer used by default
func Default() *routerconfig.Config {
func Default() *balancerConfig.Config {
return RandomChoice()
}
14 changes: 7 additions & 7 deletions balancers/balancers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (

"github.com/stretchr/testify/require"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/mock"
routerconfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/router/config"
)

func TestPreferLocalDC(t *testing.T) {
Expand All @@ -18,7 +18,7 @@ func TestPreferLocalDC(t *testing.T) {
}
rr := PreferLocalDC(Default())
require.False(t, rr.AllowFalback)
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(routerconfig.Info{SelfLocation: "2"}, rr, conns))
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
}

func TestPreferLocalDCWithFallBack(t *testing.T) {
Expand All @@ -29,7 +29,7 @@ func TestPreferLocalDCWithFallBack(t *testing.T) {
}
rr := PreferLocalDCWithFallBack(Default())
require.True(t, rr.AllowFalback)
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(routerconfig.Info{SelfLocation: "2"}, rr, conns))
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
}

func TestPreferLocations(t *testing.T) {
Expand All @@ -41,7 +41,7 @@ func TestPreferLocations(t *testing.T) {

rr := PreferLocations(Default(), "zero", "two")
require.False(t, rr.AllowFalback)
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(routerconfig.Info{}, rr, conns))
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
}

func TestPreferLocationsWithFallback(t *testing.T) {
Expand All @@ -53,12 +53,12 @@ func TestPreferLocationsWithFallback(t *testing.T) {

rr := PreferLocationsWithFallback(Default(), "zero", "two")
require.True(t, rr.AllowFalback)
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(routerconfig.Info{}, rr, conns))
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
}

func applyPreferFilter(info routerconfig.Info, b *routerconfig.Config, conns []conn.Conn) []conn.Conn {
func applyPreferFilter(info balancerConfig.Info, b *balancerConfig.Config, conns []conn.Conn) []conn.Conn {
if b.IsPreferConn == nil {
b.IsPreferConn = func(routerInfo routerconfig.Info, c conn.Conn) bool { return true }
b.IsPreferConn = func(info balancerConfig.Info, c conn.Conn) bool { return true }
}
res := make([]conn.Conn, 0, len(conns))
for _, c := range conns {
Expand Down
14 changes: 7 additions & 7 deletions balancers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"encoding/json"
"fmt"

routerconfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/router/config"
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
)

Expand All @@ -31,13 +31,13 @@ type balancersConfig struct {
}

type fromConfigOptionsHolder struct {
fallbackBalancer *routerconfig.Config
fallbackBalancer *balancerConfig.Config
errorHandler func(error)
}

type fromConfigOption func(h *fromConfigOptionsHolder)

func WithParseErrorFallbackBalancer(b *routerconfig.Config) fromConfigOption {
func WithParseErrorFallbackBalancer(b *balancerConfig.Config) fromConfigOption {
return func(h *fromConfigOptionsHolder) {
h.fallbackBalancer = b
}
Expand All @@ -49,9 +49,9 @@ func WithParseErrorHandler(errorHandler func(error)) fromConfigOption {
}
}

func CreateFromConfig(config string) (*routerconfig.Config, error) {
func CreateFromConfig(config string) (*balancerConfig.Config, error) {
var (
b *routerconfig.Config
b *balancerConfig.Config
err error
c balancersConfig
)
Expand Down Expand Up @@ -90,12 +90,12 @@ func CreateFromConfig(config string) (*routerconfig.Config, error) {
}
}

func FromConfig(config string, opts ...fromConfigOption) *routerconfig.Config {
func FromConfig(config string, opts ...fromConfigOption) *balancerConfig.Config {
var (
h = fromConfigOptionsHolder{
fallbackBalancer: Default(),
}
b *routerconfig.Config
b *balancerConfig.Config
err error
)
for _, o := range opts {
Expand Down
30 changes: 15 additions & 15 deletions balancers/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/stretchr/testify/require"

routerconfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/router/config"
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
)
Expand All @@ -14,45 +14,45 @@ func TestFromConfig(t *testing.T) {
for _, test := range []struct {
name string
config string
res routerconfig.Config
res balancerConfig.Config
fail bool
}{
{
name: "empty",
config: ``,
res: routerconfig.Config{},
res: balancerConfig.Config{},
fail: true,
},
{
name: "single",
config: `{
"type": "single"
}`,
res: routerconfig.Config{SingleConn: true},
res: balancerConfig.Config{SingleConn: true},
},
{
name: "round_robin",
config: `{
"type": "round_robin"
}`,
res: routerconfig.Config{},
res: balancerConfig.Config{},
},
{
name: "random_choice",
config: `{
"type": "random_choice"
}`,
res: routerconfig.Config{},
res: balancerConfig.Config{},
},
{
name: "prefer_local_dc",
config: `{
"type": "random_choice",
"prefer": "local_dc"
}`,
res: routerconfig.Config{
res: balancerConfig.Config{
DetectlocalDC: true,
IsPreferConn: func(routerInfo routerconfig.Info, c conn.Conn) bool {
IsPreferConn: func(info balancerConfig.Info, c conn.Conn) bool {
// some non nil func
return false
},
Expand All @@ -73,10 +73,10 @@ func TestFromConfig(t *testing.T) {
"prefer": "local_dc",
"fallback": true
}`,
res: routerconfig.Config{
res: balancerConfig.Config{
AllowFalback: true,
DetectlocalDC: true,
IsPreferConn: func(routerInfo routerconfig.Info, c conn.Conn) bool {
IsPreferConn: func(info balancerConfig.Info, c conn.Conn) bool {
// some non nil func
return false
},
Expand All @@ -89,8 +89,8 @@ func TestFromConfig(t *testing.T) {
"prefer": "locations",
"locations": ["AAA", "BBB", "CCC"]
}`,
res: routerconfig.Config{
IsPreferConn: func(routerInfo routerconfig.Info, c conn.Conn) bool {
res: balancerConfig.Config{
IsPreferConn: func(info balancerConfig.Info, c conn.Conn) bool {
// some non nil func
return false
},
Expand All @@ -104,9 +104,9 @@ func TestFromConfig(t *testing.T) {
"locations": ["AAA", "BBB", "CCC"],
"fallback": true
}`,
res: routerconfig.Config{
res: balancerConfig.Config{
AllowFalback: true,
IsPreferConn: func(routerInfo routerconfig.Info, c conn.Conn) bool {
IsPreferConn: func(info balancerConfig.Info, c conn.Conn) bool {
// some non nil func
return false
},
Expand All @@ -116,7 +116,7 @@ func TestFromConfig(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
var (
actErr error
fallback = &routerconfig.Config{}
fallback = &balancerConfig.Config{}
)
b := FromConfig(
test.config,
Expand Down
40 changes: 20 additions & 20 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"google.golang.org/grpc"
grpcCodes "google.golang.org/grpc/codes"

routerconfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/router/config"
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"

"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
Expand All @@ -25,19 +25,19 @@ import (
type Config struct {
config.Common

trace trace.Driver
dialTimeout time.Duration
connectionTTL time.Duration
routerConfig *routerconfig.Config
secure bool
endpoint string
database string
requestsType string
userAgent string
grpcOptions []grpc.DialOption
credentials credentials.Credentials
tlsConfig *tls.Config
meta meta.Meta
trace trace.Driver
dialTimeout time.Duration
connectionTTL time.Duration
balancerConfig *balancerConfig.Config
secure bool
endpoint string
database string
requestsType string
userAgent string
grpcOptions []grpc.DialOption
credentials credentials.Credentials
tlsConfig *tls.Config
meta meta.Meta

excludeGRPCCodesForPessimization []grpcCodes.Code
}
Expand Down Expand Up @@ -105,8 +105,8 @@ func (c Config) Trace() trace.Driver {

// Balancer is an optional configuration related to selected balancer.
// That is, some balancing methods allow to be configured.
func (c Config) Balancer() *routerconfig.Config {
return c.routerConfig
func (c Config) Balancer() *balancerConfig.Config {
return c.balancerConfig
}

// RequestsType set an additional type hint to all requests.
Expand Down Expand Up @@ -229,9 +229,9 @@ func WithDialTimeout(timeout time.Duration) Option {
}
}

func WithBalancer(balancer *routerconfig.Config) Option {
func WithBalancer(balancer *balancerConfig.Config) Option {
return func(c *Config) {
c.routerConfig = balancer
c.balancerConfig = balancer
}
}

Expand Down Expand Up @@ -312,8 +312,8 @@ func defaultConfig() (c Config) {
credentials: credentials.NewAnonymousCredentials(
credentials.WithSourceInfo("default"),
),
routerConfig: balancers.Default(),
tlsConfig: defaultTLSConfig(),
balancerConfig: balancers.Default(),
tlsConfig: defaultTLSConfig(),
grpcOptions: []grpc.DialOption{
grpc.WithContextDialer(
func(ctx context.Context, address string) (net.Conn, error) {
Expand Down
Loading