From 52e683416b7a81ea05b3e24236eaf9cdf766d22c Mon Sep 17 00:00:00 2001 From: Mikhail Podtserkovskiy Date: Tue, 17 Apr 2018 16:17:06 +0300 Subject: [PATCH 1/8] QA-7710: refactoring --- handlers/registerNode.go | 4 +++- pool/node.go | 3 +++ pool/pool.go | 4 ++-- pool/pool_test.go | 18 +++++++-------- pool/strategy/kubernetes/factory.go | 2 +- pool/strategy/kubernetes/provider.go | 22 +++++++++++++------ pool/strategy/kubernetes/strategy.go | 22 ++++++++++--------- pool/strategy/kubernetes/strategy_test.go | 12 +++++----- pool/strategyList_test.go | 4 ++-- .../mysql/20180417152820-add_unique_key.sql | 6 +++++ storage/mysql/storage.go | 9 +++++--- storage/tests/comon_test.go | 4 ++++ 12 files changed, 69 insertions(+), 41 deletions(-) create mode 100644 storage/migrations/mysql/20180417152820-add_unique_key.sql diff --git a/handlers/registerNode.go b/handlers/registerNode.go index 6c60ce6..c889aeb 100644 --- a/handlers/registerNode.go +++ b/handlers/registerNode.go @@ -50,9 +50,11 @@ func (h *RegisterNode) ServeHTTP(rw http.ResponseWriter, r *http.Request) { for i, value := range capabilitiesList { poolCapabilitiesList[i] = capabilities.Capabilities(value) } + hostPort := register.Configuration.Host + ":" + strconv.Itoa(register.Configuration.Port) err = h.Pool.Add( + hostPort, pool.NodeTypePersistent, - register.Configuration.Host+":"+strconv.Itoa(register.Configuration.Port), + hostPort, poolCapabilitiesList, ) if err != nil { diff --git a/pool/node.go b/pool/node.go index d3f4db5..2a6cb2a 100644 --- a/pool/node.go +++ b/pool/node.go @@ -18,6 +18,7 @@ const ( ) type Node struct { + Key string Type NodeType Address string Status NodeStatus @@ -32,6 +33,7 @@ func (n *Node) String() string { } func NewNode( + key string, t NodeType, address string, status NodeStatus, @@ -41,6 +43,7 @@ func NewNode( capabilitiesList []capabilities.Capabilities, ) *Node { return &Node{ + key, t, address, status, diff --git a/pool/pool.go b/pool/pool.go index a6f26d3..a567464 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -68,12 +68,12 @@ func (p *Pool) ReserveAvailableNode(caps capabilities.Capabilities) (*Node, erro return &node, err } -func (p *Pool) Add(t NodeType, address string, capabilitiesList []capabilities.Capabilities) error { +func (p *Pool) Add(key string, t NodeType, address string, capabilitiesList []capabilities.Capabilities) error { if len(capabilitiesList) == 0 { return errors.New("[Pool/Add] Capabilities must contains more one element") } ts := time.Now().Unix() - return p.storage.Add(*NewNode(t, address, NodeStatusAvailable, "", ts, ts, capabilitiesList), 0) + return p.storage.Add(*NewNode(key, t, address, NodeStatusAvailable, "", ts, ts, capabilitiesList), 0) } func (p *Pool) RegisterSession(node *Node, sessionID string) error { diff --git a/pool/pool_test.go b/pool/pool_test.go index 34d880c..8a5aedc 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -47,7 +47,7 @@ func TestPool_Add_Positive(t *testing.T) { p := NewPool(s, new(StrategyListMock)) eAddress := "127.0.0.1" eNodeType := NodeTypePersistent - err := p.Add(eNodeType, eAddress, []capabilities.Capabilities{{"browserName": "ololo"}}) + err := p.Add("1234", eNodeType, eAddress, []capabilities.Capabilities{{"browserName": "ololo"}}) a.Nil(err) } @@ -59,7 +59,7 @@ func TestPool_Add_Negative(t *testing.T) { p := NewPool(s, new(StrategyListMock)) eAddress := "127.0.0.1" eNodeType := NodeTypePersistent - err := p.Add(eNodeType, eAddress, []capabilities.Capabilities{}) + err := p.Add("1234", eNodeType, eAddress, []capabilities.Capabilities{}) a.Error(err) } @@ -225,7 +225,7 @@ func TestPool_fixNodeStatus_Positive_BusyExpired(t *testing.T) { slm := new(StrategyListMock) slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil) p := NewPool(new(StorageMock), slm) - node := NewNode(NodeTypePersistent, "", NodeStatusBusy, "", 0, 0, []capabilities.Capabilities{}) + node := NewNode("123", NodeTypePersistent, "", NodeStatusBusy, "", 0, 0, []capabilities.Capabilities{}) isFixed, err := p.fixNodeStatus(node) a.True(isFixed) a.Nil(err) @@ -236,7 +236,7 @@ func TestPool_fixNodeStatus_Positive_ReservedExpired(t *testing.T) { slm := new(StrategyListMock) slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil) p := NewPool(new(StorageMock), slm) - node := NewNode(NodeTypePersistent, "", NodeStatusReserved, "", 0, 0, []capabilities.Capabilities{}) + node := NewNode("123", NodeTypePersistent, "", NodeStatusReserved, "", 0, 0, []capabilities.Capabilities{}) isFixed, err := p.fixNodeStatus(node) a.True(isFixed) a.Nil(err) @@ -247,7 +247,7 @@ func TestPool_fixNodeStatus_Positive_BusyNotNotExpired(t *testing.T) { slm := new(StrategyListMock) slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil) p := NewPool(new(StorageMock), slm) - node := NewNode(NodeTypePersistent, "", NodeStatusBusy, "", time.Now().Unix(), 0, []capabilities.Capabilities{}) + node := NewNode("123", NodeTypePersistent, "", NodeStatusBusy, "", time.Now().Unix(), 0, []capabilities.Capabilities{}) isFixed, err := p.fixNodeStatus(node) a.False(isFixed) a.Nil(err) @@ -258,7 +258,7 @@ func TestPool_fixNodeStatus_Positive_ReservedNotNotExpired(t *testing.T) { slm := new(StrategyListMock) slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil) p := NewPool(new(StorageMock), slm) - node := NewNode(NodeTypePersistent, "", NodeStatusReserved, "", time.Now().Unix(), 0, []capabilities.Capabilities{}) + node := NewNode("123", NodeTypePersistent, "", NodeStatusReserved, "", time.Now().Unix(), 0, []capabilities.Capabilities{}) isFixed, err := p.fixNodeStatus(node) a.False(isFixed) a.Nil(err) @@ -269,7 +269,7 @@ func TestPool_fixNodeStatus_Positive_AvailableExpired(t *testing.T) { slm := new(StrategyListMock) slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil) p := NewPool(new(StorageMock), slm) - node := NewNode(NodeTypePersistent, "", NodeStatusAvailable, "", 0, 0, []capabilities.Capabilities{}) + node := NewNode("123", NodeTypePersistent, "", NodeStatusAvailable, "", 0, 0, []capabilities.Capabilities{}) isFixed, err := p.fixNodeStatus(node) a.False(isFixed) a.Nil(err) @@ -281,7 +281,7 @@ func TestPool_fixNodeStatus_NegativeBusy(t *testing.T) { slm := new(StrategyListMock) slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(eError) p := NewPool(new(StorageMock), slm) - node := NewNode(NodeTypePersistent, "", NodeStatusBusy, "", 0, 0, []capabilities.Capabilities{}) + node := NewNode("123", NodeTypePersistent, "", NodeStatusBusy, "", 0, 0, []capabilities.Capabilities{}) isFixed, err := p.fixNodeStatus(node) a.False(isFixed) a.Error(err) @@ -293,7 +293,7 @@ func TestPool_fixNodeStatus_NegativeReserved(t *testing.T) { slm := new(StrategyListMock) slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(eError) p := NewPool(new(StorageMock), slm) - node := NewNode(NodeTypePersistent, "", NodeStatusReserved, "", 0, 0, []capabilities.Capabilities{}) + node := NewNode("123", NodeTypePersistent, "", NodeStatusReserved, "", 0, 0, []capabilities.Capabilities{}) isFixed, err := p.fixNodeStatus(node) a.False(isFixed) a.Error(err) diff --git a/pool/strategy/kubernetes/factory.go b/pool/strategy/kubernetes/factory.go index 778c77b..19d728a 100644 --- a/pool/strategy/kubernetes/factory.go +++ b/pool/strategy/kubernetes/factory.go @@ -42,7 +42,7 @@ func (f *StrategyFactory) Create( return nil, errors.New("create k8s clientset, " + err.Error()) } - provider := &kubernetesProvider{ + provider := &kubDnsProvider{ clientset: clientset, namespace: "default", //todo: брать из конфига !!! clientFactory: clientFactory, diff --git a/pool/strategy/kubernetes/provider.go b/pool/strategy/kubernetes/provider.go index 5807cc6..d5170df 100644 --- a/pool/strategy/kubernetes/provider.go +++ b/pool/strategy/kubernetes/provider.go @@ -9,20 +9,22 @@ import ( "net" "strconv" "time" + "strings" ) type kubernetesProviderInterface interface { Create(podName string, nodeParams nodeParams) error + // idempotent operation Destroy(podName string) error } -type kubernetesProvider struct { +type kubDnsProvider struct { clientset *kubernetes.Clientset namespace string clientFactory jsonwire.ClientFactoryInterface } -func (p *kubernetesProvider) Create(podName string, nodeParams nodeParams) error { +func (p *kubDnsProvider) Create(podName string, nodeParams nodeParams) error { pod := &apiV1.Pod{} pod.ObjectMeta.Name = podName pod.ObjectMeta.Labels = map[string]string{"name": podName} @@ -78,16 +80,22 @@ Loop: return nil } -func (p *kubernetesProvider) Destroy(podName string) error { +//Destroy - destroy all pod data (idempotent operation) +func (p *kubDnsProvider) Destroy(podName string) error { err := p.clientset.CoreV1Client.Pods(p.namespace).Delete(podName, &apiV1.DeleteOptions{}) - if err != nil { + switch { + case err != nil && strings.Contains(err.Error(), "not found"): + // pod already deleted + case err != nil: err = errors.New("send command pod/delete to k8s, " + err.Error()) return err } err = p.clientset.CoreV1Client.Services(p.namespace).Delete(podName, &apiV1.DeleteOptions{}) - if err != nil { - err = errors.New("send command service/delete to k8s, " + err.Error()) - return err + switch { + case err != nil && strings.Contains(err.Error(), "not found"): + // service already deleted + case err != nil: + return errors.New("send command service/delete to k8s, " + err.Error()) } return nil } diff --git a/pool/strategy/kubernetes/strategy.go b/pool/strategy/kubernetes/strategy.go index d0a721f..ab28428 100644 --- a/pool/strategy/kubernetes/strategy.go +++ b/pool/strategy/kubernetes/strategy.go @@ -8,6 +8,7 @@ import ( "github.com/satori/go.uuid" "net" "time" + "fmt" ) type Strategy struct { @@ -25,14 +26,17 @@ func (s *Strategy) Reserve(desiredCaps capabilities.Capabilities) (pool.Node, er podName := "wd-node-" + uuid.NewV4().String() ts := time.Now().Unix() address := net.JoinHostPort(podName, nodeConfig.Params.Port) - node := pool.NewNode(pool.NodeTypeKubernetes, address, pool.NodeStatusReserved, "", ts, ts, []capabilities.Capabilities{}) + node := pool.NewNode(podName, pool.NodeTypeKubernetes, address, pool.NodeStatusReserved, "", ts, ts, []capabilities.Capabilities{}) err := s.storage.Add(*node, s.config.Limit) if err != nil { return pool.Node{}, errors.New("add node to storage, " + err.Error()) } err = s.provider.Create(podName, nodeConfig.Params) if err != nil { - _ = s.provider.Destroy(podName) // на случай если что то успело создасться + go func(podName string) { + time.Sleep(time.Minute * 2) + _ = s.provider.Destroy(podName) // на случай если что то криво создалось + }(podName) return pool.Node{}, errors.New("create node by provider, " + err.Error()) } return *node, nil @@ -43,11 +47,10 @@ func (s *Strategy) CleanUp(node pool.Node) error { if node.Type != pool.NodeTypeKubernetes { return strategy.ErrNotApplicable } - hostName, _, err := net.SplitHostPort(node.Address) - if err != nil { - return errors.New("get hostname from node.Address, " + err.Error()) + if node.Key == "" { + return fmt.Errorf("empty node key") } - err = s.provider.Destroy(hostName) + err := s.provider.Destroy(node.Key) if err != nil { return errors.New("destroy node by provider, " + err.Error()) } @@ -62,11 +65,10 @@ func (s *Strategy) FixNodeStatus(node pool.Node) error { if node.Type != pool.NodeTypeKubernetes { return strategy.ErrNotApplicable } - hostName, _, err := net.SplitHostPort(node.Address) - if err != nil { - return errors.New("get hostname from node.Address, " + err.Error()) + if node.Key == "" { + return fmt.Errorf("empty node key") } - err = s.provider.Destroy(hostName) + err := s.provider.Destroy(node.Key) if err != nil { return errors.New("destroy node by provider, " + err.Error()) } diff --git a/pool/strategy/kubernetes/strategy_test.go b/pool/strategy/kubernetes/strategy_test.go index a061613..2ee2877 100644 --- a/pool/strategy/kubernetes/strategy_test.go +++ b/pool/strategy/kubernetes/strategy_test.go @@ -74,7 +74,7 @@ func TestStrategy_CleanUp_Positive(t *testing.T) { sm := new(pool.StorageMock) sm.On("Remove", mock.AnythingOfType("pool.Node")).Return(nil) s := Strategy{storage: sm, provider: pm} - node := pool.Node{Type: pool.NodeTypeKubernetes, Address: "host:port"} + node := pool.Node{Key: "valid_key", Type: pool.NodeTypeKubernetes} err := s.CleanUp(node) assert.Nil(t, err) } @@ -86,9 +86,9 @@ func TestStrategy_CleanUp_Negative_NodeType(t *testing.T) { assert.Error(t, err, strategy.ErrNotApplicable) } -func TestStrategy_CleanUp_Negative_InvalidNodeAddress(t *testing.T) { +func TestStrategy_CleanUp_Negative_EmptyNodeKey(t *testing.T) { s := Strategy{} - node := pool.Node{Type: pool.NodeTypeKubernetes, Address: "invalid node address"} + node := pool.Node{Key: "", Type: pool.NodeTypeKubernetes} // empty node key err := s.CleanUp(node) assert.NotNil(t, err) } @@ -119,7 +119,7 @@ func TestStrategy_FixNodeStatus_Positive(t *testing.T) { sm := new(pool.StorageMock) sm.On("Remove", mock.AnythingOfType("pool.Node")).Return(nil) s := Strategy{storage: sm, provider: pm} - node := pool.Node{Type: pool.NodeTypeKubernetes, Address: "host:port"} + node := pool.Node{ Key: "valid_key", Type: pool.NodeTypeKubernetes} err := s.FixNodeStatus(node) assert.Nil(t, err) } @@ -131,9 +131,9 @@ func TestStrategy_FixNodeStatus_Negative_NodeType(t *testing.T) { assert.Error(t, err, strategy.ErrNotApplicable) } -func TestStrategy_FixNodeStatus_Negative_InvalidNodeAddress(t *testing.T) { +func TestStrategy_FixNodeStatus_Negative_EmptyNodeKey(t *testing.T) { s := Strategy{} - node := pool.Node{Type: pool.NodeTypeKubernetes, Address: "invalid node address"} + node := pool.Node{Key: "", Type: pool.NodeTypeKubernetes} err := s.FixNodeStatus(node) assert.NotNil(t, err) } diff --git a/pool/strategyList_test.go b/pool/strategyList_test.go index ed5f500..2b49153 100644 --- a/pool/strategyList_test.go +++ b/pool/strategyList_test.go @@ -20,7 +20,7 @@ func TestStrategyList_Reserve_PositiveDirectOrder(t *testing.T) { s1 := new(StrategyListMock) s1.On("Reserve", mock.AnythingOfType("capabilities.Capabilities")).Return(Node{}, strategy.ErrNotFound) s2 := new(StrategyListMock) - expectedNode := *NewNode(NodeTypePersistent, "111", NodeStatusBusy, "", time.Now().Unix(), 0, []capabilities.Capabilities{}) + expectedNode := *NewNode("123", NodeTypePersistent, "111", NodeStatusBusy, "", time.Now().Unix(), 0, []capabilities.Capabilities{}) s2.On("Reserve", mock.AnythingOfType("capabilities.Capabilities")).Return(expectedNode, nil) sl := NewStrategyList([]StrategyInterface{s1, s2}) @@ -31,7 +31,7 @@ func TestStrategyList_Reserve_PositiveDirectOrder(t *testing.T) { func TestStrategyList_Reserve_Positive_ReverseOrder(t *testing.T) { s1 := new(StrategyListMock) - expectedNode := *NewNode(NodeTypePersistent, "111", NodeStatusBusy, "", time.Now().Unix(), 0, []capabilities.Capabilities{}) + expectedNode := *NewNode("123", NodeTypePersistent, "111", NodeStatusBusy, "", time.Now().Unix(), 0, []capabilities.Capabilities{}) s1.On("Reserve", mock.AnythingOfType("capabilities.Capabilities")).Return(expectedNode, nil) s2 := new(StrategyListMock) s2.On("Reserve", mock.AnythingOfType("capabilities.Capabilities")).Return(Node{}, strategy.ErrNotFound) diff --git a/storage/migrations/mysql/20180417152820-add_unique_key.sql b/storage/migrations/mysql/20180417152820-add_unique_key.sql new file mode 100644 index 0000000..9c366b0 --- /dev/null +++ b/storage/migrations/mysql/20180417152820-add_unique_key.sql @@ -0,0 +1,6 @@ + +-- +migrate Up +ALTER TABLE `node` ADD COLUMN `key` varchar(255) NOT NULL AFTER `id`; + +-- +migrate Down +SIGNAL SQLSTATE '45000' SET message_text = 'Impossible down this migration'; \ No newline at end of file diff --git a/storage/mysql/storage.go b/storage/mysql/storage.go index c965e52..b442b7b 100644 --- a/storage/mysql/storage.go +++ b/storage/mysql/storage.go @@ -13,6 +13,7 @@ import ( type MysqlNodeModel struct { ID string `db:"id"` + Key string `db:"key"` Type string `db:"type"` Address string `db:"address"` Status string `db:"status"` @@ -45,13 +46,14 @@ func (s *MysqlStorage) Add(node pool.Node, limit int) error { } // black magic, but it works result, err := tx.NamedExec( - "INSERT INTO node (type, address, status, sessionId, updated, registred) "+ - "SELECT :type, :address, :status, :sessionId, :updated, :registred "+ + "INSERT INTO node (`key`, type, address, status, sessionId, updated, registred) "+ + "SELECT :key, :type, :address, :status, :sessionId, :updated, :registred "+ "FROM DUAL "+ "WHERE 0 = :limit OR EXISTS (SELECT TRUE FROM node WHERE type = :type HAVING count(*) < :limit)"+ "ON DUPLICATE KEY UPDATE "+ - "type = :type, status = :status, sessionId = :sessionId, updated = :updated, registred = :registred", + "`key` = :key, type = :type, status = :status, sessionId = :sessionId, updated = :updated, registred = :registred", map[string]interface{}{ + "key": node.Key, "type": string(node.Type), "address": node.Address, "sessionId": node.SessionID, @@ -333,6 +335,7 @@ func (s *MysqlStorage) Remove(node pool.Node) error { func mapper(model *MysqlNodeModel) *pool.Node { node := pool.NewNode( + model.Key, pool.NodeType(model.Type), model.Address, pool.NodeStatus(model.Status), diff --git a/storage/tests/comon_test.go b/storage/tests/comon_test.go index 490266c..60432bb 100644 --- a/storage/tests/comon_test.go +++ b/storage/tests/comon_test.go @@ -22,6 +22,7 @@ func testStorage_Add(t *testing.T, p PrepareInterface) { storage, deferFunc := p.CreateStorage() defer deferFunc() expectedNode := pool.Node{ + Key: "111", Address: "address1", CapabilitiesList: []capabilities.Capabilities{ {"trololo": "lolo"}, @@ -38,6 +39,7 @@ func testStorage_Add(t *testing.T, p PrepareInterface) { t.Fatal("Error get all nodes list, " + err.Error()) } assert.Len(t, nodeList, 1, "Added more than one node") + assert.Equal(t, expectedNode.Key, nodeList[0].Key) assert.Equal(t, expectedNode.Type, nodeList[0].Type) assert.Equal(t, expectedNode.Address, nodeList[0].Address) assert.Equal(t, expectedNode.Status, nodeList[0].Status) @@ -97,6 +99,7 @@ func testStorage_GetAll(t *testing.T, p PrepareInterface) { expectedNodeList := make([]pool.Node, 0) for _, addr := range []string{"addr1", "addr2"} { node := pool.Node{ + Key: addr, Address: addr, CapabilitiesList: []capabilities.Capabilities{{"trololo": "lolo"}}, } @@ -113,6 +116,7 @@ func testStorage_GetAll(t *testing.T, p PrepareInterface) { isNodeMatch := false for _, node := range nodeList { if node.Address == expectedNode.Address { + assert.Equal(t, expectedNode.Key, node.Key) assert.Equal(t, expectedNode.Type, node.Type) assert.Equal(t, expectedNode.Address, node.Address) assert.Equal(t, expectedNode.Status, node.Status) From 162b33fd8a25ee475b663cca5a74fbfae4a20fe4 Mon Sep 17 00:00:00 2001 From: Mikhail Podtserkovskiy Date: Wed, 18 Apr 2018 02:27:50 +0300 Subject: [PATCH 2/8] QA-7710: refactoring part 2 --- pool/node.go | 2 +- pool/pool.go | 4 +- proxy/transport.go | 2 +- storage/local/local_test.go | 62 +++++++++---------- storage/local/storage.go | 21 ++++--- .../mysql/20180417152820-add_unique_key.sql | 2 + storage/mysql/storage.go | 52 ++++++++-------- storage/tests/comon_test.go | 34 +++++----- 8 files changed, 92 insertions(+), 87 deletions(-) diff --git a/pool/node.go b/pool/node.go index 2a6cb2a..e45ef32 100644 --- a/pool/node.go +++ b/pool/node.go @@ -29,7 +29,7 @@ type Node struct { } func (n *Node) String() string { - return "Node [" + n.Address + "]" + return "Node [" + n.Key + "]" } func NewNode( diff --git a/pool/pool.go b/pool/pool.go index a567464..0357951 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -148,7 +148,7 @@ func (p *Pool) FixNodeStatuses() { continue } if isFixed { - log.Infof("Node [%s] status fixed", node.Address) + log.Infof("Node [%s] status fixed", node.Key) } } } @@ -169,7 +169,7 @@ func (p *Pool) fixNodeStatus(node *Node) (bool, error) { } err := p.strategyList.FixNodeStatus(*node) if err != nil { - return false, fmt.Errorf("Can't fix node [%s] status, %s", node.Address, err.Error()) + return false, fmt.Errorf("Can't fix node [%s] status, %s", node.Key, err.Error()) } return true, nil } diff --git a/proxy/transport.go b/proxy/transport.go index 59cfaea..282ece3 100644 --- a/proxy/transport.go +++ b/proxy/transport.go @@ -61,7 +61,7 @@ func (t *CreateSessionTransport) RoundTrip(request *http.Request) (*http.Respons err = fmt.Errorf("session not created, response: %s", string(b)) return nil, err } - log.Infof("register SessionID: %s on node %s", sessionID, t.node.Address) + log.Infof("register SessionID: %s on node %s", sessionID, t.node.Key) err = t.pool.RegisterSession(t.node, sessionID) if err != nil { err = fmt.Errorf("sessionId not registred in storage: %s", sessionID) diff --git a/storage/local/local_test.go b/storage/local/local_test.go index e2c775a..f0a1152 100644 --- a/storage/local/local_test.go +++ b/storage/local/local_test.go @@ -15,54 +15,54 @@ func TestStorage_Add_Positive(t *testing.T) { } func TestStorage_Add_Positive_Repeat(t *testing.T) { - s := Storage{db: map[string]*pool.Node{"1": {Address: "1"}}} - err := s.Add(pool.Node{Address: "1"}, 0) + s := Storage{db: map[string]*pool.Node{"1": {Key: "1"}}} + err := s.Add(pool.Node{Key: "1"}, 0) assert.NoError(t, err) assert.Len(t, s.db, 1) } func TestStorage_Add_Negative_LimitReached(t *testing.T) { - s := Storage{db: map[string]*pool.Node{"1": {Address: "1"}}} + s := Storage{db: map[string]*pool.Node{"1": {Key: "1"}}} limit := 1 - err := s.Add(pool.Node{Address: "2"}, limit) + err := s.Add(pool.Node{Key: "2"}, limit) assert.Error(t, err, "limit reached") assert.Len(t, s.db, limit) } func TestStorage_ReserveAvailable_Positive(t *testing.T) { - expectedNode := pool.Node{Address: "1", Status: pool.NodeStatusAvailable} - s := Storage{db: map[string]*pool.Node{expectedNode.Address: &expectedNode}} + expectedNode := pool.Node{Key: "1", Status: pool.NodeStatusAvailable} + s := Storage{db: map[string]*pool.Node{expectedNode.Key: &expectedNode}} node, err := s.ReserveAvailable([]pool.Node{expectedNode}) assert.NoError(t, err) assert.Equal(t, expectedNode, node) - assert.Equal(t, pool.NodeStatusReserved, s.db[node.Address].Status) + assert.Equal(t, pool.NodeStatusReserved, s.db[node.Key].Status) } func TestStorage_ReserveAvailable_Negative_NotFoundAvailableNodes(t *testing.T) { - expectedNode := pool.Node{Address: "1", Status: pool.NodeStatusBusy} - s := Storage{db: map[string]*pool.Node{expectedNode.Address: &expectedNode}} + expectedNode := pool.Node{Key: "1", Status: pool.NodeStatusBusy} + s := Storage{db: map[string]*pool.Node{expectedNode.Key: &expectedNode}} _, err := s.ReserveAvailable([]pool.Node{expectedNode}) assert.Error(t, err, storage.ErrNotFound) } func TestStorage_ReserveAvailable_Negative_InvalidNodeList(t *testing.T) { s := Storage{db: map[string]*pool.Node{}} - _, err := s.ReserveAvailable([]pool.Node{{Address: "awd"}}) + _, err := s.ReserveAvailable([]pool.Node{{Key: "awd"}}) assert.Error(t, err, storage.ErrNotFound) } func TestStorage_SetBusy_Positive(t *testing.T) { - expectedNode := pool.Node{Address: "1"} - s := Storage{db: map[string]*pool.Node{expectedNode.Address: &expectedNode}} + expectedNode := pool.Node{Key: "1"} + s := Storage{db: map[string]*pool.Node{expectedNode.Key: &expectedNode}} expectedSessionID := "expectedSessionID" err := s.SetBusy(expectedNode, expectedSessionID) assert.NoError(t, err) - assert.Equal(t, pool.NodeStatusBusy, s.db[expectedNode.Address].Status) - assert.Equal(t, expectedSessionID, s.db[expectedNode.Address].SessionID) + assert.Equal(t, pool.NodeStatusBusy, s.db[expectedNode.Key].Status) + assert.Equal(t, expectedSessionID, s.db[expectedNode.Key].SessionID) } func TestStorage_SetBusy_Negative(t *testing.T) { - expectedNode := pool.Node{Address: "1"} + expectedNode := pool.Node{Key: "1"} s := Storage{db: map[string]*pool.Node{}} expectedSessionID := "expectedSessionID" err := s.SetBusy(expectedNode, expectedSessionID) @@ -70,22 +70,22 @@ func TestStorage_SetBusy_Negative(t *testing.T) { } func TestStorage_SetAvailable_Positive(t *testing.T) { - expectedNode := pool.Node{Address: "1"} - s := Storage{db: map[string]*pool.Node{expectedNode.Address: &expectedNode}} + expectedNode := pool.Node{Key: "1"} + s := Storage{db: map[string]*pool.Node{expectedNode.Key: &expectedNode}} err := s.SetAvailable(expectedNode) assert.NoError(t, err) - assert.Equal(t, pool.NodeStatusAvailable, s.db[expectedNode.Address].Status) + assert.Equal(t, pool.NodeStatusAvailable, s.db[expectedNode.Key].Status) } func TestStorage_SetAvailable_Negative(t *testing.T) { - expectedNode := pool.Node{Address: "1"} + expectedNode := pool.Node{Key: "1"} s := Storage{db: map[string]*pool.Node{}} err := s.SetAvailable(expectedNode) assert.Error(t, err, storage.ErrNotFound) } func TestStorage_GetCountWithStatus_Positive_All(t *testing.T) { - s := Storage{db: map[string]*pool.Node{"1": {Address: "1"}, "2": {Address: "2"}}} + s := Storage{db: map[string]*pool.Node{"1": {Key: "1"}, "2": {Key: "2"}}} count, err := s.GetCountWithStatus(nil) assert.NoError(t, err) assert.Equal(t, count, len(s.db)) @@ -93,59 +93,59 @@ func TestStorage_GetCountWithStatus_Positive_All(t *testing.T) { func TestStorage_GetCountWithStatus_Positive_One(t *testing.T) { expectedStatus := pool.NodeStatusBusy - s := Storage{db: map[string]*pool.Node{"1": {Address: "1", Status: expectedStatus}, "2": {Address: "2"}}} + s := Storage{db: map[string]*pool.Node{"1": {Key: "1", Status: expectedStatus}, "2": {Key: "2"}}} count, err := s.GetCountWithStatus(&expectedStatus) assert.NoError(t, err) assert.Equal(t, count, 1) } func TestStorage_GetBySession_Positive(t *testing.T) { - expectedNode := pool.Node{Address: "1"} - s := Storage{db: map[string]*pool.Node{expectedNode.Address: &expectedNode}} + expectedNode := pool.Node{Key: "1"} + s := Storage{db: map[string]*pool.Node{expectedNode.Key: &expectedNode}} node, err := s.GetBySession(expectedNode.SessionID) assert.NoError(t, err) assert.Equal(t, expectedNode, node) } func TestStorage_GetBySession_Negative(t *testing.T) { - expectedNode := pool.Node{Address: "1"} + expectedNode := pool.Node{Key: "1"} s := Storage{db: map[string]*pool.Node{}} _, err := s.GetBySession(expectedNode.SessionID) assert.Error(t, err, storage.ErrNotFound) } func TestStorage_GetByAddress_Positive(t *testing.T) { - expectedNode := pool.Node{Address: "1"} - s := Storage{db: map[string]*pool.Node{expectedNode.Address: &expectedNode}} + expectedNode := pool.Node{Address: "1", Key: "12d"} + s := Storage{db: map[string]*pool.Node{expectedNode.Key: &expectedNode}} node, err := s.GetByAddress(expectedNode.Address) assert.NoError(t, err) assert.Equal(t, expectedNode, node) } func TestStorage_GetByAddress_Negative(t *testing.T) { - expectedNode := pool.Node{Address: "1"} + expectedNode := pool.Node{Address: "1", Key: "1234567890"} s := Storage{db: map[string]*pool.Node{}} _, err := s.GetByAddress(expectedNode.Address) assert.Error(t, err, storage.ErrNotFound) } func TestStorage_GetAll_Positive(t *testing.T) { - s := Storage{db: map[string]*pool.Node{"1": {Address: "1"}, "2": {Address: "2"}}} + s := Storage{db: map[string]*pool.Node{"1": {Key: "1"}, "2": {Key: "2"}}} nodeList, err := s.GetAll() assert.NoError(t, err) assert.Len(t, nodeList, 2) } func TestStorage_Remove_Positive(t *testing.T) { - node := pool.Node{Address: "1"} - s := Storage{db: map[string]*pool.Node{node.Address: &node}} + node := pool.Node{Key: "1"} + s := Storage{db: map[string]*pool.Node{node.Key: &node}} err := s.Remove(node) assert.NoError(t, err) } func TestStorage_Remove_Negative(t *testing.T) { s := Storage{db: map[string]*pool.Node{}} - node := pool.Node{Address: "1"} + node := pool.Node{Key: "1"} err := s.Remove(node) assert.Error(t, err, storage.ErrNotFound) } diff --git a/storage/local/storage.go b/storage/local/storage.go index ff08859..b80e4c7 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -28,7 +28,7 @@ func (s *Storage) Add(node pool.Node, limit int) error { } } - s.db[node.Address] = &node + s.db[node.Key] = &node return nil } @@ -36,7 +36,7 @@ func (s *Storage) ReserveAvailable(nodeList []pool.Node) (pool.Node, error) { s.mu.Lock() defer s.mu.Unlock() for _, node := range nodeList { - dbNode, ok := s.db[node.Address] + dbNode, ok := s.db[node.Key] if ok && dbNode.Status == pool.NodeStatusAvailable { dbNode.Status = pool.NodeStatusReserved dbNode.Updated = time.Now().Unix() @@ -49,7 +49,7 @@ func (s *Storage) ReserveAvailable(nodeList []pool.Node) (pool.Node, error) { func (s *Storage) SetBusy(node pool.Node, sessionID string) error { s.mu.Lock() defer s.mu.Unlock() - storedNode, ok := s.db[node.Address] + storedNode, ok := s.db[node.Key] if !ok { return storage.ErrNotFound } @@ -62,7 +62,7 @@ func (s *Storage) SetBusy(node pool.Node, sessionID string) error { func (s *Storage) SetAvailable(node pool.Node) error { s.mu.Lock() defer s.mu.Unlock() - storedNode, ok := s.db[node.Address] + storedNode, ok := s.db[node.Key] if !ok { return storage.ErrNotFound } @@ -100,11 +100,12 @@ func (s *Storage) GetBySession(sessionID string) (pool.Node, error) { func (s *Storage) GetByAddress(address string) (pool.Node, error) { s.mu.RLock() defer s.mu.RUnlock() - node, ok := s.db[address] - if !ok { - return pool.Node{}, storage.ErrNotFound + for _, node := range s.db { + if node.Address == address { + return *node, nil + } } - return *node, nil + return pool.Node{}, storage.ErrNotFound } func (s *Storage) GetAll() ([]pool.Node, error) { @@ -121,10 +122,10 @@ func (s *Storage) GetAll() ([]pool.Node, error) { func (s *Storage) Remove(node pool.Node) error { s.mu.Lock() defer s.mu.Unlock() - _, ok := s.db[node.Address] + _, ok := s.db[node.Key] if !ok { return storage.ErrNotFound } - delete(s.db, node.Address) + delete(s.db, node.Key) return nil } diff --git a/storage/migrations/mysql/20180417152820-add_unique_key.sql b/storage/migrations/mysql/20180417152820-add_unique_key.sql index 9c366b0..fc85744 100644 --- a/storage/migrations/mysql/20180417152820-add_unique_key.sql +++ b/storage/migrations/mysql/20180417152820-add_unique_key.sql @@ -1,6 +1,8 @@ -- +migrate Up ALTER TABLE `node` ADD COLUMN `key` varchar(255) NOT NULL AFTER `id`; +ALTER TABLE `capabilities` CHANGE COLUMN `nodeAddress` `nodeKey` VARCHAR(255) NOT NULL; +CREATE UNIQUE INDEX `key` ON `node` (`key`); -- +migrate Down SIGNAL SQLSTATE '45000' SET message_text = 'Impossible down this migration'; \ No newline at end of file diff --git a/storage/mysql/storage.go b/storage/mysql/storage.go index b442b7b..eb0cab8 100644 --- a/storage/mysql/storage.go +++ b/storage/mysql/storage.go @@ -13,7 +13,7 @@ import ( type MysqlNodeModel struct { ID string `db:"id"` - Key string `db:"key"` + Key string `db:"key"` Type string `db:"type"` Address string `db:"address"` Status string `db:"status"` @@ -24,7 +24,7 @@ type MysqlNodeModel struct { type MysqlCapabilitiesModel struct { ID int `db:"id"` - NodeAddress string `db:"nodeAddress"` + NodeKey string `db:"nodeKey"` SetID string `db:"setId"` Name string `db:"name"` Value string `db:"value"` @@ -51,7 +51,7 @@ func (s *MysqlStorage) Add(node pool.Node, limit int) error { "FROM DUAL "+ "WHERE 0 = :limit OR EXISTS (SELECT TRUE FROM node WHERE type = :type HAVING count(*) < :limit)"+ "ON DUPLICATE KEY UPDATE "+ - "`key` = :key, type = :type, status = :status, sessionId = :sessionId, updated = :updated, registred = :registred", + "type = :type, address = :address, status = :status, sessionId = :sessionId, updated = :updated, registred = :registred", map[string]interface{}{ "key": node.Key, "type": string(node.Type), @@ -79,7 +79,7 @@ func (s *MysqlStorage) Add(node pool.Node, limit int) error { return errors.New("[MysqlStorage/Add] No rows was affected (may be limit reached)") } - _, err = tx.Exec("DELETE FROM capabilities WHERE nodeAddress = ?", node.Address) + _, err = tx.Exec("DELETE FROM capabilities WHERE nodeKey = ?", node.Key) if err != nil { _ = tx.Rollback() return errors.New("[MysqlStorage/Add] delete old capabilities: " + err.Error()) @@ -89,8 +89,8 @@ func (s *MysqlStorage) Add(node pool.Node, limit int) error { for i, caps := range node.CapabilitiesList { for name, value := range caps { preparedCapability = map[string]interface{}{ - "nodeAddress": node.Address, - "setId": node.Address + "|" + strconv.Itoa(i), // просто уникальное значение для сета + "nodeKey": node.Key, + "setId": node.Key + "|" + strconv.Itoa(i), // просто уникальное значение для сета "name": name, "value": value, } @@ -100,8 +100,8 @@ func (s *MysqlStorage) Add(node pool.Node, limit int) error { for _, preparedCapability := range preparedCapabilities { _, err = tx.NamedExec( - "INSERT INTO capabilities (nodeAddress, setId, name, value) "+ - "VALUES (:nodeAddress, :setId, :name, :value)", + "INSERT INTO capabilities (nodeKey, setId, name, value) "+ + "VALUES (:nodeKey, :setId, :name, :value)", preparedCapability, ) if err != nil { @@ -124,15 +124,15 @@ func (s *MysqlStorage) ReserveAvailable(nodeList []pool.Node) (pool.Node, error) return pool.Node{}, errors.New("[MysqlStorage/ReserveAvailable] start transaction: " + err.Error()) } - nodeAddressList := make([]string, 0, len(nodeList)) + nodeKeyList := make([]string, 0, len(nodeList)) for _, node := range nodeList { - nodeAddressList = append(nodeAddressList, node.Address) + nodeKeyList = append(nodeKeyList, node.Key) } - args := []interface{}{string(pool.NodeStatusAvailable), nodeAddressList} + args := []interface{}{string(pool.NodeStatusAvailable), nodeKeyList} //var row *sqlx.Row query, args, err := sqlx.In( - "SELECT n.* FROM node n WHERE n.status = ? AND n.address IN (?) ORDER BY n.updated ASC LIMIT 1 FOR UPDATE;", args...) + "SELECT n.* FROM node n WHERE n.status = ? AND n.`key` IN (?) ORDER BY n.updated ASC LIMIT 1 FOR UPDATE;", args...) if err != nil { return pool.Node{}, errors.New("[MysqlStorage/ReserveAvailable] make select query: " + err.Error()) } @@ -154,10 +154,10 @@ func (s *MysqlStorage) ReserveAvailable(nodeList []pool.Node) (pool.Node, error) nodeModel.Updated = time.Now().Unix() nodeModel.Status = string(pool.NodeStatusReserved) res, err := tx.Exec( - "UPDATE node SET status = ?, updated = ? WHERE address = ?", + "UPDATE node SET status = ?, updated = ? WHERE `key` = ?", nodeModel.Status, nodeModel.Updated, - nodeModel.Address, + nodeModel.Key, ) if err != nil { _ = tx.Rollback() @@ -183,11 +183,11 @@ func (s *MysqlStorage) ReserveAvailable(nodeList []pool.Node) (pool.Node, error) func (s *MysqlStorage) SetBusy(node pool.Node, sessionID string) error { res, err := s.db.Exec( - "UPDATE node SET sessionID = ?, updated = ?, status = ? WHERE address = ?", + "UPDATE node SET sessionID = ?, updated = ?, status = ? WHERE `key` = ?", sessionID, time.Now().Unix(), string(pool.NodeStatusBusy), - node.Address, + node.Key, ) if err != nil { return errors.New("[MysqlStorage/SetBusy] update table `node`, " + err.Error()) @@ -204,10 +204,10 @@ func (s *MysqlStorage) SetBusy(node pool.Node, sessionID string) error { func (s *MysqlStorage) SetAvailable(node pool.Node) error { res, err := s.db.Exec( - "UPDATE node SET status = ?, updated = ? WHERE address = ?", + "UPDATE node SET status = ?, updated = ? WHERE `key` = ?", string(pool.NodeStatusAvailable), time.Now().Unix(), - node.Address, + node.Key, ) if err != nil { err = errors.New("[MysqlStorage/SetAvailable] update table `node`, " + err.Error()) @@ -284,21 +284,21 @@ func (s *MysqlStorage) GetAll() ([]pool.Node, error) { return nil, errors.New("[MysqlStorage/GetAll] get all capabilities from db, " + err.Error()) } for _, row := range rowsCaps { - _, ok := capsMap[row.NodeAddress] + _, ok := capsMap[row.NodeKey] if !ok { - capsMap[row.NodeAddress] = map[string]capabilities.Capabilities{} + capsMap[row.NodeKey] = map[string]capabilities.Capabilities{} } - currCaps, ok := capsMap[row.NodeAddress][row.SetID] + currCaps, ok := capsMap[row.NodeKey][row.SetID] if !ok { currCaps = capabilities.Capabilities{} } currCaps[row.Name] = row.Value - capsMap[row.NodeAddress][row.SetID] = currCaps + capsMap[row.NodeKey][row.SetID] = currCaps } for i, node := range nodeList { - capsList := make([]capabilities.Capabilities, 0, len(capsMap[node.Address])) - for _, currCaps := range capsMap[node.Address] { + capsList := make([]capabilities.Capabilities, 0, len(capsMap[node.Key])) + for _, currCaps := range capsMap[node.Key] { capsList = append(capsList, currCaps) } nodeList[i].CapabilitiesList = capsList @@ -308,7 +308,7 @@ func (s *MysqlStorage) GetAll() ([]pool.Node, error) { } func (s *MysqlStorage) Remove(node pool.Node) error { - res, err := s.db.Exec("DELETE FROM node WHERE address = ?", node.Address) + res, err := s.db.Exec("DELETE FROM node WHERE `key` = ?", node.Key) if err != nil { return errors.New("[MysqlStorage/Remove] delete from table `node`, " + err.Error()) } @@ -319,7 +319,7 @@ func (s *MysqlStorage) Remove(node pool.Node) error { if rowsAffected == 0 { return errors.New("[MysqlStorage/Remove] delete from node: affected 0 rows") } - res, err = s.db.Exec("DELETE FROM capabilities WHERE nodeAddress = ?", node.Address) + res, err = s.db.Exec("DELETE FROM capabilities WHERE `nodeKey` = ?", node.Key) if err != nil { return errors.New("[MysqlStorage/Remove] delete from table `capabilities`: " + err.Error()) } diff --git a/storage/tests/comon_test.go b/storage/tests/comon_test.go index 60432bb..7af1870 100644 --- a/storage/tests/comon_test.go +++ b/storage/tests/comon_test.go @@ -55,6 +55,7 @@ func testStorage_Add_Repeat(t *testing.T, p PrepareInterface) { storage, deferFunc := p.CreateStorage() defer deferFunc() node := pool.Node{ + Key: "ololo", Address: "ololo", CapabilitiesList: []capabilities.Capabilities{{"trololo": "lolo"}}, } @@ -76,6 +77,7 @@ func testStorage_Add_Limit_Overflow(t *testing.T, p PrepareInterface) { storage, deferFunc := p.CreateStorage() defer deferFunc() node := pool.Node{ + Key: "ololo", Address: "ololo", CapabilitiesList: []capabilities.Capabilities{{"trololo": "lolo"}}, Type: pool.NodeTypePersistent, @@ -83,7 +85,7 @@ func testStorage_Add_Limit_Overflow(t *testing.T, p PrepareInterface) { limit := 1 err := storage.Add(node, limit) assert.Nil(t, err) - node.Address = "ololo1" + node.Key = "ololo1" err = storage.Add(node, limit) assert.NotNil(t, err) nodeList, err := storage.GetAll() @@ -115,8 +117,7 @@ func testStorage_GetAll(t *testing.T, p PrepareInterface) { for _, expectedNode := range expectedNodeList { isNodeMatch := false for _, node := range nodeList { - if node.Address == expectedNode.Address { - assert.Equal(t, expectedNode.Key, node.Key) + if node.Key == expectedNode.Key { assert.Equal(t, expectedNode.Type, node.Type) assert.Equal(t, expectedNode.Address, node.Address) assert.Equal(t, expectedNode.Status, node.Status) @@ -137,7 +138,7 @@ func testStorage_GetByAddress(t *testing.T, p PrepareInterface) { t.Parallel() storage, deferFunc := p.CreateStorage() defer deferFunc() - expectedNode := pool.Node{Address: "mySuperAddress"} + expectedNode := pool.Node{Address: "mySuperAddress", Key: "mySuperAddress"} err := storage.Add(expectedNode, 0) if err != nil { t.Fatal("Error add node, " + err.Error()) @@ -158,13 +159,14 @@ func testStorage_GetBySession(t *testing.T, p PrepareInterface) { t.Parallel() storage, deferFunc := p.CreateStorage() defer deferFunc() - expectedNode := pool.Node{Address: "mySuperAddress"} + expectedNode := pool.Node{Address: "mySuperAddress", Key: "mySuperAddress"} err := storage.Add(expectedNode, 0) if err != nil { t.Fatal("Error add node, " + err.Error()) } node, err := storage.GetBySession(expectedNode.SessionID) assert.Nil(t, err) + assert.Equal(t, expectedNode.Key, node.Key) assert.Equal(t, expectedNode.Type, node.Type) assert.Equal(t, expectedNode.Address, node.Address) assert.Equal(t, expectedNode.Status, node.Status) @@ -179,15 +181,15 @@ func testStorage_GetCountWithStatus(t *testing.T, p PrepareInterface) { t.Parallel() storage, deferFunc := p.CreateStorage() defer deferFunc() - err := storage.Add(pool.Node{Status: pool.NodeStatusAvailable, Address: "1"}, 0) + err := storage.Add(pool.Node{Status: pool.NodeStatusAvailable, Address: "1", Key: "1"}, 0) if err != nil { t.Fatal("Error add node, " + err.Error()) } - err = storage.Add(pool.Node{Status: pool.NodeStatusAvailable, Address: "2"}, 0) + err = storage.Add(pool.Node{Status: pool.NodeStatusAvailable, Address: "2", Key: "2"}, 0) if err != nil { t.Fatal("Error add node, " + err.Error()) } - err = storage.Add(pool.Node{Status: pool.NodeStatusBusy, Address: "3"}, 0) + err = storage.Add(pool.Node{Status: pool.NodeStatusBusy, Address: "3", Key: "3"}, 0) if err != nil { t.Fatal("Error add node, " + err.Error()) } @@ -202,7 +204,7 @@ func testStorage_Remove(t *testing.T, p PrepareInterface) { t.Parallel() storage, deferFunc := p.CreateStorage() defer deferFunc() - node := pool.Node{Status: pool.NodeStatusAvailable, Address: "1", CapabilitiesList: []capabilities.Capabilities{{"1": "2"}}} + node := pool.Node{Key: "123", Status: pool.NodeStatusAvailable, Address: "1", CapabilitiesList: []capabilities.Capabilities{{"1": "2"}}} err := storage.Add(node, 0) if err != nil { t.Fatal("Error add node, " + err.Error()) @@ -218,12 +220,12 @@ func testStorage_ReserveAvailable_Positive(t *testing.T, p PrepareInterface) { t.Parallel() storage, deferFunc := p.CreateStorage() defer deferFunc() - node := pool.Node{Status: pool.NodeStatusAvailable, Address: "1", CapabilitiesList: []capabilities.Capabilities{{"cap1": "val1"}}} + node := pool.Node{Key: "123", Status: pool.NodeStatusAvailable, Address: "1", CapabilitiesList: []capabilities.Capabilities{{"cap1": "val1"}}} err := storage.Add(node, 0) if err != nil { t.Fatal("Error add node, " + err.Error()) } - expectedNode := pool.Node{Status: pool.NodeStatusAvailable, Address: "2", CapabilitiesList: []capabilities.Capabilities{{"cap1": "val1", "cap2": "val2"}}} + expectedNode := pool.Node{Key: "123", Status: pool.NodeStatusAvailable, Address: "2", CapabilitiesList: []capabilities.Capabilities{{"cap1": "val1", "cap2": "val2"}}} err = storage.Add(expectedNode, 0) if err != nil { t.Fatal("Error add node, " + err.Error()) @@ -231,7 +233,7 @@ func testStorage_ReserveAvailable_Positive(t *testing.T, p PrepareInterface) { node, err = storage.ReserveAvailable([]pool.Node{expectedNode}) assert.Nil(t, err) assert.Equal(t, pool.NodeStatusReserved, node.Status, "Node not Reserved") - assert.Equal(t, expectedNode.Address, node.Address, "Reserved unexpected node") + assert.Equal(t, expectedNode.Key, node.Key, "Reserved unexpected node") node, err = storage.GetByAddress(node.Address) if err != nil { t.Fatal("Error get node, " + err.Error()) @@ -244,12 +246,12 @@ func testStorage_ReserveAvailable_Negative(t *testing.T, p PrepareInterface) { t.Parallel() storage, deferFunc := p.CreateStorage() defer deferFunc() - node := pool.Node{Status: pool.NodeStatusBusy, Address: "qqqqqq", CapabilitiesList: []capabilities.Capabilities{{"1": "2"}}} + node := pool.Node{Key: "123", Status: pool.NodeStatusBusy, Address: "qqqqqq", CapabilitiesList: []capabilities.Capabilities{{"1": "2"}}} err := storage.Add(node, 0) if err != nil { t.Fatal("Error add node, " + err.Error()) } - node, err = storage.ReserveAvailable([]pool.Node{{Address: "qqqqqq"}}) + node, err = storage.ReserveAvailable([]pool.Node{{Key: "qqqqqq"}}) assert.Error(t, err) } @@ -258,7 +260,7 @@ func testStorage_SetAvailable(t *testing.T, p PrepareInterface) { t.Parallel() storage, deferFunc := p.CreateStorage() defer deferFunc() - node := pool.Node{Status: pool.NodeStatusBusy, Address: "qqqqqq", CapabilitiesList: []capabilities.Capabilities{{"1": "2"}}} + node := pool.Node{Key: "123", Status: pool.NodeStatusBusy, Address: "qqqqqq", CapabilitiesList: []capabilities.Capabilities{{"1": "2"}}} err := storage.Add(node, 0) if err != nil { t.Fatal("Error add node, " + err.Error()) @@ -277,7 +279,7 @@ func testStorage_SetBusy(t *testing.T, p PrepareInterface) { t.Parallel() storage, deferFunc := p.CreateStorage() defer deferFunc() - node := pool.Node{Status: pool.NodeStatusAvailable, Address: "qqqqqq", CapabilitiesList: []capabilities.Capabilities{{"1": "2"}}} + node := pool.Node{Key: "123", Status: pool.NodeStatusAvailable, Address: "qqqqqq", CapabilitiesList: []capabilities.Capabilities{{"1": "2"}}} err := storage.Add(node, 0) if err != nil { t.Fatal("Error add node, " + err.Error()) From 399bdcb01ff02c04090d4f9d84360f05a019df21 Mon Sep 17 00:00:00 2001 From: Mikhail Podtserkovskiy Date: Wed, 18 Apr 2018 12:29:54 +0300 Subject: [PATCH 3/8] QA-7710: app update node address method --- pool/mocks.go | 5 +++++ pool/pool.go | 1 + storage/local/local_test.go | 18 ++++++++++++++++++ storage/local/storage.go | 11 +++++++++++ storage/mysql/storage.go | 21 +++++++++++++++++++++ storage/tests/comon_test.go | 35 +++++++++++++++++++++++++++++++++++ storage/tests/mysql_test.go | 10 ++++++++++ 7 files changed, 101 insertions(+) diff --git a/pool/mocks.go b/pool/mocks.go index 7278fc3..896dd09 100644 --- a/pool/mocks.go +++ b/pool/mocks.go @@ -72,3 +72,8 @@ func (s *StrategyListMock) FixNodeStatus(node Node) error { args := s.Called(node) return args.Error(0) } + +func (s *StorageMock) UpdateAddress(node Node, newAddress string) error { + args := s.Called(node, newAddress) + return args.Error(0) +} diff --git a/pool/pool.go b/pool/pool.go index 0357951..6d9c44d 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -25,6 +25,7 @@ type StorageInterface interface { GetByAddress(string) (Node, error) GetAll() ([]Node, error) Remove(Node) error + UpdateAddress(node Node, newAddress string) error } type StrategyInterface interface { diff --git a/storage/local/local_test.go b/storage/local/local_test.go index f0a1152..c8374f6 100644 --- a/storage/local/local_test.go +++ b/storage/local/local_test.go @@ -149,3 +149,21 @@ func TestStorage_Remove_Negative(t *testing.T) { err := s.Remove(node) assert.Error(t, err, storage.ErrNotFound) } + +func TestStorage_UpdateAddress_UpdatesValue(t *testing.T) { + key := "1234567890" + node := pool.Node{Address: "1", Key: key} + s := Storage{db: map[string]*pool.Node{key: &node}} + expectedAddress := "2" + err := s.UpdateAddress(node, expectedAddress) + assert.NoError(t, err) + assert.Equal(t, expectedAddress, s.db[key].Address) +} + +func TestStorage_UpdateAddress_ReturnsErrNotFound(t *testing.T) { + key := "1234567890" + node := pool.Node{Address: "1", Key: key} + s := Storage{db: map[string]*pool.Node{key: &node}} + err := s.UpdateAddress(pool.Node{Key:"12345"}, "1234567890") + assert.Equal(t, storage.ErrNotFound, err) +} \ No newline at end of file diff --git a/storage/local/storage.go b/storage/local/storage.go index b80e4c7..2f99828 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -129,3 +129,14 @@ func (s *Storage) Remove(node pool.Node) error { delete(s.db, node.Key) return nil } + +func (s *Storage) UpdateAddress(node pool.Node, newAddress string) error { + s.mu.Lock() + defer s.mu.Unlock() + storedNode, ok := s.db[node.Key] + if !ok { + return storage.ErrNotFound + } + storedNode.Address = newAddress + return nil +} diff --git a/storage/mysql/storage.go b/storage/mysql/storage.go index eb0cab8..3cbc1f2 100644 --- a/storage/mysql/storage.go +++ b/storage/mysql/storage.go @@ -333,6 +333,27 @@ func (s *MysqlStorage) Remove(node pool.Node) error { return nil } +func (s *MysqlStorage) UpdateAddress(node pool.Node, newAddress string) error { + res, err := s.db.Exec( + "UPDATE node SET address = ? WHERE `key` = ?", + newAddress, + node.Key, + ) + if err != nil { + err = errors.New("[MysqlStorage/UpdateAddress] update table `node`, " + err.Error()) + return err + } + rowsAffected, err := res.RowsAffected() + if err != nil { + err = errors.New("[MysqlStorage/UpdateAddress] get affected rows, " + err.Error()) + return err + } + if rowsAffected == 0 { + return storage.ErrNotFound + } + return nil +} + func mapper(model *MysqlNodeModel) *pool.Node { node := pool.NewNode( model.Key, diff --git a/storage/tests/comon_test.go b/storage/tests/comon_test.go index 7af1870..4396298 100644 --- a/storage/tests/comon_test.go +++ b/storage/tests/comon_test.go @@ -1,6 +1,7 @@ package tests import ( + storageLib "github.com/qa-dev/jsonwire-grid/storage" "github.com/qa-dev/jsonwire-grid/pool" "github.com/qa-dev/jsonwire-grid/pool/capabilities" "github.com/stretchr/testify/assert" @@ -294,3 +295,37 @@ func testStorage_SetBusy(t *testing.T, p PrepareInterface) { assert.Equal(t, pool.NodeStatusBusy, node.Status, "Node not Busy") assert.Equal(t, expectedSessionID, node.SessionID, "Not saved sessionID") } + +// testStorage_UpdateAdderss_UpdatesValue успешное обновления адреса ноды +func testStorage_UpdateAdderss_UpdatesValue(t *testing.T, p PrepareInterface) { + t.Parallel() + storage, deferFunc := p.CreateStorage() + defer deferFunc() + node := pool.Node{SessionID: "sess", Key: "123", Status: pool.NodeStatusAvailable, Address: "qqqqqq", CapabilitiesList: []capabilities.Capabilities{{"1": "2"}}} + err := storage.Add(node, 0) + if err != nil { + t.Fatal("Error add node, " + err.Error()) + } + expectedAddress := "newAddressId" + err = storage.UpdateAddress(node, expectedAddress) + assert.Nil(t, err) + node, err = storage.GetBySession("sess") + if err != nil { + t.Fatal("Error add node, " + err.Error()) + } + assert.Equal(t, expectedAddress, node.Address, "Not updated address") +} + +// testStorage_UpdateAdderss_ReturnsErrNotFound попытка обновить несуществующую ноду +func testStorage_UpdateAdderss_ReturnsErrNotFound(t *testing.T, p PrepareInterface) { + t.Parallel() + storage, deferFunc := p.CreateStorage() + defer deferFunc() + node := pool.Node{SessionID: "sess", Key: "123", Status: pool.NodeStatusAvailable, Address: "qqqqqq", CapabilitiesList: []capabilities.Capabilities{{"1": "2"}}} + err := storage.Add(pool.Node{Key:"12345"}, 0) + if err != nil { + t.Fatal("Error add node, " + err.Error()) + } + err = storage.UpdateAddress(node, "trololo") + assert.Equal(t, storageLib.ErrNotFound, err) +} \ No newline at end of file diff --git a/storage/tests/mysql_test.go b/storage/tests/mysql_test.go index 2d969f9..3d322e2 100644 --- a/storage/tests/mysql_test.go +++ b/storage/tests/mysql_test.go @@ -155,3 +155,13 @@ func TestMysqlStorage_SetAvailable(t *testing.T) { func TestMysqlStorage_SetBusy(t *testing.T) { testStorage_SetBusy(t, mv) } + +// TestMysqlStorage_UpdateAdderss_UpdatesValue see testStorage_UpdateAdderss_UpdatesValue +func TestMysqlStorage_UpdateAdderss_UpdatesValue(t *testing.T) { + testStorage_UpdateAdderss_UpdatesValue(t, mv) +} + +// TestMysqlStorage_UpdateAdderss_ReturnsErrNotFound see testStorage_UpdateAdderss_ReturnsErrNotFound +func TestMysqlStorage_UpdateAdderss_ReturnsErrNotFound(t *testing.T) { + testStorage_UpdateAdderss_ReturnsErrNotFound(t, mv) +} From 97d134c3e1079c1850ed8331b98238e8bceb59f7 Mon Sep 17 00:00:00 2001 From: Mikhail Podtserkovskiy Date: Wed, 18 Apr 2018 15:17:01 +0300 Subject: [PATCH 4/8] QA-7710: change service address to pod ip in kubernetes strategy --- pool/strategy/kubernetes/provider.go | 60 +++++++++++++---------- pool/strategy/kubernetes/strategy.go | 12 +++-- pool/strategy/kubernetes/strategy_test.go | 31 ++++++++++-- 3 files changed, 69 insertions(+), 34 deletions(-) diff --git a/pool/strategy/kubernetes/provider.go b/pool/strategy/kubernetes/provider.go index d5170df..2f468e1 100644 --- a/pool/strategy/kubernetes/provider.go +++ b/pool/strategy/kubernetes/provider.go @@ -10,10 +10,11 @@ import ( "strconv" "time" "strings" + "fmt" ) type kubernetesProviderInterface interface { - Create(podName string, nodeParams nodeParams) error + Create(podName string, nodeParams nodeParams) (nodeAddress string, err error) // idempotent operation Destroy(podName string) error } @@ -24,7 +25,7 @@ type kubDnsProvider struct { clientFactory jsonwire.ClientFactoryInterface } -func (p *kubDnsProvider) Create(podName string, nodeParams nodeParams) error { +func (p *kubDnsProvider) Create(podName string, nodeParams nodeParams) (nodeAddress string, err error) { pod := &apiV1.Pod{} pod.ObjectMeta.Name = podName pod.ObjectMeta.Labels = map[string]string{"name": podName} @@ -34,37 +35,51 @@ func (p *kubDnsProvider) Create(podName string, nodeParams nodeParams) error { container.Image = nodeParams.Image port, err := strconv.Atoi(nodeParams.Port) if err != nil { - return errors.New("convert to int nodeParams.Port, " + err.Error()) + return "", errors.New("convert to int nodeParams.Port, " + err.Error()) } container.Ports = []apiV1.ContainerPort{{ContainerPort: int32(port)}} pod.Spec.Containers = append(pod.Spec.Containers, container) _, err = p.clientset.CoreV1Client.Pods(p.namespace).Create(pod) if err != nil { - return errors.New("send command pod/create to k8s, " + err.Error()) + return "", errors.New("send command pod/create to k8s, " + err.Error()) } - service := &apiV1.Service{} - service.ObjectMeta.Name = podName - service.Spec.ClusterIP = "None" - service.Spec.Ports = []apiV1.ServicePort{{Port: int32(port)}} - service.Spec.Selector = map[string]string{"name": podName} - _, err = p.clientset.CoreV1Client.Services(p.namespace).Create(service) - if err != nil { - return errors.New("send command service/create to k8s, " + err.Error()) + stopWaitIP := time.After(40 * time.Second) + log.Debugf("start waiting pod ip") + var createdPodIP string +LoopWaitIP: + for { + select { + case <-stopWaitIP: + return "", fmt.Errorf("wait podIP stopped by timeout, %v", podName) + default: + time.Sleep(time.Second) + createdPod, err := p.clientset.CoreV1Client.Pods(p.namespace).Get(podName) + if err != nil { + log.Debugf("fail get created pod, %v, %v",podName, err) + continue + } + if createdPod.Status.PodIP == "" { + log.Debugf("empty pod ip, %v", podName) + continue + } + createdPodIP = createdPod.Status.PodIP + break LoopWaitIP + } } // todo: пока так ожидаем поднятие ноды, так как не понятно что конкретно означают статусы возвращаемые через апи - client := p.clientFactory.Create(net.JoinHostPort(podName, nodeParams.Port)) + nodeAddress = net.JoinHostPort(createdPodIP, nodeParams.Port) + client := p.clientFactory.Create(nodeAddress) stop := time.After(40 * time.Second) - log.Debugln("start waiting") -Loop: + log.Debugln("start waiting selenium") +LoopWaitSelenium: for { select { case <-stop: - return errors.New("wait stopped by timeout") + return "", fmt.Errorf("wait selenium stopped by timeout, %v", podName) default: time.Sleep(time.Second) - log.Debugln("start request") message, err := client.Health() if err != nil { log.Debugf("fail request, %v", err) @@ -72,12 +87,12 @@ Loop: } log.Debugf("done request, status: %v", message.Status) if message.Status == 0 { - break Loop + break LoopWaitSelenium } } } - return nil + return nodeAddress, nil } //Destroy - destroy all pod data (idempotent operation) @@ -90,12 +105,5 @@ func (p *kubDnsProvider) Destroy(podName string) error { err = errors.New("send command pod/delete to k8s, " + err.Error()) return err } - err = p.clientset.CoreV1Client.Services(p.namespace).Delete(podName, &apiV1.DeleteOptions{}) - switch { - case err != nil && strings.Contains(err.Error(), "not found"): - // service already deleted - case err != nil: - return errors.New("send command service/delete to k8s, " + err.Error()) - } return nil } diff --git a/pool/strategy/kubernetes/strategy.go b/pool/strategy/kubernetes/strategy.go index ab28428..cd1cd4b 100644 --- a/pool/strategy/kubernetes/strategy.go +++ b/pool/strategy/kubernetes/strategy.go @@ -6,7 +6,6 @@ import ( "github.com/qa-dev/jsonwire-grid/pool/capabilities" "github.com/qa-dev/jsonwire-grid/pool/strategy" "github.com/satori/go.uuid" - "net" "time" "fmt" ) @@ -25,13 +24,12 @@ func (s *Strategy) Reserve(desiredCaps capabilities.Capabilities) (pool.Node, er } podName := "wd-node-" + uuid.NewV4().String() ts := time.Now().Unix() - address := net.JoinHostPort(podName, nodeConfig.Params.Port) - node := pool.NewNode(podName, pool.NodeTypeKubernetes, address, pool.NodeStatusReserved, "", ts, ts, []capabilities.Capabilities{}) + node := pool.NewNode(podName, pool.NodeTypeKubernetes, "temp-value-replace-me", pool.NodeStatusReserved, "", ts, ts, []capabilities.Capabilities{}) err := s.storage.Add(*node, s.config.Limit) if err != nil { return pool.Node{}, errors.New("add node to storage, " + err.Error()) } - err = s.provider.Create(podName, nodeConfig.Params) + nodeAddress, err := s.provider.Create(podName, nodeConfig.Params) if err != nil { go func(podName string) { time.Sleep(time.Minute * 2) @@ -39,6 +37,12 @@ func (s *Strategy) Reserve(desiredCaps capabilities.Capabilities) (pool.Node, er }(podName) return pool.Node{}, errors.New("create node by provider, " + err.Error()) } + + err = s.storage.UpdateAddress(*node, nodeAddress) + if err != nil { + return pool.Node{}, errors.New("update node address in storage, " + err.Error()) + } + node.Address = nodeAddress return *node, nil } diff --git a/pool/strategy/kubernetes/strategy_test.go b/pool/strategy/kubernetes/strategy_test.go index 2ee2877..9e3527a 100644 --- a/pool/strategy/kubernetes/strategy_test.go +++ b/pool/strategy/kubernetes/strategy_test.go @@ -14,9 +14,9 @@ type providerMock struct { mock.Mock } -func (p *providerMock) Create(podName string, nodeParams nodeParams) error { +func (p *providerMock) Create(podName string, nodeParams nodeParams) (nodeAddress string, err error) { args := p.Called(podName, nodeParams) - return args.Error(0) + return args.String(0), args.Error(1) } func (p *providerMock) Destroy(podName string) error { @@ -32,14 +32,17 @@ func TestStrategy_Reserve_Positive(t *testing.T) { } sm := new(pool.StorageMock) sm.On("Add", mock.AnythingOfType("pool.Node"), mock.AnythingOfType("int")).Return(nil) + sm.On("UpdateAddress", mock.AnythingOfType("pool.Node"), mock.AnythingOfType("string")).Return(nil) cm := new(capabilities.ComparatorMock) cm.On("Compare", mock.AnythingOfType("capabilities.Capabilities"), mock.AnythingOfType("capabilities.Capabilities")).Return(true) pm := new(providerMock) - pm.On("Create", mock.AnythingOfType("string"), mock.AnythingOfType("nodeParams")).Return(nil) + expectedAddress := "addr" + pm.On("Create", mock.AnythingOfType("string"), mock.AnythingOfType("nodeParams")).Return(expectedAddress, nil) str := Strategy{storage: sm, provider: pm, config: strategyConfig, capsComparator: cm} node, err := str.Reserve(capabilities.Capabilities{}) assert.Nil(t, err) assert.NotNil(t, node) + assert.Equal(t, expectedAddress, node.Address) } func TestStrategy_Reserve_Negative_NotMatchCapabilities(t *testing.T) { @@ -61,7 +64,27 @@ func TestStrategy_Reserve_Negative_ReserveAvailable(t *testing.T) { cm.On("Compare", mock.AnythingOfType("capabilities.Capabilities"), mock.AnythingOfType("capabilities.Capabilities")).Return(true) pm := new(providerMock) eError := errors.New("Error") - pm.On("Create", mock.AnythingOfType("string"), mock.AnythingOfType("nodeParams")).Return(eError) + pm.On("Create", mock.AnythingOfType("string"), mock.AnythingOfType("nodeParams")).Return("", eError) + pm.On("Destroy", mock.AnythingOfType("string")).Return(nil) + str := Strategy{storage: sm, provider: pm, config: strategyConfig, capsComparator: cm} + _, err := str.Reserve(capabilities.Capabilities{}) + assert.NotNil(t, err) +} + +func TestStrategy_Reserve_Negative_UpdateAddress(t *testing.T) { + nodeCfg := nodeConfig{} + nodeCfg.CapabilitiesList = []map[string]interface{}{{"cap1": "cal1"}} + strategyConfig := strategyConfig{ + NodeList: []nodeConfig{nodeCfg}, + } + sm := new(pool.StorageMock) + sm.On("Add", mock.AnythingOfType("pool.Node"), mock.AnythingOfType("int")).Return(nil) + sm.On("UpdateAddress", mock.AnythingOfType("pool.Node"), mock.AnythingOfType("string")).Return(errors.New("muhaha-error")) + cm := new(capabilities.ComparatorMock) + cm.On("Compare", mock.AnythingOfType("capabilities.Capabilities"), mock.AnythingOfType("capabilities.Capabilities")).Return(true) + pm := new(providerMock) + eError := errors.New("Error") + pm.On("Create", mock.AnythingOfType("string"), mock.AnythingOfType("nodeParams")).Return("", eError) pm.On("Destroy", mock.AnythingOfType("string")).Return(nil) str := Strategy{storage: sm, provider: pm, config: strategyConfig, capsComparator: cm} _, err := str.Reserve(capabilities.Capabilities{}) From 08f3f09f5705032584187b80da87ea9f37de7698 Mon Sep 17 00:00:00 2001 From: Mikhail Podtserkovskiy Date: Wed, 18 Apr 2018 17:33:09 +0300 Subject: [PATCH 5/8] QA-7710: move hardcode to config --- pool/strategy/kubernetes/config.go | 24 ++++++++++++++++++++++++ pool/strategy/kubernetes/factory.go | 6 +++++- pool/strategy/kubernetes/provider.go | 6 +++--- 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/pool/strategy/kubernetes/config.go b/pool/strategy/kubernetes/config.go index 138422a..c49b948 100644 --- a/pool/strategy/kubernetes/config.go +++ b/pool/strategy/kubernetes/config.go @@ -4,9 +4,33 @@ import ( "encoding/json" "errors" "github.com/qa-dev/jsonwire-grid/config" + "time" + "fmt" ) type strategyParams struct { + Namespace string + PodCreationTimeout time.Duration +} + +func (sp *strategyParams) UnmarshalJSON(b []byte) error { + tempStruct := struct{ + Namespace string `json:"namespace"` + PodCreationTimeout string `json:"pod_creation_timeout"` + } { + "default", + "1m", + } + if err := json.Unmarshal(b, &tempStruct); err != nil { + return err + } + podCreationTimeout, err := time.ParseDuration(tempStruct.PodCreationTimeout) + if err != nil { + return fmt.Errorf("invalid value strategy.pod_creation_timeout in config, given: %v", tempStruct.PodCreationTimeout) + } + sp.Namespace = tempStruct.Namespace + sp.PodCreationTimeout = podCreationTimeout + return nil } type strategyConfig struct { diff --git a/pool/strategy/kubernetes/factory.go b/pool/strategy/kubernetes/factory.go index 19d728a..90b8146 100644 --- a/pool/strategy/kubernetes/factory.go +++ b/pool/strategy/kubernetes/factory.go @@ -8,6 +8,7 @@ import ( "github.com/qa-dev/jsonwire-grid/pool/capabilities" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + log "github.com/Sirupsen/logrus" ) type StrategyFactory struct { @@ -31,6 +32,8 @@ func (f *StrategyFactory) Create( } } + log.Debugf("strategy kubernetes config, %+v", strategyConfig) + //todo: выпилить этот говноклиент, когда будет работать нормальный kubConfig, err := rest.InClusterConfig() if err != nil { @@ -44,7 +47,8 @@ func (f *StrategyFactory) Create( provider := &kubDnsProvider{ clientset: clientset, - namespace: "default", //todo: брать из конфига !!! + namespace: strategyConfig.Params.Namespace, + podCreationTimeout: strategyConfig.Params.PodCreationTimeout, clientFactory: clientFactory, } diff --git a/pool/strategy/kubernetes/provider.go b/pool/strategy/kubernetes/provider.go index 2f468e1..f79f233 100644 --- a/pool/strategy/kubernetes/provider.go +++ b/pool/strategy/kubernetes/provider.go @@ -22,6 +22,7 @@ type kubernetesProviderInterface interface { type kubDnsProvider struct { clientset *kubernetes.Clientset namespace string + podCreationTimeout time.Duration clientFactory jsonwire.ClientFactoryInterface } @@ -44,13 +45,13 @@ func (p *kubDnsProvider) Create(podName string, nodeParams nodeParams) (nodeAddr return "", errors.New("send command pod/create to k8s, " + err.Error()) } - stopWaitIP := time.After(40 * time.Second) + stop := time.After(p.podCreationTimeout) log.Debugf("start waiting pod ip") var createdPodIP string LoopWaitIP: for { select { - case <-stopWaitIP: + case <-stop: return "", fmt.Errorf("wait podIP stopped by timeout, %v", podName) default: time.Sleep(time.Second) @@ -71,7 +72,6 @@ LoopWaitIP: // todo: пока так ожидаем поднятие ноды, так как не понятно что конкретно означают статусы возвращаемые через апи nodeAddress = net.JoinHostPort(createdPodIP, nodeParams.Port) client := p.clientFactory.Create(nodeAddress) - stop := time.After(40 * time.Second) log.Debugln("start waiting selenium") LoopWaitSelenium: for { From 82512d1f52daf962e84d3a596330d6b71a1502aa Mon Sep 17 00:00:00 2001 From: Mikhail Podtserkovskiy Date: Wed, 18 Apr 2018 17:50:48 +0300 Subject: [PATCH 6/8] QA-7710: update readme --- README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index b75f927..ee61bbc 100644 --- a/README.md +++ b/README.md @@ -112,8 +112,9 @@ Configurations are stored in json files. Example: | node_list | - | Omit this property. | ##### `kubernetes` - on-demand nodes in kubernetes cluster. -| Strategy option | Possible values | Description | -|-------------------------- | --------------- | --------------------------- | -| params | - | Omit this property. | -| node_list.[].params.image | string | Docker image with selenium. | -| node_list.[].params.port | string | Port of selenium. | +| Strategy option | Possible values | Description | +|-------------------------- | ---------------------- | ------------------------------------- | +| params.namespace | string | Namespace in k8s for on-demand nodes. | +| params.pod_creation_timeout | string as `12m`, `60s` | Max waiting time for creating a pod. | +| node_list.[].params.image | string | Docker image with selenium. | +| node_list.[].params.port | string | Port of selenium. | From d48004e2f47538f612fefec4145ede4906a5ce70 Mon Sep 17 00:00:00 2001 From: Mikhail Podtserkovskiy Date: Thu, 19 Apr 2018 16:16:59 +0300 Subject: [PATCH 7/8] QA-7710: add comments --- pool/node.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pool/node.go b/pool/node.go index e45ef32..423225d 100644 --- a/pool/node.go +++ b/pool/node.go @@ -18,6 +18,10 @@ const ( ) type Node struct { + // A unique key, by which we understand how to find this object in the outer world + for not adding the second time the same thing. + // The value may depend on the strategy: + // - for constant nodes ip: port + // - for temporary pod.name Key string Type NodeType Address string From 2a3a5e7bff1b029a19f3ca4f5992230fb0f991ef Mon Sep 17 00:00:00 2001 From: Mikhail Podtserkovskiy Date: Fri, 20 Apr 2018 10:52:51 +0300 Subject: [PATCH 8/8] QA-7710: fix --- pool/strategy/kubernetes/provider.go | 8 ++------ pool/strategy/kubernetes/strategy.go | 4 +++- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pool/strategy/kubernetes/provider.go b/pool/strategy/kubernetes/provider.go index f79f233..4bc430b 100644 --- a/pool/strategy/kubernetes/provider.go +++ b/pool/strategy/kubernetes/provider.go @@ -98,12 +98,8 @@ LoopWaitSelenium: //Destroy - destroy all pod data (idempotent operation) func (p *kubDnsProvider) Destroy(podName string) error { err := p.clientset.CoreV1Client.Pods(p.namespace).Delete(podName, &apiV1.DeleteOptions{}) - switch { - case err != nil && strings.Contains(err.Error(), "not found"): - // pod already deleted - case err != nil: - err = errors.New("send command pod/delete to k8s, " + err.Error()) - return err + if err != nil && !strings.Contains(err.Error(), "not found") { + return errors.New("send command pod/delete to k8s, " + err.Error()) } return nil } diff --git a/pool/strategy/kubernetes/strategy.go b/pool/strategy/kubernetes/strategy.go index cd1cd4b..1c25615 100644 --- a/pool/strategy/kubernetes/strategy.go +++ b/pool/strategy/kubernetes/strategy.go @@ -17,6 +17,8 @@ type Strategy struct { capsComparator capabilities.ComparatorInterface } +const cleanupFailedPodsTimeout = time.Minute * 2 + func (s *Strategy) Reserve(desiredCaps capabilities.Capabilities) (pool.Node, error) { nodeConfig := s.findApplicableConfig(s.config.NodeList, desiredCaps) if nodeConfig == nil { @@ -32,7 +34,7 @@ func (s *Strategy) Reserve(desiredCaps capabilities.Capabilities) (pool.Node, er nodeAddress, err := s.provider.Create(podName, nodeConfig.Params) if err != nil { go func(podName string) { - time.Sleep(time.Minute * 2) + time.Sleep(cleanupFailedPodsTimeout) _ = s.provider.Destroy(podName) // на случай если что то криво создалось }(podName) return pool.Node{}, errors.New("create node by provider, " + err.Error())