Skip to content

Qa 7710 #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ Configurations are stored in json files. Example:
| node_list | - | Omit this property. |

##### `kubernetes` - on-demand nodes in kubernetes cluster.
| Strategy option | Possible values | Description |
|-------------------------- | --------------- | --------------------------- |
| params | - | Omit this property. |
| node_list.[].params.image | string | Docker image with selenium. |
| node_list.[].params.port | string | Port of selenium. |
| Strategy option | Possible values | Description |
|-------------------------- | ---------------------- | ------------------------------------- |
| params.namespace | string | Namespace in k8s for on-demand nodes. |
| params.pod_creation_timeout | string as `12m`, `60s` | Max waiting time for creating a pod. |
| node_list.[].params.image | string | Docker image with selenium. |
| node_list.[].params.port | string | Port of selenium. |
4 changes: 3 additions & 1 deletion handlers/registerNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ func (h *RegisterNode) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
for i, value := range capabilitiesList {
poolCapabilitiesList[i] = capabilities.Capabilities(value)
}
hostPort := register.Configuration.Host + ":" + strconv.Itoa(register.Configuration.Port)
err = h.Pool.Add(
hostPort,
pool.NodeTypePersistent,
register.Configuration.Host+":"+strconv.Itoa(register.Configuration.Port),
hostPort,
poolCapabilitiesList,
)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pool/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,8 @@ func (s *StrategyListMock) FixNodeStatus(node Node) error {
args := s.Called(node)
return args.Error(0)
}

func (s *StorageMock) UpdateAddress(node Node, newAddress string) error {
args := s.Called(node, newAddress)
return args.Error(0)
}
9 changes: 8 additions & 1 deletion pool/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ const (
)

