-
Notifications
You must be signed in to change notification settings - Fork 4
/
exec.go
197 lines (166 loc) · 5.63 KB
/
exec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0
package server
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"
"github.com/digitalocean/go-libvirt"
"github.com/go-logr/logr"
iri "github.com/ironcore-dev/ironcore/iri/apis/machine/v1alpha1"
remotecommandserver "github.com/ironcore-dev/ironcore/poollet/machinepoollet/iri/streaming/remotecommand"
"github.com/ironcore-dev/libvirt-provider/api"
libvirtutils "github.com/ironcore-dev/libvirt-provider/internal/libvirt/utils"
"github.com/ironcore-dev/libvirt-provider/internal/store"
"github.com/moby/term"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/client-go/tools/remotecommand"
"libvirt.org/go/libvirtxml"
)
const (
StreamCreationTimeout = 30 * time.Second
StreamIdleTimeout = 2 * time.Minute
)
type executorExec struct {
Libvirt *libvirt.Libvirt
ExecRequest *iri.ExecRequest
Machine *api.Machine
activeConsoles *sync.Map
}
func (s *Server) Exec(ctx context.Context, req *iri.ExecRequest) (*iri.ExecResponse, error) {
log := s.loggerFrom(ctx, "MachineID", req.MachineId)
log.V(1).Info("Verifying machine in the store")
if _, err := s.machineStore.Get(ctx, req.MachineId); err != nil {
if !errors.Is(err, store.ErrNotFound) {
return nil, fmt.Errorf("error getting machine: %w", err)
}
return nil, status.Errorf(codes.NotFound, "machine %s not found", req.MachineId)
}
log.V(1).Info("Inserting request into cache")
token, err := s.execRequestCache.Insert(req)
if err != nil {
return nil, err
}
log.V(1).Info("Returning url with token")
return &iri.ExecResponse{
Url: s.buildURL("exec", token),
}, nil
}
func (s *Server) ServeExec(w http.ResponseWriter, req *http.Request, token string) {
ctx := req.Context()
log := logr.FromContextOrDiscard(ctx)
request, ok := s.execRequestCache.Consume(token)
if !ok {
log.V(1).Info("Rejecting unknown / expired token")
http.NotFound(w, req)
return
}
apiMachine, err := s.machineStore.Get(ctx, request.MachineId)
if err != nil {
log.Error(err, "error getting the apiMachine")
if errors.Is(err, store.ErrNotFound) {
http.NotFound(w, req)
return
}
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
exec := executorExec{
Libvirt: s.libvirt,
ExecRequest: request,
Machine: apiMachine,
activeConsoles: &s.activeConsoles,
}
handler, err := remotecommandserver.NewExecHandler(exec, remotecommandserver.ExecHandlerOptions{
StreamCreationTimeout: StreamCreationTimeout,
StreamIdleTimeout: StreamIdleTimeout,
})
if err != nil {
log.Error(err, "error creating exec handler")
code := http.StatusInternalServerError
http.Error(w, http.StatusText(code), code)
return
}
handler.Handle(w, req, remotecommandserver.ExecOptions{})
}
func (e executorExec) Exec(ctx context.Context, in io.Reader, out io.WriteCloser, _ remotecommand.TerminalSizeQueue) error {
machineID := e.ExecRequest.MachineId
// Check if a console is already active for this machine
_, loaded := e.activeConsoles.LoadOrStore(machineID, true)
if loaded {
return errors.New("operation failed: Active console session exists for this domain")
}
defer e.activeConsoles.Delete(machineID)
// Check if the apiMachine doesn't exist, to avoid making the libvirt-lookup call.
if e.Machine == nil {
return fmt.Errorf("apiMachine %w in the store", store.ErrNotFound)
}
domain, err := e.Libvirt.DomainLookupByUUID(libvirtutils.UUIDStringToBytes(machineID))
if err != nil {
if !libvirtutils.IsErrorCode(err, libvirt.ErrNoDomain) {
return fmt.Errorf("error looking up domain: %w", err)
}
return fmt.Errorf("machine %s has not yet been synced", machineID)
}
domainXMLData, err := e.Libvirt.DomainGetXMLDesc(domain, 0)
if err != nil {
return fmt.Errorf("failed to lookup domain: %w", err)
}
domainXML := &libvirtxml.Domain{}
if err := domainXML.Unmarshal(domainXMLData); err != nil {
return fmt.Errorf("failed to unmarshal domain: %w", err)
}
if domainXML.Devices == nil || len(domainXML.Devices.Consoles) == 0 {
return errors.New("device console not set in machine domainXML")
}
ttyPath := domainXML.Devices.Consoles[0].TTY
f, err := os.OpenFile(ttyPath, os.O_RDWR, 0)
if err != nil {
return fmt.Errorf("error opening PTY: %w", err)
}
// Wrap the input stream with an escape proxy. Escape Sequence Ctrl + ] = 29
inputReader := term.NewEscapeProxy(in, []byte{29})
// Print escape character information to the exec console
fmt.Fprintf(out, "Escape character is ^] (Ctrl + ])\n")
var wg sync.WaitGroup
log := logr.FromContextOrDiscard(ctx).WithName(machineID)
wg.Add(2)
// ReadInput: go routine to read the input from the reader, and write to the terminal.
go func() {
defer wg.Done()
buf := make([]byte, 1024)
for {
n, err := inputReader.Read(buf)
if err != nil {
if _, ok := err.(term.EscapeError); ok {
f.Close() // This is to close the writer, allowing io.Copy to exit the loop.
log.Info("Closed reading the terminal. Escape sequence received")
return
}
log.Error(err, "error reading bytes")
return
}
_, err = f.Write(buf[:n])
if err != nil {
log.Error(err, "error writing to the file descriptor")
return
}
}
}()
// WriteOutput: go routine for writing the output back to the Writer.
go func() {
defer wg.Done()
// Ignoring error to allow graceful shutdown without flagging as an error; not needed at this stage.
_, _ = io.Copy(out, f)
log.Info("Closed writing to the terminal")
}()
wg.Wait()
log.Info("Closed console for the machine")
return nil
}