From a056e6983b5f71d4696d004357a35038618b1383 Mon Sep 17 00:00:00 2001 From: Laurent Luce Date: Wed, 14 Jun 2023 16:19:11 -0400 Subject: [PATCH 01/13] Add reconnect mechanism around the port forwarder. --- .../kurtosis_gateway/connection/connection.go | 62 +++++++++++++------ .../api_container_gateway_service_server.go | 34 ++++++++-- 2 files changed, 73 insertions(+), 23 deletions(-) diff --git a/cli/cli/kurtosis_gateway/connection/connection.go b/cli/cli/kurtosis_gateway/connection/connection.go index 833eac7009..b1db87244d 100644 --- a/cli/cli/kurtosis_gateway/connection/connection.go +++ b/cli/cli/kurtosis_gateway/connection/connection.go @@ -16,10 +16,11 @@ import ( ) const ( - localHostIpStr = "127.0.0.1" - portForwardTimeoutDuration = 5 * time.Second - grpcPortId = "grpc" - emptyApplicationProtocol = "" + localHostIpStr = "127.0.0.1" + portForwardTimeoutDuration = 5 * time.Second + grpcPortId = "grpc" + emptyApplicationProtocol = "" + portForwardTimeBetweenRetries = 5 * time.Second ) // GatewayConnectionToKurtosis represents a connection on localhost that can be used by the gateway to communicate with Kurtosis in the cluster @@ -34,14 +35,11 @@ type GatewayConnectionToKurtosis interface { type gatewayConnectionToKurtosisImpl struct { localAddresses []string localPorts map[string]*port_spec.PortSpec - // kubectl command to exec to to start the tunnel - portforwarder *portforward.PortForwarder portforwarderStdOut bytes.Buffer portforwarderStdErr bytes.Buffer portforwarderStopChannel chan struct{} - portforwarderReadyChannel chan struct{} // RemotePort -> port-spec ID remotePortNumberToPortSpecIdMap map[uint16]string @@ -89,17 +87,46 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP Timeout: 0, }, dialerMethod, podProxyEndpointUrl) - // Create our k8s port forwarder - portForwarder, err := portforward.NewOnAddresses(dialer, portForwardAddresses, portStrings, portforwardStopChannel, portforwardReadyChannel, &portforwardStdOut, &portforwardStdErr) - if err != nil { - return nil, err - } - // Start forwarding ports asynchronously + // Start forwarding ports asynchronously with reconnect logic. + // The reconnect logic tries to reconnect after a working connection is lost. + // The port forward process can be interrupted using the port forwarder stop channel. There is no retry after that. + var portForwarder *portforward.PortForwarder go func() { - if err := portForwarder.ForwardPorts(); err != nil { - logrus.Warnf("Expected to be able to start forwarding local ports to remote ports, instead our portforwarder has returned a non-nil err:\n%v", err) + retries := 0 + readyChannel := portforwardReadyChannel + for { + portForwarder, err = portforward.NewOnAddresses(dialer, portForwardAddresses, portStrings, portforwardStopChannel, readyChannel, &portforwardStdOut, &portforwardStdErr) + if err != nil { + // Addresses or ports cannot be parsed so there is nothing else to try + return + } else { + logrus.Infof("Opening connection to pod: %s", podProxyEndpointUrl.String()) + if err = portForwarder.ForwardPorts(); err != nil { + if err == portforward.ErrLostConnectionToPod { + logrus.Infof("Lost connection to pod: %s", podProxyEndpointUrl.String()) + } else { + if retries == 0 { + // Exit the retry logic if the first connection fails so we don't block the caller + logrus.Errorf("Expected to be able to start forwarding local ports to remote ports, instead our portforwarder has returned a non-nil err:\n%v", err) + return + } + } + } else { + // ForwardPorts() returns nil when we close the connection using the stop channel. + // Close the port forwarder and do not try to reconnect. + //portForwarder.Close() + logrus.Infof("ForwardPorts returned nil") + return + } + time.Sleep(portForwardTimeBetweenRetries) + retries += 1 + logrus.Debugf("Retrying (%d) connection to pod: %s", retries, podProxyEndpointUrl.String()) + // No need to be notified when the reconnect is successful. + readyChannel = nil + } } }() + // Wait for the portforwarder to be ready with timeout select { case <-portforwardReadyChannel: @@ -134,11 +161,9 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP connection := &gatewayConnectionToKurtosisImpl{ localAddresses: portForwardAddresses, localPorts: localPortSpecs, - portforwarder: portForwarder, portforwarderStdOut: portforwardStdOut, portforwarderStdErr: portforwardStdErr, portforwarderStopChannel: portforwardStopChannel, - portforwarderReadyChannel: portforwardReadyChannel, remotePortNumberToPortSpecIdMap: remotePortNumberToPortSpecIdMapping, urlString: podProxyEndpointUrl.String(), } @@ -147,7 +172,8 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP } func (connection *gatewayConnectionToKurtosisImpl) Stop() { - connection.portforwarder.Close() + logrus.Infof("Closing connection to pod: %s", connection.urlString) + close(connection.portforwarderStopChannel) } func (connection *gatewayConnectionToKurtosisImpl) GetLocalPorts() map[string]*port_spec.PortSpec { diff --git a/cli/cli/kurtosis_gateway/server/api_container_gateway/api_container_gateway_service_server.go b/cli/cli/kurtosis_gateway/server/api_container_gateway/api_container_gateway_service_server.go index 887b9c1976..082ba11494 100644 --- a/cli/cli/kurtosis_gateway/server/api_container_gateway/api_container_gateway_service_server.go +++ b/cli/cli/kurtosis_gateway/server/api_container_gateway/api_container_gateway_service_server.go @@ -145,12 +145,14 @@ func (service *ApiContainerGatewayServiceServer) GetServices(ctx context.Context if err != nil { return nil, stacktrace.Propagate(err, errorCallingRemoteApiContainerFromGateway) } - for serviceId, serviceInfo := range remoteApiContainerResponse.ServiceInfo { - if err := service.writeOverServiceInfoFieldsWithLocalConnectionInformation(serviceInfo); err != nil { - return nil, stacktrace.Propagate(err, "Expected to be able to write over service info fields for service '%v', instead a non-nil error was returned", serviceId) - } - } + // Clean up the removed services when we have the full list of running services + cleanUpRemovedServices := len(args.ServiceIdentifiers) == 0 + + if err := service.updateServicesLocalConnection(remoteApiContainerResponse.ServiceInfo, cleanUpRemovedServices); err != nil { + return nil, stacktrace.Propagate(err, "Error updating the services local connection") + } + return remoteApiContainerResponse, nil } @@ -432,6 +434,28 @@ func (service *ApiContainerGatewayServiceServer) idempotentKillRunningConnection delete(service.userServiceGuidToLocalConnectionMap, serviceUuid) } +func (service *ApiContainerGatewayServiceServer) updateServicesLocalConnection(serviceInfos map[string]*kurtosis_core_rpc_api_bindings.ServiceInfo, cleanUpRemovedServices bool) error { + + serviceUuids := map[string]bool{} + for serviceId, serviceInfo := range serviceInfos { + if err := service.writeOverServiceInfoFieldsWithLocalConnectionInformation(serviceInfo); err != nil { + return stacktrace.Propagate(err, "Expected to be able to write over service info fields for service '%v', instead a non-nil error was returned", serviceId) + } + serviceUuids[serviceInfo.GetServiceUuid()] = true + } + + if cleanUpRemovedServices { + // Clean up connection for removed services + for serviceUuid := range service.userServiceGuidToLocalConnectionMap { + if _, found := serviceUuids[serviceUuid]; !found { + service.idempotentKillRunningConnectionForServiceGuid(serviceUuid) + } + } + } + + return nil +} + func forwardKurtosisExecutionStream[T any](streamToReadFrom grpc.ClientStream, streamToWriteTo grpc.ServerStream) error { for { starlarkRunResponseLine := new(T) From eca480ba7cf8e089a9423f9d32e41652557f4838 Mon Sep 17 00:00:00 2001 From: Laurent Luce Date: Wed, 14 Jun 2023 23:24:12 -0400 Subject: [PATCH 02/13] Rename cleanUpRemovedServices to cleanupRemovedServices. --- .../api_container_gateway_service_server.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cli/cli/kurtosis_gateway/server/api_container_gateway/api_container_gateway_service_server.go b/cli/cli/kurtosis_gateway/server/api_container_gateway/api_container_gateway_service_server.go index 082ba11494..a9e87810e1 100644 --- a/cli/cli/kurtosis_gateway/server/api_container_gateway/api_container_gateway_service_server.go +++ b/cli/cli/kurtosis_gateway/server/api_container_gateway/api_container_gateway_service_server.go @@ -147,12 +147,12 @@ func (service *ApiContainerGatewayServiceServer) GetServices(ctx context.Context } // Clean up the removed services when we have the full list of running services - cleanUpRemovedServices := len(args.ServiceIdentifiers) == 0 + cleanupRemovedServices := len(args.ServiceIdentifiers) == 0 - if err := service.updateServicesLocalConnection(remoteApiContainerResponse.ServiceInfo, cleanUpRemovedServices); err != nil { + if err := service.updateServicesLocalConnection(remoteApiContainerResponse.ServiceInfo, cleanupRemovedServices); err != nil { return nil, stacktrace.Propagate(err, "Error updating the services local connection") } - + return remoteApiContainerResponse, nil } @@ -434,8 +434,8 @@ func (service *ApiContainerGatewayServiceServer) idempotentKillRunningConnection delete(service.userServiceGuidToLocalConnectionMap, serviceUuid) } -func (service *ApiContainerGatewayServiceServer) updateServicesLocalConnection(serviceInfos map[string]*kurtosis_core_rpc_api_bindings.ServiceInfo, cleanUpRemovedServices bool) error { - +func (service *ApiContainerGatewayServiceServer) updateServicesLocalConnection(serviceInfos map[string]*kurtosis_core_rpc_api_bindings.ServiceInfo, cleanupRemovedServices bool) error { + serviceUuids := map[string]bool{} for serviceId, serviceInfo := range serviceInfos { if err := service.writeOverServiceInfoFieldsWithLocalConnectionInformation(serviceInfo); err != nil { @@ -444,7 +444,7 @@ func (service *ApiContainerGatewayServiceServer) updateServicesLocalConnection(s serviceUuids[serviceInfo.GetServiceUuid()] = true } - if cleanUpRemovedServices { + if cleanupRemovedServices { // Clean up connection for removed services for serviceUuid := range service.userServiceGuidToLocalConnectionMap { if _, found := serviceUuids[serviceUuid]; !found { From 54c7e5f7fcfa8347da70f2c8d3f75d30e16e18d3 Mon Sep 17 00:00:00 2001 From: Laurent Luce Date: Wed, 14 Jun 2023 23:27:59 -0400 Subject: [PATCH 03/13] Remove uneeded log message. --- cli/cli/kurtosis_gateway/connection/connection.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cli/cli/kurtosis_gateway/connection/connection.go b/cli/cli/kurtosis_gateway/connection/connection.go index b1db87244d..f2bdb38056 100644 --- a/cli/cli/kurtosis_gateway/connection/connection.go +++ b/cli/cli/kurtosis_gateway/connection/connection.go @@ -114,8 +114,6 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP } else { // ForwardPorts() returns nil when we close the connection using the stop channel. // Close the port forwarder and do not try to reconnect. - //portForwarder.Close() - logrus.Infof("ForwardPorts returned nil") return } time.Sleep(portForwardTimeBetweenRetries) From a3505f67223e179ae9d9dc09dd2b62c65fe9a1fe Mon Sep 17 00:00:00 2001 From: Laurent Luce Date: Wed, 14 Jun 2023 23:29:51 -0400 Subject: [PATCH 04/13] Fix comment. --- cli/cli/kurtosis_gateway/connection/connection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/cli/kurtosis_gateway/connection/connection.go b/cli/cli/kurtosis_gateway/connection/connection.go index f2bdb38056..7cbf307cae 100644 --- a/cli/cli/kurtosis_gateway/connection/connection.go +++ b/cli/cli/kurtosis_gateway/connection/connection.go @@ -113,7 +113,7 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP } } else { // ForwardPorts() returns nil when we close the connection using the stop channel. - // Close the port forwarder and do not try to reconnect. + // Do not try to reconnect. return } time.Sleep(portForwardTimeBetweenRetries) From 20ffeae1010f517a408f27cb9665d97e317752d3 Mon Sep 17 00:00:00 2001 From: Laurent Luce Date: Thu, 15 Jun 2023 10:12:52 -0400 Subject: [PATCH 05/13] Re-use the same local ports when we reconnect the port forwarder. --- .../kurtosis_gateway/connection/connection.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/cli/cli/kurtosis_gateway/connection/connection.go b/cli/cli/kurtosis_gateway/connection/connection.go index 7cbf307cae..3559eb5a4d 100644 --- a/cli/cli/kurtosis_gateway/connection/connection.go +++ b/cli/cli/kurtosis_gateway/connection/connection.go @@ -100,7 +100,7 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP // Addresses or ports cannot be parsed so there is nothing else to try return } else { - logrus.Infof("Opening connection to pod: %s", podProxyEndpointUrl.String()) + logrus.Debugf("Opening connection to pod: %s", podProxyEndpointUrl.String()) if err = portForwarder.ForwardPorts(); err != nil { if err == portforward.ErrLostConnectionToPod { logrus.Infof("Lost connection to pod: %s", podProxyEndpointUrl.String()) @@ -110,6 +110,7 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP logrus.Errorf("Expected to be able to start forwarding local ports to remote ports, instead our portforwarder has returned a non-nil err:\n%v", err) return } + logrus.Debugf("Error trying to forwarding ports:\n%v", err) } } else { // ForwardPorts() returns nil when we close the connection using the stop channel. @@ -119,8 +120,18 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP time.Sleep(portForwardTimeBetweenRetries) retries += 1 logrus.Debugf("Retrying (%d) connection to pod: %s", retries, podProxyEndpointUrl.String()) - // No need to be notified when the reconnect is successful. - readyChannel = nil + readyChannel = make(chan struct{}, 1) + + // Re-use the same local ports + ports, err := portForwarder.GetPorts() + if err != nil { + logrus.Errorf("An error occured retrieving the local ports to remote ports mapping for our portforwarder:\n%v", err) + } + portStrings = nil + for _, port := range ports { + portString := fmt.Sprintf("%v:%v", port.Local, port.Remote) + portStrings = append(portStrings, portString) + } } } }() From bb0d8633ea3f2f7ed920411debc52e219ae5f2c1 Mon Sep 17 00:00:00 2001 From: Laurent Luce Date: Thu, 15 Jun 2023 10:41:27 -0400 Subject: [PATCH 06/13] Linting. --- cli/cli/kurtosis_gateway/connection/connection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/cli/kurtosis_gateway/connection/connection.go b/cli/cli/kurtosis_gateway/connection/connection.go index 3559eb5a4d..1672ac6810 100644 --- a/cli/cli/kurtosis_gateway/connection/connection.go +++ b/cli/cli/kurtosis_gateway/connection/connection.go @@ -39,7 +39,7 @@ type gatewayConnectionToKurtosisImpl struct { portforwarderStdOut bytes.Buffer portforwarderStdErr bytes.Buffer - portforwarderStopChannel chan struct{} + portforwarderStopChannel chan struct{} // RemotePort -> port-spec ID remotePortNumberToPortSpecIdMap map[uint16]string From 896b55112eefd19d3f5b665e83602338224e53a9 Mon Sep 17 00:00:00 2001 From: Laurent Luce Date: Thu, 15 Jun 2023 10:59:28 -0400 Subject: [PATCH 07/13] Move the local ports copy to when we loose a connection. --- .../kurtosis_gateway/connection/connection.go | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/cli/cli/kurtosis_gateway/connection/connection.go b/cli/cli/kurtosis_gateway/connection/connection.go index 1672ac6810..a6ebe6e41e 100644 --- a/cli/cli/kurtosis_gateway/connection/connection.go +++ b/cli/cli/kurtosis_gateway/connection/connection.go @@ -104,13 +104,24 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP if err = portForwarder.ForwardPorts(); err != nil { if err == portforward.ErrLostConnectionToPod { logrus.Infof("Lost connection to pod: %s", podProxyEndpointUrl.String()) + // Copy the port forwarder assigned local ports so we re-use the same local ports when we reconnect + ports, err := portForwarder.GetPorts() + if err != nil { + logrus.Errorf("An error occured retrieving the local ports to remote ports mapping for our portforwarder:\n%v", err) + return + } + portStrings = nil + for _, port := range ports { + portString := fmt.Sprintf("%v:%v", port.Local, port.Remote) + portStrings = append(portStrings, portString) + } } else { if retries == 0 { // Exit the retry logic if the first connection fails so we don't block the caller logrus.Errorf("Expected to be able to start forwarding local ports to remote ports, instead our portforwarder has returned a non-nil err:\n%v", err) return } - logrus.Debugf("Error trying to forwarding ports:\n%v", err) + logrus.Debugf("Error trying to forward ports:\n%v", err) } } else { // ForwardPorts() returns nil when we close the connection using the stop channel. @@ -121,17 +132,6 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP retries += 1 logrus.Debugf("Retrying (%d) connection to pod: %s", retries, podProxyEndpointUrl.String()) readyChannel = make(chan struct{}, 1) - - // Re-use the same local ports - ports, err := portForwarder.GetPorts() - if err != nil { - logrus.Errorf("An error occured retrieving the local ports to remote ports mapping for our portforwarder:\n%v", err) - } - portStrings = nil - for _, port := range ports { - portString := fmt.Sprintf("%v:%v", port.Local, port.Remote) - portStrings = append(portStrings, portString) - } } } }() From 02548fa1fa4b6052495a248a9b93c93bf8cbfb21 Mon Sep 17 00:00:00 2001 From: Laurent Luce Date: Thu, 15 Jun 2023 12:17:57 -0400 Subject: [PATCH 08/13] Update comment. --- cli/cli/kurtosis_gateway/connection/connection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/cli/kurtosis_gateway/connection/connection.go b/cli/cli/kurtosis_gateway/connection/connection.go index a6ebe6e41e..fa9a4275bd 100644 --- a/cli/cli/kurtosis_gateway/connection/connection.go +++ b/cli/cli/kurtosis_gateway/connection/connection.go @@ -117,7 +117,7 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP } } else { if retries == 0 { - // Exit the retry logic if the first connection fails so we don't block the caller + // Exit the retry logic if the first connection fails logrus.Errorf("Expected to be able to start forwarding local ports to remote ports, instead our portforwarder has returned a non-nil err:\n%v", err) return } From cf0d2648f3fd0bfaa39138c62c91cd335127ecba Mon Sep 17 00:00:00 2001 From: Laurent Luce Date: Thu, 15 Jun 2023 12:28:42 -0400 Subject: [PATCH 09/13] Add error message when we cannot parse the port forwarder addresses or ports. --- cli/cli/kurtosis_gateway/connection/connection.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cli/cli/kurtosis_gateway/connection/connection.go b/cli/cli/kurtosis_gateway/connection/connection.go index fa9a4275bd..ce5ee62d97 100644 --- a/cli/cli/kurtosis_gateway/connection/connection.go +++ b/cli/cli/kurtosis_gateway/connection/connection.go @@ -98,6 +98,7 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP portForwarder, err = portforward.NewOnAddresses(dialer, portForwardAddresses, portStrings, portforwardStopChannel, readyChannel, &portforwardStdOut, &portforwardStdErr) if err != nil { // Addresses or ports cannot be parsed so there is nothing else to try + logrus.Errorf("An error occured parsing the port forwarder addresses or ports:\n%v", err) return } else { logrus.Debugf("Opening connection to pod: %s", podProxyEndpointUrl.String()) From da0b988a9f4506be1e09df3e7f00758e83a76e62 Mon Sep 17 00:00:00 2001 From: Laurent Luce Date: Thu, 15 Jun 2023 12:30:20 -0400 Subject: [PATCH 10/13] Clarify comment. --- cli/cli/kurtosis_gateway/connection/connection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/cli/kurtosis_gateway/connection/connection.go b/cli/cli/kurtosis_gateway/connection/connection.go index ce5ee62d97..2aa8c5a68a 100644 --- a/cli/cli/kurtosis_gateway/connection/connection.go +++ b/cli/cli/kurtosis_gateway/connection/connection.go @@ -118,7 +118,7 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP } } else { if retries == 0 { - // Exit the retry logic if the first connection fails + // Exit the retry logic if the first try to connect fails logrus.Errorf("Expected to be able to start forwarding local ports to remote ports, instead our portforwarder has returned a non-nil err:\n%v", err) return } From ba51d2cd124814ffbe3e8b1a3bcb6151dbb4541a Mon Sep 17 00:00:00 2001 From: Laurent Luce Date: Sun, 18 Jun 2023 05:18:12 -0400 Subject: [PATCH 11/13] Close pod port forwarding by calling the port forward close method, closing the port forward stop channel and closing the reconnect logic stop channel. --- .../kurtosis_gateway/connection/connection.go | 46 +++++++++++-------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/cli/cli/kurtosis_gateway/connection/connection.go b/cli/cli/kurtosis_gateway/connection/connection.go index 2aa8c5a68a..4a93659d52 100644 --- a/cli/cli/kurtosis_gateway/connection/connection.go +++ b/cli/cli/kurtosis_gateway/connection/connection.go @@ -36,11 +36,11 @@ type gatewayConnectionToKurtosisImpl struct { localAddresses []string localPorts map[string]*port_spec.PortSpec - portforwarderStdOut bytes.Buffer - portforwarderStdErr bytes.Buffer - + portforwarder *portforward.PortForwarder portforwarderStopChannel chan struct{} + stopChannel chan struct{} + // RemotePort -> port-spec ID remotePortNumberToPortSpecIdMap map[uint16]string @@ -54,6 +54,7 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP var portforwardStdErr bytes.Buffer portforwardStopChannel := make(chan struct{}, 1) portforwardReadyChannel := make(chan struct{}, 1) + stopChannel := make(chan struct{}, 1) portForwardAddresses := []string{localHostIpStr} remotePortNumberToPortSpecIdMapping := map[uint16]string{} @@ -74,6 +75,16 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP remotePortNumberToPortSpecIdMapping[portSpec.GetNumber()] = portspecId } + connection := &gatewayConnectionToKurtosisImpl{ + localAddresses: portForwardAddresses, + localPorts: nil, + portforwarder: nil, + portforwarderStopChannel: portforwardStopChannel, + stopChannel: stopChannel, + remotePortNumberToPortSpecIdMap: remotePortNumberToPortSpecIdMapping, + urlString: podProxyEndpointUrl.String(), + } + // Connection to pod portforwarder endpoint transport, upgrader, err := spdy.RoundTripperFor(kubernetesRestConfig) if err != nil { @@ -90,23 +101,22 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP // Start forwarding ports asynchronously with reconnect logic. // The reconnect logic tries to reconnect after a working connection is lost. // The port forward process can be interrupted using the port forwarder stop channel. There is no retry after that. - var portForwarder *portforward.PortForwarder go func() { retries := 0 readyChannel := portforwardReadyChannel for { - portForwarder, err = portforward.NewOnAddresses(dialer, portForwardAddresses, portStrings, portforwardStopChannel, readyChannel, &portforwardStdOut, &portforwardStdErr) + connection.portforwarder, err = portforward.NewOnAddresses(dialer, portForwardAddresses, portStrings, portforwardStopChannel, readyChannel, &portforwardStdOut, &portforwardStdErr) if err != nil { // Addresses or ports cannot be parsed so there is nothing else to try logrus.Errorf("An error occured parsing the port forwarder addresses or ports:\n%v", err) return } else { - logrus.Debugf("Opening connection to pod: %s", podProxyEndpointUrl.String()) - if err = portForwarder.ForwardPorts(); err != nil { + logrus.Debugf("Trying to forward ports for pod: %s", podProxyEndpointUrl.String()) + if err = connection.portforwarder.ForwardPorts(); err != nil { if err == portforward.ErrLostConnectionToPod { logrus.Infof("Lost connection to pod: %s", podProxyEndpointUrl.String()) // Copy the port forwarder assigned local ports so we re-use the same local ports when we reconnect - ports, err := portForwarder.GetPorts() + ports, err := connection.portforwarder.GetPorts() if err != nil { logrus.Errorf("An error occured retrieving the local ports to remote ports mapping for our portforwarder:\n%v", err) return @@ -129,6 +139,11 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP // Do not try to reconnect. return } + select { + case <-stopChannel: + return + default: + } time.Sleep(portForwardTimeBetweenRetries) retries += 1 logrus.Debugf("Retrying (%d) connection to pod: %s", retries, podProxyEndpointUrl.String()) @@ -144,7 +159,7 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP return nil, stacktrace.NewError("Expected Kubernetes portforwarder to open local ports to the pod exposed by the portforward api at URL '%v', instead the Kubernetes portforwarder timed out binding local ports", podProxyEndpointUrl) } // Get local forwarded ports - forwardedPorts, err := portForwarder.GetPorts() + forwardedPorts, err := connection.portforwarder.GetPorts() if err != nil { return nil, stacktrace.Propagate(err, "Expected to be able to get forwarded ports from our running portforwarder, instead a non-nil err was returned") } @@ -167,22 +182,15 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP localPortSpecs[portSpecId] = localPortSpec } - - connection := &gatewayConnectionToKurtosisImpl{ - localAddresses: portForwardAddresses, - localPorts: localPortSpecs, - portforwarderStdOut: portforwardStdOut, - portforwarderStdErr: portforwardStdErr, - portforwarderStopChannel: portforwardStopChannel, - remotePortNumberToPortSpecIdMap: remotePortNumberToPortSpecIdMapping, - urlString: podProxyEndpointUrl.String(), - } + connection.localPorts = localPortSpecs return connection, nil } func (connection *gatewayConnectionToKurtosisImpl) Stop() { logrus.Infof("Closing connection to pod: %s", connection.urlString) + connection.portforwarder.Close() + close(connection.stopChannel) close(connection.portforwarderStopChannel) } From d22f2b7f54c04b931a5b7f5b22d7cd204f0fe07e Mon Sep 17 00:00:00 2001 From: Laurent Luce Date: Sun, 18 Jun 2023 23:08:11 +0200 Subject: [PATCH 12/13] Remove port forwarder close call from the gw connection stop method. --- cli/cli/kurtosis_gateway/connection/connection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/cli/kurtosis_gateway/connection/connection.go b/cli/cli/kurtosis_gateway/connection/connection.go index 4a93659d52..dc0b87fe18 100644 --- a/cli/cli/kurtosis_gateway/connection/connection.go +++ b/cli/cli/kurtosis_gateway/connection/connection.go @@ -115,6 +115,7 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP if err = connection.portforwarder.ForwardPorts(); err != nil { if err == portforward.ErrLostConnectionToPod { logrus.Infof("Lost connection to pod: %s", podProxyEndpointUrl.String()) + retries = 0 // Copy the port forwarder assigned local ports so we re-use the same local ports when we reconnect ports, err := connection.portforwarder.GetPorts() if err != nil { @@ -189,7 +190,6 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP func (connection *gatewayConnectionToKurtosisImpl) Stop() { logrus.Infof("Closing connection to pod: %s", connection.urlString) - connection.portforwarder.Close() close(connection.stopChannel) close(connection.portforwarderStopChannel) } From dd93c1f8fd271e1c1348377bb738c7ecb4fb3aa2 Mon Sep 17 00:00:00 2001 From: Laurent Luce Date: Tue, 20 Jun 2023 10:48:55 +0200 Subject: [PATCH 13/13] Call the port forward close method when we close the kurtosis connection. --- cli/cli/kurtosis_gateway/connection/connection.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cli/cli/kurtosis_gateway/connection/connection.go b/cli/cli/kurtosis_gateway/connection/connection.go index dc0b87fe18..e06bfeaca5 100644 --- a/cli/cli/kurtosis_gateway/connection/connection.go +++ b/cli/cli/kurtosis_gateway/connection/connection.go @@ -191,6 +191,7 @@ func newLocalPortToPodPortConnection(kubernetesRestConfig *k8s_rest.Config, podP func (connection *gatewayConnectionToKurtosisImpl) Stop() { logrus.Infof("Closing connection to pod: %s", connection.urlString) close(connection.stopChannel) + connection.portforwarder.Close() close(connection.portforwarderStopChannel) }