type Node struct {
// A unique key, by which we understand how to find this object in the outer world + for not adding the second time the same thing.
// The value may depend on the strategy:
// - for constant nodes ip: port
// - for temporary pod.name
Key string
Type NodeType
Address string
Status NodeStatus
Expand All @@ -28,10 +33,11 @@ type Node struct {
}

func (n *Node) String() string {
return "Node [" + n.Address + "]"
return "Node [" + n.Key + "]"
}

func NewNode(
key string,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А за что отвечает ключ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

добавил комент в код
уникальный ключ, по которому мы понимаем как нам найти этот объект во внешнем мире + за то чтобы не добавить второй раз одно и то же
значение может зависеть от стратегии
для постоянных нод ip:port
для временных pod.name

t NodeType,
address string,
status NodeStatus,
Expand All @@ -41,6 +47,7 @@ func NewNode(
capabilitiesList []capabilities.Capabilities,
) *Node {
return &Node{
key,
t,
address,
status,
Expand Down
9 changes: 5 additions & 4 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type StorageInterface interface {
GetByAddress(string) (Node, error)
GetAll() ([]Node, error)
Remove(Node) error
UpdateAddress(node Node, newAddress string) error
}

type StrategyInterface interface {
Expand Down Expand Up @@ -68,12 +69,12 @@ func (p *Pool) ReserveAvailableNode(caps capabilities.Capabilities) (*Node, erro
return &node, err
}

func (p *Pool) Add(t NodeType, address string, capabilitiesList []capabilities.Capabilities) error {
func (p *Pool) Add(key string, t NodeType, address string, capabilitiesList []capabilities.Capabilities) error {
if len(capabilitiesList) == 0 {
return errors.New("[Pool/Add] Capabilities must contains more one element")
}
ts := time.Now().Unix()
return p.storage.Add(*NewNode(t, address, NodeStatusAvailable, "", ts, ts, capabilitiesList), 0)
return p.storage.Add(*NewNode(key, t, address, NodeStatusAvailable, "", ts, ts, capabilitiesList), 0)
}

func (p *Pool) RegisterSession(node *Node, sessionID string) error {
Expand Down Expand Up @@ -148,7 +149,7 @@ func (p *Pool) FixNodeStatuses() {
continue
}
if isFixed {
log.Infof("Node [%s] status fixed", node.Address)
log.Infof("Node [%s] status fixed", node.Key)
}
}
}
Expand All @@ -169,7 +170,7 @@ func (p *Pool) fixNodeStatus(node *Node) (bool, error) {
}
err := p.strategyList.FixNodeStatus(*node)
if err != nil {
return false, fmt.Errorf("Can't fix node [%s] status, %s", node.Address, err.Error())
return false, fmt.Errorf("Can't fix node [%s] status, %s", node.Key, err.Error())
}
return true, nil
}
18 changes: 9 additions & 9 deletions pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestPool_Add_Positive(t *testing.T) {
p := NewPool(s, new(StrategyListMock))
eAddress := "127.0.0.1"
eNodeType := NodeTypePersistent
err := p.Add(eNodeType, eAddress, []capabilities.Capabilities{{"browserName": "ololo"}})
err := p.Add("1234", eNodeType, eAddress, []capabilities.Capabilities{{"browserName": "ololo"}})
a.Nil(err)
}

Expand All @@ -59,7 +59,7 @@ func TestPool_Add_Negative(t *testing.T) {
p := NewPool(s, new(StrategyListMock))
eAddress := "127.0.0.1"
eNodeType := NodeTypePersistent
err := p.Add(eNodeType, eAddress, []capabilities.Capabilities{})
err := p.Add("1234", eNodeType, eAddress, []capabilities.Capabilities{})
a.Error(err)
}

Expand Down Expand Up @@ -225,7 +225,7 @@ func TestPool_fixNodeStatus_Positive_BusyExpired(t *testing.T) {
slm := new(StrategyListMock)
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil)
p := NewPool(new(StorageMock), slm)
node := NewNode(NodeTypePersistent, "", NodeStatusBusy, "", 0, 0, []capabilities.Capabilities{})
node := NewNode("123", NodeTypePersistent, "", NodeStatusBusy, "", 0, 0, []capabilities.Capabilities{})
isFixed, err := p.fixNodeStatus(node)
a.True(isFixed)
a.Nil(err)
Expand All @@ -236,7 +236,7 @@ func TestPool_fixNodeStatus_Positive_ReservedExpired(t *testing.T) {
slm := new(StrategyListMock)
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil)
p := NewPool(new(StorageMock), slm)
node := NewNode(NodeTypePersistent, "", NodeStatusReserved, "", 0, 0, []capabilities.Capabilities{})
node := NewNode("123", NodeTypePersistent, "", NodeStatusReserved, "", 0, 0, []capabilities.Capabilities{})
isFixed, err := p.fixNodeStatus(node)
a.True(isFixed)
a.Nil(err)
Expand All @@ -247,7 +247,7 @@ func TestPool_fixNodeStatus_Positive_BusyNotNotExpired(t *testing.T) {
slm := new(StrategyListMock)
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil)
p := NewPool(new(StorageMock), slm)
node := NewNode(NodeTypePersistent, "", NodeStatusBusy, "", time.Now().Unix(), 0, []capabilities.Capabilities{})
node := NewNode("123", NodeTypePersistent, "", NodeStatusBusy, "", time.Now().Unix(), 0, []capabilities.Capabilities{})
isFixed, err := p.fixNodeStatus(node)
a.False(isFixed)
a.Nil(err)
Expand All @@ -258,7 +258,7 @@ func TestPool_fixNodeStatus_Positive_ReservedNotNotExpired(t *testing.T) {
slm := new(StrategyListMock)
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil)
p := NewPool(new(StorageMock), slm)
node := NewNode(NodeTypePersistent, "", NodeStatusReserved, "", time.Now().Unix(), 0, []capabilities.Capabilities{})
node := NewNode("123", NodeTypePersistent, "", NodeStatusReserved, "", time.Now().Unix(), 0, []capabilities.Capabilities{})
isFixed, err := p.fixNodeStatus(node)
a.False(isFixed)
a.Nil(err)
Expand All @@ -269,7 +269,7 @@ func TestPool_fixNodeStatus_Positive_AvailableExpired(t *testing.T) {
slm := new(StrategyListMock)
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil)
p := NewPool(new(StorageMock), slm)
node := NewNode(NodeTypePersistent, "", NodeStatusAvailable, "", 0, 0, []capabilities.Capabilities{})
node := NewNode("123", NodeTypePersistent, "", NodeStatusAvailable, "", 0, 0, []capabilities.Capabilities{})
isFixed, err := p.fixNodeStatus(node)
a.False(isFixed)
a.Nil(err)
Expand All @@ -281,7 +281,7 @@ func TestPool_fixNodeStatus_NegativeBusy(t *testing.T) {
slm := new(StrategyListMock)
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(eError)
p := NewPool(new(StorageMock), slm)
node := NewNode(NodeTypePersistent, "", NodeStatusBusy, "", 0, 0, []capabilities.Capabilities{})
node := NewNode("123", NodeTypePersistent, "", NodeStatusBusy, "", 0, 0, []capabilities.Capabilities{})
isFixed, err := p.fixNodeStatus(node)
a.False(isFixed)
a.Error(err)
Expand All @@ -293,7 +293,7 @@ func TestPool_fixNodeStatus_NegativeReserved(t *testing.T) {
slm := new(StrategyListMock)
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(eError)
p := NewPool(new(StorageMock), slm)
node := NewNode(NodeTypePersistent, "", NodeStatusReserved, "", 0, 0, []capabilities.Capabilities{})
node := NewNode("123", NodeTypePersistent, "", NodeStatusReserved, "", 0, 0, []capabilities.Capabilities{})
isFixed, err := p.fixNodeStatus(node)
a.False(isFixed)
a.Error(err)
Expand Down
24 changes: 24 additions & 0 deletions pool/strategy/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,33 @@ import (
"encoding/json"
"errors"
"github.com/qa-dev/jsonwire-grid/config"
"time"
"fmt"
)

type strategyParams struct {
Namespace string
PodCreationTimeout time.Duration
}

func (sp *strategyParams) UnmarshalJSON(b []byte) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Выглядет так что тебе нужно добавить валидацию конфигов и тип type CfgDuration string с методом getStdlibDuration() time.Duration

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

нужно координально все связанное с конфигами переделать, пока предлагаю оставить так как временное решение и вернуться к этому позже

tempStruct := struct{
Namespace string `json:"namespace"`
PodCreationTimeout string `json:"pod_creation_timeout"`
} {
"default",
"1m",
}
if err := json.Unmarshal(b, &tempStruct); err != nil {
return err
}
podCreationTimeout, err := time.ParseDuration(tempStruct.PodCreationTimeout)
if err != nil {
return fmt.Errorf("invalid value strategy.pod_creation_timeout in config, given: %v", tempStruct.PodCreationTimeout)
}
sp.Namespace = tempStruct.Namespace
sp.PodCreationTimeout = podCreationTimeout
return nil
}

type strategyConfig struct {
Expand Down
8 changes: 6 additions & 2 deletions pool/strategy/kubernetes/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/qa-dev/jsonwire-grid/pool/capabilities"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
log "github.com/Sirupsen/logrus"
)

type StrategyFactory struct {
Expand All @@ -31,6 +32,8 @@ func (f *StrategyFactory) Create(
}
}

log.Debugf("strategy kubernetes config, %+v", strategyConfig)

//todo: выпилить этот говноклиент, когда будет работать нормальный
kubConfig, err := rest.InClusterConfig()
if err != nil {
Expand All @@ -42,9 +45,10 @@ func (f *StrategyFactory) Create(
return nil, errors.New("create k8s clientset, " + err.Error())
}

provider := &kubernetesProvider{
provider := &kubDnsProvider{
clientset: clientset,
namespace: "default", //todo: брать из конфига !!!
namespace: strategyConfig.Params.Namespace,
podCreationTimeout: strategyConfig.Params.PodCreationTimeout,
clientFactory: clientFactory,
}

Expand Down
72 changes: 42 additions & 30 deletions pool/strategy/kubernetes/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@ import (
"net"
"strconv"
"time"
"strings"
"fmt"
)

type kubernetesProviderInterface interface {
Create(podName string, nodeParams nodeParams) error
Create(podName string, nodeParams nodeParams) (nodeAddress string, err error)
// idempotent operation
Destroy(podName string) error
}

type kubernetesProvider struct {
type kubDnsProvider struct {
clientset *kubernetes.Clientset
namespace string
podCreationTimeout time.Duration
clientFactory jsonwire.ClientFactoryInterface
}

func (p *kubernetesProvider) Create(podName string, nodeParams nodeParams) error {
func (p *kubDnsProvider) Create(podName string, nodeParams nodeParams) (nodeAddress string, err error) {
pod := &apiV1.Pod{}
pod.ObjectMeta.Name = podName
pod.ObjectMeta.Labels = map[string]string{"name": podName}
Expand All @@ -32,62 +36,70 @@ func (p *kubernetesProvider) Create(podName string, nodeParams nodeParams) error
container.Image = nodeParams.Image
port, err := strconv.Atoi(nodeParams.Port)
if err != nil {
return errors.New("convert to int nodeParams.Port, " + err.Error())
return "", errors.New("convert to int nodeParams.Port, " + err.Error())
}
container.Ports = []apiV1.ContainerPort{{ContainerPort: int32(port)}}
pod.Spec.Containers = append(pod.Spec.Containers, container)
_, err = p.clientset.CoreV1Client.Pods(p.namespace).Create(pod)
if err != nil {
return errors.New("send command pod/create to k8s, " + err.Error())
return "", errors.New("send command pod/create to k8s, " + err.Error())
}

service := &apiV1.Service{}
service.ObjectMeta.Name = podName
service.Spec.ClusterIP = "None"
service.Spec.Ports = []apiV1.ServicePort{{Port: int32(port)}}
service.Spec.Selector = map[string]string{"name": podName}
_, err = p.clientset.CoreV1Client.Services(p.namespace).Create(service)
if err != nil {
return errors.New("send command service/create to k8s, " + err.Error())
stop := time.After(p.podCreationTimeout)
log.Debugf("start waiting pod ip")
var createdPodIP string
LoopWaitIP:
for {
select {
case <-stop:
return "", fmt.Errorf("wait podIP stopped by timeout, %v", podName)
default:
time.Sleep(time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А что будет если человек по не знанию поставит p.podCreationTimeout = 500 * time.MIliseconds )))) наверно надо тоже в конфиг засунуть

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

в конфиг засовывать не стоит, так как это его лишь усложнит его
сейчас пользователь просто повисит секунду + время на запрос в куб вместо 1/2 секунды, ничего страшного не пройзойдет
если мы хотим управлять частотой опроса можно делать это так req_f = podCreationTimeout / 3 если podCreationTimeout < 3 sec
но в этом тоже не много смысла, ибо время создания селениума в кубе всегда больше 1 секунды и если человек поставит маленький таймаут то он никогда не дождется результата

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Нет, он просто все время с таймаутом будет валиться, валидируй тогда значение на входе

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

возникает вопрос какой таймаут считать минимальным и нужно ли это?
до первого issue будем верить в осознанность пользователей

createdPod, err := p.clientset.CoreV1Client.Pods(p.namespace).Get(podName)
if err != nil {
log.Debugf("fail get created pod, %v, %v",podName, err)
continue
}
if createdPod.Status.PodIP == "" {
log.Debugf("empty pod ip, %v", podName)
continue
}
createdPodIP = createdPod.Status.PodIP
break LoopWaitIP
}
}

// todo: пока так ожидаем поднятие ноды, так как не понятно что конкретно означают статусы возвращаемые через апи
client := p.clientFactory.Create(net.JoinHostPort(podName, nodeParams.Port))
stop := time.After(40 * time.Second)
log.Debugln("start waiting")
Loop:
nodeAddress = net.JoinHostPort(createdPodIP, nodeParams.Port)
client := p.clientFactory.Create(nodeAddress)
log.Debugln("start waiting selenium")
LoopWaitSelenium:
for {
select {
case <-stop:
return errors.New("wait stopped by timeout")
return "", fmt.Errorf("wait selenium stopped by timeout, %v", podName)
default:
time.Sleep(time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ВЫше описал.

log.Debugln("start request")
message, err := client.Health()
if err != nil {
log.Debugf("fail request, %v", err)
continue
}
log.Debugf("done request, status: %v", message.Status)
if message.Status == 0 {
break Loop
break LoopWaitSelenium
}
}
}

return nil
return nodeAddress, nil
}

func (p *kubernetesProvider) Destroy(podName string) error {
//Destroy - destroy all pod data (idempotent operation)
func (p *kubDnsProvider) Destroy(podName string) error {
err := p.clientset.CoreV1Client.Pods(p.namespace).Delete(podName, &apiV1.DeleteOptions{})
if err != nil {
err = errors.New("send command pod/delete to k8s, " + err.Error())
return err
}
err = p.clientset.CoreV1Client.Services(p.namespace).Delete(podName, &apiV1.DeleteOptions{})
if err != nil {
err = errors.New("send command service/delete to k8s, " + err.Error())
return err
if err != nil && !strings.Contains(err.Error(), "not found") {
return errors.New("send command pod/delete to k8s, " + err.Error())
}
return nil
}
Loading