Skip to content

Commit

Permalink
fix: lower calls to backend by doing get all services more efficiently (
Browse files Browse the repository at this point in the history
#1143)

## Description:
We were imposed with Kubernetes rate limits due to how we were doing
GetServcies by fetching all individual services for plural runs as well,
this fetches once from the underlying backend

## Is this change user facing?
YES - user will get speed improvement but doesn't need docs

## References (if applicable):
Closes #1074
  • Loading branch information
h4ck3rk3y committed Aug 24, 2023
1 parent d359d3c commit a2c0dcc
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 53 deletions.
2 changes: 1 addition & 1 deletion api/golang/core/lib/enclaves/enclave_context.go
Expand Up @@ -267,7 +267,7 @@ func (enclaveCtx *EnclaveContext) GetServices() (map[services.ServiceName]servic
getServicesArgs := binding_constructors.NewGetServicesArgs(map[string]bool{})
response, err := enclaveCtx.client.GetServices(context.Background(), getServicesArgs)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred getting the service Names in the enclave")
return nil, stacktrace.Propagate(err, "An error occurred getting the services in the enclave")
}

serviceInfos := make(map[services.ServiceName]services.ServiceUUID, len(response.GetServiceInfo()))
Expand Down
118 changes: 67 additions & 51 deletions core/server/api_container/server/api_container_service.go
Expand Up @@ -12,6 +12,7 @@ import (
"compress/gzip"
"context"
"fmt"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/uuid_generator"
"io"
"math"
"net/http"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/container_status"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/port_spec"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/uuid_generator"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/service_network"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/startosis_constants"
Expand Down Expand Up @@ -284,10 +284,12 @@ func (apicService ApiContainerService) GetServices(ctx context.Context, args *ku
serviceInfos := map[string]*kurtosis_core_rpc_api_bindings.ServiceInfo{}
filterServiceIdentifiers := args.ServiceIdentifiers

// if there are any filters we fetch those services only
// if there are any filters we fetch those services only - this goes one by one
// TODO (maybe) - explore perf differences between individual fetches vs filtering on the APIC side
// Note as of 2023-08-23 I(gyani) has only seen instances of fetch everything & fetch one, so we don't need to optimize just yet
if len(filterServiceIdentifiers) > 0 {
for serviceIdentifier := range filterServiceIdentifiers {
serviceInfo, err := apicService.getServiceInfo(ctx, serviceIdentifier)
serviceInfo, err := apicService.getServiceInfoForIdentifier(ctx, serviceIdentifier)
if err != nil {
return nil, stacktrace.Propagate(err, "Failed to get service info for service '%v'", serviceIdentifier)
}
Expand All @@ -297,19 +299,13 @@ func (apicService ApiContainerService) GetServices(ctx context.Context, args *ku
return resp, nil
}

// otherwise we fetch everything
allServiceNames, err := apicService.serviceNetwork.GetServiceNames()
allServices, err := apicService.serviceNetwork.GetServices(ctx)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred getting all service names")
return nil, stacktrace.Propagate(err, "an error occurred while fetching all services from the backend")
}

for serviceName := range allServiceNames {
serviceNameStr := string(serviceName)
serviceInfo, err := apicService.getServiceInfo(ctx, serviceNameStr)
if err != nil {
return nil, stacktrace.Propagate(err, "Failed to get service info for service '%v'", serviceName)
}
serviceInfos[serviceNameStr] = serviceInfo
serviceInfos, err = getServiceInfosFromServiceObjs(allServices)
if err != nil {
return nil, stacktrace.Propagate(err, "an error occurred while converting the service obj into service info")
}

resp := binding_constructors.NewGetServicesResponse(serviceInfos)
Expand Down Expand Up @@ -623,52 +619,19 @@ func makeHttpRequest(httpMethod string, url string, body string) (*http.Response
return resp, nil
}

func (apicService ApiContainerService) getServiceInfo(ctx context.Context, serviceIdentifier string) (*kurtosis_core_rpc_api_bindings.ServiceInfo, error) {
func (apicService ApiContainerService) getServiceInfoForIdentifier(ctx context.Context, serviceIdentifier string) (*kurtosis_core_rpc_api_bindings.ServiceInfo, error) {
serviceObj, err := apicService.serviceNetwork.GetService(
ctx,
serviceIdentifier,
)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred getting info for service '%v'", serviceIdentifier)
}
privatePorts := serviceObj.GetPrivatePorts()
privateIp := serviceObj.GetRegistration().GetPrivateIP()
maybePublicIp := serviceObj.GetMaybePublicIP()
maybePublicPorts := serviceObj.GetMaybePublicPorts()
serviceUuidStr := string(serviceObj.GetRegistration().GetUUID())
serviceNameStr := string(serviceObj.GetRegistration().GetName())
serviceStatus, err := convertServiceStatusToServiceInfoStatus(serviceObj.GetRegistration().GetStatus())
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred converting the service status to a service info status")
}

privateApiPorts, err := transformPortSpecMapToApiPortsMap(privatePorts)
serviceInfo, err := getServiceInfoFromServiceObj(serviceObj)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred transforming the service's private port specs to API ports")
}
publicIpAddrStr := missingPublicIpAddrStr
if maybePublicIp != nil {
publicIpAddrStr = maybePublicIp.String()
return nil, stacktrace.Propagate(err, "an error occurred while converting service obj for service with id '%v' to service info", serviceIdentifier)
}
publicApiPorts := map[string]*kurtosis_core_rpc_api_bindings.Port{}
if maybePublicPorts != nil {
publicApiPorts, err = transformPortSpecMapToApiPortsMap(maybePublicPorts)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred transforming the service's public port spec ports to API ports")
}
}

serviceInfoResponse := binding_constructors.NewServiceInfo(
serviceUuidStr,
serviceNameStr,
uuid_generator.ShortenedUUIDString(serviceUuidStr),
privateIp.String(),
privateApiPorts,
publicIpAddrStr,
publicApiPorts,
serviceStatus,
)
return serviceInfoResponse, nil
return serviceInfo, nil
}

func (apicService ApiContainerService) runStarlarkPackageSetup(
Expand Down Expand Up @@ -742,6 +705,59 @@ func (apicService ApiContainerService) runStarlark(
}
}

func getServiceInfosFromServiceObjs(services map[service.ServiceUUID]*service.Service) (map[string]*kurtosis_core_rpc_api_bindings.ServiceInfo, error) {
serviceInfos := map[string]*kurtosis_core_rpc_api_bindings.ServiceInfo{}
for uuid, serviceObj := range services {
serviceInfo, err := getServiceInfoFromServiceObj(serviceObj)
if err != nil {
return nil, stacktrace.Propagate(err, "there was an error converting the service obj for service with uuid '%v' and name '%v' to service info", uuid, serviceObj.GetRegistration().GetName())
}
serviceInfos[serviceInfo.Name] = serviceInfo
}
return serviceInfos, nil
}

func getServiceInfoFromServiceObj(serviceObj *service.Service) (*kurtosis_core_rpc_api_bindings.ServiceInfo, error) {
privatePorts := serviceObj.GetPrivatePorts()
privateIp := serviceObj.GetRegistration().GetPrivateIP()
maybePublicIp := serviceObj.GetMaybePublicIP()
maybePublicPorts := serviceObj.GetMaybePublicPorts()
serviceUuidStr := string(serviceObj.GetRegistration().GetUUID())
serviceNameStr := string(serviceObj.GetRegistration().GetName())
serviceStatus, err := convertServiceStatusToServiceInfoStatus(serviceObj.GetRegistration().GetStatus())
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred converting the service status to a service info status")
}

privateApiPorts, err := transformPortSpecMapToApiPortsMap(privatePorts)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred transforming the service's private port specs to API ports")
}
publicIpAddrStr := missingPublicIpAddrStr
if maybePublicIp != nil {
publicIpAddrStr = maybePublicIp.String()
}
publicApiPorts := map[string]*kurtosis_core_rpc_api_bindings.Port{}
if maybePublicPorts != nil {
publicApiPorts, err = transformPortSpecMapToApiPortsMap(maybePublicPorts)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred transforming the service's public port spec ports to API ports")
}
}

