Skip to content

Commit

Permalink
feat: validate min cpu & min memory are well under whats available (#988
Browse files Browse the repository at this point in the history
)

## Description:
This is a very best effort implementation. For docker we get a list of
all running containers, and sum them; if one of the containers in the
list dies we ignore them.

## Is this change user facing?
YES/NO
<!-- If yes, please add the "user facing" label to the PR -->
<!-- If yes, don't forget to include docs changes where relevant -->

## References (if applicable):
<!-- Add relevant Github Issues, Discord threads, or other helpful
information. -->
  • Loading branch information
h4ck3rk3y committed Jul 27, 2023
1 parent 20b635a commit 768e95d
Show file tree
Hide file tree
Showing 13 changed files with 365 additions and 24 deletions.
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/object_attributes_provider"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/object_attributes_provider/label_key_consts"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/object_attributes_provider/label_value_consts"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/compute_resources"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/container_status"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/enclave"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/engine"
Expand All @@ -27,6 +28,10 @@ import (
"sync"
)

const (
isResourceInformationComplete = true
)

type DockerKurtosisBackend struct {
dockerManager *docker_manager.DockerManager

Expand Down Expand Up @@ -496,6 +501,14 @@ func (backend *DockerKurtosisBackend) DestroyDeprecatedCentralizedLogsResources(
return nil
}

func (backend *DockerKurtosisBackend) GetAvailableCPUAndMemory(ctx context.Context) (compute_resources.MemoryInMegaBytes, compute_resources.CpuMilliCores, bool, error) {
availableMemory, availableCpu, err := backend.dockerManager.GetAvailableCPUAndMemory(ctx)
if err != nil {
return 0, 0, false, stacktrace.Propagate(err, "an error occurred fetching resource information from the docker backend")
}
return availableMemory, availableCpu, isResourceInformationComplete, nil
}

// ====================================================================================================
//
// Private helper functions shared by multiple subfunctions files
Expand Down
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/docker/go-connections/nat"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/consts"
docker_manager_types "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_manager/types"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/compute_resources"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/concurrent_writer"
"github.com/kurtosis-tech/stacktrace"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -121,6 +122,11 @@ const (
linuxAmd64 = "linux/amd64"
defaultPlatform = ""
architectureErrorString = "no matching manifest for linux/arm64/v8"

onlyReturnContainerIds = true
coresToMilliCores = 1000
osTypeWindows = "windows"
bytesInMegaBytes = 1000000
)

/*
Expand Down Expand Up @@ -1065,6 +1071,16 @@ func (manager *DockerManager) CopyFromContainer(ctx context.Context, containerId
return tarStreamReadCloser, nil
}

// GetAvailableCPUAndMemory returns free memory in megabytes, free cpu in millicores, information on whether cpu information is complete
func (manager *DockerManager) GetAvailableCPUAndMemory(ctx context.Context) (compute_resources.MemoryInMegaBytes, compute_resources.CpuMilliCores, error) {
availableMemoryInBytes, availableCpuInMilliCores, err := getFreeMemoryAndCPU(ctx, manager.dockerClient)
if err != nil {
return 0, 0, stacktrace.Propagate(err, "an error occurred while getting available cpu and memory on docker")
}
// cpu isn't complete on windows but is complete on linux
return compute_resources.MemoryInMegaBytes(availableMemoryInBytes), compute_resources.CpuMilliCores(availableCpuInMilliCores), nil
}

// =================================================================================================================
//
// INSTANCE HELPER FUNCTIONS
Expand Down Expand Up @@ -1750,3 +1766,49 @@ func pullImage(ctx context.Context, dockerClient *client.Client, imageName strin
logrus.Tracef("No error pulling '%s' for platform '%s'. Returning", imageName, platform)
return nil, false
}

// getFreeMemoryAndCPU returns free memory in bytes and free cpu in MilliCores
// this is a best effort calculation, it creates a list of containers and then adds up resources on that list
// if a container dies during list creation this just ignores it
func getFreeMemoryAndCPU(ctx context.Context, dockerClient *client.Client) (compute_resources.MemoryInMegaBytes, compute_resources.CpuMilliCores, error) {
info, err := dockerClient.Info(ctx)
if err != nil {
return 0, 0, stacktrace.Propagate(err, "An error occurred while running info on docker")
}
containers, err := dockerClient.ContainerList(ctx, types.ContainerListOptions{
// true - only show ids
Quiet: onlyReturnContainerIds,
Size: false,
All: false,
Latest: false,
Since: "",
Before: "",
Limit: 0,
Filters: filters.Args{},
})
if err != nil {
return 0, 0, stacktrace.Propagate(err, "an error occurred while getting a list of all containers")
}
totalFreeMemory := uint64(info.MemTotal)
totalUsedMemory := uint64(0)
cpuUsageAsFractionOfAvailableCpu := float64(0)
totalCPUs := info.NCPU
for _, maybeRunningContainer := range containers {
containerStatsResponse, err := dockerClient.ContainerStatsOneShot(ctx, maybeRunningContainer.ID)
if err != nil {
if strings.Contains(err.Error(), "No such container") {
logrus.Warnf("Container with '%v' was in the list of containers for which we wanted to calculate consumed resources but it vanished in the meantime.", maybeRunningContainer.ID)
continue
}
return 0, 0, stacktrace.Propagate(err, "An unexpected error occurred while getting resource usage information about the container with id '%v'", maybeRunningContainer.ID)
}
var containerStats types.Stats
if err = json.NewDecoder(containerStatsResponse.Body).Decode(&containerStats); err != nil {
logrus.Errorf("an error occurred while unmarshalling stats response for container with id '%v':\n%v", maybeRunningContainer.ID, err)
continue
}
totalUsedMemory += containerStats.MemoryStats.Usage
cpuUsageAsFractionOfAvailableCpu += float64(containerStats.CPUStats.CPUUsage.TotalUsage-containerStats.PreCPUStats.CPUUsage.TotalUsage) / float64(containerStats.CPUStats.SystemUsage-containerStats.PreCPUStats.SystemUsage)
}
return compute_resources.MemoryInMegaBytes((totalFreeMemory - totalUsedMemory) / bytesInMegaBytes), compute_resources.CpuMilliCores(float64(totalCPUs*coresToMilliCores) * (1 - cpuUsageAsFractionOfAvailableCpu)), nil
}
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/kubernetes/object_attributes_provider"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/kubernetes/object_attributes_provider/label_key_consts"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/kubernetes/object_attributes_provider/label_value_consts"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/compute_resources"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/enclave"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/engine"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/exec_result"
Expand All @@ -36,6 +37,10 @@ type KubernetesKurtosisBackend struct {
apiContainerModeArgs *shared_helpers.ApiContainerModeArgs
}

const (
isResourceInformationComplete = false
)

func (backend *KubernetesKurtosisBackend) GetEngineLogs(ctx context.Context, outputDirpath string) error {
//TODO implement me
panic("implement me")
Expand Down Expand Up @@ -392,6 +397,11 @@ func (backend *KubernetesKurtosisBackend) DestroyUserServices(ctx context.Contex
backend.kubernetesManager)
}

func (backend *KubernetesKurtosisBackend) GetAvailableCPUAndMemory(ctx context.Context) (compute_resources.MemoryInMegaBytes, compute_resources.CpuMilliCores, bool, error) {
// TODO - implement resource calculation in kubernetes
return 0, 0, isResourceInformationComplete, nil
}

func (backend *KubernetesKurtosisBackend) CreateLogsDatabase(ctx context.Context, logsDatabaseHttpPortNumber uint16) (*logs_database.LogsDatabase, error) {
// TODO IMPLEMENT
return nil, stacktrace.NewError("Creating the logs database isn't yet implemented on Kubernetes")
Expand Down
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/api_container"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/compute_resources"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/enclave"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/engine"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/exec_result"
Expand Down Expand Up @@ -528,3 +529,11 @@ func (backend *MetricsReportingKurtosisBackend) DestroyDeprecatedCentralizedLogs
}
return nil
}

func (backend *MetricsReportingKurtosisBackend) GetAvailableCPUAndMemory(ctx context.Context) (compute_resources.MemoryInMegaBytes, compute_resources.CpuMilliCores, bool, error) {
availableMemory, availableCpu, isResourceInformationComplete, err := backend.underlying.GetAvailableCPUAndMemory(ctx)
if err != nil {
return 0, 0, false, stacktrace.Propagate(err, "An error occurred while fetching cpu & memory information from the underlying backend")
}
return availableMemory, availableCpu, isResourceInformationComplete, nil
}
Expand Up @@ -3,6 +3,7 @@ package backend_interface
import (
"context"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/api_container"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/compute_resources"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/enclave"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/engine"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/exec_result"
Expand Down Expand Up @@ -386,4 +387,7 @@ type KurtosisBackend interface {
// Destroy the centralized logs resources
// TODO(centralized-logs-resources-deprecation) remove this once we know people are on > 0.68.0
DestroyDeprecatedCentralizedLogsResources(ctx context.Context) error

// GetAvailableCPUAndMemory - gets available memory in megabytes and cpu in millicores, the boolean indicates whether the information is complete
GetAvailableCPUAndMemory(ctx context.Context) (compute_resources.MemoryInMegaBytes, compute_resources.CpuMilliCores, bool, error)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

@@ -0,0 +1,4 @@
package compute_resources

type CpuMilliCores uint64
type MemoryInMegaBytes uint64
Expand Up @@ -81,13 +81,24 @@ func validateSingleService(validatorEnvironment *startosis_validator.ValidatorEn
}
}
}

if validationErr := validatorEnvironment.HasEnoughCPU(serviceConfig.GetMinCPUAllocationMillicpus(), serviceName); validationErr != nil {
return validationErr
}

if validationErr := validatorEnvironment.HasEnoughMemory(serviceConfig.GetMinMemoryAllocationMegabytes(), serviceName); validationErr != nil {
return validationErr
}

validatorEnvironment.AddServiceName(serviceName)
validatorEnvironment.AppendRequiredContainerImage(serviceConfig.GetContainerImageName())
var portIds []string
for portId := range serviceConfig.GetPrivatePorts() {
portIds = append(portIds, portId)
}
validatorEnvironment.AddPrivatePortIDForService(portIds, serviceName)
validatorEnvironment.ConsumeMemory(serviceConfig.GetMinMemoryAllocationMegabytes(), serviceName)
validatorEnvironment.ConsumeCPU(serviceConfig.GetMinCPUAllocationMillicpus(), serviceName)
return nil
}

Expand Down
Expand Up @@ -75,6 +75,8 @@ func (builtin *RemoveServiceCapabilities) Validate(_ *builtin_argument.ArgumentV
}
validatorEnvironment.RemoveServiceName(builtin.serviceName)
validatorEnvironment.RemoveServiceFromPrivatePortIDMapping(builtin.serviceName)
validatorEnvironment.FreeMemory(builtin.serviceName)
validatorEnvironment.FreeCPU(builtin.serviceName)
return nil
}

Expand Down
Expand Up @@ -25,6 +25,8 @@ type StartosisValidator struct {

serviceNetwork service_network.ServiceNetwork
fileArtifactStore *enclave_data_directory.FilesArtifactStore

backend *backend_interface.KurtosisBackend
}

func NewStartosisValidator(kurtosisBackend *backend_interface.KurtosisBackend, serviceNetwork service_network.ServiceNetwork, fileArtifactStore *enclave_data_directory.FilesArtifactStore) *StartosisValidator {
Expand All @@ -33,6 +35,7 @@ func NewStartosisValidator(kurtosisBackend *backend_interface.KurtosisBackend, s
dockerImagesValidator,
serviceNetwork,
fileArtifactStore,
kurtosisBackend,
}
}

Expand All @@ -54,11 +57,22 @@ func (validator *StartosisValidator) Validate(ctx context.Context, instructionsS
return
}

availableMemoryInMegaBytes, availableCpuInMilliCores, isResourceInformationComplete, err := (*validator.backend).GetAvailableCPUAndMemory(ctx)
if err != nil {
wrappedValidationError := startosis_errors.WrapWithValidationError(err, "Couldn't create validator environment as we ran into errors fetching information about available cpu & memory")
starlarkRunResponseLineStream <- binding_constructors.NewStarlarkRunResponseLineFromValidationError(wrappedValidationError.ToAPIType())
starlarkRunResponseLineStream <- binding_constructors.NewStarlarkRunResponseLineFromRunFailureEvent()
return
}

environment := startosis_validator.NewValidatorEnvironment(
validator.serviceNetwork.IsNetworkPartitioningEnabled(),
serviceNames,
validator.fileArtifactStore.ListFiles(),
serviceNamePortIdMapping)
serviceNamePortIdMapping,
availableCpuInMilliCores,
availableMemoryInMegaBytes,
isResourceInformationComplete)

isValidationFailure = isValidationFailure ||
validator.validateAndUpdateEnvironment(instructionsSequence, environment, starlarkRunResponseLineStream)
Expand Down

0 comments on commit 768e95d

Please sign in to comment.