Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Support for reconnects in the Gateway port forwarder #736

Merged
merged 19 commits into from Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a056e69
Add reconnect mechanism around the port forwarder.
laurentluce Jun 14, 2023
eca480b
Rename cleanUpRemovedServices to cleanupRemovedServices.
laurentluce Jun 15, 2023
54c7e5f
Remove uneeded log message.
laurentluce Jun 15, 2023
a3505f6
Fix comment.
laurentluce Jun 15, 2023
20ffeae
Re-use the same local ports when we reconnect the port forwarder.
laurentluce Jun 15, 2023
b008d34
Merge branch 'main' into laurent/gateway_port_forwarder_reconnect
laurentluce Jun 15, 2023
bb0d863
Linting.
laurentluce Jun 15, 2023
896b551
Move the local ports copy to when we loose a connection.
laurentluce Jun 15, 2023
10a815b
Merge branch 'main' into laurent/gateway_port_forwarder_reconnect
laurentluce Jun 15, 2023
02548fa
Update comment.
laurentluce Jun 15, 2023
ba9c736
Merge branch 'main' into laurent/gateway_port_forwarder_reconnect
laurentluce Jun 15, 2023
cf0d264
Add error message when we cannot parse the port forwarder addresses o…
laurentluce Jun 15, 2023
da0b988
Clarify comment.
laurentluce Jun 15, 2023
9a31809
Merge branch 'main' into laurent/gateway_port_forwarder_reconnect
laurentluce Jun 15, 2023
ba51d2c
Close pod port forwarding by calling the port forward close method, c…
laurentluce Jun 18, 2023
21072ed
Merge branch 'main' into laurent/gateway_port_forwarder_reconnect
laurentluce Jun 18, 2023
d22f2b7
Remove port forwarder close call from the gw connection stop method.
laurentluce Jun 18, 2023
dd93c1f
Call the port forward close method when we close the kurtosis connect…
laurentluce Jun 20, 2023
b54980d
Merge branch 'main' into laurent/gateway_port_forwarder_reconnect
laurentluce Jun 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
107 changes: 76 additions & 31 deletions cli/cli/kurtosis_gateway/connection/connection.go
Expand Up @@ -16,10 +16,11 @@ import (
)

const (
localHostIpStr = "127.0.0.1"
portForwardTimeoutDuration = 5 * time.Second
grpcPortId = "grpc"
emptyApplicationProtocol = ""
localHostIpStr = "127.0.0.1"
portForwardTimeoutDuration = 5 * time.Second
grpcPortId = "grpc"
emptyApplicationProtocol = ""
portForwardTimeBetweenRetries = 5 * time.Second
)

// GatewayConnectionToKurtosis represents a connection on localhost that can be used by the gateway to communicate with Kurtosis in the cluster
Expand All @@ -34,14 +35,11 @@ type GatewayConnectionToKurtosis interface {
type gatewayConnectionToKurtosisImpl struct {
localAddresses []string
localPorts map[string]*port_spec.PortSpec
// kubectl command to exec to to start the tunnel
portforwarder *portforward.PortForwarder
laurentluce marked this conversation as resolved.
Show resolved Hide resolved

portforwarderStdOut bytes.Buffer
portforwarderStdErr bytes.Buffer
portforwarder *portforward.PortForwarder
portforwarderStopChannel chan struct{}

portforwarderStopChannel chan struct{}
portforwarderReadyChannel chan struct{}
stopChannel chan struct{}

// RemotePort -> port-spec ID
remotePortNumberToPortSpecIdMap map[uint16]string
Expand All @@ -56,6 +54,7 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP
var portforwardStdErr bytes.Buffer
portforwardStopChannel := make(chan struct{}, 1)
portforwardReadyChannel := make(chan struct{}, 1)
stopChannel := make(chan struct{}, 1)
portForwardAddresses := []string{localHostIpStr}
remotePortNumberToPortSpecIdMapping := map[uint16]string{}

Expand All @@ -76,6 +75,16 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP
remotePortNumberToPortSpecIdMapping[portSpec.GetNumber()] = portspecId
}

connection := &gatewayConnectionToKurtosisImpl{
localAddresses: portForwardAddresses,
localPorts: nil,
portforwarder: nil,
portforwarderStopChannel: portforwardStopChannel,
stopChannel: stopChannel,
remotePortNumberToPortSpecIdMap: remotePortNumberToPortSpecIdMapping,
urlString: podProxyEndpointUrl.String(),
}

// Connection to pod portforwarder endpoint
transport, upgrader, err := spdy.RoundTripperFor(kubernetesRestConfig)
if err != nil {
Expand All @@ -89,25 +98,69 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP
Timeout: 0,
}, dialerMethod, podProxyEndpointUrl)

