Skip to content

Commit

Permalink
refactor by batch ops
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Nov 16, 2018
1 parent 02a48b5 commit 0948212
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 120 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
18.9.13
18.11.12
52 changes: 12 additions & 40 deletions store/etcdv3/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,14 @@ func (m *Mercury) AddContainer(ctx context.Context, container *types.Container)
if err != nil {
return err
}
data := string(bytes)
containerData := string(bytes)

// store container info
key := fmt.Sprintf(containerInfoKey, container.ID)
_, err = m.Create(ctx, key, data)
if err != nil {
return err
}

// store container finished so clean if err is not nil
defer func() {
if err != nil {
m.CleanContainerData(context.Background(), container.ID, appname, entrypoint, container.Nodename)
}
}()

// store deploy status
key = filepath.Join(containerDeployPrefix, appname, entrypoint, container.Nodename, container.ID)
_, err = m.Create(ctx, key, "")
if err != nil {
return err
data := map[string]string{
fmt.Sprintf(containerInfoKey, container.ID): containerData,
fmt.Sprintf(nodeContainersKey, container.Nodename, container.ID): containerData,
filepath.Join(containerDeployPrefix, appname, entrypoint, container.Nodename, container.ID): "",
}

// store node-container data
key = fmt.Sprintf(nodeContainersKey, container.Nodename, container.ID)
_, err = m.Create(ctx, key, data)
_, err = m.BatchCreate(ctx, data)
return err
}

Expand All @@ -79,23 +61,13 @@ func (m *Mercury) RemoveContainer(ctx context.Context, container *types.Containe

// CleanContainerData clean container data
func (m *Mercury) CleanContainerData(ctx context.Context, ID, appname, entrypoint, nodename string) error {
key := fmt.Sprintf(containerInfoKey, ID)
if _, err := m.Delete(ctx, key); err != nil {
return err
}

// remove deploy status by core
key = filepath.Join(containerDeployPrefix, appname, entrypoint, nodename, ID)
if _, err := m.Delete(ctx, key); err != nil {
return err
keys := []string{
fmt.Sprintf(containerInfoKey, ID),
filepath.Join(containerDeployPrefix, appname, entrypoint, nodename, ID),
fmt.Sprintf(nodeContainersKey, nodename, ID),
}

// remove node-containers data
key = fmt.Sprintf(nodeContainersKey, nodename, ID)
if _, err := m.Delete(ctx, key); err != nil {
return err
}
return nil
_, err := m.BatchDelete(ctx, keys)
return err
}

// GetContainer get a container
Expand Down
31 changes: 0 additions & 31 deletions store/etcdv3/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package etcdv3

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"strings"

Expand Down Expand Up @@ -45,32 +43,3 @@ func (m *Mercury) doGetDeployStatus(ctx context.Context, resp *clientv3.GetRespo

return setCount(nodesCount, nodesInfo), nil
}

// Obsolete, for old eru, container info not store by container meta, so we have to generate it
func (m *Mercury) doMakeDeployStatus(ctx context.Context, opts *types.DeployOptions, nodesInfo []types.NodeInfo) ([]types.NodeInfo, error) {
resp, err := m.Get(ctx, fmt.Sprintf(containerInfoKey, ""), clientv3.WithPrefix())
if err != nil {
return nodesInfo, err
}

prefix := fmt.Sprintf("%s_%s", opts.Name, opts.Entrypoint.Name)
container := &types.Container{}
nodesCount := map[string]int{}
for _, ev := range resp.Kvs {
if err := json.Unmarshal(ev.Value, &container); err != nil {
return nodesInfo, err
}
if container.Podname != opts.Podname {
continue
}
if !strings.HasPrefix(container.Name, prefix) {
continue
}
if _, ok := nodesCount[container.Nodename]; !ok {
nodesCount[container.Nodename] = 1
continue
}
nodesCount[container.Nodename]++
}
return setCount(nodesCount, nodesInfo), nil
}
60 changes: 50 additions & 10 deletions store/etcdv3/mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,52 @@ func (m *Mercury) Delete(ctx context.Context, key string, opts ...clientv3.OpOpt
return m.cliv3.Delete(ctx, m.parseKey(key), opts...)
}

// BatchDelete batch delete keys
func (m *Mercury) BatchDelete(ctx context.Context, keys []string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
txn := m.cliv3.Txn(ctx)
ops := []clientv3.Op{}
for _, key := range keys {
op := clientv3.OpDelete(m.parseKey(key), opts...)
ops = append(ops, op)
}
return txn.Then(ops...).Commit()
}

// Put save a key value
func (m *Mercury) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
return m.cliv3.Put(ctx, m.parseKey(key), val, opts...)
}

func (m *Mercury) batchPut(ctx context.Context, data map[string]string, limit map[string]map[int]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
txn := m.cliv3.Txn(ctx)
ops := []clientv3.Op{}
conds := []clientv3.Cmp{}
for key, val := range data {
key = m.parseKey(key)
op := clientv3.OpPut(key, val, opts...)
ops = append(ops, op)
if v, ok := limit[key]; ok {
for rev, condition := range v {
cond := clientv3.Compare(clientv3.Version(key), condition, rev)
conds = append(conds, cond)
}
}
}
return txn.If(conds...).Then(ops...).Commit()
}

// Create create a key if not exists
func (m *Mercury) Create(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
resp, err := m.revPut(ctx, key, val, "=", 0, opts...)
return m.BatchCreate(ctx, map[string]string{key: val}, opts...)
}

// BatchCreate create key values if not exists
func (m *Mercury) BatchCreate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
limit := map[string]map[int]string{}
for key := range data {
limit[key] = map[int]string{0: "="}
}
resp, err := m.batchPut(ctx, data, limit, opts...)
if err != nil {
return nil, err
}
Expand All @@ -100,12 +138,21 @@ func (m *Mercury) Create(ctx context.Context, key, val string, opts ...clientv3.

// Update update a key if exists
func (m *Mercury) Update(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
resp, err := m.revPut(ctx, key, val, "!=", 0, opts...)
return m.BatchUpdate(ctx, map[string]string{key: val}, opts...)
}

// BatchUpdate update keys if not exists
func (m *Mercury) BatchUpdate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
limit := map[string]map[int]string{}
for key := range data {
limit[key] = map[int]string{0: "!="}
}
resp, err := m.batchPut(ctx, data, limit, opts...)
if err != nil {
return nil, err
}
if !resp.Succeeded {
return nil, types.ErrKeyNotExists
return nil, types.ErrKeyExists
}
return resp, nil
}
Expand All @@ -116,13 +163,6 @@ func (m *Mercury) Watch(ctx context.Context, key string, opts ...clientv3.OpOpti
return m.cliv3.Watch(ctx, key, opts...)
}

func (m *Mercury) revPut(ctx context.Context, key, val, condition string, rev int, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
key = m.parseKey(key)
req := clientv3.OpPut(key, val, opts...)
cond := clientv3.Compare(clientv3.Version(key), condition, rev)
return m.cliv3.Txn(ctx).If(cond).Then(req).Commit()
}

func (m *Mercury) parseKey(key string) string {
key = filepath.Join(m.config.Etcd.Prefix, key)
log.Debugf("[parseKey] ops on %s", key)
Expand Down
58 changes: 20 additions & 38 deletions store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,26 @@ func (m *Mercury) AddNode(ctx context.Context, name, endpoint, podname, ca, cert
podname, name))
}

