Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable streaming exec output in container engine [stream exec pt. 1] #1043

Merged
merged 18 commits into from Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -278,7 +278,6 @@ func (backend *DockerKurtosisBackend) GetUserServiceLogs(
return user_service_functions.GetUserServiceLogs(ctx, enclaveUuid, filters, shouldFollowLogs, backend.dockerManager)
}

// TODO Switch these to streaming so that huge command outputs don't blow up the API container memory
// NOTE: This function will block while the exec is ongoing; if we need more perf we can make it async
func (backend *DockerKurtosisBackend) RunUserServiceExecCommands(
gbouv marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context,
Expand All @@ -292,6 +291,15 @@ func (backend *DockerKurtosisBackend) RunUserServiceExecCommands(
return user_service_functions.RunUserServiceExecCommands(ctx, enclaveUuid, userServiceCommands, backend.dockerManager)
}

func (backend *DockerKurtosisBackend) RunUserServiceExecCommandWithStreamedOutput(
ctx context.Context,
enclaveUuid enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
cmd []string,
) (chan string, chan *exec_result.ExecResult, error) {
return user_service_functions.RunUserServiceExecCommandWithStreamedOutput(ctx, enclaveUuid, serviceUuid, cmd, backend.dockerManager)
}

func (backend *DockerKurtosisBackend) GetShellOnUserService(ctx context.Context, enclaveUuid enclave.EnclaveUUID, serviceUuid service.ServiceUUID) error {
return user_service_functions.GetShellOnUserService(ctx, enclaveUuid, serviceUuid, backend.dockerManager)
}
Expand Down
@@ -0,0 +1,50 @@
package user_service_functions

import (
"context"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/shared_helpers"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_manager"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/enclave"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/exec_result"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service"
"github.com/kurtosis-tech/stacktrace"
)

func RunUserServiceExecCommandWithStreamedOutput(
ctx context.Context,
enclaveId enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
cmd []string,
dockerManager *docker_manager.DockerManager,
) (chan string, chan *exec_result.ExecResult, error) {
userServiceUuids := map[service.ServiceUUID]bool{}
userServiceUuids[serviceUuid] = true
filters := &service.ServiceFilters{
Names: nil,
UUIDs: userServiceUuids,
Statuses: nil,
}
_, allDockerResources, err := shared_helpers.GetMatchingUserServiceObjsAndDockerResourcesNoMutex(ctx, enclaveId, filters, dockerManager)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred getting user services matching filters '%+v'", filters)
}

var userServiceDockerResource *shared_helpers.UserServiceDockerResources
if dockerResource, found := allDockerResources[serviceUuid]; found {
userServiceDockerResource = dockerResource
} else {
return nil, nil, stacktrace.NewError("No docker resources were found for the service with identifier: '%v'", serviceUuid)
}

userServiceDockerContainer := userServiceDockerResource.ServiceContainer

execOutputLinesChan, finalExecChan, err := dockerManager.RunExecCommandWithStreamedOutput(
ctx,
userServiceDockerContainer.GetId(),
cmd)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred attempting to stream exec output from docker.")
}

return execOutputLinesChan, finalExecChan, nil
}
Expand Up @@ -6,6 +6,7 @@
package docker_manager

