/
portforwardcontroller.go
146 lines (122 loc) · 3.52 KB
/
portforwardcontroller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package engine
import (
"context"
"github.com/windmilleng/tilt/internal/k8s"
"github.com/windmilleng/tilt/internal/logger"
"github.com/windmilleng/tilt/internal/model"
"github.com/windmilleng/tilt/internal/store"
"k8s.io/api/core/v1"
)
type PortForwardController struct {
kClient k8s.Client
activeForwards map[k8s.PodID]portForwardEntry
}
func NewPortForwardController(kClient k8s.Client) *PortForwardController {
return &PortForwardController{
kClient: kClient,
activeForwards: make(map[k8s.PodID]portForwardEntry),
}
}
// Figure out the diff between what's in the data store and
// what port-forwarding is currently active.
func (m *PortForwardController) diff(ctx context.Context, st store.RStore) (toStart []portForwardEntry, toShutdown []portForwardEntry) {
state := st.RLockState()
defer st.RUnlockState()
statePods := make(map[k8s.PodID]bool, len(state.ManifestTargets))
// Find all the port-forwards that need to be created.
for _, mt := range state.Targets() {
ms := mt.State
manifest := mt.Manifest
pod := ms.MostRecentPod()
podID := pod.PodID
if podID == "" {
continue
}
// Only do port-forwarding if the pod is running.
if pod.Phase != v1.PodRunning && !pod.Deleting {
continue
}
forwards := PopulatePortForwards(manifest, pod)
if len(forwards) == 0 {
continue
}
statePods[podID] = true
_, isActive := m.activeForwards[podID]
if isActive {
continue
}
ctx, cancel := context.WithCancel(ctx)
entry := portForwardEntry{
podID: podID,
name: ms.Name,
namespace: pod.Namespace,
forwards: forwards,
ctx: ctx,
cancel: cancel,
}
toStart = append(toStart, entry)
m.activeForwards[podID] = entry
}
// Find all the port-forwards that aren't in the manifest anymore
// and need to be shutdown.
for key, value := range m.activeForwards {
_, inState := statePods[key]
if inState {
continue
}
toShutdown = append(toShutdown, value)
delete(m.activeForwards, key)
}
return toStart, toShutdown
}
func (m *PortForwardController) OnChange(ctx context.Context, st store.RStore) {
toStart, toShutdown := m.diff(ctx, st)
for _, entry := range toShutdown {
entry.cancel()
}
for _, entry := range toStart {
entry := entry
ns := entry.namespace
podID := entry.podID
for _, forward := range entry.forwards {
// TODO(nick): Handle the case where DockerForDesktop is handling
// the port-forwarding natively already
_, closer, err := m.kClient.ForwardPort(ctx, ns, podID, forward.LocalPort, forward.ContainerPort)
if err != nil {
logger.Get(ctx).Infof("Error port-forwarding %s: %v", entry.name, err)
continue
}
go func() {
<-entry.ctx.Done()
closer()
}()
}
}
}
var _ store.Subscriber = &PortForwardController{}
type portForwardEntry struct {
name model.ManifestName
namespace k8s.Namespace
podID k8s.PodID
forwards []model.PortForward
ctx context.Context
cancel func()
}
// Extract the port-forward specs from the manifest. If any of them
// have ContainerPort = 0, populate them with the default port in the pod spec.
// Quietly drop forwards that we can't populate.
func PopulatePortForwards(m model.Manifest, pod store.Pod) []model.PortForward {
cPorts := pod.ContainerPorts
fwds := m.K8sTarget().PortForwards
forwards := make([]model.PortForward, 0, len(fwds))
for _, forward := range fwds {
if forward.ContainerPort == 0 {
if len(cPorts) == 0 {
continue
}
forward.ContainerPort = int(cPorts[0])
}
forwards = append(forwards, forward)
}
return forwards
}