Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial draft of hostfs distributed locking #1685

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ internal/linter/*.so
*.test
**/__debug_bin
*.out
*.pprof
.vscode
.idea
*.swp
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ require (
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/ulikunitz/xz v0.5.10 // indirect
github.com/vimeo/galaxycache v0.0.0-20210323154928-b7e5d71c067a // indirect
github.com/viney-shih/go-lock v1.1.2 // indirect
github.com/wayneashleyberry/terminal-dimensions v1.1.0 // indirect
github.com/weaveworks/promrus v1.2.0 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2344,6 +2344,8 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.5/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
github.com/viney-shih/go-lock v1.1.2 h1:3TdGTiHZCPqBdTvFbQZQN/TRZzKF3KWw2rFEyKz3YqA=
github.com/viney-shih/go-lock v1.1.2/go.mod h1:Yijm78Ljteb3kRiJrbLAxVntkUukGu5uzSxq/xV7OO8=
github.com/vultr/govultr/v2 v2.17.2 h1:gej/rwr91Puc/tgh+j33p/BLR16UrIPnSr+AIwYWZQs=
github.com/vultr/govultr/v2 v2.17.2/go.mod h1:ZFOKGWmgjytfyjeyAdhQlSWwTjh2ig+X49cAp50dzXI=
github.com/wayneashleyberry/terminal-dimensions v1.1.0 h1:EB7cIzBdsOzAgmhTUtTTQXBByuPheP/Zv1zL2BRPY6g=
Expand Down
9 changes: 8 additions & 1 deletion pkg/storage/etcd/etcd_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func TestEtcd(t *testing.T) {

var store = future.New[*etcd.EtcdStore]()

var lmF = future.New[*etcd.EtcdLockManager]()

var _ = BeforeSuite(func() {
testruntime.IfIntegration(func() {
env := test.Environment{}
Expand All @@ -35,7 +37,11 @@ var _ = BeforeSuite(func() {
)
Expect(err).NotTo(HaveOccurred())
store.Set(client)

l, err := etcd.NewEtcdLockManager(context.Background(), env.EtcdConfig(),
etcd.WithPrefix("test-lock"),
)
Expect(err).NotTo(HaveOccurred())
lmF.Set(l)
DeferCleanup(env.Stop)
})
})
Expand All @@ -45,3 +51,4 @@ var _ = Describe("Cluster Store", Ordered, Label("integration", "slow"), Cluster
var _ = Describe("RBAC Store", Ordered, Label("integration", "slow"), RBACStoreTestSuite(store))
var _ = Describe("Keyring Store", Ordered, Label("integration", "slow"), KeyringStoreTestSuite(store))
var _ = Describe("KV Store", Ordered, Label("integration", "slow"), KeyValueStoreTestSuite(store))
var _ = Describe("Lock Manager", Ordered, Label("integration", "slow"), LockManagerTestSuite(lmF))
95 changes: 95 additions & 0 deletions pkg/storage/etcd/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package etcd

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/rancher/opni/pkg/storage"
"github.com/rancher/opni/pkg/storage/lock"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

type EtcdLock struct {
client *clientv3.Client
mutex *concurrency.Mutex
session *concurrency.Session

acquired uint32
key string

startLock lock.LockPrimitive
startUnlock lock.LockPrimitive
options *lock.LockOptions
}

func NewEtcdLock(c *clientv3.Client, key string, options *lock.LockOptions) *EtcdLock {
lockValiditySeconds := int(options.LockValidity.Seconds())
if lockValiditySeconds == 0 {
lockValiditySeconds = 1
}
s, err := concurrency.NewSession(c, concurrency.WithTTL(lockValiditySeconds))
if err != nil {
panic(err)
}

m := concurrency.NewMutex(s, key)
return &EtcdLock{
client: c,
session: s,
mutex: m,
options: options,
key: key,
}
}

var _ storage.Lock = (*EtcdLock)(nil)

func (e *EtcdLock) Lock() error {
return e.startLock.Do(func() error {
ctxca, ca := context.WithCancel(e.client.Ctx())
signalAcquired := make(chan struct{})
defer close(signalAcquired)
if !e.options.Keepalive {
defer e.session.Orphan()
}
var lockErr error
var mu sync.Mutex
go func() {
select {
case <-e.options.AcquireContext.Done():
mu.Lock()
lockErr = errors.Join(lockErr, lock.ErrAcquireLockCancelled)
mu.Unlock()
ca()
case <-time.After(e.options.AcquireTimeout):
mu.Lock()
lockErr = errors.Join(lockErr, lock.ErrAcquireLockTimeout)
mu.Unlock()
ca()
}
}()
err := e.mutex.Lock(ctxca)
mu.Lock()
err = errors.Join(lockErr, err)
mu.Unlock()
if err != nil {
e.mutex.Unlock(e.client.Ctx())
return err
}
atomic.StoreUint32(&e.acquired, 1)
return nil
})
}

func (e *EtcdLock) Unlock() error {
return e.startUnlock.Do(func() error {
if atomic.LoadUint32(&e.acquired) == 0 {
return lock.ErrLockNotAcquired
}
return e.mutex.Unlock(e.client.Ctx())
})
}
61 changes: 61 additions & 0 deletions pkg/storage/etcd/lock_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package etcd

import (
"context"
"crypto/tls"
"fmt"

"github.com/rancher/opni/pkg/config/v1beta1"
"github.com/rancher/opni/pkg/logger"
"github.com/rancher/opni/pkg/storage"
"github.com/rancher/opni/pkg/storage/lock"
"github.com/rancher/opni/pkg/util"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

type EtcdLockManager struct {
lg *zap.SugaredLogger
options EtcdStoreOptions
client *clientv3.Client
}

func NewEtcdLockManager(ctx context.Context, conf *v1beta1.EtcdStorageSpec, opts ...EtcdStoreOption) (*EtcdLockManager, error) {
options := EtcdStoreOptions{}
options.apply(opts...)
lg := logger.New(logger.WithLogLevel(zap.WarnLevel)).Named("etcd-locker")
var tlsConfig *tls.Config
if conf.Certs != nil {
var err error
tlsConfig, err = util.LoadClientMTLSConfig(conf.Certs)
if err != nil {
return nil, fmt.Errorf("failed to load client TLS config: %w", err)
}
}
clientConfig := clientv3.Config{
Endpoints: conf.Endpoints,
TLS: tlsConfig,
Context: ctx,
Logger: lg.Desugar(),
}
etcdClient, err := clientv3.New(clientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
lg.With(
"endpoints", clientConfig.Endpoints,
).Info("connecting to etcd")
return &EtcdLockManager{
options: options,
lg: lg,
client: etcdClient,
}, nil
}

var _ storage.LockManager = (*EtcdLockManager)(nil)

func (e *EtcdLockManager) Locker(key string, opts ...lock.LockOption) storage.Lock {
options := lock.DefaultLockOptions(e.client.Ctx())
options.Apply(opts...)
return NewEtcdLock(e.client, key, options)
}
36 changes: 36 additions & 0 deletions pkg/storage/inmemory/inmemory_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package inmemory_test

import (
"context"
"fmt"
"testing"

"github.com/google/uuid"
"github.com/rancher/opni/pkg/storage/inmemory"
. "github.com/rancher/opni/pkg/test/conformance/storage"
_ "github.com/rancher/opni/pkg/test/setup"
"github.com/rancher/opni/pkg/util/future"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var lmF = future.New[*inmemory.InMemoryLockManager]()

func TestInmemory(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Inmemory Suite")
}

type testBroker struct{}

func (t testBroker) LockManager(path string) *inmemory.InMemoryLockManager {
return inmemory.NewLockManager(context.TODO(), path)
}

func init() {
t := testBroker{}
lmF.Set(t.LockManager(fmt.Sprintf("/tmp/lock/opni-test-%s", uuid.New().String())))
}

var _ = Describe("In-memory Lock Manager", Ordered, Label("integration"), LockManagerTestSuite(lmF))
Loading