This repository has been archived by the owner on Feb 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
external_forwarder.go
120 lines (106 loc) · 3.27 KB
/
external_forwarder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package externalforwarder
import (
"context"
"fmt"
"os"
"os/signal"
"time"
"github.com/google/wire"
"github.com/int128/kubectl-external-forward/pkg/portforwarder"
"github.com/int128/kubectl-external-forward/pkg/tunnel"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
)
var Set = wire.NewSet(
wire.Struct(new(ExternalForwarder), "*"),
wire.Bind(new(Interface), new(*ExternalForwarder)),
)
type Option struct {
Config *rest.Config
Tunnels []tunnel.Tunnel
Namespace string
PodImage string
}
type Interface interface {
Do(ctx context.Context, o Option) error
}
type ExternalForwarder struct {
PortForwarder portforwarder.Interface
}
func (f ExternalForwarder) Do(ctx context.Context, o Option) error {
clientset, err := kubernetes.NewForConfig(o.Config)
if err != nil {
return fmt.Errorf("could not create a client set: %w", err)
}
klog.Infof("creating a pod")
pod, err := newPod(o)
if err != nil {
return fmt.Errorf("could not generate pod spec: %w", err)
}
pod, err = clientset.CoreV1().Pods(o.Namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("could not create pod: %w", err)
}
klog.Infof("created pod %s/%s", pod.Namespace, pod.Name)
ctx, stop := signal.NotifyContext(ctx, os.Interrupt)
defer stop()
var eg errgroup.Group
eg.Go(func() error {
<-ctx.Done()
// clean up the pod
ctx := context.Background()
ctx, stop := signal.NotifyContext(ctx, os.Interrupt)
defer stop()
klog.Infof("deleting pod %s/%s...", pod.Namespace, pod.Name)
if err := deletePodWithRetry(ctx, clientset, pod.Namespace, pod.Name, 60*time.Second); err != nil {
return fmt.Errorf("you need to delete pod %s/%s manually: %w", pod.Namespace, pod.Name, err)
}
klog.Infof("deleted pod %s/%s", pod.Namespace, pod.Name)
return nil
})
eg.Go(func() error {
if err := waitForPodRunning(ctx, clientset, pod.Namespace, pod.Name, 60*time.Second); err != nil {
return fmt.Errorf("pod is not running: %w", err)
}
for _, container := range pod.Spec.Containers {
containerName := container.Name
eg.Go(func() error {
return tailPodLogs(ctx, clientset, pod.Namespace, pod.Name, containerName)
})
}
for _, t := range o.Tunnels {
f.startPortForwarder(ctx, &eg, o.Config, t, pod)
}
return nil
})
return eg.Wait()
}
func (f ExternalForwarder) startPortForwarder(ctx context.Context, eg *errgroup.Group, restConfig *rest.Config, tunnel tunnel.Tunnel, pod *corev1.Pod) {
stopChan := make(chan struct{})
eg.Go(func() error {
<-ctx.Done()
close(stopChan)
return nil
})
eg.Go(func() error {
klog.Infof("starting port-forwarder from %d to %s/%s:%d", tunnel.LocalPort, pod.Namespace, pod.Name, tunnel.LocalPort)
po := portforwarder.Option{
Config: restConfig,
SourceHost: tunnel.LocalHost,
SourcePort: tunnel.LocalPort,
TargetNamespace: pod.Namespace,
TargetPodName: pod.Name,
TargetContainerPort: tunnel.LocalPort,
}
if err := f.PortForwarder.Run(po, nil, stopChan); err != nil {
return fmt.Errorf("could not start port-forwarder")
}
klog.Info("stopped port-forwarder")
return nil
})
}