-
Notifications
You must be signed in to change notification settings - Fork 787
/
kube_test_helpers.go
131 lines (115 loc) · 3.62 KB
/
kube_test_helpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package testhelpers
import (
"bytes"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/jenkins-x/jx/v2/pkg/cmd/clients"
"github.com/jenkins-x/jx/v2/pkg/log"
"github.com/pkg/errors"
core_v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)
// GetFreePort asks the kernel for a free open port that is ready to use.
func GetFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
if err != nil {
return 0, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
defer func() {
_ = l.Close()
}()
return l.Addr().(*net.TCPAddr).Port, nil
}
// WaitForPod waits for the specified duration for the given Pod to get into the 'Running' status.
func WaitForPod(pod *core_v1.Pod, namespace string, labels map[string]string, timeout time.Duration, kubeClient kubernetes.Interface) error {
status := pod.Status
watch, err := kubeClient.CoreV1().Pods(namespace).Watch(meta_v1.ListOptions{
Watch: true,
ResourceVersion: pod.ResourceVersion,
LabelSelector: LabelSelector(labels),
})
if err != nil {
return errors.Wrapf(err, "unable to create watch for pod '%s'", pod.Name)
}
func() {
for {
select {
case events, ok := <-watch.ResultChan():
if !ok {
return
}
pod := events.Object.(*core_v1.Pod)
log.Logger().Debugf("Pod status: %v", pod.Status.Phase)
status = pod.Status
if pod.Status.Phase != core_v1.PodPending {
watch.Stop()
}
case <-time.After(timeout):
log.Logger().Debugf("timeout to wait for pod active")
watch.Stop()
}
}
}()
if status.Phase != core_v1.PodRunning {
return errors.Errorf("Pod '%s' should be running", pod.Name)
}
return nil
}
// PortForward port forwards the container port of the specified pod in the given namespace to the specified local forwarding port.
// The functions returns a stop channel to stop port forwarding.
func PortForward(namespace string, podName string, containerPort string, forwardPort string, factory clients.Factory) (chan struct{}, error) {
config, err := factory.CreateKubeConfig()
if err != nil {
return nil, err
}
roundTripper, upgrader, err := spdy.RoundTripperFor(config)
if err != nil {
return nil, err
}
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName)
hostIP := strings.TrimLeft(config.Host, "https:/")
serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL)
stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1)
out, errOut := new(bytes.Buffer), new(bytes.Buffer)
forwarder, err := portforward.New(dialer, []string{fmt.Sprintf("%s:%s", forwardPort, containerPort)}, stopChan, readyChan, out, errOut)
if err != nil {
return nil, err
}
go func() {
for range readyChan { // Kubernetes will close this channel when it has something to tell us.
}
if len(errOut.String()) != 0 {
log.Logger().Error(errOut.String())
} else if len(out.String()) != 0 {
log.Logger().Info(out.String())
}
}()
go func() {
err := forwarder.ForwardPorts()
if err != nil {
log.Logger().Errorf("error during port forwarding: %s", errOut.String())
}
}()
return stopChan, err
}
// LabelSelector builds a Kubernetes label selector from the specified map.
func LabelSelector(labels map[string]string) string {
selector := ""
for k, v := range labels {
selector = selector + fmt.Sprintf("%s=%s,", k, v)
}
selector = strings.TrimRight(selector, ",")
return selector
}