From 8c2b697548f06c1f7e8a1474e9ee2cb2922d5dea Mon Sep 17 00:00:00 2001 From: Peeeekay <15133250+Peeeekay@users.noreply.github.com> Date: Thu, 22 Jun 2023 22:20:50 -0400 Subject: [PATCH] feat: added wait to run_sh task. (#750) Added wait field to run_sh task. The intention is that user can set a wait time-out and if the command is not completed before the time-out, it will fail. In this PR, I have also fixed the wait and ready-condition bug observed on k8s with exec-recipe. --- .../lsp/resource/kurtosis_starlark.json | 6 + .../run_user_service_exec_commands.go | 26 +--- .../kubernetes_manager/kubernetes_manager.go | 87 +++++++++++++ .../kurtosis_instruction/run_sh/run_sh.go | 115 ++++++++++++++---- docs/docs/starlark-reference/plan.md | 11 ++ .../run_task_sh_task_test.go | 14 +++ 6 files changed, 216 insertions(+), 43 deletions(-) diff --git a/cli/cli/commands/lsp/resource/kurtosis_starlark.json b/cli/cli/commands/lsp/resource/kurtosis_starlark.json index fde622ad9f..bf7fef9f41 100644 --- a/cli/cli/commands/lsp/resource/kurtosis_starlark.json +++ b/cli/cli/commands/lsp/resource/kurtosis_starlark.json @@ -208,6 +208,12 @@ "type": "list", "content": "store?", "detail": "List of paths to directories or files that will be copied to a file artifact" + }, + { + "name": "wait", + "type": "string", + "content": "wait?", + "detail": "The time to allow for the command to complete. If the command takes longer than this, the instruction will fail. In order to disable it, set wait=None. The default value is - 180s" } ] }, diff --git a/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_kurtosis_backend/user_services_functions/run_user_service_exec_commands.go b/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_kurtosis_backend/user_services_functions/run_user_service_exec_commands.go index af6f58e289..67b66bc9ff 100644 --- a/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_kurtosis_backend/user_services_functions/run_user_service_exec_commands.go +++ b/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_kurtosis_backend/user_services_functions/run_user_service_exec_commands.go @@ -49,23 +49,14 @@ func RunUserServiceExecCommands( return nil, nil, stacktrace.Propagate(err, "An error occurred getting user services matching the requested UUIDs: %+v", requestedGuids) } - successfulExecs, failedExecs, err := runExecOperationsInParallel(namespaceName, userServiceCommands, matchingObjectsAndResources, kubernetesManager) + successfulExecs, failedExecs, err := runExecOperationsInParallel(namespaceName, userServiceCommands, matchingObjectsAndResources, kubernetesManager, ctx) if err != nil { return nil, nil, stacktrace.Propagate(err, "An unexpected error occurred running the exec commands in parallel") } return successfulExecs, failedExecs, nil } -func runExecOperationsInParallel( - namespaceName string, - commandArgs map[service.ServiceUUID][]string, - userServiceKubernetesResources map[service.ServiceUUID]*shared_helpers.UserServiceObjectsAndKubernetesResources, - kubernetesManager *kubernetes_manager.KubernetesManager, -) ( - map[service.ServiceUUID]*exec_result.ExecResult, - map[service.ServiceUUID]error, - error, -) { +func runExecOperationsInParallel(namespaceName string, commandArgs map[service.ServiceUUID][]string, userServiceKubernetesResources map[service.ServiceUUID]*shared_helpers.UserServiceObjectsAndKubernetesResources, kubernetesManager *kubernetes_manager.KubernetesManager, ctx context.Context) (map[service.ServiceUUID]*exec_result.ExecResult, map[service.ServiceUUID]error, error) { successfulExecs := map[service.ServiceUUID]*exec_result.ExecResult{} failedExecs := map[service.ServiceUUID]error{} @@ -103,7 +94,7 @@ func runExecOperationsInParallel( userServiceKubernetesPod := userServiceKubernetesResource.KubernetesResources.Pod execOperationId := operation_parallelizer.OperationID(serviceUuid) - execOperation := createExecOperation(namespaceName, serviceUuid, userServiceKubernetesPod, commandArg, kubernetesManager) + execOperation := createExecOperation(namespaceName, serviceUuid, userServiceKubernetesPod, commandArg, kubernetesManager, ctx) execOperations[execOperationId] = execOperation } @@ -125,17 +116,12 @@ func runExecOperationsInParallel( return successfulExecs, failedExecs, nil } -func createExecOperation( - namespaceName string, - serviceUuid service.ServiceUUID, - servicePod *v1.Pod, - commandArg []string, - kubernetesManager *kubernetes_manager.KubernetesManager, -) operation_parallelizer.Operation { +func createExecOperation(namespaceName string, serviceUuid service.ServiceUUID, servicePod *v1.Pod, commandArg []string, kubernetesManager *kubernetes_manager.KubernetesManager, ctx context.Context) operation_parallelizer.Operation { return func() (interface{}, error) { outputBuffer := &bytes.Buffer{} concurrentBuffer := concurrent_writer.NewConcurrentWriter(outputBuffer) - exitCode, err := kubernetesManager.RunExecCommand( + exitCode, err := kubernetesManager.RunExecCommandWithContext( + ctx, namespaceName, servicePod.Name, userServiceContainerName, diff --git a/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_manager/kubernetes_manager.go b/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_manager/kubernetes_manager.go index a2ebf9021e..1a17dc0dab 100644 --- a/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_manager/kubernetes_manager.go +++ b/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_manager/kubernetes_manager.go @@ -61,6 +61,7 @@ const ( shouldAddTimestampsWhenPrintingPodInfo = true listOptionsTimeoutSeconds int64 = 10 + contextDeadlineExceeded = "context deadline exceeded" ) // We'll try to use the nicer-to-use shells first before we drop down to the lower shells @@ -1245,6 +1246,92 @@ func (manager *KubernetesManager) GetContainerLogs( return result, nil } +// RunExecCommandWithContext This runs the exec to kubernetes with context, therefore +// when context timeouts it stops the process. +// TODO: merge RunExecCommand and this to one method +// Doing this for now to unblock myself for wait worflows for k8s +// In next PR, will include add context to WaitForPortAvailabilityUsingNetstat and +// CopyFilesFromUserService method. I am doing this to reduce the blast radius. +func (manager *KubernetesManager) RunExecCommandWithContext( + ctx context.Context, + namespaceName string, + podName string, + containerName string, + command []string, + stdOutOutput io.Writer, + stdErrOutput io.Writer, +) ( + resultExitCode int32, + resultErr error, +) { + execOptions := &apiv1.PodExecOptions{ + TypeMeta: metav1.TypeMeta{ + Kind: "", + APIVersion: "", + }, + Stdin: shouldAllocateStdinOnPodExec, + Stdout: shouldAllocatedStdoutOnPodExec, + Stderr: shouldAllocatedStderrOnPodExec, + TTY: shouldAllocateTtyOnPodExec, + Container: containerName, + Command: command, + } + + //Create a RESTful command request. + request := manager.kubernetesClientSet.CoreV1().RESTClient(). + Post(). + Namespace(namespaceName). + Resource("pods"). + Name(podName). + SubResource("exec"). + VersionedParams(execOptions, scheme.ParameterCodec) + if request == nil { + return -1, stacktrace.NewError( + "Failed to build a working RESTful request for the command '%s'.", + execOptions.Command, + ) + } + + exec, err := remotecommand.NewSPDYExecutor(manager.kuberneteRestConfig, http.MethodPost, request.URL()) + if err != nil { + return -1, stacktrace.Propagate( + err, + "Failed to build an executor for the command '%s' with the RESTful endpoint '%s'.", + execOptions.Command, + request.URL().String(), + ) + } + + if err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: nil, + Stdout: stdOutOutput, + Stderr: stdErrOutput, + Tty: false, + TerminalSizeQueue: nil, + }); err != nil { + // Kubernetes returns the exit code of the command via a string in the error message, so we have to extract it + statusError := err.Error() + + // this means that context deadline has exceeded + if strings.Contains(statusError, contextDeadlineExceeded) { + return 1, stacktrace.Propagate(err, "There was an error occurred while executing commands on the container") + } + + exitCode, err := getExitCodeFromStatusMessage(statusError) + if err != nil { + return exitCode, stacktrace.Propagate( + err, + "There was an error trying to parse the message '%s' to an exit code.", + statusError, + ) + } + + return exitCode, nil + } + + return successExecCommandExitCode, nil +} + func (manager *KubernetesManager) RunExecCommand( namespaceName string, podName string, diff --git a/core/server/api_container/server/startosis_engine/kurtosis_instruction/run_sh/run_sh.go b/core/server/api_container/server/startosis_engine/kurtosis_instruction/run_sh/run_sh.go index e2d4bbf76f..af596d49d6 100644 --- a/core/server/api_container/server/startosis_engine/kurtosis_instruction/run_sh/run_sh.go +++ b/core/server/api_container/server/startosis_engine/kurtosis_instruction/run_sh/run_sh.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/kurtosis-tech/kurtosis/api/golang/core/lib/services" + "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/exec_result" "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service" "github.com/kurtosis-tech/kurtosis/core/server/api_container/server/service_network" "github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/kurtosis_instruction/shared_helpers/magic_string_helper" @@ -21,16 +22,19 @@ import ( "go.starlark.net/starlarkstruct" "reflect" "strings" + "time" ) const ( RunShBuiltinName = "run_sh" - ImageNameArgName = "image" - RunArgName = "run" + ImageNameArgName = "image" + RunArgName = "run" + StoreFilesArgName = "store" + WaitArgName = "wait" + FilesArgName = "files" DefaultImageName = "badouralix/curl-jq" - FilesAttr = "files" runshCodeKey = "code" runshOutputKey = "output" @@ -39,7 +43,8 @@ const ( shellCommand = "/bin/sh" - storeFilesKey = "store" + DefaultWaitTimeoutDurationStr = "180s" + DisableWaitTimeoutDurationStr = "" ) var runTailCommandToPreventContainerToStopOnCreating = []string{"tail", "-f", "/dev/null"} @@ -64,15 +69,25 @@ func NewRunShService(serviceNetwork service_network.ServiceNetwork, runtimeValue }, }, { - Name: FilesAttr, + Name: FilesArgName, IsOptional: true, ZeroValueProvider: builtin_argument.ZeroValueProvider[*starlark.Dict], }, { - Name: storeFilesKey, + Name: StoreFilesArgName, IsOptional: true, ZeroValueProvider: builtin_argument.ZeroValueProvider[*starlark.List], }, + { + Name: WaitArgName, + IsOptional: true, + ZeroValueProvider: builtin_argument.ZeroValueProvider[starlark.Value], + // the value can be a string duration, or it can be a Starlark none value (because we are preparing + // the signature to receive a custom type in the future) when users want to disable it + Validator: func(value starlark.Value) *startosis_errors.InterpretationError { + return builtin_argument.DurationOrNone(value, WaitArgName) + }, + }, }, }, @@ -87,14 +102,16 @@ func NewRunShService(serviceNetwork service_network.ServiceNetwork, runtimeValue resultUuid: "", // populated at interpretation time fileArtifactNames: nil, pathToFileArtifacts: nil, + wait: DefaultWaitTimeoutDurationStr, } }, DefaultDisplayArguments: map[string]bool{ - RunArgName: true, - ImageNameArgName: true, - FilesAttr: true, - storeFilesKey: true, + RunArgName: true, + ImageNameArgName: true, + FilesArgName: true, + StoreFilesArgName: true, + WaitArgName: true, }, } } @@ -110,6 +127,7 @@ type RunShCapabilities struct { files map[string]string fileArtifactNames []string pathToFileArtifacts []string + wait string } func (builtin *RunShCapabilities) Interpret(arguments *builtin_argument.ArgumentValuesSet) (starlark.Value, *startosis_errors.InterpretationError) { @@ -127,13 +145,13 @@ func (builtin *RunShCapabilities) Interpret(arguments *builtin_argument.Argument builtin.image = imageStarlark.GoString() } - if arguments.IsSet(FilesAttr) { - filesStarlark, err := builtin_argument.ExtractArgumentValue[*starlark.Dict](arguments, FilesAttr) + if arguments.IsSet(FilesArgName) { + filesStarlark, err := builtin_argument.ExtractArgumentValue[*starlark.Dict](arguments, FilesArgName) if err != nil { - return nil, startosis_errors.WrapWithInterpretationError(err, "Unable to extract value for '%s' argument", FilesAttr) + return nil, startosis_errors.WrapWithInterpretationError(err, "Unable to extract value for '%s' argument", FilesArgName) } if filesStarlark.Len() > 0 { - filesArtifactMountDirPaths, interpretationErr := kurtosis_types.SafeCastToMapStringString(filesStarlark, FilesAttr) + filesArtifactMountDirPaths, interpretationErr := kurtosis_types.SafeCastToMapStringString(filesStarlark, FilesArgName) if interpretationErr != nil { return nil, interpretationErr } @@ -141,14 +159,14 @@ func (builtin *RunShCapabilities) Interpret(arguments *builtin_argument.Argument } } - if arguments.IsSet(storeFilesKey) { - storeFilesList, err := builtin_argument.ExtractArgumentValue[*starlark.List](arguments, storeFilesKey) + if arguments.IsSet(StoreFilesArgName) { + storeFilesList, err := builtin_argument.ExtractArgumentValue[*starlark.List](arguments, StoreFilesArgName) if err != nil { - return nil, startosis_errors.WrapWithInterpretationError(err, "Unable to extract value for '%s' argument", storeFilesKey) + return nil, startosis_errors.WrapWithInterpretationError(err, "Unable to extract value for '%s' argument", StoreFilesArgName) } if storeFilesList.Len() > 0 { - storeFilesArray, interpretationErr := kurtosis_types.SafeCastToStringSlice(storeFilesList, storeFilesKey) + storeFilesArray, interpretationErr := kurtosis_types.SafeCastToStringSlice(storeFilesList, StoreFilesArgName) if interpretationErr != nil { return nil, interpretationErr } @@ -169,6 +187,20 @@ func (builtin *RunShCapabilities) Interpret(arguments *builtin_argument.Argument } } + if arguments.IsSet(WaitArgName) { + var waitTimeout string + waitValue, err := builtin_argument.ExtractArgumentValue[starlark.Value](arguments, WaitArgName) + if err != nil { + return nil, startosis_errors.WrapWithInterpretationError(err, "error occurred while extracting wait information") + } + if waitValueStr, ok := waitValue.(starlark.String); ok { + waitTimeout = waitValueStr.GoString() + } else if _, ok := waitValue.(starlark.NoneType); ok { + waitTimeout = DisableWaitTimeoutDurationStr + } + builtin.wait = waitTimeout + } + resultUuid, err := builtin.runtimeValueStore.CreateValue() if err != nil { return nil, startosis_errors.NewInterpretationError("An error occurred while generating UUID for future reference for %v instruction", RunShBuiltinName) @@ -232,11 +264,11 @@ func (builtin *RunShCapabilities) Validate(_ *builtin_argument.ArgumentValuesSet // Make task as its own entity instead of currently shown under services func (builtin *RunShCapabilities) Execute(ctx context.Context, _ *builtin_argument.ArgumentValuesSet) (string, error) { // create work directory and cd into that directory - commandRunCommand, err := getCommandToRun(builtin) + commandToRun, err := getCommandToRun(builtin) if err != nil { return "", stacktrace.Propagate(err, "error occurred while preparing the sh command to execute on the image") } - createDefaultDirectory := []string{shellCommand, "-c", commandRunCommand} + fullCommandToRun := []string{shellCommand, "-c", commandToRun} serviceConfigBuilder := services.NewServiceConfigBuilder(builtin.image) serviceConfigBuilder.WithFilesArtifactMountDirpaths(builtin.files) // This make sure that the container does not stop as soon as it starts @@ -253,7 +285,7 @@ func (builtin *RunShCapabilities) Execute(ctx context.Context, _ *builtin_argume } // run the command passed in by user in the container - createDefaultDirectoryResult, err := builtin.serviceNetwork.RunExec(ctx, builtin.name, createDefaultDirectory) + createDefaultDirectoryResult, err := executeWithWait(ctx, builtin, fullCommandToRun) if err != nil { return "", stacktrace.Propagate(err, fmt.Sprintf("error occurred while executing one time task command: %v ", builtin.run)) } @@ -270,7 +302,7 @@ func (builtin *RunShCapabilities) Execute(ctx context.Context, _ *builtin_argume if createDefaultDirectoryResult.GetExitCode() != 0 { return "", stacktrace.NewError( "error occurred and shell command: %q exited with code %d with output %q", - commandRunCommand, createDefaultDirectoryResult.GetExitCode(), createDefaultDirectoryResult.GetOutput()) + commandToRun, createDefaultDirectoryResult.GetExitCode(), createDefaultDirectoryResult.GetOutput()) } if builtin.fileArtifactNames != nil && builtin.pathToFileArtifacts != nil { @@ -332,13 +364,50 @@ func getCommandToRun(builtin *RunShCapabilities) (string, error) { return commandWithNoNewLines, nil } +func executeWithWait(ctx context.Context, builtin *RunShCapabilities, commandToRun []string) (*exec_result.ExecResult, error) { + // Wait is set to None + if builtin.wait == DisableWaitTimeoutDurationStr { + return builtin.serviceNetwork.RunExec(ctx, builtin.name, commandToRun) + } + + resultChan := make(chan *exec_result.ExecResult, 1) + errChan := make(chan error, 1) + + timoutStr := builtin.wait + + // we validate timeout string during the validation stage so it cannot be invalid at this stage + parsedTimeout, _ := time.ParseDuration(timoutStr) + + timeDuration := time.After(parsedTimeout) + contextWithDeadline, cancelContext := context.WithTimeout(ctx, parsedTimeout) + defer cancelContext() + + go func() { + executionResult, err := builtin.serviceNetwork.RunExec(contextWithDeadline, builtin.name, commandToRun) + if err != nil { + errChan <- err + } else { + resultChan <- executionResult + } + }() + + select { + case result := <-resultChan: + return result, nil + case err := <-errChan: + return nil, err + case <-timeDuration: // Timeout duration + return nil, stacktrace.NewError("The exec request timed out after %v seconds", parsedTimeout.Seconds()) + } +} + func validatePathIsUniqueWhileCreatingFileArtifact(storeFiles []string) *startosis_errors.ValidationError { if len(storeFiles) > 0 { duplicates := map[string]uint16{} for _, filePath := range storeFiles { if duplicates[filePath] != 0 { return startosis_errors.NewValidationError( - "error occurred while validating field: %v. The file paths in the array must be unique. Found multiple instances of %v", storeFilesKey, filePath) + "error occurred while validating field: %v. The file paths in the array must be unique. Found multiple instances of %v", StoreFilesArgName, filePath) } duplicates[filePath] = 1 } diff --git a/docs/docs/starlark-reference/plan.md b/docs/docs/starlark-reference/plan.md index a689c771fa..9fe23e6fa0 100644 --- a/docs/docs/starlark-reference/plan.md +++ b/docs/docs/starlark-reference/plan.md @@ -409,6 +409,17 @@ The `run_sh` instruction executes a one-time execution task. It runs the bash co # copies the entire directory into a file artifact "/src", ], + + # The time to allow for the command to complete. If the command takes longer than this, + # Kurtosis will kill the command and mark it as failed. + # You may specify a custom wait timeout duration or disable the feature entirely. + # You may specify a custom wait timeout duration with a string: + # wait = "2m" + # Or, you can disable this feature by setting the value to None: + # wait = None + # The feature is enabled by default with a default timeout of 180s + # OPTIONAL (Default: "180s") + wait="180s" ) plan.print(result.code) # returns the future reference to the code diff --git a/internal_testsuites/golang/testsuite/startosis_run_sh_task_test/run_task_sh_task_test.go b/internal_testsuites/golang/testsuite/startosis_run_sh_task_test/run_task_sh_task_test.go index 22c7ad4b58..ed30f065f8 100644 --- a/internal_testsuites/golang/testsuite/startosis_run_sh_task_test/run_task_sh_task_test.go +++ b/internal_testsuites/golang/testsuite/startosis_run_sh_task_test/run_task_sh_task_test.go @@ -30,6 +30,12 @@ def run(plan): result = plan.run_sh(run="cat /tmp/kurtosis.txt") plan.assert(value=result.code, assertion="==", target_value="0") ` + + runshStarlarkWithTimeout = ` +def run(plan): + result = plan.run_sh(run="sleep 45s", wait="30s") + plan.assert(value=result.code, assertion="==", target_value="0") +` ) func TestStarlark_RunshTaskSimple(t *testing.T) { @@ -53,3 +59,11 @@ func TestStarlark_RunshTaskFileArtifactFailure(t *testing.T) { require.NotNil(t, runResult.ExecutionError) require.Contains(t, runResult.ExecutionError.GetErrorMessage(), expectedErrorMessage) } + +func TestStarlark_RunshTimesoutSuccess(t *testing.T) { + ctx := context.Background() + runResult, _ := test_helpers.SetupSimpleEnclaveAndRunScript(t, ctx, runshTest, runshStarlarkWithTimeout) + expectedErrorMessage := "The exec request timed out after 30 seconds" + require.NotNil(t, runResult.ExecutionError) + require.Contains(t, runResult.ExecutionError.GetErrorMessage(), expectedErrorMessage) +}