Skip to content

Commit

Permalink
Fix locking operations
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Roytman <roytman@il.ibm.com>
  • Loading branch information
roytman committed Oct 21, 2021
1 parent a777277 commit fd7c74f
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 131 deletions.
1 change: 1 addition & 0 deletions pkg/libovsdb/notation.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Operation struct {
UUID *UUID `json:"uuid,omitempty"`
Comment *string `json:"comment,omitempty"`
Durable *bool `json:"durable,omitempty"`
Lock *string `json:"lock,omitempty"`
}

// String, serialize Transact
Expand Down
46 changes: 0 additions & 46 deletions pkg/ovsdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
)

type Databaser interface {
GetLock(ctx context.Context, id string) (Locker, error)
CreateMonitor(dbName string, handler *Handler, log logr.Logger) *dbMonitor
AddSchema(schemaFile string) error
GetKeyData(key common.Key, keysOnly bool) (*clientv3.GetResponse, error)
Expand Down Expand Up @@ -57,35 +56,6 @@ type DatabaseEtcd struct {
dbName string
}

type Locker interface {
tryLock() error
lock() error
unlock() error
cancel()
}

type lock struct {
mutex *concurrency.Mutex
myCancel context.CancelFunc
ctx context.Context
}

func (l *lock) tryLock() error {
return l.mutex.TryLock(l.ctx)
}

func (l *lock) lock() error {
return l.mutex.Lock(l.ctx)
}

func (l *lock) unlock() error {
return l.mutex.Unlock(l.ctx)
}

func (l *lock) cancel() {
l.myCancel()
}

var EtcdClientTimeout = time.Second

func NewEtcdClient(ctx context.Context, endpoints []string, keepAliveTime, keepAliveTimeout time.Duration) (*clientv3.Client, error) {
Expand Down Expand Up @@ -117,18 +87,6 @@ func NewDatabaseEtcd(cli *clientv3.Client, model string, log logr.Logger) (Datab
schemas: libovsdb.Schemas{}, strSchemas: map[string]map[string]interface{}{}, serverID: uuid.NewString()}, nil
}

func (con *DatabaseEtcd) GetLock(ctx context.Context, id string) (Locker, error) {
ctxt, cancel := context.WithCancel(ctx)
session, err := concurrency.NewSession(con.cli, concurrency.WithContext(ctxt))
if err != nil {
cancel()
return nil, err
}
key := common.NewLockKey(id)
mutex := concurrency.NewMutex(session, key.String())
return &lock{mutex: mutex, myCancel: cancel, ctx: ctxt}, nil
}

func (con *DatabaseEtcd) AddSchema(schemaFile string) error {
data, err := common.ReadFile(schemaFile)
if err != nil {
Expand Down Expand Up @@ -399,10 +357,6 @@ func NewDatabaseMock() (Databaser, error) {
return &DatabaseMock{ServerID: uuid.NewString()}, nil
}

func (con *DatabaseMock) GetLock(ctx context.Context, id string) (Locker, error) {
return &LockerMock{}, nil
}

func (con *DatabaseMock) StartLeaderElection() {
return
}
Expand Down
16 changes: 1 addition & 15 deletions pkg/ovsdb/database_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package ovsdb

import (
"context"
"fmt"
"github.com/ibm/ovsdb-etcd/pkg/libovsdb"
"path"
"testing"
"time"
Expand All @@ -12,21 +10,9 @@ import (
etcdClient "go.etcd.io/etcd/client/v3"

"github.com/ibm/ovsdb-etcd/pkg/common"
"github.com/ibm/ovsdb-etcd/pkg/libovsdb"
)

func TestMockLock(t *testing.T) {
expectedResponse := &LockerMock{}
var expectedError error
mock := DatabaseMock{
Response: expectedResponse,
Error: expectedError,
}
context.Background()
actualResponse, actualError := mock.GetLock(context.Background(), "id")
assert.Equal(t, expectedResponse, actualResponse)
assert.Equal(t, expectedError, actualError)
}

func TestMockAddSchema(t *testing.T) {
var expectedError error
mock := DatabaseMock{
Expand Down
122 changes: 60 additions & 62 deletions pkg/ovsdb/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/ibm/ovsdb-etcd/pkg/ovsjson"
)

const locked = "locked"

type JrpcServer interface {
Wait() error
Stop()
Expand All @@ -45,7 +47,7 @@ type Handler struct {
// json-value string to handler monitor related data
handlerMonitorData map[string]handlerMonitorData

databaseLocks sync.Map
dbLocks databaseLocks
}

func (ch *Handler) Transact(ctx context.Context, params []interface{}) (interface{}, error) {
Expand Down Expand Up @@ -76,7 +78,7 @@ func (ch *Handler) Transact(ctx context.Context, params []interface{}) (interfac
log.V(1).Info("Database cache is not created", "dbName", ovsReq.DBName)
return nil, err
}
txn, err := NewTransaction(ctx, ch.etcdClient, ovsReq, dbCache, schema, log)
txn, err := NewTransaction(ctx, ch.etcdClient, ovsReq, dbCache, schema, &ch.dbLocks, log)
if err != nil {
return nil, errors.New(ErrInternalError)
}
Expand Down Expand Up @@ -137,49 +139,23 @@ func (ch *Handler) Lock(ctx context.Context, param interface{}) (interface{}, er
ch.log.Error(err, "Lock, param parsing", "param", param)
return nil, err
}
var myLock Locker
locI, ok := ch.databaseLocks.Load(id)
if ok {
myLock, ok = locI.(Locker)
if !ok {
err = fmt.Errorf("cannot transform to Logger, %T", locI)
ch.log.Error(err, "")
return nil, err
}
} else {
myLock, err = ch.db.GetLock(ch.handlerContext, id)
if err != nil {
ch.log.Error(err, "lock failed", "lockID", id)
return nil, err
}
ch.log.V(6).Info("new etcd lock created", "lockID", id)
// validate that no other locks
otherLock, ok := ch.databaseLocks.Load(id)
if !ok {
ch.databaseLocks.Store(id, myLock)
} else {
ch.log.V(6).Info("there is another lock in the local map", "lockID", id)
// What should we do ?
myLock.cancel()
myLock, ok = otherLock.(Locker)
if !ok {
err = fmt.Errorf("cannot transform to Logger, %T", otherLock)
ch.log.Error(err, "")
return nil, err
}
}
myLock, err := ch.createGetLocker(id)
if err != nil {
return nil, err
}
err = myLock.tryLock()
ch.log.V(5).Info("===> tryLock return", "err", fmt.Sprintf("%v", err))
if err == nil {
ch.log.V(5).Info("lock request returned - \"locked: true\"", "lockID", id)
return map[string]bool{"locked": true}, nil
return map[string]bool{locked: true}, nil
} else if err != concurrency.ErrLocked {
ch.log.Error(err, "lock failed", "lockID", id)
// TOD is it correct?
// TODO is it correct?
return nil, err
}
go func() {
err = myLock.lock()
ch.log.V(6).Info("myLock.lock returned", "err", fmt.Sprintf("%v", err))
if err == nil {
// Send notification
ch.log.V(5).Info("lock succeeded", "lockID", id)
Expand All @@ -192,7 +168,7 @@ func (ch *Handler) Lock(ctx context.Context, param interface{}) (interface{}, er
}
}()
ch.log.V(5).Info("lock request returned - \"locked: false\"", "lockID", id)
return map[string]bool{"locked": false}, nil
return map[string]bool{locked: false}, nil
}

func (ch *Handler) Unlock(ctx context.Context, param interface{}) (interface{}, error) {
Expand All @@ -201,18 +177,10 @@ func (ch *Handler) Unlock(ctx context.Context, param interface{}) (interface{},
if err != nil {
return ovsjson.EmptyStruct{}, err
}
iLock, ok := ch.databaseLocks.LoadAndDelete(id)
if !ok {
ch.log.V(4).Info("unlock: can't find lock", "lockID", id)
return ovsjson.EmptyStruct{}, nil
}
myLock, ok := iLock.(Locker)
if !ok {
err = fmt.Errorf("cannot transform to Logger, %T", iLock)
ch.log.Error(err, "")
err = ch.dbLocks.unlock(id)
if err != nil {
return nil, err
}
myLock.cancel()
return ovsjson.EmptyStruct{}, nil
}

Expand Down Expand Up @@ -327,7 +295,7 @@ func NewHandler(ctx context.Context, db Databaser, cli *clientv3.Client, log log
return &Handler{
handlerContext: ctx,
db: db,
databaseLocks: sync.Map{},
dbLocks: databaseLocks{},
handlerMonitorData: map[string]handlerMonitorData{},
etcdClient: cli,
monitors: map[string]*dbMonitor{},
Expand All @@ -336,22 +304,9 @@ func NewHandler(ctx context.Context, db Databaser, cli *clientv3.Client, log log
}

func (ch *Handler) Cleanup() {
ch.log.V(5).Info("CLEAN UP do something")
ch.log.V(5).Info("CLEAN UP")
ch.closed = true
ch.databaseLocks.Range(func(key, value interface{}) bool {
mLock, ok := value.(Locker)
if !ok {
err := fmt.Errorf("cannot transform to Logger, value type %T, key %s", value, key)
ch.log.Error(err, "")
}
if err := mLock.unlock(); err != nil {
ch.log.Error(err, "cannot unlock")
// TODO: should we do something else?
}
ch.databaseLocks.Delete(key)
return true
})

ch.dbLocks.cleanup(ch.log)
for _, monitor := range ch.monitors {
monitor.cancelDbMonitor()
}
Expand Down Expand Up @@ -583,6 +538,49 @@ func (ch *Handler) GetClientAddress() string {
return ""
}

func (ch *Handler) newLocker(ctx context.Context, id string) (*locker, error) {
ctxt, cancel := context.WithCancel(ctx)
session, err := concurrency.NewSession(ch.etcdClient, concurrency.WithContext(ctxt), concurrency.WithTTL(5))
if err != nil {
cancel()
return nil, err
}
key := common.NewLockKey(id)
mutex := concurrency.NewMutex(session, key.String())
return &locker{mutex: mutex, myCancel: cancel, ctx: ctxt}, nil
}

func (ch *Handler) createGetLocker(id string) (*locker, error) {
myLock, err := ch.dbLocks.getLocker(id)
if err != nil {
ch.log.Error(err, "getLocker returned")
return nil, err
}
if myLock == nil {
myLock, err = ch.newLocker(ch.handlerContext, id)
if err != nil {
ch.log.Error(err, "lock failed", "lockID", id)
return nil, err
}
ch.log.V(6).Info("new etcd locker created", "lockID", id)
// validate that no other locks
otherLock, ok := ch.dbLocks.LoadOrStore(id, myLock)
if ok {
// there is another locker with the same id from the same client, very unusual
ch.log.V(5).Info("Duplicated locker", "id", id)
myLock.cancel()
l, ok := otherLock.(*locker)
if !ok {
err = fmt.Errorf("cannot transform to locker, %T", otherLock)
ch.log.Error(err, "")
return nil, err
}
myLock = l
}
}
return myLock, nil
}

func parseCondMonitorParameters(params []interface{}) (*ovsjson.CondMonitorParameters, error) {
l := len(params)
if l < 2 || l > 4 {
Expand Down
Loading

0 comments on commit fd7c74f

Please sign in to comment.