Skip to content

Commit

Permalink
Add LOAD_BALANCING_POLICY_SLOW_AVOIDANCE funtionality
Browse files Browse the repository at this point in the history
The java driver has the feature to automatically avoid slow replicas
by doing simple heuristics. This is one of the key feature to have a
stable latency.

This commit adds additional field in tokenAwareHostPolicy to control
if the feature is enabled and what is the maximum in flight threshold.

If feature is enabled driver sorts the replicas to first try those
with less than specified maximum in flight connections.

Fixes: #154
  • Loading branch information
sylwiaszunejko committed Apr 17, 2024
1 parent 3c32c6c commit 1290781
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
5 changes: 5 additions & 0 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,11 @@ func (h *HostInfo) IsUp() bool {
return h != nil && h.State() == NodeUp
}

func (h *HostInfo) IsBusy(s *Session) bool {
pool, ok := s.pool.getPool(h)
return ok && h != nil && pool.Size() >= MAX_IN_FLIGHT_THRESHOLD
}

func (h *HostInfo) HostnameAndPort() string {
h.mu.Lock()
defer h.mu.Unlock()
Expand Down
29 changes: 29 additions & 0 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,16 @@ func ShuffleReplicas() func(*tokenAwareHostPolicy) {
}
}

// AvoidSlowReplicas enabled avoiding slow replicas
//
// TokenAwareHostPolicy normally does not check how busy replica is, with avoidSlowReplicas enabled it avoids replicas
// if they have equal or more than MAX_IN_FLIGHT_THRESHOLD connections in flight
func AvoidSlowReplicas() func(policy *tokenAwareHostPolicy) {
return func(t *tokenAwareHostPolicy) {
t.avoidSlowReplicas = true
}
}

// NonLocalReplicasFallback enables fallback to replicas that are not considered local.
//
// TokenAwareHostPolicy used with DCAwareHostPolicy fallback first selects replicas by partition key in local DC, then
Expand Down Expand Up @@ -424,6 +434,8 @@ type clusterMeta struct {
tokenRing *tokenRing
}

const MAX_IN_FLIGHT_THRESHOLD int = 10

type tokenAwareHostPolicy struct {
fallback HostSelectionPolicy
getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error)
Expand All @@ -443,6 +455,8 @@ type tokenAwareHostPolicy struct {

// Experimental, this interface and use may change
tablets cowTabletList

avoidSlowReplicas bool
}

func (t *tokenAwareHostPolicy) Init(s *Session) {
Expand Down Expand Up @@ -687,6 +701,21 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
}
}

if s := qry.GetSession(); s != nil && t.avoidSlowReplicas {
healthyReplicas := make([]*HostInfo, 0, len(replicas))
unhealthyReplicas := make([]*HostInfo, 0, len(replicas))

for _, h := range replicas {
if h.IsBusy(s) {
unhealthyReplicas = append(unhealthyReplicas, h)
} else {
healthyReplicas = append(healthyReplicas, h)
}
}

replicas = append(healthyReplicas, unhealthyReplicas...)
}

var (
fallbackIter NextHost
i, j, k int
Expand Down

0 comments on commit 1290781

Please sign in to comment.