forked from gravitational/teleport
/
sftp.go
214 lines (187 loc) · 5.72 KB
/
sftp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
/*
* Teleport
* Copyright (C) 2023 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package regular
import (
"bufio"
"context"
"errors"
"io"
"os"
"os/exec"
"time"
"github.com/gogo/protobuf/jsonpb"
"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
"github.com/gravitational/teleport"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/srv"
"github.com/gravitational/teleport/lib/utils"
)
// number of goroutines that copy SFTP data from a SSH channel to
// and from anonymous pipes
const copyingGoroutines = 2
type sftpSubsys struct {
sftpCmd *exec.Cmd
serverCtx *srv.ServerContext
errCh chan error
log *logrus.Entry
}
func newSFTPSubsys() (*sftpSubsys, error) {
// TODO: add prometheus collectors?
return &sftpSubsys{
log: logrus.WithFields(logrus.Fields{
trace.Component: teleport.ComponentSubsystemSFTP,
}),
}, nil
}
func (s *sftpSubsys) Start(ctx context.Context,
serverConn *ssh.ServerConn,
ch ssh.Channel, req *ssh.Request,
serverCtx *srv.ServerContext,
) error {
// Check that file copying is allowed Node-wide again here in case
// this connection was proxied, the proxy doesn't know if file copying
// is allowed for certain Nodes.
if !serverCtx.AllowFileCopying {
serverCtx.GetServer().EmitAuditEvent(context.WithoutCancel(ctx), &apievents.SFTP{
Metadata: apievents.Metadata{
Code: events.SFTPDisallowedCode,
Type: events.SFTPEvent,
Time: time.Now(),
},
UserMetadata: serverCtx.Identity.GetUserMetadata(),
ServerMetadata: serverCtx.GetServerMetadata(),
Error: srv.ErrNodeFileCopyingNotPermitted.Error(),
})
return srv.ErrNodeFileCopyingNotPermitted
}
s.serverCtx = serverCtx
// Create two sets of anonymous pipes to give the child process
// access to the SSH channel
chReadPipeOut, chReadPipeIn, err := os.Pipe()
if err != nil {
return trace.Wrap(err)
}
defer chReadPipeOut.Close()
chWritePipeOut, chWritePipeIn, err := os.Pipe()
if err != nil {
return trace.Wrap(err)
}
defer chWritePipeIn.Close()
// Create anonymous pipe that the child will send audit information
// over
auditPipeOut, auditPipeIn, err := os.Pipe()
if err != nil {
return trace.Wrap(err)
}
defer auditPipeIn.Close()
// Create child process to handle SFTP connection
execRequest, err := srv.NewExecRequest(serverCtx, teleport.SFTPSubsystem)
if err != nil {
return trace.Wrap(err)
}
if err := serverCtx.SetExecRequest(execRequest); err != nil {
return trace.Wrap(err)
}
if err := serverCtx.SetSSHRequest(req); err != nil {
return trace.Wrap(err)
}
s.sftpCmd, err = srv.ConfigureCommand(serverCtx, chReadPipeOut, chWritePipeIn, auditPipeIn)
if err != nil {
return trace.Wrap(err)
}
s.sftpCmd.Stdout = os.Stdout
s.sftpCmd.Stderr = os.Stderr
s.log.Debug("starting SFTP process")
err = s.sftpCmd.Start()
if err != nil {
return trace.Wrap(err)
}
// TODO: put in cgroup?
execRequest.Continue()
// Copy the SSH channel to and from the anonymous pipes
s.errCh = make(chan error, copyingGoroutines)
go func() {
defer chReadPipeIn.Close()
_, err := io.Copy(chReadPipeIn, ch)
s.errCh <- err
}()
go func() {
defer chWritePipeOut.Close()
_, err := io.Copy(ch, chWritePipeOut)
s.errCh <- err
}()
// Read and emit audit events from the child process
go func() {
defer auditPipeOut.Close()
// Create common fields for events
serverMeta := serverCtx.GetServerMetadata()
sessionMeta := serverCtx.GetSessionMetadata()
userMeta := serverCtx.Identity.GetUserMetadata()
connectionMeta := apievents.ConnectionMetadata{
RemoteAddr: serverConn.RemoteAddr().String(),
LocalAddr: serverConn.LocalAddr().String(),
}
r := bufio.NewReader(auditPipeOut)
for {
// Read up to a NULL byte, the child process uses this to
// delimit audit events
eventStr, err := r.ReadString(0x0)
if err != nil {
if !errors.Is(err, io.EOF) {
s.log.WithError(err).Warn("Failed to read SFTP event.")
}
return
}
var sftpEvent apievents.SFTP
err = jsonpb.UnmarshalString(eventStr[:len(eventStr)-1], &sftpEvent)
if err != nil {
s.log.WithError(err).Warn("Failed to unmarshal SFTP event.")
continue
}
sftpEvent.Metadata.ClusterName = serverCtx.ClusterName
sftpEvent.ServerMetadata = serverMeta
sftpEvent.SessionMetadata = sessionMeta
sftpEvent.UserMetadata = userMeta
sftpEvent.ConnectionMetadata = connectionMeta
if err := serverCtx.GetServer().EmitAuditEvent(ctx, &sftpEvent); err != nil {
log.WithError(err).Warn("Failed to emit SFTP event.")
}
}
}()
return nil
}
func (s *sftpSubsys) Wait() error {
waitErr := s.sftpCmd.Wait()
s.log.Debug("SFTP process finished")
s.serverCtx.SendExecResult(srv.ExecResult{
Command: s.sftpCmd.String(),
Code: s.sftpCmd.ProcessState.ExitCode(),
})
errs := []error{waitErr}
for i := 0; i < copyingGoroutines; i++ {
err := <-s.errCh
if err != nil && !utils.IsOKNetworkError(err) {
s.log.WithError(err).Warn("Connection problem.")
errs = append(errs, err)
}
}
return trace.NewAggregate(errs...)
}