serviceInfoResponse := binding_constructors.NewServiceInfo(
serviceUuidStr,
serviceNameStr,
uuid_generator.ShortenedUUIDString(serviceUuidStr),
privateIp.String(),
privateApiPorts,
publicIpAddrStr,
publicApiPorts,
serviceStatus,
)
return serviceInfoResponse, nil
}

func getFileDescriptionsFromArtifact(artifactPath string) ([]*kurtosis_core_rpc_api_bindings.FileArtifactContentsFileDescription, error) {
file, err := os.Open(artifactPath)
if err != nil {
Expand Down
Expand Up @@ -659,6 +659,45 @@ func (network *DefaultServiceNetwork) HttpRequestService(ctx context.Context, se
return resp, nil
}

func (network *DefaultServiceNetwork) GetServices(ctx context.Context) (map[service.ServiceUUID]*service.Service, error) {
network.mutex.Lock()
defer network.mutex.Unlock()

registeredServices, err := network.serviceRegistrationRepository.GetAll()
if err != nil {
return nil, stacktrace.Propagate(err, "an error occurred getting registered services from the repository")
}
registeredServiceNames := map[service.ServiceName]bool{}
for name := range registeredServices {
registeredServiceNames[name] = true
}

registeredServiceUuidsFilters := &service.ServiceFilters{
Names: registeredServiceNames,
UUIDs: nil,
Statuses: nil,
}

allServices, err := network.kurtosisBackend.GetUserServices(ctx, network.enclaveUuid, registeredServiceUuidsFilters)
if err != nil {
return nil, stacktrace.Propagate(err, "an error occurred while fetching services from the backend")
}

filteredServicesToRegisteredServices := map[service.ServiceUUID]*service.Service{}

for name, registration := range registeredServices {
uuid := registration.GetUUID()
serviceObj, found := allServices[uuid]
if !found {
return nil, stacktrace.NewError("couldn't find service with uuid '%v' and name '%v' in backend", uuid, name)
}
serviceObj.GetRegistration().SetStatus(registration.GetStatus())
filteredServicesToRegisteredServices[uuid] = serviceObj
}

return filteredServicesToRegisteredServices, nil
}

func (network *DefaultServiceNetwork) GetService(ctx context.Context, serviceIdentifier string) (*service.Service, error) {
network.mutex.Lock()
defer network.mutex.Unlock()
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -93,6 +93,8 @@ type ServiceNetwork interface {

GetService(ctx context.Context, serviceIdentifier string) (*service.Service, error)

GetServices(ctx context.Context) (map[service.ServiceUUID]*service.Service, error)

CopyFilesFromService(ctx context.Context, serviceIdentifier string, srcPath string, artifactName string) (enclave_data_directory.FilesArtifactUUID, error)

GetServiceNames() (map[service.ServiceName]bool, error)
Expand Down

0 comments on commit a2c0dcc

Please sign in to comment.