/
service_execute.go
132 lines (112 loc) · 3.58 KB
/
service_execute.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package runnerv2service
import (
"io"
"os"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
runnerv2alpha1 "github.com/stateful/runme/v3/internal/gen/proto/go/runme/runner/v2alpha1"
"github.com/stateful/runme/v3/internal/ulid"
)
func (r *runnerService) Execute(srv runnerv2alpha1.RunnerService_ExecuteServer) error {
ctx := srv.Context()
id := ulid.GenerateID()
logger := r.logger.With(zap.String("id", id))
logger.Info("running Execute in runnerService")
// Get the initial request.
req, err := srv.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
logger.Info("client closed the connection while getting initial request")
return nil
}
logger.Info("failed to receive a request", zap.Error(err))
return errors.WithStack(err)
}
logger.Info("received initial request", zap.Any("req", req))
// Manage the session.
session, existed, err := r.getOrCreateSessionFromRequest(req)
if err != nil {
return err
}
if err := session.SetEnv(req.Config.Env...); err != nil {
return err
}
if !existed {
r.sessions.Add(session)
}
// TODO: extend session with the project, if present.
exec, err := newExecution(
id,
req.Config,
session,
req.StoreStdoutInEnv,
logger,
)
if err != nil {
return err
}
// Start the command and send the initial response with PID.
if err := exec.Cmd.Start(ctx); err != nil {
return err
}
if err := srv.Send(&runnerv2alpha1.ExecuteResponse{
Pid: &wrapperspb.UInt32Value{Value: uint32(exec.Cmd.Pid())},
}); err != nil {
return err
}
// From the initial request, only the config is used to create a new execution.
// The rest of fields like InputData, Winsize, Stop are handled in this goroutine,
// and then the goroutine continues to read the next requests.
go func(req *runnerv2alpha1.ExecuteRequest) {
for {
var err error
if err := exec.SetWinsize(req.Winsize); err != nil {
logger.Info("failed to set winsize; ignoring", zap.Error(err))
}
_, err = exec.Write(req.InputData)
if err != nil {
logger.Info("failed to write to stdin; ignoring", zap.Error(err))
}
if err := exec.Stop(req.Stop); err != nil {
logger.Info("failed to stop program; ignoring", zap.Error(err))
}
req, err = srv.Recv()
logger.Info("received request", zap.Any("req", req), zap.Error(err))
switch {
case err == nil:
// continue
case err == io.EOF:
logger.Info("client closed its send direction; stopping the program")
if err := exec.Cmd.Signal(os.Interrupt); err != nil {
logger.Info("failed to stop the command with interrupt signal", zap.Error(err))
}
return
case status.Convert(err).Code() == codes.Canceled || status.Convert(err).Code() == codes.DeadlineExceeded:
if !exec.Cmd.Running() {
logger.Info("stream canceled after the process finished; ignoring")
} else {
logger.Info("stream canceled while the process is still running; program will be stopped if non-background")
if err := exec.Cmd.Signal(os.Kill); err != nil {
logger.Info("failed to stop program with kill signal", zap.Error(err))
}
}
return
}
}
}(req)
exitCode, waitErr := exec.Wait(ctx, srv)
logger.Info("command finished", zap.Int("exitCode", exitCode), zap.Error(waitErr))
var finalExitCode *wrapperspb.UInt32Value
if exitCode > -1 {
finalExitCode = wrapperspb.UInt32(uint32(exitCode))
}
if err := srv.Send(&runnerv2alpha1.ExecuteResponse{
ExitCode: finalExitCode,
}); err != nil {
logger.Info("failed to send exit code", zap.Error(err))
}
return waitErr
}