Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC DO NOT MERGE: Implementation of swarmctl service exec #2359

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
93 changes: 93 additions & 0 deletions agent/session.go
@@ -1,10 +1,15 @@
package agent

import (
"bytes"
"errors"
"io"
"io/ioutil"
"sync"
"time"

"github.com/docker/docker/api/types"
engineapi "github.com/docker/docker/client"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/log"
Expand Down Expand Up @@ -99,6 +104,7 @@ func (s *session) run(ctx context.Context, delay time.Duration, description *api
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)
go runctx(ctx, s.closed, s.errs, s.logSubscriptions)
go runctx(ctx, s.closed, s.errs, s.taskExecutions)

close(s.registered)
}
Expand Down Expand Up @@ -217,6 +223,93 @@ func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMess
}
}

//TODO(fntlnz): cancel with context
func handleExecutions(ctx context.Context, attachment api.TaskExec_AttachClient, containerID string, commands chan []byte) error {
// TODO(fntlnz): REMOVE THE CLIENT HERE find a way to have here the docker client instantiated in cmd/swarmd
// instead of creating a new one or at least use the same creation logic bcause the other spports containerd
dclient, err := engineapi.NewEnvClient()

if err != nil {
return err
}

execCfg := types.ExecConfig{
User: "root",
AttachStdout: true,
AttachStderr: true,
AttachStdin: true,
Cmd: []string{"/bin/sh"},
}

res, err := dclient.ContainerExecCreate(ctx, containerID, execCfg)
if err != nil {
return err
}
resp, err := dclient.ContainerExecAttach(ctx, res.ID, execCfg)

if err != nil {
return err
}
defer resp.Close()

// Send the output back to the client
go func(resp types.HijackedResponse, containerID string) {
for {
curLine, _ := resp.Reader.ReadByte() // TODO(fntlnz): It is ok to send byte by byte?

// sending the result back (in future we will have a tty)
attachment.Send(&api.TaskExecStream{
Message: []byte{curLine},
Containerid: containerID,
})

}
}(resp, containerID)

for {
select {
case cmd := <-commands:
in := ioutil.NopCloser(bytes.NewReader(cmd))
io.Copy(resp.Conn, in)
}
}

}

func (s *session) taskExecutions(ctx context.Context) error {
// TODO(fntlnz): rework this to support streaming over the same exec and not just sending/receiveing
// TODO(fntlnz): pass out errors instead of just conintuing

commandsReader := map[string]chan []byte{}
client := api.NewTaskExecClient(s.conn.ClientConn)
attachment, err := client.Attach(ctx)
if err != nil {
return err
}

for {
data, err := attachment.Recv()
if err != nil {
continue
}

if len(data.Containerid) == 0 {
continue
}

if _, ok := commandsReader[data.Containerid]; !ok {
logrus.Infof("current: %s", data.Containerid)
commandsReader[data.Containerid] = make(chan []byte)
go handleExecutions(ctx, attachment, data.Containerid, commandsReader[data.Containerid])
commandsReader[data.Containerid] <- data.Message
} else {
commandsReader[data.Containerid] <- data.Message
}

}

}

func (s *session) logSubscriptions(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).logSubscriptions"})
log.Debugf("")
Expand Down
55 changes: 55 additions & 0 deletions api/api.pb.txt
Expand Up @@ -4630,6 +4630,14 @@ file {
type_name: ".docker.swarmkit.v1.HealthConfig"
json_name: "healthcheck"
}
field {
name: "init_value"
number: 23
label: LABEL_OPTIONAL
type: TYPE_BOOL
oneof_index: 0
json_name: "initValue"
}
nested_type {
name: "LabelsEntry"
field {
Expand Down Expand Up @@ -4684,6 +4692,9 @@ file {
json_name: "options"
}
}
oneof_decl {
name: "init"
}
}
message_type {
name: "EndpointSpec"
Expand Down Expand Up @@ -6155,6 +6166,23 @@ file {
dependency: "github.com/docker/swarmkit/api/types.proto"
dependency: "gogoproto/gogo.proto"
dependency: "github.com/docker/swarmkit/protobuf/plugin/plugin.proto"
message_type {
name: "TaskExecStream"
field {
name: "message"
number: 1
label: LABEL_OPTIONAL
type: TYPE_BYTES
json_name: "message"
}
field {
name: "containerid"
number: 2
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "containerid"
}
}
message_type {
name: "GetNodeRequest"
field {
Expand Down Expand Up @@ -7321,6 +7349,21 @@ file {
message_type {
name: "RemoveConfigResponse"
}
service {
name: "TaskExec"
method {
name: "Attach"
input_type: ".docker.swarmkit.v1.TaskExecStream"
output_type: ".docker.swarmkit.v1.TaskExecStream"
options {
73626345 {
1: "swarm-manager"
}
}
client_streaming: true
server_streaming: true
}
}
service {
name: "Control"
method {
Expand Down Expand Up @@ -7383,6 +7426,18 @@ file {
}
}
}
method {
name: "Attach"
input_type: ".docker.swarmkit.v1.TaskExecStream"
output_type: ".docker.swarmkit.v1.TaskExecStream"
options {
73626345 {
1: "swarm-manager"
}
}
client_streaming: true
server_streaming: true
}
method {
name: "RemoveTask"
input_type: ".docker.swarmkit.v1.RemoveTaskRequest"
Expand Down
1 change: 1 addition & 0 deletions api/ca.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.