/
ftp.go
96 lines (80 loc) · 1.93 KB
/
ftp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package port
import (
"bytes"
"context"
"fmt"
"sync"
"time"
"github.com/jlaffaye/ftp"
"github.com/pkg/errors"
"github.com/smallinsky/mtf/pkg/fswatch"
pb "github.com/smallinsky/mtf/proto/fswatch"
)
type FTPPort struct {
ftpEventC chan *pb.EventRequest
conn *ftp.ServerConn
}
func NewFTPPort(addr, user, pass string) (*Port, error) {
p, err := NewFTP(addr, user, pass)
if err != nil {
return nil, err
}
return &Port{
impl: p,
}, nil
}
func NewFTP(addr, user, pass string) (*FTPPort, error) {
conn, err := dialFTP("localhost:21", "test", "test")
if err != nil {
return nil, fmt.Errorf("faield to dial ftp: %v", err)
}
ftpPort := &FTPPort{
ftpEventC: make(chan *pb.EventRequest),
conn: conn,
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
fswatch.Subscriber(":4441", func(event *pb.EventRequest) {
ftpPort.ftpEventC <- event
})
}()
wg.Wait()
return ftpPort, nil
}
type FTPEvent struct {
Path string
Payload []byte
}
func (p *FTPPort) Send(ctx context.Context, i interface{}) error {
event, ok := i.(*FTPEvent)
if !ok {
return fmt.Errorf("FTPPort send supports only FTPEvent type")
}
if err := p.conn.Stor(event.Path, bytes.NewBuffer(event.Payload)); err != nil {
return fmt.Errorf("ftp file upload failed %v", err)
}
return nil
}
func (p *FTPPort) Receive(ctx context.Context) (interface{}, error) {
select {
case msg := <-p.ftpEventC:
return &FTPEvent{
Path: msg.GetPath(),
Payload: msg.GetContent(),
}, nil
case <-time.NewTimer(time.Second * 7).C:
return nil, errors.Errorf("fialed to recive message, deadline exeeded")
}
}
func dialFTP(addr string, user, pass string) (*ftp.ServerConn, error) {
connection, err := ftp.Connect(addr)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to %q", addr)
}
if err := connection.Login(user, pass); err != nil {
return nil, fmt.Errorf("failed to login to %q: %v", addr, err)
}
return connection, nil
}