forked from cromon/ratchet2
/
sftp_writer.go
82 lines (69 loc) · 2.6 KB
/
sftp_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package processors
import (
"golang.org/x/crypto/ssh"
"github.com/pkg/sftp"
"github.com/rameshputalapattu/ratchet2/data"
"github.com/rameshputalapattu/ratchet2/logger"
"github.com/rameshputalapattu/ratchet2/util"
)
// SftpWriter is an inline writer to remote sftp server
type SftpWriter struct {
client *sftp.Client
file *sftp.File
parameters *util.SftpParameters
initialized bool
CloseOnFinish bool
}
// NewSftpWriter instantiates a new sftp writer, a connection to the remote server is delayed until data is recv'd by the writer
// By default, the connection to the remote client will be closed in the Finish() func.
// Set CloseOnFinish to false to manage the connection manually.
func NewSftpWriter(server string, username string, path string, authMethods ...ssh.AuthMethod) *SftpWriter {
return &SftpWriter{
parameters: &util.SftpParameters{
Server: server,
Username: username,
Path: path,
AuthMethods: authMethods,
},
initialized: false,
CloseOnFinish: true,
}
}
// NewSftpWriterByFile allows you to manually manage the connection to the remote file object.
// Use this if you want to write to the same file object across multiple pipelines.
// By default, the connection to the remote client will *not* be closed in the Finish() func.
// Set CloseOnFinish to true to have this processor clean up the connection when it's done.
func NewSftpWriterByFile(file *sftp.File) *SftpWriter {
return &SftpWriter{file: file, initialized: true, CloseOnFinish: false}
}
// ProcessData writes data as is directly to the output file
func (w *SftpWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) {
logger.Debug("SftpWriter Process data:", string(d))
w.ensureInitialized(killChan)
_, e := w.file.Write([]byte(d))
util.KillPipelineIfErr(e, killChan)
}
// Finish optionally closes open references to the remote file and server
func (w *SftpWriter) Finish(outputChan chan data.JSON, killChan chan error) {
if w.CloseOnFinish {
w.file.Close()
w.client.Close()
}
}
func (w *SftpWriter) String() string {
return "SftpWriter"
}
// ensureInitialized calls connect and then creates the output file on the sftp server at the specified path
func (w *SftpWriter) ensureInitialized(killChan chan error) {
if w.initialized {
return
}
client, err := util.SftpClient(w.parameters.Server, w.parameters.Username, w.parameters.AuthMethods)
util.KillPipelineIfErr(err, killChan)
logger.Info("Path", w.parameters.Path)
file, err := client.Create(w.parameters.Path)
util.KillPipelineIfErr(err, killChan)
w.client = client
w.file = file
w.initialized = true
}