// Create our k8s port forwarder
portForwarder, err := portforward.NewOnAddresses(dialer, portForwardAddresses, portStrings, portforwardStopChannel, portforwardReadyChannel, &portforwardStdOut, &portforwardStdErr)
if err != nil {
return nil, err
}
// Start forwarding ports asynchronously
// Start forwarding ports asynchronously with reconnect logic.
// The reconnect logic tries to reconnect after a working connection is lost.
// The port forward process can be interrupted using the port forwarder stop channel. There is no retry after that.
go func() {
if err := portForwarder.ForwardPorts(); err != nil {
logrus.Warnf("Expected to be able to start forwarding local ports to remote ports, instead our portforwarder has returned a non-nil err:\n%v", err)
retries := 0
readyChannel := portforwardReadyChannel
for {
connection.portforwarder, err = portforward.NewOnAddresses(dialer, portForwardAddresses, portStrings, portforwardStopChannel, readyChannel, &portforwardStdOut, &portforwardStdErr)
if err != nil {
// Addresses or ports cannot be parsed so there is nothing else to try
logrus.Errorf("An error occured parsing the port forwarder addresses or ports:\n%v", err)
return
} else {
logrus.Debugf("Trying to forward ports for pod: %s", podProxyEndpointUrl.String())
if err = connection.portforwarder.ForwardPorts(); err != nil {
if err == portforward.ErrLostConnectionToPod {
logrus.Infof("Lost connection to pod: %s", podProxyEndpointUrl.String())
retries = 0
// Copy the port forwarder assigned local ports so we re-use the same local ports when we reconnect
ports, err := connection.portforwarder.GetPorts()
if err != nil {
logrus.Errorf("An error occured retrieving the local ports to remote ports mapping for our portforwarder:\n%v", err)
return
}
portStrings = nil
for _, port := range ports {
portString := fmt.Sprintf("%v:%v", port.Local, port.Remote)
portStrings = append(portStrings, portString)
}
} else {
if retries == 0 {
// Exit the retry logic if the first try to connect fails
logrus.Errorf("Expected to be able to start forwarding local ports to remote ports, instead our portforwarder has returned a non-nil err:\n%v", err)
return
}
logrus.Debugf("Error trying to forward ports:\n%v", err)
}
laurentluce marked this conversation as resolved.
Show resolved Hide resolved
} else {
// ForwardPorts() returns nil when we close the connection using the stop channel.
// Do not try to reconnect.
return
}
select {
case <-stopChannel:
return
default:
}
time.Sleep(portForwardTimeBetweenRetries)
retries += 1
logrus.Debugf("Retrying (%d) connection to pod: %s", retries, podProxyEndpointUrl.String())
readyChannel = make(chan struct{}, 1)
}
}
}()

// Wait for the portforwarder to be ready with timeout
select {
case <-portforwardReadyChannel:
case <-time.After(portForwardTimeoutDuration):
return nil, stacktrace.NewError("Expected Kubernetes portforwarder to open local ports to the pod exposed by the portforward api at URL '%v', instead the Kubernetes portforwarder timed out binding local ports", podProxyEndpointUrl)
}
// Get local forwarded ports
forwardedPorts, err := portForwarder.GetPorts()
forwardedPorts, err := connection.portforwarder.GetPorts()
if err != nil {
return nil, stacktrace.Propagate(err, "Expected to be able to get forwarded ports from our running portforwarder, instead a non-nil err was returned")
}
Expand All @@ -130,24 +183,16 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP

localPortSpecs[portSpecId] = localPortSpec
}

