Skip to content

Commit

Permalink
mcs: add balancer for keyspace group
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Apr 6, 2023
1 parent 69ec7fc commit befd1b3
Show file tree
Hide file tree
Showing 21 changed files with 433 additions and 72 deletions.
58 changes: 58 additions & 0 deletions pkg/balancer/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package balancer

// Policy is the policy of balancer.
type Policy int

const (
// PolicyRoundRobin is the round robin policy.
PolicyRoundRobin Policy = iota
// PolicyLeast is the policy to return the least used node.
// TODO: move indexed heap to pkg and use it.
PolicyLeast
)

func (p Policy) String() string {
switch p {
case PolicyRoundRobin:
return "round-robin"
default:
return "unknown"
}
}

// Balancer is the interface for balancer.
type Balancer[T uint32 | string] interface {
// Next returns next one.
Next() T
// Put puts one into balancer.
Put(T)
// Delete deletes one from balancer.
Delete(T)
// Reset resets balancer with nodes.
Reset([]T)
// GetAll returns all nodes.
GetAll() []T
}

func GenByPolicy[T uint32 | string](policy Policy) Balancer[T] {
switch policy {
case PolicyRoundRobin:
return NewRoundRobin[T]()
default:
return NewRoundRobin[T]()
}
}
81 changes: 81 additions & 0 deletions pkg/balancer/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package balancer

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

func TestBalancer(t *testing.T) {
t.Parallel()
re := require.New(t)
balancers := []Balancer[uint32]{
NewRoundRobin[uint32](),
}
for _, balancer := range balancers {
re.Equal(uint32(0), balancer.Next())
// test put
exists := make(map[uint32]struct{})
for i := 0; i < 100; i++ {
num := rand.Uint32()
balancer.Put(num)
exists[num] = struct{}{}
re.Equal(len(balancer.GetAll()), len(exists))
t := balancer.Next()
re.Contains(exists, t)
}
// test delete
for num := range exists {
balancer.Delete(num)
delete(exists, num)
re.Equal(len(balancer.GetAll()), len(exists))
if len(exists) == 0 {
break
}
t := balancer.Next()
re.NotEqual(t, num)
re.Contains(exists, t)
}
re.Equal(uint32(0), balancer.Next())
}
}

func TestRoundRobin(t *testing.T) {
t.Parallel()
re := require.New(t)
balancer := NewRoundRobin[uint32]()
for i := 0; i < 100; i++ {
num := rand.Uint32()
balancer.Put(num)
}
statistics := make(map[uint32]int)
for i := 0; i < 1000; i++ {
statistics[balancer.Next()]++
}
min := 1000
max := 0
for _, v := range statistics {
if v < min {
min = v
}
if v > max {
max = v
}
}
re.LessOrEqual(max-min, 10)
}
91 changes: 91 additions & 0 deletions pkg/balancer/round_robin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package balancer

import (
"sync"
"sync/atomic"
)

// RoundRobin is a balancer that selects nodes in a round-robin fashion.
type RoundRobin[T uint32 | string] struct {
sync.RWMutex
nodes []T
exists map[T]struct{}
next uint32
}

// NewRoundRobin creates a balancer that selects nodes in a round-robin fashion.
func NewRoundRobin[T uint32 | string]() *RoundRobin[T] {
return &RoundRobin[T]{
nodes: make([]T, 0),
exists: make(map[T]struct{}),
}
}

// Next returns next address
func (r *RoundRobin[T]) Next() (t T) {
r.RLock()
defer r.RUnlock()
if len(r.nodes) == 0 {
return
}
next := atomic.AddUint32(&r.next, 1)
node := r.nodes[(int(next)-1)%len(r.nodes)]
return node
}

// Reset updates nodes
func (r *RoundRobin[T]) Reset(nodes []T) {
r.Lock()
defer r.Unlock()
r.nodes = nodes
r.exists = make(map[T]struct{})
for _, n := range nodes {
r.exists[n] = struct{}{}
}
}

// GetAll returns all nodes
func (r *RoundRobin[T]) GetAll() []T {
r.RLock()
defer r.RUnlock()
return r.nodes
}

// Put puts one into balancer.
func (r *RoundRobin[T]) Put(node T) {
r.Lock()
defer r.Unlock()
if _, ok := r.exists[node]; !ok {
r.nodes = append(r.nodes, node)
r.exists[node] = struct{}{}
}
}

// Delete deletes one from balancer.
func (r *RoundRobin[T]) Delete(node T) {
r.Lock()
defer r.Unlock()
if _, ok := r.exists[node]; ok {
for i, n := range r.nodes {
if n == node {
r.nodes = append(r.nodes[:i], r.nodes[i+1:]...)
delete(r.exists, node)
break
}
}
}
}
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store)
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
suite.manager = NewKeyspaceManager(store, nil, allocator, &mockConfig{}, kgm)
suite.NoError(suite.manager.Bootstrap())
}
Expand Down
Loading

0 comments on commit befd1b3

Please sign in to comment.