Skip to content

Commit

Permalink
Merge branch 'unstable' into f/1125-redis
Browse files Browse the repository at this point in the history
  • Loading branch information
jihuayu committed Dec 2, 2023
2 parents 4ef6562 + 2be546c commit ea2d5ff
Show file tree
Hide file tree
Showing 5 changed files with 356 additions and 7 deletions.
8 changes: 1 addition & 7 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,7 @@
#
name: CI Actions # don't edit while the badge was depend on this

on:
push:
branches:
- unstable
pull_request:
branches:
- unstable
on: [push, pull_request]

jobs:
lint:
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/go-playground/validator/v10 v10.9.0
github.com/go-redis/redis/v8 v8.11.5
github.com/go-resty/resty/v2 v2.7.0
github.com/go-zookeeper/zk v1.0.3
github.com/google/uuid v1.3.0
github.com/jedib0t/go-pretty/v6 v6.4.6
github.com/prometheus/client_golang v1.11.1
Expand Down Expand Up @@ -51,6 +52,7 @@ require (
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414
github.com/ugorji/go/codec v1.1.7 // indirect
go.etcd.io/etcd/api/v3 v3.5.4 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
Expand Down
246 changes: 246 additions & 0 deletions storage/persistence/zookeeper/zookeeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package zookeeper

import (
"context"
"errors"
"strings"
"sync"
"time"

"github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/storage/persistence"
"github.com/go-zookeeper/zk"
"go.uber.org/atomic"
)

const (
sessionTTL = 6
defaultDailTimeout = 5 * time.Second
)

const defaultElectPath = "/kvrocks/controller/leader"

type Config struct {
Addrs []string `yaml:"addrs"`
Scheme string `yaml:"scheme"`
AuthId string `yaml:"auth_id"`
ElectPath string `yaml:"elect_path"`
}

type Zookeeper struct {
conn *zk.Conn
acl []zk.ACL // We will set this ACL for the node we have created.
leaderMu sync.RWMutex
leaderID string
myID string
electPath string
isReady atomic.Bool
quitCh chan struct{}
leaderChangeCh chan bool
}

func New(id string, cfg *Config) (*Zookeeper, error) {
if len(id) == 0 {
return nil, errors.New("id must NOT be a empty string")
}
conn, _, err := zk.Connect(cfg.Addrs, defaultDailTimeout)
if err != nil {
return nil, err
}

electPath := defaultElectPath
if cfg.ElectPath != "" {
electPath = cfg.ElectPath
}
acl := zk.WorldACL(zk.PermAll)
if cfg.Scheme != "" && cfg.AuthId != "" {
conn.AddAuth(cfg.Scheme, []byte(cfg.AuthId))
acl = []zk.ACL{{Perms: zk.PermAll, Scheme: cfg.Scheme, ID: cfg.AuthId}}
}
e := &Zookeeper{
myID: id,
acl: acl,
electPath: electPath,
conn: conn,
quitCh: make(chan struct{}),
leaderChangeCh: make(chan bool),
}
e.isReady.Store(false)
go e.observeLeaderEvent(context.Background())
return e, nil
}

func (e *Zookeeper) ID() string {
return e.myID
}

func (e *Zookeeper) Leader() string {
e.leaderMu.RLock()
defer e.leaderMu.RUnlock()
return e.leaderID
}

func (e *Zookeeper) LeaderChange() <-chan bool {
return e.leaderChangeCh
}

func (e *Zookeeper) IsReady(ctx context.Context) bool {
for {
select {
case <-e.quitCh:
return false
case <-time.After(100 * time.Millisecond):
if e.isReady.Load() {
return true
}
case <-ctx.Done():
return e.isReady.Load()
}
}
}

func (e *Zookeeper) Get(ctx context.Context, key string) ([]byte, error) {
data, _, err := e.conn.Get(key)
if err != nil {
if err == zk.ErrNoNode {
return nil, nil // Key does not exist
}
return nil, err
}

return data, nil
}

func (e *Zookeeper) Exists(ctx context.Context, key string) (bool, error) {
exists, _, err := e.conn.Exists(key)
if err != nil {
return false, err
}
return exists, nil
}

// If the key exists, it will be set; if not, it will be created.
func (e *Zookeeper) Set(ctx context.Context, key string, value []byte) error {
exist, _ := e.Exists(ctx, key)
if exist {
_, err := e.conn.Set(key, value, -1)
return err
}

return e.Create(ctx, key, value, 0)
}

func (e *Zookeeper) Create(ctx context.Context, key string, value []byte, flags int32) error {
lastSlashIndex := strings.LastIndex(key, "/")
if lastSlashIndex > 0 {
substring := key[:lastSlashIndex]
// If the parent node does not exist, create the parent node recursively.
exist, _ := e.Exists(ctx, substring)
if !exist {
e.Create(ctx, substring, []byte{}, 0)
}
}
_, err := e.conn.Create(key, value, flags, e.acl)
return err
}

func (e *Zookeeper) Delete(ctx context.Context, key string) error {
err := e.conn.Delete(key, -1)
if err == zk.ErrNoNode {
return nil // Key does not exist
}
return err
}

func (e *Zookeeper) List(ctx context.Context, prefix string) ([]persistence.Entry, error) {
children, _, err := e.conn.Children(prefix)
if err != nil {
return nil, err
}

entries := make([]persistence.Entry, 0)
for _, child := range children {
key := prefix + "/" + child
data, _, err := e.conn.Get(key)
if err != nil {
return nil, err
}

entry := persistence.Entry{
Key: key,
Value: data,
}
entries = append(entries, entry)
}

return entries, nil
}

func (e *Zookeeper) SetleaderID(newLeaderID string) {
if newLeaderID != "" && newLeaderID != e.leaderID {
e.leaderMu.Lock()
e.leaderID = newLeaderID
e.leaderMu.Unlock()
e.leaderChangeCh <- true
}
}

func (e *Zookeeper) observeLeaderEvent(ctx context.Context) {
reset:
select {
case <-e.quitCh:
return
default:
}

e.Create(ctx, e.electPath, []byte(e.myID), zk.FlagEphemeral)
data, _, ch, err := e.conn.GetW(e.electPath)
if err != nil {
goto reset
}
e.SetleaderID(string(data))
e.isReady.Store(true)
for {
select {
case resp := <-ch:
if resp.Type == zk.EventNodeDeleted {
e.Create(ctx, e.electPath, []byte(e.myID), zk.FlagEphemeral)
}
data, _, ch, err = e.conn.GetW(e.electPath)
if err != nil {
goto reset
}
e.SetleaderID(string(data))
case <-e.quitCh:
logger.Get().Info(e.myID + "Exit the leader election loop")
return
}

}

}

func (e *Zookeeper) Close() error {
close(e.quitCh)
e.conn.Close()
return nil
}
105 changes: 105 additions & 0 deletions storage/persistence/zookeeper/zookeeper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package zookeeper

import (
"context"
"testing"
"time"

"github.com/apache/kvrocks-controller/util"

"github.com/stretchr/testify/require"
)

const addr = "127.0.0.1:2181"

func TestBasicOperations(t *testing.T) {
id := util.RandString(40)
testElectPath := "/" + util.RandString(8) + "/" + util.RandString(8)
persist, err := New(id, &Config{
ElectPath: testElectPath,
Addrs: []string{addr},
})
require.NoError(t, err)
defer persist.Close()

ctx := context.Background()
keys := []string{"/a/b/c0", "/a/b/c1", "/a/b/c2"}
value := []byte("v")
for _, key := range keys {
require.NoError(t, persist.Set(ctx, key, value))
gotValue, err := persist.Get(ctx, key)
require.NoError(t, err)
require.Equal(t, value, gotValue)
}
entries, err := persist.List(ctx, "/a/b")
require.NoError(t, err)
require.Equal(t, len(keys), len(entries))
for _, key := range keys {
require.NoError(t, persist.Delete(ctx, key))
}
}

func TestElect(t *testing.T) {
endpoints := []string{addr}

testElectPath := "/" + util.RandString(8) + "/" + util.RandString(8)
id0 := util.RandString(40)
node0, err := New(id0, &Config{
ElectPath: testElectPath,
Addrs: endpoints,
})
require.NoError(t, err)
require.Eventuallyf(t, func() bool {
return node0.Leader() == node0.myID
}, 10*time.Second, 100*time.Millisecond, "node0 should be the leader")

id1 := util.RandString(40)
node1, err := New(id1, &Config{
ElectPath: testElectPath,
Addrs: endpoints,
})
require.NoError(t, err)
require.Eventuallyf(t, func() bool {
return node1.Leader() == node0.myID
}, 10*time.Second, 100*time.Millisecond, "node1's leader should be the node0")

shutdown := make(chan struct{})
go func() {
for {
select {
case <-node0.LeaderChange():
// do nothing
case <-node1.LeaderChange():
// do nothing
case <-shutdown:
return
}
}
}()

require.NoError(t, node0.Close())

require.Eventuallyf(t, func() bool {
return node1.Leader() == node1.myID
}, 15*time.Second, 100*time.Millisecond, "node1 should be the leader")
close(shutdown)
}

0 comments on commit ea2d5ff

Please sign in to comment.