import (
"bufio"
"context"
"encoding/json"
"fmt"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/consts"
docker_manager_types "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_manager/types"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/compute_resources"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/exec_result"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/concurrent_writer"
"github.com/kurtosis-tech/stacktrace"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -116,7 +118,8 @@ const (

successfulExitCode = 0

emptyNetworkAlias = ""
emptyNetworkAlias = ""
streamOutputDelimiter = '\n'

isDockerNetworkAttachable = true

Expand Down Expand Up @@ -916,7 +919,7 @@ func (manager *DockerManager) RunExecCommand(context context.Context, containerI
}

execStartConfig := types.ExecStartCheck{
// Because detach is false, we'll block until the command comes back
// Can not be run in detached mode or else response from ContainerExecAttach doesn't return output
Detach: false,
Tty: false,
ConsoleSize: nil,
Expand All @@ -929,26 +932,20 @@ func (manager *DockerManager) RunExecCommand(context context.Context, containerI
// Therefore, we ONLY call Attach, without Start
attachResp, err := dockerClient.ContainerExecAttach(context, execId, execStartConfig)
if err != nil {
return 0, stacktrace.Propagate(
err,
"An error occurred starting/attaching to the exec command")
return 0, stacktrace.Propagate(err, "An error occurred starting/attaching to the exec command")
}
defer attachResp.Close()

// NOTE: We have to demultiplex the logs that come back
// This will keep reading until it receives EOF
concurrentWriter := concurrent_writer.NewConcurrentWriter(logOutput)
if _, err := stdcopy.StdCopy(concurrentWriter, concurrentWriter, attachResp.Reader); err != nil {
return 0, stacktrace.Propagate(
err,
"An error occurred copying the exec command output to the given output writer")
return 0, stacktrace.Propagate(err, "An error occurred copying the exec command output to the given output writer")
}

inspectResponse, err := dockerClient.ContainerExecInspect(context, execId)
if err != nil {
return 0, stacktrace.Propagate(
err,
"An error occurred inspecting the exec to get the response code")
return 0, stacktrace.Propagate(err, "An error occurred inspecting the exec to get the response code")
}
if inspectResponse.Running {
return 0, stacktrace.NewError("Expected exec to have stopped, but it's still running!")
Expand All @@ -961,6 +958,97 @@ func (manager *DockerManager) RunExecCommand(context context.Context, containerI
return int32ExitCode, nil
}

func (manager *DockerManager) RunExecCommandWithStreamedOutput(context context.Context, containerId string, command []string) (chan string, chan *exec_result.ExecResult, error) {
dockerClient := manager.dockerClient
execConfig := types.ExecConfig{
User: "",
Privileged: false,
Tty: false,
ConsoleSize: nil,
AttachStdin: false,
AttachStderr: true,
AttachStdout: true,
Detach: false,
DetachKeys: "",
Env: nil,
WorkingDir: "",
Cmd: command,
}

createResp, err := dockerClient.ContainerExecCreate(context, containerId, execConfig)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred creating the exec process")
}

execId := createResp.ID
if execId == "" {
return nil, nil, stacktrace.NewError("Got back an empty exec ID when running '%v' on container '%v'", command, containerId)
}

execStartConfig := types.ExecStartCheck{
// Can not be run in detached mode or else response from ContainerExecAttach doesn't return output
Detach: false,
Tty: false,
ConsoleSize: nil,
}

execOutputChan := make(chan string)
finalExecResultChan := make(chan *exec_result.ExecResult)
go func() {
defer func() {
close(execOutputChan)
close(finalExecResultChan)
}()

// IMPORTANT NOTE:
// You'd think that we'd need to call ContainerExecStart separately after this ContainerExecAttach....
// ...but ContainerExecAttach **actually starts the exec command!!!!**
// We used to be doing them both, but then we were hitting this occasional race condition: https://github.com/moby/moby/issues/42408
// Therefore, we ONLY call Attach, without Start
attachResp, err := dockerClient.ContainerExecAttach(context, execId, execStartConfig)
if err != nil {
execOutputChan <- err.Error()
return
}
defer attachResp.Close()

// Stream output from docker through output channel
reader := bufio.NewReader(attachResp.Reader)
tedim52 marked this conversation as resolved.
Show resolved Hide resolved
for {
execOutputLine, err := reader.ReadString(streamOutputDelimiter)
if err != nil {
if err == io.EOF {
break
} else {
return
}
}

execOutputChan <- execOutputLine
}

inspectResponse, err := dockerClient.ContainerExecInspect(context, execId)
if err != nil {
execOutputChan <- err.Error()
return
}
if inspectResponse.Running {
execOutputChan <- stacktrace.NewError("Expected exec to have stopped, but it's still running!").Error()
return
}
unsizedExitCode := inspectResponse.ExitCode
if unsizedExitCode > math.MaxInt32 || unsizedExitCode < math.MinInt32 {
execOutputChan <- stacktrace.NewError("Could not cast unsized int '%v' to int32 because it does not fit", unsizedExitCode).Error()
return
}
int32ExitCode := int32(unsizedExitCode)

// Don't send output in final result because it was already streamed
finalExecResultChan <- exec_result.NewExecResult(int32ExitCode, "")
tedim52 marked this conversation as resolved.
Show resolved Hide resolved
}()
return execOutputChan, finalExecResultChan, nil
}

/*
ConnectContainerToNetwork
Connects the container with the given container ID to the network with the given network ID, using the given IP address
Expand Down
Expand Up @@ -329,7 +329,6 @@ func (backend *KubernetesKurtosisBackend) GetUserServiceLogs(
backend.kubernetesManager)
}

// TODO Switch these to streaming methods, so that huge command outputs don't blow up the memory of the API container
func (backend *KubernetesKurtosisBackend) RunUserServiceExecCommands(
ctx context.Context,
enclaveUuid enclave.EnclaveUUID,
Expand All @@ -349,6 +348,23 @@ func (backend *KubernetesKurtosisBackend) RunUserServiceExecCommands(
backend.kubernetesManager)
}

func (backend *KubernetesKurtosisBackend) RunUserServiceExecCommandWithStreamedOutput(
ctx context.Context,
enclaveUuid enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
cmd []string,
) (chan string, chan *exec_result.ExecResult, error) {
return user_services_functions.RunUserServiceExecCommandWithStreamedOutput(
ctx,
enclaveUuid,
serviceUuid,
cmd,
backend.cliModeArgs,
backend.apiContainerModeArgs,
backend.engineServerModeArgs,
backend.kubernetesManager)
}

func (backend *KubernetesKurtosisBackend) GetShellOnUserService(ctx context.Context, enclaveUuid enclave.EnclaveUUID, serviceUuid service.ServiceUUID) (resultErr error) {
objectAndResources, err := shared_helpers.GetSingleUserServiceObjectsAndResources(ctx, enclaveUuid, serviceUuid, backend.cliModeArgs, backend.apiContainerModeArgs, backend.engineServerModeArgs, backend.kubernetesManager)
if err != nil {
Expand Down
@@ -0,0 +1,77 @@
package user_services_functions

import (
"context"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_kurtosis_backend/shared_helpers"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_manager"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/container_status"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/enclave"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/exec_result"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service"
"github.com/kurtosis-tech/stacktrace"
)

func RunUserServiceExecCommandWithStreamedOutput(
ctx context.Context,
enclaveId enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
cmd []string,
cliModeArgs *shared_helpers.CliModeArgs,
apiContainerModeArgs *shared_helpers.ApiContainerModeArgs,
engineServerModeArgs *shared_helpers.EngineServerModeArgs,
kubernetesManager *kubernetes_manager.KubernetesManager,
) (chan string, chan *exec_result.ExecResult, error) {
tedim52 marked this conversation as resolved.
Show resolved Hide resolved
namespaceName, err := shared_helpers.GetEnclaveNamespaceName(ctx, enclaveId, cliModeArgs, apiContainerModeArgs, engineServerModeArgs, kubernetesManager)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred getting namespace name for enclave '%v'", enclaveId)
}

requestedGuids := map[service.ServiceUUID]bool{}
requestedGuids[serviceUuid] = true
matchingServicesFilters := &service.ServiceFilters{
Names: nil,
UUIDs: requestedGuids,
Statuses: nil,
}
matchingObjectsAndResources, err := shared_helpers.GetMatchingUserServiceObjectsAndKubernetesResources(ctx, enclaveId, matchingServicesFilters, cliModeArgs, apiContainerModeArgs, engineServerModeArgs, kubernetesManager)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred getting user services matching the requested UUIDs: %v", requestedGuids)
}

var userServiceKubernetesResource *shared_helpers.UserServiceObjectsAndKubernetesResources
if resource, found := matchingObjectsAndResources[serviceUuid]; found {
userServiceKubernetesResource = resource
} else {
return nil, nil, stacktrace.NewError(
"Cannot execute command '%+v' on service '%v' because no Kubernetes resources were found for it",
cmd,
serviceUuid)
}

userServiceKubernetesService := userServiceKubernetesResource.Service
if userServiceKubernetesService == nil {
return nil, nil, stacktrace.Propagate(err, "An error was found while running exec with streamed output over kubernetes for service '%s' and command '%v'.",
cmd,
serviceUuid)
}
if userServiceKubernetesService.GetStatus() != container_status.ContainerStatus_Running {
return nil, nil, stacktrace.NewError(
"Cannot execute command '%+v' on service '%v' because the service status is '%v'",
cmd,
serviceUuid,
userServiceKubernetesService.GetStatus().String())
}

userServiceKubernetesPod := userServiceKubernetesResource.KubernetesResources.Pod

execOutputLinesChan, finalResultChan, err := kubernetesManager.RunExecCommandWithStreamedOutput(
ctx,
namespaceName,
userServiceKubernetesPod.Name,
userServiceContainerName,
cmd)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred attempting to stream exec output from docker.")
}
return execOutputLinesChan, finalResultChan, nil
}