// 如果有tls的证书需要保存就保存一下
if ca != "" && cert != "" && key != "" {
if _, err = m.Put(ctx, fmt.Sprintf(nodeCaKey, name), ca); err != nil {
return nil, err
}

if _, err = m.Put(ctx, fmt.Sprintf(nodeCertKey, name), cert); err != nil {
return nil, err
}

if _, err = m.Put(ctx, fmt.Sprintf(nodeKeyKey, name), key); err != nil {
return nil, err
}
}

// 尝试加载docker的客户端
engine, err := m.makeDockerClient(ctx, podname, name, endpoint, true)
if err != nil {
m.deleteNode(ctx, podname, name, endpoint)
return nil, err
}

// 判断这货是不是活着的
info, err := engine.Info(ctx)
if err != nil {
m.deleteNode(ctx, podname, name, endpoint)
return nil, err
}

data := map[string]string{}
// 如果有tls的证书需要保存就保存一下
if ca != "" && cert != "" && key != "" {
data[fmt.Sprintf(nodeCaKey, name)] = ca
data[fmt.Sprintf(nodeCertKey, name)] = cert
data[fmt.Sprintf(nodeKeyKey, name)] = key
}

ncpu := cpu
memcap := memory
if cpu == 0 {
Expand Down Expand Up @@ -96,21 +87,14 @@ func (m *Mercury) AddNode(ctx context.Context, name, endpoint, podname, ca, cert

bytes, err := json.Marshal(node)
if err != nil {
m.deleteNode(ctx, podname, name, endpoint)
return nil, err
}

nodeKey := fmt.Sprintf(nodeInfoKey, podname, name)
podKey := fmt.Sprintf(nodePodKey, name)
_, err = m.Create(ctx, nodeKey, string(bytes))
if err != nil {
m.deleteNode(ctx, podname, name, endpoint)
return nil, err
}
data[fmt.Sprintf(nodeInfoKey, podname, name)] = string(bytes)
data[fmt.Sprintf(nodePodKey, name)] = podname

_, err = m.Create(ctx, podKey, podname)
_, err = m.BatchCreate(ctx, data)
if err != nil {
m.deleteNode(ctx, podname, name, endpoint)
return nil, err
}

Expand All @@ -131,17 +115,15 @@ func (m *Mercury) DeleteNode(ctx context.Context, node *types.Node) {
// 所以需要删除这些留存的证书
// 至于结果是不是成功就无所谓了
func (m *Mercury) deleteNode(ctx context.Context, podname, nodename, endpoint string) {
nodeInfo := fmt.Sprintf(nodeInfoKey, podname, nodename)
nodePod := fmt.Sprintf(nodePodKey, nodename)
ca := fmt.Sprintf(nodeCaKey, nodename)
cert := fmt.Sprintf(nodeCertKey, nodename)
key := fmt.Sprintf(nodeKeyKey, nodename)

m.Delete(ctx, nodeInfo)
m.Delete(ctx, nodePod)
m.Delete(ctx, ca)
m.Delete(ctx, cert)
m.Delete(ctx, key)
keys := []string{
fmt.Sprintf(nodeInfoKey, podname, nodename),
fmt.Sprintf(nodePodKey, nodename),
fmt.Sprintf(nodeCaKey, nodename),
fmt.Sprintf(nodeCertKey, nodename),
fmt.Sprintf(nodeKeyKey, nodename),
}

m.BatchDelete(ctx, keys)

if strings.HasPrefix(endpoint, nodeTCPPrefixKey) {
host, err := types.GetEndpointHost(endpoint)
Expand Down

0 comments on commit 0948212

Please sign in to comment.