connection := &gatewayConnectionToKurtosisImpl{
localAddresses: portForwardAddresses,
localPorts: localPortSpecs,
portforwarder: portForwarder,
portforwarderStdOut: portforwardStdOut,
portforwarderStdErr: portforwardStdErr,
portforwarderStopChannel: portforwardStopChannel,
portforwarderReadyChannel: portforwardReadyChannel,
remotePortNumberToPortSpecIdMap: remotePortNumberToPortSpecIdMapping,
urlString: podProxyEndpointUrl.String(),
}
connection.localPorts = localPortSpecs

return connection, nil
}

func (connection *gatewayConnectionToKurtosisImpl) Stop() {
logrus.Infof("Closing connection to pod: %s", connection.urlString)
close(connection.stopChannel)
connection.portforwarder.Close()
close(connection.portforwarderStopChannel)
laurentluce marked this conversation as resolved.
Show resolved Hide resolved
}

func (connection *gatewayConnectionToKurtosisImpl) GetLocalPorts() map[string]*port_spec.PortSpec {
Expand Down
Expand Up @@ -145,10 +145,12 @@ func (service *ApiContainerGatewayServiceServer) GetServices(ctx context.Context
if err != nil {
return nil, stacktrace.Propagate(err, errorCallingRemoteApiContainerFromGateway)
}
for serviceId, serviceInfo := range remoteApiContainerResponse.ServiceInfo {
if err := service.writeOverServiceInfoFieldsWithLocalConnectionInformation(serviceInfo); err != nil {
return nil, stacktrace.Propagate(err, "Expected to be able to write over service info fields for service '%v', instead a non-nil error was returned", serviceId)
}

// Clean up the removed services when we have the full list of running services
cleanupRemovedServices := len(args.ServiceIdentifiers) == 0

if err := service.updateServicesLocalConnection(remoteApiContainerResponse.ServiceInfo, cleanupRemovedServices); err != nil {
return nil, stacktrace.Propagate(err, "Error updating the services local connection")
}

return remoteApiContainerResponse, nil
Expand Down Expand Up @@ -408,6 +410,28 @@ func (service *ApiContainerGatewayServiceServer) idempotentKillRunningConnection
delete(service.userServiceGuidToLocalConnectionMap, serviceUuid)
}

func (service *ApiContainerGatewayServiceServer) updateServicesLocalConnection(serviceInfos map[string]*kurtosis_core_rpc_api_bindings.ServiceInfo, cleanupRemovedServices bool) error {

serviceUuids := map[string]bool{}
for serviceId, serviceInfo := range serviceInfos {
if err := service.writeOverServiceInfoFieldsWithLocalConnectionInformation(serviceInfo); err != nil {
return stacktrace.Propagate(err, "Expected to be able to write over service info fields for service '%v', instead a non-nil error was returned", serviceId)
}
serviceUuids[serviceInfo.GetServiceUuid()] = true
}

if cleanupRemovedServices {
laurentluce marked this conversation as resolved.
Show resolved Hide resolved
// Clean up connection for removed services
for serviceUuid := range service.userServiceGuidToLocalConnectionMap {
if _, found := serviceUuids[serviceUuid]; !found {
service.idempotentKillRunningConnectionForServiceGuid(serviceUuid)
}
}
}

return nil
}

func forwardKurtosisExecutionStream[T any](streamToReadFrom grpc.ClientStream, streamToWriteTo grpc.ServerStream) error {
for {
starlarkRunResponseLine := new(T)
Expand Down