-
-
Notifications
You must be signed in to change notification settings - Fork 517
/
connector_state.go
161 lines (143 loc) · 4.6 KB
/
connector_state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package cli
import (
"context"
"fmt"
"path/filepath"
"time"
"github.com/pkg/errors"
"google.golang.org/grpc"
empty "google.golang.org/protobuf/types/known/emptypb"
"github.com/telepresenceio/telepresence/rpc/v2/connector"
"github.com/telepresenceio/telepresence/rpc/v2/daemon"
"github.com/telepresenceio/telepresence/rpc/v2/manager"
"github.com/telepresenceio/telepresence/v2/pkg/client"
"github.com/telepresenceio/telepresence/v2/pkg/filelocation"
)
type connectorState struct {
*sessionInfo
daemonClient daemon.DaemonClient
connectorConn *grpc.ClientConn
connectorClient connector.ConnectorClient
managerClient manager.ManagerClient
info *connector.ConnectInfo
}
func NewConnectorState(sessionInfo *sessionInfo, daemonClient daemon.DaemonClient) *connectorState {
return &connectorState{
sessionInfo: sessionInfo,
daemonClient: daemonClient,
}
}
// Connect asks the daemon to connect to a cluster
func (cs *connectorState) EnsureState() (bool, error) {
if cs.isConnected() {
return false, cs.setConnectInfo()
}
for attempt := 0; ; attempt++ {
dr, err := cs.daemonClient.Status(cs.cmd.Context(), &empty.Empty{})
if err != nil {
return false, err
}
switch dr.Error {
case daemon.DaemonStatus_UNSPECIFIED:
case daemon.DaemonStatus_NOT_STARTED:
return false, errDaemonIsNotRunning
case daemon.DaemonStatus_NO_NETWORK:
if attempt >= 40 {
return false, errors.New("Unable to connect: Network overrides are not established")
}
time.Sleep(250 * time.Millisecond)
continue
}
break
}
err := start(cs.cmd.Context(), client.GetExe(), []string{"connector-foreground"}, false, nil, nil, nil)
if err != nil {
return false, errors.Wrap(err, "failed to launch the connector service")
}
fmt.Fprintln(cs.cmd.OutOrStdout(), "Connecting to traffic manager...")
if err = client.WaitUntilSocketAppears("connector", client.ConnectorSocketName, 10*time.Second); err != nil {
logDir, _ := filelocation.AppUserLogDir(cs.cmd.Context())
return false, fmt.Errorf("connector service did not start (see %q for more info)", filepath.Join(logDir, "connector.log"))
}
err = cs.connect()
if err != nil {
return true, err
}
return true, cs.setConnectInfo()
}
func (cs *connectorState) setConnectInfo() error {
r, err := cs.connectorClient.Connect(cs.cmd.Context(), &connector.ConnectRequest{
KubeFlags: kubeFlagMap(),
MappedNamespaces: mappedNamespaces,
})
if err != nil {
return err
}
cs.info = r
var msg string
switch r.Error {
case connector.ConnectInfo_UNSPECIFIED:
fmt.Fprintf(cs.cmd.OutOrStdout(), "Connected to context %s (%s)\n", r.ClusterContext, r.ClusterServer)
return nil
case connector.ConnectInfo_ALREADY_CONNECTED:
return nil
case connector.ConnectInfo_DISCONNECTED:
msg = "Not connected"
case connector.ConnectInfo_MUST_RESTART:
msg = "Cluster configuration changed, please quit telepresence and reconnect"
case connector.ConnectInfo_TRAFFIC_MANAGER_FAILED, connector.ConnectInfo_CLUSTER_FAILED, connector.ConnectInfo_DAEMON_FAILED:
msg = r.ErrorText
}
return fmt.Errorf("connector.Connect: %s", msg) // Return err != nil to ensure disconnect
}
func (cs *connectorState) DeactivateState() error {
if !cs.isConnected() {
return nil
}
out := cs.cmd.OutOrStdout()
fmt.Fprint(out, "Disconnecting...")
var err error
if client.SocketExists(client.ConnectorSocketName) {
// using context.Background() here since it's likely that the
// command context has been cancelled.
_, err = cs.connectorClient.Quit(context.Background(), &empty.Empty{})
}
cs.disconnect()
if err == nil {
err = client.WaitUntilSocketVanishes("connector", client.ConnectorSocketName, 5*time.Second)
}
if err == nil {
fmt.Fprintln(out, "done")
} else {
fmt.Fprintln(out, "failed")
}
return err
}
func assertConnectorStarted() error {
if client.SocketExists(client.ConnectorSocketName) {
return nil
}
return errConnectorIsNotRunning
}
// isConnected returns true if a connection has been established to the daemon
func (cs *connectorState) isConnected() bool {
return cs.connectorConn != nil
}
// connect opens the client connection to the daemon.
func (cs *connectorState) connect() (err error) {
if cs.connectorConn, err = client.DialSocket(cs.cmd.Context(), client.ConnectorSocketName); err == nil {
cs.connectorClient = connector.NewConnectorClient(cs.connectorConn)
cs.managerClient = manager.NewManagerClient(cs.connectorConn)
}
return
}
// disconnect closes the client connection to the daemon.
func (cs *connectorState) disconnect() {
conn := cs.connectorConn
cs.connectorConn = nil
cs.connectorClient = nil
cs.managerClient = nil
if conn != nil {
conn.Close()
}
}