Skip to content

Commit

Permalink
feat: added wait to run_sh task. (#750)
Browse files Browse the repository at this point in the history
Added wait field to run_sh task.
The intention is that user can set a wait time-out and if the command is
not completed before the time-out, it will fail. In this PR, I have also
fixed the wait and ready-condition bug observed on k8s with exec-recipe.
  • Loading branch information
Peeeekay committed Jun 23, 2023
1 parent f1570f7 commit 8c2b697
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 43 deletions.
6 changes: 6 additions & 0 deletions cli/cli/commands/lsp/resource/kurtosis_starlark.json
Expand Up @@ -208,6 +208,12 @@
"type": "list<string>",
"content": "store?",
"detail": "List of paths to directories or files that will be copied to a file artifact"
},
{
"name": "wait",
"type": "string",
"content": "wait?",
"detail": "The time to allow for the command to complete. If the command takes longer than this, the instruction will fail. In order to disable it, set wait=None. The default value is - 180s"
}
]
},
Expand Down
Expand Up @@ -49,23 +49,14 @@ func RunUserServiceExecCommands(
return nil, nil, stacktrace.Propagate(err, "An error occurred getting user services matching the requested UUIDs: %+v", requestedGuids)
}

successfulExecs, failedExecs, err := runExecOperationsInParallel(namespaceName, userServiceCommands, matchingObjectsAndResources, kubernetesManager)
successfulExecs, failedExecs, err := runExecOperationsInParallel(namespaceName, userServiceCommands, matchingObjectsAndResources, kubernetesManager, ctx)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An unexpected error occurred running the exec commands in parallel")
}
return successfulExecs, failedExecs, nil
}

func runExecOperationsInParallel(
namespaceName string,
commandArgs map[service.ServiceUUID][]string,
userServiceKubernetesResources map[service.ServiceUUID]*shared_helpers.UserServiceObjectsAndKubernetesResources,
kubernetesManager *kubernetes_manager.KubernetesManager,
) (
map[service.ServiceUUID]*exec_result.ExecResult,
map[service.ServiceUUID]error,
error,
) {
func runExecOperationsInParallel(namespaceName string, commandArgs map[service.ServiceUUID][]string, userServiceKubernetesResources map[service.ServiceUUID]*shared_helpers.UserServiceObjectsAndKubernetesResources, kubernetesManager *kubernetes_manager.KubernetesManager, ctx context.Context) (map[service.ServiceUUID]*exec_result.ExecResult, map[service.ServiceUUID]error, error) {
successfulExecs := map[service.ServiceUUID]*exec_result.ExecResult{}
failedExecs := map[service.ServiceUUID]error{}

Expand Down Expand Up @@ -103,7 +94,7 @@ func runExecOperationsInParallel(
userServiceKubernetesPod := userServiceKubernetesResource.KubernetesResources.Pod

execOperationId := operation_parallelizer.OperationID(serviceUuid)
execOperation := createExecOperation(namespaceName, serviceUuid, userServiceKubernetesPod, commandArg, kubernetesManager)
execOperation := createExecOperation(namespaceName, serviceUuid, userServiceKubernetesPod, commandArg, kubernetesManager, ctx)
execOperations[execOperationId] = execOperation
}

Expand All @@ -125,17 +116,12 @@ func runExecOperationsInParallel(
return successfulExecs, failedExecs, nil
}

func createExecOperation(
namespaceName string,
serviceUuid service.ServiceUUID,
servicePod *v1.Pod,
commandArg []string,
kubernetesManager *kubernetes_manager.KubernetesManager,
) operation_parallelizer.Operation {
func createExecOperation(namespaceName string, serviceUuid service.ServiceUUID, servicePod *v1.Pod, commandArg []string, kubernetesManager *kubernetes_manager.KubernetesManager, ctx context.Context) operation_parallelizer.Operation {
return func() (interface{}, error) {
outputBuffer := &bytes.Buffer{}
concurrentBuffer := concurrent_writer.NewConcurrentWriter(outputBuffer)
exitCode, err := kubernetesManager.RunExecCommand(
exitCode, err := kubernetesManager.RunExecCommandWithContext(
ctx,
namespaceName,
servicePod.Name,
userServiceContainerName,
Expand Down
Expand Up @@ -61,6 +61,7 @@ const (
shouldAddTimestampsWhenPrintingPodInfo = true

listOptionsTimeoutSeconds int64 = 10
contextDeadlineExceeded = "context deadline exceeded"
)

// We'll try to use the nicer-to-use shells first before we drop down to the lower shells
Expand Down Expand Up @@ -1245,6 +1246,92 @@ func (manager *KubernetesManager) GetContainerLogs(
return result, nil
}

// RunExecCommandWithContext This runs the exec to kubernetes with context, therefore
// when context timeouts it stops the process.
// TODO: merge RunExecCommand and this to one method
// Doing this for now to unblock myself for wait worflows for k8s
// In next PR, will include add context to WaitForPortAvailabilityUsingNetstat and
// CopyFilesFromUserService method. I am doing this to reduce the blast radius.
func (manager *KubernetesManager) RunExecCommandWithContext(
ctx context.Context,
namespaceName string,
podName string,
containerName string,
command []string,
stdOutOutput io.Writer,
stdErrOutput io.Writer,
) (
resultExitCode int32,
resultErr error,
) {
execOptions := &apiv1.PodExecOptions{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Stdin: shouldAllocateStdinOnPodExec,
Stdout: shouldAllocatedStdoutOnPodExec,
Stderr: shouldAllocatedStderrOnPodExec,
TTY: shouldAllocateTtyOnPodExec,
Container: containerName,
Command: command,
}

//Create a RESTful command request.
request := manager.kubernetesClientSet.CoreV1().RESTClient().
Post().
Namespace(namespaceName).
Resource("pods").
Name(podName).
SubResource("exec").
VersionedParams(execOptions, scheme.ParameterCodec)
if request == nil {
return -1, stacktrace.NewError(
"Failed to build a working RESTful request for the command '%s'.",
execOptions.Command,
)
}

exec, err := remotecommand.NewSPDYExecutor(manager.kuberneteRestConfig, http.MethodPost, request.URL())
if err != nil {
return -1, stacktrace.Propagate(
err,
"Failed to build an executor for the command '%s' with the RESTful endpoint '%s'.",
execOptions.Command,
request.URL().String(),
)
}

if err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: nil,
Stdout: stdOutOutput,
Stderr: stdErrOutput,
Tty: false,
TerminalSizeQueue: nil,
}); err != nil {
// Kubernetes returns the exit code of the command via a string in the error message, so we have to extract it
statusError := err.Error()

// this means that context deadline has exceeded
if strings.Contains(statusError, contextDeadlineExceeded) {
return 1, stacktrace.Propagate(err, "There was an error occurred while executing commands on the container")
}

exitCode, err := getExitCodeFromStatusMessage(statusError)
if err != nil {
return exitCode, stacktrace.Propagate(
err,
"There was an error trying to parse the message '%s' to an exit code.",
statusError,
)
}

return exitCode, nil
}

return successExecCommandExitCode, nil
}

func (manager *KubernetesManager) RunExecCommand(
namespaceName string,
podName string,
Expand Down

0 comments on commit 8c2b697

Please sign in to comment.