Skip to content

Commit

Permalink
Add Etcd support for Import Tasks.
Browse files Browse the repository at this point in the history
issue: milvus-io#15604
Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
  • Loading branch information
soothing-rain committed Mar 24, 2022
1 parent 1c4b949 commit 5b0440d
Show file tree
Hide file tree
Showing 14 changed files with 1,033 additions and 320 deletions.
11 changes: 11 additions & 0 deletions internal/kv/etcd/embed_etcd_kv.go
Expand Up @@ -333,13 +333,24 @@ func (kv *EmbedEtcdKV) SaveBytes(key string, value []byte) error {

// SaveWithLease is a function to put value in etcd with etcd lease options.
func (kv *EmbedEtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error {
log.Debug("Embedded Etcd saving with lease", zap.String("etcd_key", key))
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Put(ctx, key, value, clientv3.WithLease(id))
return err
}

// SaveWithIgnoreLease updates the key without changing its current lease.
func (kv *EmbedEtcdKV) SaveWithIgnoreLease(key, value string) error {
log.Debug("Embedded Etcd saving with ignore lease", zap.String("etcd_key", key))
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Put(ctx, key, value, clientv3.WithIgnoreLease())
return err
}

// SaveBytesWithLease is a function to put value in etcd with etcd lease options.
func (kv *EmbedEtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error {
key = path.Join(kv.rootPath, key)
Expand Down
45 changes: 29 additions & 16 deletions internal/kv/etcd/embed_etcd_kv_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"os"
"testing"
"time"

"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
Expand Down Expand Up @@ -898,6 +899,10 @@ func TestEmbedEtcd(te *testing.T) {
}

for k, v := range tests {
// SaveWithIgnoreLease must be used when the key already exists.
err = metaKv.SaveWithIgnoreLease(k, v)
assert.Error(t, err)

err = metaKv.SaveWithLease(k, v, leaseID)
assert.NoError(t, err)

Expand All @@ -907,33 +912,41 @@ func TestEmbedEtcd(te *testing.T) {

})

te.Run("Etcd Lease Bytes", func(t *testing.T) {
rootPath := "/etcd/test/root/lease_bytes"
_metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV)
te.Run("Etcd Lease Ignore", func(t *testing.T) {
rootPath := "/etcd/test/root/lease_ignore"
metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
assert.Nil(t, err)

defer metaKv.Close()
defer metaKv.RemoveWithPrefix("")

leaseID, err := metaKv.Grant(10)
assert.NoError(t, err)

metaKv.KeepAlive(leaseID)

tests := map[string][]byte{
"a/b": []byte("v1"),
"a/b/c": []byte("v2"),
"x": []byte("v3"),
tests := map[string]string{
"a/b": "v1",
"a/b/c": "v2",
"x": "v3",
}

for k, v := range tests {
err = metaKv.SaveBytesWithLease(k, v, leaseID)
leaseID, err := metaKv.Grant(1)
assert.NoError(t, err)

err = metaKv.SaveWithLease(k, v, leaseID)
assert.NoError(t, err)

err = metaKv.SaveBytesWithLease(k, v, clientv3.LeaseID(999))
err = metaKv.SaveWithIgnoreLease(k, "updated_"+v)
assert.NoError(t, err)

// Record should be updated correctly.
value, err := metaKv.Load(k)
assert.NoError(t, err)
assert.Equal(t, "updated_"+v, value)

// Let the lease expire. 3 seconds should be pretty safe.
time.Sleep(3 * time.Second)

// Updated record should still expire with lease.
_, err = metaKv.Load(k)
assert.Error(t, err)
}

})
}
13 changes: 13 additions & 0 deletions internal/kv/etcd/etcd_kv.go
Expand Up @@ -317,6 +317,7 @@ func (kv *EtcdKV) SaveBytes(key string, value []byte) error {

// SaveWithLease is a function to put value in etcd with etcd lease options.
func (kv *EtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error {
log.Debug("Etcd saving with lease", zap.String("etcd_key", key))
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
Expand All @@ -326,6 +327,18 @@ func (kv *EtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error {
return err
}

// SaveWithIgnoreLease updates the key without changing its current lease. Must be used when key already exists.
func (kv *EtcdKV) SaveWithIgnoreLease(key, value string) error {
log.Debug("Etcd saving with ignore lease", zap.String("etcd_key", key))
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Put(ctx, key, value, clientv3.WithIgnoreLease())
CheckElapseAndWarn(start, "Slow etcd operation save with lease")
return err
}

// SaveBytesWithLease is a function to put value in etcd with etcd lease options.
func (kv *EtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error {
start := time.Now()
Expand Down
41 changes: 40 additions & 1 deletion internal/kv/etcd/etcd_kv_test.go
Expand Up @@ -816,13 +816,53 @@ func TestEtcdKV_Load(te *testing.T) {
}

for k, v := range tests {
// SaveWithIgnoreLease must be used when the key already exists.
err = etcdKV.SaveWithIgnoreLease(k, v)
assert.Error(t, err)

err = etcdKV.SaveWithLease(k, v, leaseID)
assert.NoError(t, err)

err = etcdKV.SaveWithLease(k, v, clientv3.LeaseID(999))
assert.Error(t, err)
}
})

te.Run("Etcd Lease Ignore", func(t *testing.T) {
rootPath := "/etcd/test/root/lease_ignore"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)

defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")

tests := map[string]string{
"a/b": "v1",
"a/b/c": "v2",
"x": "v3",
}

for k, v := range tests {
leaseID, err := etcdKV.Grant(1)
assert.NoError(t, err)

err = etcdKV.SaveWithLease(k, v, leaseID)
assert.NoError(t, err)

err = etcdKV.SaveWithIgnoreLease(k, "updated_"+v)
assert.NoError(t, err)

// Record should be updated correctly.
value, err := etcdKV.Load(k)
assert.NoError(t, err)
assert.Equal(t, "updated_"+v, value)

// Let the lease expire. 3 seconds should be pretty safe.
time.Sleep(3 * time.Second)

// Updated record should still expire with lease.
_, err = etcdKV.Load(k)
assert.Error(t, err)
}
})

te.Run("Etcd Lease Bytes", func(t *testing.T) {
Expand Down Expand Up @@ -850,7 +890,6 @@ func TestEtcdKV_Load(te *testing.T) {
err = etcdKV.SaveBytesWithLease(k, v, clientv3.LeaseID(999))
assert.Error(t, err)
}

})
}

Expand Down
1 change: 1 addition & 0 deletions internal/kv/kv.go
Expand Up @@ -76,6 +76,7 @@ type MetaKv interface {
WatchWithPrefix(key string) clientv3.WatchChan
WatchWithRevision(key string, revision int64) clientv3.WatchChan
SaveWithLease(key, value string, id clientv3.LeaseID) error
SaveWithIgnoreLease(key, value string) error
Grant(ttl int64) (id clientv3.LeaseID, err error)
KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error)
CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error
Expand Down
152 changes: 152 additions & 0 deletions internal/kv/mock_kv.go
@@ -0,0 +1,152 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"strings"

"github.com/milvus-io/milvus/internal/log"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

type mockBaseKV struct {
InMemKv map[string]string
}

func (m *mockBaseKV) Load(key string) (string, error) {
if val, ok := m.InMemKv[key]; ok {
return val, nil
}
return "", nil
}

func (m *mockBaseKV) MultiLoad(keys []string) ([]string, error) {
panic("not implemented") // TODO: Implement
}

func (m *mockBaseKV) LoadWithPrefix(key string) ([]string, []string, error) {
panic("not implemented") // TODO: Implement
}

func (m *mockBaseKV) Save(key string, value string) error {
panic("not implemented") // TODO: Implement
}

func (m *mockBaseKV) MultiSave(kvs map[string]string) error {
panic("not implemented") // TODO: Implement
}

func (m *mockBaseKV) Remove(key string) error {
panic("not implemented") // TODO: Implement
}

func (m *mockBaseKV) MultiRemove(keys []string) error {
panic("not implemented") // TODO: Implement
}

func (m *mockBaseKV) RemoveWithPrefix(key string) error {
panic("not implemented") // TODO: Implement
}

func (m *mockBaseKV) Close() {
panic("not implemented") // TODO: Implement
}

type mockTxnKV struct {
mockBaseKV
}

func (m *mockTxnKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
panic("not implemented") // TODO: Implement
}

func (m *mockTxnKV) MultiRemoveWithPrefix(keys []string) error {
panic("not implemented") // TODO: Implement
}

func (m *mockTxnKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
panic("not implemented") // TODO: Implement
}

type MockMetaKV struct {
mockTxnKV
}

func (m *MockMetaKV) GetPath(key string) string {
panic("not implemented") // TODO: Implement
}

func (m *MockMetaKV) LoadWithPrefix(prefix string) ([]string, []string, error) {
keys := make([]string, 0, len(m.InMemKv))
values := make([]string, 0, len(m.InMemKv))
for k, v := range m.InMemKv {
if strings.HasPrefix(k, prefix) {
keys = append(keys, k)
values = append(values, v)
}
}
return keys, values, nil
}

func (m *MockMetaKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) {
panic("not implemented") // TODO: Implement
}

func (m *MockMetaKV) LoadWithRevision(key string) ([]string, []string, int64, error) {
panic("not implemented") // TODO: Implement
}

func (m *MockMetaKV) Watch(key string) clientv3.WatchChan {
panic("not implemented") // TODO: Implement
}

func (m *MockMetaKV) WatchWithPrefix(key string) clientv3.WatchChan {
panic("not implemented") // TODO: Implement
}

func (m *MockMetaKV) WatchWithRevision(key string, revision int64) clientv3.WatchChan {
panic("not implemented") // TODO: Implement
}

func (m *MockMetaKV) SaveWithLease(key, value string, id clientv3.LeaseID) error {
m.InMemKv[key] = value
log.Debug("Doing SaveWithLease", zap.String("key", key))
return nil
}

func (m *MockMetaKV) SaveWithIgnoreLease(key, value string) error {
m.InMemKv[key] = value
log.Debug("Doing SaveWithIgnoreLease", zap.String("key", key))
return nil
}

func (m *MockMetaKV) Grant(ttl int64) (id clientv3.LeaseID, err error) {
return 1, nil
}

func (m *MockMetaKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
panic("not implemented") // TODO: Implement
}

func (m *MockMetaKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error {
panic("not implemented") // TODO: Implement
}

func (m *MockMetaKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error {
panic("not implemented") // TODO: Implement
}

0 comments on commit 5b0440d

Please sign in to comment.