Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zookeeper lock #111

Merged
merged 15 commits into from
Jul 13, 2021
4 changes: 4 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import (
"mosn.io/layotto/components/lock"
lock_etcd "mosn.io/layotto/components/lock/etcd"
lock_redis "mosn.io/layotto/components/lock/redis"
lock_zookeeper "mosn.io/layotto/components/lock/zookeeper"
runtime_lock "mosn.io/layotto/pkg/runtime/lock"

// Actuator
Expand Down Expand Up @@ -245,6 +246,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime_lock.NewFactory("redis", func() lock.LockStore {
return lock_redis.NewStandaloneRedisLock(log.DefaultLogger)
}),
runtime_lock.NewFactory("zookeeper", func() lock.LockStore {
return lock_zookeeper.NewZookeeperLock(log.DefaultLogger)
}),
runtime_lock.NewFactory("etcd", func() lock.LockStore {
return lock_etcd.NewEtcdLock(log.DefaultLogger)
}),
Expand Down
2 changes: 2 additions & 0 deletions components/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ require (
github.com/alicebob/miniredis/v2 v2.13.3
github.com/apache/dubbo-go-hessian2 v1.7.0
github.com/go-redis/redis/v8 v8.8.0
github.com/go-zookeeper/zk v1.0.2
github.com/golang/mock v1.4.4
github.com/google/uuid v1.1.2
github.com/stretchr/testify v1.7.0
github.com/valyala/fasthttp v1.26.0
Expand Down
2 changes: 2 additions & 0 deletions components/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GO
github.com/go-redis/redis/v8 v8.8.0 h1:fDZP58UN/1RD3DjtTXP/fFZ04TFohSYhjZDkcDe2dnw=
github.com/go-redis/redis/v8 v8.8.0/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM=
github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
Expand Down
194 changes: 194 additions & 0 deletions components/lock/zookeeper/zookeeper_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package zookeeper

import (
"errors"
"fmt"
"github.com/go-zookeeper/zk"
"mosn.io/layotto/components/lock"
"mosn.io/pkg/log"
"mosn.io/pkg/utils"
"strconv"
"strings"
"time"
)

const (
host = "zookeeperHosts"
password = "zookeeperPassword"
sessionTimeout = "sessionTimeout"
logInfo = "logInfo"
defaultSessionTimeout = 5 * time.Second
)

type ConnectionFactory interface {
NewConnection(expire time.Duration, meta metadata) (ZKConnection, error)
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
}

type ConnectionFactoryImpl struct {
}

func (c *ConnectionFactoryImpl) NewConnection(expire time.Duration, meta metadata) (ZKConnection, error) {
conn, _, err := zk.Connect(meta.hosts, expire, zk.WithLogInfo(meta.logInfo))
if err != nil {
return nil, err
}
return conn, nil
}

type ZKConnection interface {
Get(path string) ([]byte, *zk.Stat, error)
Delete(path string, version int32) error
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
Close()
}

type ZookeeperLock struct {
//trylock reestablish connection every time
factory ConnectionFactory
//unlock reuse this conneciton
unlockConn ZKConnection
metadata metadata
logger log.ErrorLogger
}

func NewZookeeperLock(logger log.ErrorLogger) *ZookeeperLock {
lock := &ZookeeperLock{
logger: logger,
}
return lock
}

func (p *ZookeeperLock) Init(metadata lock.Metadata) error {

m, err := parseZookeeperMetadata(metadata)
if err != nil {
return err
}

p.metadata = m
p.factory = &ConnectionFactoryImpl{}

//init unlock connection
zkConn, err := p.factory.NewConnection(p.metadata.sessionTimeout, p.metadata)
if err != nil {
return err
}
p.unlockConn = zkConn
return nil
}

func (p *ZookeeperLock) Features() []lock.Feature {
return nil
}
func (p *ZookeeperLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {

conn, err := p.factory.NewConnection(time.Duration(req.Expire)*time.Second, p.metadata)
if err != nil {
return &lock.TryLockResponse{}, err
}
//1.create zk ephemeral node
_, err = conn.Create("/"+req.ResourceId, []byte(req.LockOwner), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))

//2.1 create node fail ,indicates lock fail
if err != nil {
defer conn.Close()
//the node exists,lock fail
if err == zk.ErrNodeExists {
return &lock.TryLockResponse{
Success: false,
}, nil
}
//other err
return nil, err
}

//2.2 create node success, asyn to make sure zkclient alive for need time
utils.GoWithRecover(func() {
//can also
//time.Sleep(time.Second * time.Duration(req.Expire))
timeAfterTrigger := time.After(time.Second * time.Duration(req.Expire))
<-timeAfterTrigger
// make sure close connecion
conn.Close()
}, nil)

return &lock.TryLockResponse{
Success: true,
}, nil

}
func (p *ZookeeperLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {

conn := p.unlockConn

path := "/" + req.ResourceId
owner, state, err := conn.Get(path)

if err != nil {
//node does not exist, indicates this lock has expired
if err == zk.ErrNoNode {
return &lock.UnlockResponse{Status: lock.LOCK_UNEXIST}, nil
}
//other err
return nil, err
}
//node exist ,but owner not this, indicates this lock has occupied or wrong unlock
if string(owner) != req.LockOwner {
return &lock.UnlockResponse{Status: lock.LOCK_BELONG_TO_OTHERS}, nil
}
err = conn.Delete(path, state.Version)
//owner is this, but delete fail
if err != nil {
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
// delete no node , indicates this lock has expired
if err == zk.ErrNoNode {
return &lock.UnlockResponse{Status: lock.LOCK_UNEXIST}, nil
// delete version error , indicates this lock has occupied by others
} else if err == zk.ErrBadVersion {
return &lock.UnlockResponse{Status: lock.LOCK_BELONG_TO_OTHERS}, nil
//other error
} else {
return nil, err
}
}
//delete success, unlock success
return &lock.UnlockResponse{Status: lock.SUCCESS}, nil
}

type metadata struct {
hosts []string
password string
sessionTimeout time.Duration
logInfo bool
}

func parseZookeeperMetadata(meta lock.Metadata) (metadata, error) {
m := metadata{}
if val, ok := meta.Properties[host]; ok && val != "" {
split := strings.Split(val, ";")
m.hosts = append(m.hosts, split...)
} else {
return m, errors.New("zookeeper store error: missing host address")
}

if val, ok := meta.Properties[password]; ok && val != "" {
m.password = val
}

m.sessionTimeout = defaultSessionTimeout
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
if val, ok := meta.Properties[sessionTimeout]; ok && val != "" {
parsedVal, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("zookeeper store error: can't parse sessionTimeout field: %s", err)
}
m.sessionTimeout = time.Duration(parsedVal) * time.Second
}

if val, ok := meta.Properties[logInfo]; ok && val != "" {
b, err := strconv.ParseBool(val)
if err != nil {
return metadata{}, err
}
m.logInfo = b
}
return m, nil
}
131 changes: 131 additions & 0 deletions components/lock/zookeeper/zookeeper_lock_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading