Skip to content

Commit

Permalink
fix: made forwarding efficient by reducing calls to Kubernetes (#1200)
Browse files Browse the repository at this point in the history
This PR needs some work. This has been created for a visual diff
  • Loading branch information
h4ck3rk3y committed Aug 31, 2023
1 parent 3e42244 commit 4df6a1c
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 72 deletions.
75 changes: 28 additions & 47 deletions cli/cli/kurtosis_gateway/connection/provider.go
Expand Up @@ -27,9 +27,10 @@ const (
var noWait *port_spec.Wait = nil

type GatewayConnectionProvider struct {
config *restclient.Config
kubernetesManager *kubernetes_manager.KubernetesManager
providerContext context.Context
config *restclient.Config
kubernetesManager *kubernetes_manager.KubernetesManager
providerContext context.Context
enclaveIdToEnclaveNamespaceName map[string]string
}

func NewGatewayConnectionProvider(ctx context.Context, kubernetesConfig *restclient.Config) (*GatewayConnectionProvider, error) {
Expand All @@ -44,9 +45,10 @@ func NewGatewayConnectionProvider(ctx context.Context, kubernetesConfig *restcli
kubernetesManager := kubernetes_manager.NewKubernetesManager(clientSet, kubernetesConfig)

return &GatewayConnectionProvider{
config: kubernetesConfig,
kubernetesManager: kubernetesManager,
providerContext: ctx,
config: kubernetesConfig,
kubernetesManager: kubernetesManager,
providerContext: ctx,
enclaveIdToEnclaveNamespaceName: map[string]string{},
}, nil
}

Expand Down Expand Up @@ -94,18 +96,16 @@ func (provider *GatewayConnectionProvider) ForEnclaveApiContainer(enclaveInfo *k
return apiContainerConnection, nil
}

func (provider *GatewayConnectionProvider) ForUserServiceIfRunning(enclaveId string, serviceUuid string, servicePortSpecs map[string]*port_spec.PortSpec) (GatewayConnectionToKurtosis, error) {
podPortforwardEndpoint, err := provider.getMaybeUserServicePodPortforwardEndpoint(enclaveId, serviceUuid)
func (provider *GatewayConnectionProvider) ForUserServiceIfRunning(enclaveId string, serviceName string, servicePortSpecs map[string]*port_spec.PortSpec) (GatewayConnectionToKurtosis, error) {
enclaveNamespaceName, err := provider.getEnclaveNamespaceNameForEnclaveId(enclaveId)
if err != nil {
return nil, stacktrace.Propagate(err, "Expected to be able to find an api endpoint for Kubernetes portforward to a Kurtosis user service with id '%v' in enclave '%v', instead a non-nil error was returned", enclaveId, serviceUuid)
} else if podPortforwardEndpoint == nil {
return nil, nil
return nil, stacktrace.Propagate(err, "an error occurred while getting the enclave namespace name")
}
podPortforwardEndpoint := provider.getUserServicePortForwardEndpoint(enclaveNamespaceName, serviceName)
userServiceConnection, err := newLocalPortToPodPortConnection(provider.config, podPortforwardEndpoint, servicePortSpecs)
if err != nil {
return nil, stacktrace.Propagate(err, "Expected to be able to connect to user service with id '%v', instead a non-nil error was returned", serviceUuid)
return nil, stacktrace.Propagate(err, "Expected to be able to connect to user service with name '%v', instead a non-nil error was returned", serviceName)
}

return userServiceConnection, nil
}

Expand Down Expand Up @@ -139,19 +139,10 @@ func (provider *GatewayConnectionProvider) getEnginePodPortforwardEndpoint(engin
}

func (provider *GatewayConnectionProvider) getApiContainerPodPortforwardEndpoint(enclaveId string) (*url.URL, error) {
enclaveLabels := map[string]string{
label_key_consts.EnclaveUUIDKubernetesLabelKey.GetString(): enclaveId,
label_key_consts.KurtosisResourceTypeKubernetesLabelKey.GetString(): label_value_consts.EnclaveKurtosisResourceTypeKubernetesLabelValue.GetString(),
label_key_consts.AppIDKubernetesLabelKey.GetString(): label_value_consts.AppIDKubernetesLabelValue.GetString(),
}
enclaveNamespaceList, err := provider.kubernetesManager.GetNamespacesByLabels(provider.providerContext, enclaveLabels)
enclaveNamespaceName, err := provider.getEnclaveNamespaceNameForEnclaveId(enclaveId)
if err != nil {
return nil, stacktrace.Propagate(err, "Expected to be able to get enclaves namespaces with labels '%+v`, instead a non-nil error was returned", enclaveLabels)
return nil, stacktrace.Propagate(err, "an error occurred while getting the enclave namespace name")
}
if len(enclaveNamespaceList.Items) != 1 {
return nil, stacktrace.NewError("Expected to find exactly 1 enclave namespace with enclaveId '%v', instead found '%v'", enclaveId, len(enclaveNamespaceList.Items))
}
enclaveNamespaceName := enclaveNamespaceList.Items[0].Name

// Get running API Container pods from Kubernetes
apiContainerPodLabels := map[string]string{
Expand All @@ -170,28 +161,8 @@ func (provider *GatewayConnectionProvider) getApiContainerPodPortforwardEndpoint
return provider.kubernetesManager.GetPodPortforwardEndpointUrl(enclaveNamespaceName, apiContainerPodName), nil
}

func (provider *GatewayConnectionProvider) getMaybeUserServicePodPortforwardEndpoint(enclaveId string, serviceUuid string) (*url.URL, error) {
userServiceNamespaceName, err := provider.getEnclaveNamespaceNameForEnclaveId(enclaveId)
if err != nil {
return nil, stacktrace.Propagate(err, "Expected to be able to get a Kubernetes namespace corresponding to a Kurtosis enclave with id '%v', instead a non-nil error was returned", enclaveId)
}
userServiceLabels := map[string]string{
label_key_consts.GUIDKubernetesLabelKey.GetString(): serviceUuid,
label_key_consts.KurtosisResourceTypeKubernetesLabelKey.GetString(): label_value_consts.UserServiceKurtosisResourceTypeKubernetesLabelValue.GetString(),
label_key_consts.AppIDKubernetesLabelKey.GetString(): label_value_consts.AppIDKubernetesLabelValue.GetString(),
}
runningUserServicePodNames, err := provider.getRunningPodNamesByLabels(userServiceNamespaceName, userServiceLabels)
if err != nil {
return nil, stacktrace.Propagate(err, "Expected to be able to get running user service pods with labels '%+v' in namespace '%v', instead a non nil error was returned", runningUserServicePodNames, userServiceNamespaceName)
}
if len(runningUserServicePodNames) == 0 {
// A stopped service has no pod running and no port forward endpoint
return nil, nil
} else if len(runningUserServicePodNames) != 1 {
return nil, stacktrace.NewError("Expected to find exactly 1 running user service pod with guid '%v' in enclave '%v', instead found '%v'", serviceUuid, enclaveId, len(runningUserServicePodNames))
}
userServicePodName := runningUserServicePodNames[0]
return provider.kubernetesManager.GetPodPortforwardEndpointUrl(userServiceNamespaceName, userServicePodName), nil
func (provider *GatewayConnectionProvider) getUserServicePortForwardEndpoint(enclaveNamespaceName string, serviceName string) *url.URL {
return provider.kubernetesManager.GetPodPortforwardEndpointUrl(enclaveNamespaceName, serviceName)
}

func (provider *GatewayConnectionProvider) getRunningPodNamesByLabels(namespace string, podLabels map[string]string) ([]string, error) {
Expand All @@ -211,7 +182,14 @@ func (provider *GatewayConnectionProvider) getRunningPodNamesByLabels(namespace
return runningPodNames, nil
}

// TODO - this function shouldn't exist when kt- + enclave name is the namespace inside kurtosis
// https://github.com/kurtosis-tech/kurtosis/issues/1203 - till then we cache it
func (provider *GatewayConnectionProvider) getEnclaveNamespaceNameForEnclaveId(enclaveId string) (string, error) {
enclaveNamespaceName, found := provider.enclaveIdToEnclaveNamespaceName[enclaveId]
if found {
return enclaveNamespaceName, nil
}

enclaveLabels := map[string]string{
label_key_consts.EnclaveUUIDKubernetesLabelKey.GetString(): enclaveId,
label_key_consts.KurtosisResourceTypeKubernetesLabelKey.GetString(): label_value_consts.EnclaveKurtosisResourceTypeKubernetesLabelValue.GetString(),
Expand All @@ -224,5 +202,8 @@ func (provider *GatewayConnectionProvider) getEnclaveNamespaceNameForEnclaveId(e
if len(enclaveNamespaceList.Items) != 1 {
return "", stacktrace.NewError("Expected to find exactly 1 enclave namespace with enclaveId '%v', instead found '%v'", enclaveId, len(enclaveNamespaceList.Items))
}
return enclaveNamespaceList.Items[0].Name, nil

enclaveNamespaceName = enclaveNamespaceList.Items[0].Name
provider.enclaveIdToEnclaveNamespaceName[enclaveId] = enclaveNamespaceName
return enclaveNamespaceName, nil
}
Expand Up @@ -30,7 +30,7 @@ type ApiContainerGatewayServiceServer struct {

// ServiceMap and mutex to protect it
mutex *sync.Mutex
userServiceGuidToLocalConnectionMap map[string]*runningLocalServiceConnection
userServiceNameToLocalConnectionMap map[string]*runningLocalServiceConnection
}

type runningLocalServiceConnection struct {
Expand All @@ -45,7 +45,7 @@ func NewEnclaveApiContainerGatewayServer(connectionProvider *connection.GatewayC

closeGatewayFunc := func() {
// Stop any port forwarding
for _, runningLocalServiceConnection := range resultCoreGatewayServerService.userServiceGuidToLocalConnectionMap {
for _, runningLocalServiceConnection := range resultCoreGatewayServerService.userServiceNameToLocalConnectionMap {
runningLocalServiceConnection.kurtosisConnection.Stop()
}
}
Expand All @@ -54,7 +54,7 @@ func NewEnclaveApiContainerGatewayServer(connectionProvider *connection.GatewayC
remoteApiContainerClient: remoteApiContainerClient,
connectionProvider: connectionProvider,
mutex: &sync.Mutex{},
userServiceGuidToLocalConnectionMap: userServiceToLocalConnectionMap,
userServiceNameToLocalConnectionMap: userServiceToLocalConnectionMap,
enclaveId: enclaveId,
}, closeGatewayFunc
}
Expand Down Expand Up @@ -220,21 +220,21 @@ func (service *ApiContainerGatewayServiceServer) writeOverServiceInfoFieldsWithL
return nil
}

serviceUuid := serviceInfo.GetServiceUuid()
serviceName := serviceInfo.GetName()
var localConnErr error
var runningLocalConnection *runningLocalServiceConnection
cleanUpConnection := true
runningLocalConnection, isFound := service.userServiceGuidToLocalConnectionMap[serviceUuid]
runningLocalConnection, isFound := service.userServiceNameToLocalConnectionMap[serviceName]
if !isFound {
runningLocalConnection, localConnErr = service.startRunningConnectionForKurtosisServiceIfRunning(serviceUuid, serviceInfo.PrivatePorts)
runningLocalConnection, localConnErr = service.startRunningConnectionForKurtosisServiceIfRunning(serviceName, serviceInfo.PrivatePorts)
if localConnErr != nil {
return stacktrace.Propagate(localConnErr, "Expected to be able to start a local connection to Kurtosis service '%v', instead a non-nil error was returned", serviceUuid)
return stacktrace.Propagate(localConnErr, "Expected to be able to start a local connection to Kurtosis service '%v', instead a non-nil error was returned", serviceName)
} else if runningLocalConnection == nil {
return nil
}
defer func() {
if cleanUpConnection {
service.idempotentKillRunningConnectionForServiceGuid(serviceUuid)
service.idempotentKillRunningConnectionForServiceName(serviceName)
}
}()
}
Expand All @@ -247,18 +247,18 @@ func (service *ApiContainerGatewayServiceServer) writeOverServiceInfoFieldsWithL

// startRunningConnectionForKurtosisServiceIfRunning starts a port forwarding process from kernel assigned local ports to the remote service ports specified
// If privatePortsFromApi is empty, an error is thrown
func (service *ApiContainerGatewayServiceServer) startRunningConnectionForKurtosisServiceIfRunning(serviceUuid string, privatePortsFromApi map[string]*kurtosis_core_rpc_api_bindings.Port) (*runningLocalServiceConnection, error) {
func (service *ApiContainerGatewayServiceServer) startRunningConnectionForKurtosisServiceIfRunning(serviceName string, privatePortsFromApi map[string]*kurtosis_core_rpc_api_bindings.Port) (*runningLocalServiceConnection, error) {
if len(privatePortsFromApi) == 0 {
return nil, stacktrace.NewError("Expected Kurtosis service to have private ports specified for port forwarding, instead no ports were provided")
}
remotePrivatePortSpecs := map[string]*port_spec.PortSpec{}
for portSpecId, coreApiPort := range privatePortsFromApi {
if coreApiPort.GetTransportProtocol() != kurtosis_core_rpc_api_bindings.Port_TCP {
logrus.Warnf(
"Will not be able to forward service port with id '%v' for service with guid '%v' in enclave '%v'. "+
"Will not be able to forward service port with id '%v' for service with name '%v' in enclave '%v'. "+
"The protocol of this port is '%v', but only '%v' is supported",
portSpecId,
serviceUuid,
serviceName,
service.enclaveId,
coreApiPort.GetTransportProtocol(),
kurtosis_core_rpc_api_bindings.Port_TCP.String(),
Expand All @@ -274,9 +274,10 @@ func (service *ApiContainerGatewayServiceServer) startRunningConnectionForKurtos
}

// Start listening
serviceConnection, err := service.connectionProvider.ForUserServiceIfRunning(service.enclaveId, serviceUuid, remotePrivatePortSpecs)
serviceConnection, err := service.connectionProvider.ForUserServiceIfRunning(service.enclaveId, serviceName, remotePrivatePortSpecs)
if err != nil {
return nil, stacktrace.Propagate(err, "Expected to be able to start a local connection service with guid '%v' in enclave '%v', instead a non-nil error was returned", serviceUuid, service.enclaveId)
logrus.Errorf("Tried forwarding ports for user service '%v' in enclave '%v' but failed with error:\n%v", serviceName, service.enclaveId, err)
return nil, nil
} else if serviceConnection == nil {
return nil, nil
}
Expand Down Expand Up @@ -309,11 +310,11 @@ func (service *ApiContainerGatewayServiceServer) startRunningConnectionForKurtos
}

// Store information about our running gateway
service.userServiceGuidToLocalConnectionMap[serviceUuid] = runingLocalServiceConnection
service.userServiceNameToLocalConnectionMap[serviceName] = runingLocalServiceConnection
cleanUpMapEntry := true
defer func() {
if cleanUpMapEntry {
delete(service.userServiceGuidToLocalConnectionMap, serviceUuid)
delete(service.userServiceNameToLocalConnectionMap, serviceName)
}
}()

Expand All @@ -322,8 +323,8 @@ func (service *ApiContainerGatewayServiceServer) startRunningConnectionForKurtos
return runingLocalServiceConnection, nil
}

func (service *ApiContainerGatewayServiceServer) idempotentKillRunningConnectionForServiceGuid(serviceUuid string) {
runningLocalConnection, isRunning := service.userServiceGuidToLocalConnectionMap[serviceUuid]
func (service *ApiContainerGatewayServiceServer) idempotentKillRunningConnectionForServiceName(serviceName string) {
runningLocalConnection, isRunning := service.userServiceNameToLocalConnectionMap[serviceName]
// Nothing running, nothing to kill
if !isRunning {
return
Expand All @@ -332,24 +333,24 @@ func (service *ApiContainerGatewayServiceServer) idempotentKillRunningConnection
// Close up the connection
runningLocalConnection.kurtosisConnection.Stop()
// delete the entry for the serve
delete(service.userServiceGuidToLocalConnectionMap, serviceUuid)
delete(service.userServiceNameToLocalConnectionMap, serviceName)
}

func (service *ApiContainerGatewayServiceServer) updateServicesLocalConnection(serviceInfos map[string]*kurtosis_core_rpc_api_bindings.ServiceInfo, cleanupRemovedServices bool) error {

serviceUuids := map[string]bool{}
for serviceId, serviceInfo := range serviceInfos {
serviceNames := map[string]bool{}
for _, serviceInfo := range serviceInfos {
if err := service.writeOverServiceInfoFieldsWithLocalConnectionInformationIfServiceRunning(serviceInfo); err != nil {
return stacktrace.Propagate(err, "Expected to be able to write over service info fields for service '%v', instead a non-nil error was returned", serviceId)
return stacktrace.Propagate(err, "Expected to be able to write over service info fields for service '%v', instead a non-nil error was returned", serviceInfo.Name)
}
serviceUuids[serviceInfo.GetServiceUuid()] = true
serviceNames[serviceInfo.GetName()] = true
}

if cleanupRemovedServices {
// Clean up connection for removed services
for serviceUuid := range service.userServiceGuidToLocalConnectionMap {
if _, found := serviceUuids[serviceUuid]; !found {
service.idempotentKillRunningConnectionForServiceGuid(serviceUuid)
for serviceName := range service.userServiceNameToLocalConnectionMap {
if _, found := serviceNames[serviceName]; !found {
service.idempotentKillRunningConnectionForServiceName(serviceName)
}
}
}
Expand Down

0 comments on commit 4df6a1c

Please sign in to comment.