Skip to content

Commit

Permalink
feat(scheme/ftp): support to use FTP/FTPS as an alert
Browse files Browse the repository at this point in the history
  • Loading branch information
macrat committed Dec 9, 2022
1 parent e4e261b commit 739e935
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 34 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ Ayd supports below URL schemes in default.
| scheme | as Target | as Alert |
|------------------------------------|:------------------:|:------------------:|
| [`http:` / `https:`](#http--https) | :heavy_check_mark: | :heavy_check_mark: |
| [`ftp:` / `ftps:`](#ftp--ftps) | :heavy_check_mark: | :heavy_minus_sign: |
| [`ftp:` / `ftps:`](#ftp--ftps) | :heavy_check_mark: | :heavy_check_mark: |
| [`ping:`](#ping) | :heavy_check_mark: | :heavy_minus_sign: |
| [`tcp:`](#tcp) | :heavy_check_mark: | :heavy_minus_sign: |
| [`dns:`](#dns) | :heavy_check_mark: | :heavy_minus_sign: |
Expand Down Expand Up @@ -169,7 +169,8 @@ examples:

##### as Alert

FTP/FTPS does not support to used as an alert URL.
Writes the same format logs as the [normal log file](#log-file), over FTP or FTPS, when the service status changed.
It is pretty same as [file:](#file) scheme for alert but uses FTP/FTPS.

#### ping:

Expand Down Expand Up @@ -250,7 +251,8 @@ examples:

##### as Alert

If you use this as an alert target, it writes logs that the same as [normal log](#log-file) to the target path, but only logs when status changed.
Writes the same format logs as the [normal log file](#log-file) to the target path, when the service status changed.
It is pretty same as [ftp: / ftps:](ftp--ftps) for alert but writes to a local file.

#### exec:

Expand Down
2 changes: 1 addition & 1 deletion internal/scheme/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewAlerterFromURL(u *api.URL) (Alerter, error) {
case "http", "https":
return NewHTTPScheme(u)
case "ftp", "ftps":
return nil, ErrUnsupportedAlertScheme
return NewFTPScheme(u)
case "ping", "ping4", "ping6":
return nil, ErrUnsupportedAlertScheme
case "tcp", "tcp4", "tcp6":
Expand Down
2 changes: 0 additions & 2 deletions internal/scheme/alert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ func TestAlerter(t *testing.T) {
{"exec:ayd-bar-probe", "alert:exec:ayd-bar-probe", "arg \"\"\nenv ayd_time=2001-02-03T16:05:06Z ayd_status=FAILURE ayd_latency=12.345 ayd_target=dummy:failure ayd_message=foobar ayd_extra={\"hello\":\"world\"}\n---\nexit_code: 0", ""},
{"bar:", "", "", "unsupported scheme"},

{"ftp://example.com", "", "", "unsupported scheme for alert"},
{"ftps://example.com", "", "", "unsupported scheme for alert"},
{"ping://example.com", "", "", "unsupported scheme for alert"},
{"ping4://example.com", "", "", "unsupported scheme for alert"},
{"ping6://example.com", "", "", "unsupported scheme for alert"},
Expand Down
72 changes: 53 additions & 19 deletions internal/scheme/ftp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheme

import (
"bytes"
"context"
"crypto/tls"
"errors"
Expand Down Expand Up @@ -92,13 +93,13 @@ func ftpConnectAndLogin(ctx context.Context, u *api.URL) (conn *ftp.ServerConn,
return conn, api.StatusHealthy, ""
}

// FTPProbe is a implementation for the FTP.
type FTPProbe struct {
// FTPScheme is a probe/alert implementation for the FTP.
type FTPScheme struct {
target *api.URL
}

func NewFTPProbe(u *api.URL) (FTPProbe, error) {
p := FTPProbe{
func NewFTPScheme(u *api.URL) (FTPScheme, error) {
s := FTPScheme{
target: &api.URL{
Scheme: u.Scheme,
User: u.User,
Expand All @@ -109,31 +110,31 @@ func NewFTPProbe(u *api.URL) (FTPProbe, error) {
}

if u.Host == "" {
return FTPProbe{}, ErrMissingHost
return FTPScheme{}, ErrMissingHost
}

if u.User != nil {
if u.User.Username() == "" {
return FTPProbe{}, ErrMissingUsername
return FTPScheme{}, ErrMissingUsername
}
if _, ok := u.User.Password(); !ok {
return FTPProbe{}, ErrMissingPassword
return FTPScheme{}, ErrMissingPassword
}
}

if u.Path == "" {
p.target.Path = "/"
s.target.Path = "/"
}

return p, nil
return s, nil
}

func (p FTPProbe) Target() *api.URL {
return p.target
func (s FTPScheme) Target() *api.URL {
return s.target
}

func (p FTPProbe) list(conn *ftp.ServerConn) (files []*ftp.Entry, status api.Status, message string) {
ls, err := conn.List(p.target.Path)
func (s FTPScheme) list(conn *ftp.ServerConn) (files []*ftp.Entry, status api.Status, message string) {
ls, err := conn.List(s.target.Path)
if err != nil {
return nil, api.StatusFailure, err.Error()
}
Expand All @@ -144,30 +145,30 @@ func (p FTPProbe) list(conn *ftp.ServerConn) (files []*ftp.Entry, status api.Sta
}

// Probe checks if the target FTP server is available.
func (p FTPProbe) Probe(ctx context.Context, r Reporter) {
func (s FTPScheme) Probe(ctx context.Context, r Reporter) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()

stime := time.Now()
report := func(status api.Status, message string, extra map[string]interface{}) {
r.Report(p.target, timeoutOr(ctx, api.Record{
r.Report(s.target, timeoutOr(ctx, api.Record{
Time: stime,
Status: status,
Latency: time.Since(stime),
Target: p.target,
Target: s.target,
Message: message,
Extra: extra,
}))
}

conn, status, message := ftpConnectAndLogin(ctx, p.target)
conn, status, message := ftpConnectAndLogin(ctx, s.target)
if status != api.StatusHealthy {
report(status, message, nil)
return
}
defer conn.Quit()

ls, status, message := p.list(conn)
ls, status, message := s.list(conn)
if status != api.StatusHealthy {
report(status, message, nil)
return
Expand All @@ -180,7 +181,7 @@ func (p FTPProbe) Probe(ctx context.Context, r Reporter) {
}
}

if n == 1 && path.Base(ls[0].Name) == path.Base(p.target.Path) {
if n == 1 && path.Base(ls[0].Name) == path.Base(s.target.Path) {
report(api.StatusHealthy, "file exists", map[string]interface{}{
"file_size": ls[0].Size,
"mtime": ls[0].Time.Format(time.RFC3339),
Expand All @@ -200,3 +201,36 @@ func (p FTPProbe) Probe(ctx context.Context, r Reporter) {
report(api.StatusHealthy, "directory exists", extra)
}
}

func (s FTPScheme) Alert(ctx context.Context, r Reporter, lastRecord api.Record) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()

stime := time.Now()
report := func(status api.Status, message string, extra map[string]interface{}) {
r.Report(s.target, timeoutOr(ctx, api.Record{
Time: stime,
Status: status,
Latency: time.Since(stime),
Target: s.target,
Message: message,
Extra: extra,
}))
}

conn, status, message := ftpConnectAndLogin(ctx, s.target)
if status != api.StatusHealthy {
report(status, message, nil)
return
}
defer conn.Quit()

line := lastRecord.String() + "\n"

err := conn.Append(s.target.Path, bytes.NewBufferString(line))
if err != nil {
report(api.StatusFailure, fmt.Sprintf("failed to upload record: %s", err), nil)
} else {
report(api.StatusHealthy, fmt.Sprintf("uploaded %d bytes to the server", len(line)), nil)
}
}
89 changes: 81 additions & 8 deletions internal/scheme/ftp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheme_test

import (
"bytes"
"context"
_ "embed"
"errors"
"fmt"
Expand All @@ -11,6 +12,9 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/macrat/ayd/internal/scheme"
"github.com/macrat/ayd/internal/testutil"
api "github.com/macrat/ayd/lib-ayd"
ftp "goftp.io/server/core"
)
Expand Down Expand Up @@ -56,7 +60,15 @@ func (i FTPFileInfo) Group() string {
return "hoge"
}

type FTPTestDriver struct{}
type FTPUploadedFile struct {
Name string
Data []byte
Append bool
}

type FTPTestDriver struct {
Uploaded []FTPUploadedFile
}

func (d FTPTestDriver) Stat(path string) (ftp.FileInfo, error) {
switch path {
Expand Down Expand Up @@ -127,11 +139,20 @@ func (d FTPTestDriver) GetFile(path string, i int64) (int64, io.ReadCloser, erro
return 0, nil, errors.New("not implemented")
}

func (d FTPTestDriver) PutFile(path string, f io.Reader, b bool) (int64, error) {
return 0, errors.New("not implemented")
func (d *FTPTestDriver) PutFile(path string, f io.Reader, append_ bool) (int64, error) {
if b, err := io.ReadAll(f); err != nil {
return 0, err
} else {
d.Uploaded = append(d.Uploaded, FTPUploadedFile{
Name: path,
Data: b,
Append: append_,
})
return int64(len(b)), nil
}
}

func (d FTPTestDriver) NewDriver() (ftp.Driver, error) {
func (d *FTPTestDriver) NewDriver() (ftp.Driver, error) {
return d, nil
}

Expand All @@ -150,10 +171,11 @@ func (a FTPTestAuth) CheckPasswd(username, password string) (ok bool, err error)
// StartFTPServer starts FTP server for test.
//
// XXX: randomize port and avoid conflict
func StartFTPServer(t *testing.T, port int) *ftp.Server {
func StartFTPServer(t *testing.T, port int) *FTPTestDriver {
t.Helper()
driver := &FTPTestDriver{}
server := ftp.NewServer(&ftp.ServerOpts{
Factory: FTPTestDriver{},
Factory: driver,
Auth: FTPTestAuth{},
Port: port,
Logger: &ftp.DiscardLogger{},
Expand All @@ -166,10 +188,10 @@ func StartFTPServer(t *testing.T, port int) *ftp.Server {
server.Shutdown()
})
}()
return server
return driver
}

func TestFTPProbe(t *testing.T) {
func TestFTPScheme_Probe(t *testing.T) {
t.Parallel()
StartFTPServer(t, 21021)

Expand Down Expand Up @@ -199,3 +221,54 @@ func TestFTPProbe(t *testing.T) {
}, 1)
}
}

func TestFTPScheme_Alert(t *testing.T) {
t.Parallel()

a, err := scheme.NewAlerter("ftp://hoge:fuga@localhost:21121/alert.json")
if err != nil {
t.Fatalf("failed to prepare FTPScheme: %s", err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

driver := StartFTPServer(t, 21121)

r := &testutil.DummyReporter{}
a.Alert(ctx, r, api.Record{
Time: time.Date(2021, 1, 2, 15, 4, 5, 0, time.UTC),
Status: api.StatusFailure,
Latency: 123456 * time.Microsecond,
Target: &api.URL{Scheme: "dummy", Fragment: "hello"},
Message: "hello world",
})

expected := `{"time":"2021-01-02T15:04:05Z", "status":"FAILURE", "latency":123.456, "target":"dummy:#hello", "message":"hello world"}` + "\n"

if len(r.Records) != 1 {
t.Errorf("unexpected number of records\n%v", r.Records)
} else {
if r.Records[0].Status != api.StatusHealthy {
t.Errorf("unexpected status: %s", r.Records[0].Status)
}
if r.Records[0].Message != fmt.Sprintf("uploaded %d bytes to the server", len(expected)) {
t.Errorf("unexpected message: %q", r.Records[0].Message)
}
}

if len(driver.Uploaded) != 1 {
t.Errorf("unexpected number of uploaded files found: %d", len(driver.Uploaded))
} else {
info := driver.Uploaded[0]
if info.Name != "/alert.json" {
t.Errorf("unexpected name file uploaded: %s", info.Name)
}
if diff := cmp.Diff(expected, string(info.Data)); diff != "" {
t.Errorf("unexpected file uploaded:\n%s", diff)
}
if !info.Append {
t.Errorf("the append flag was false")
}
}
}
2 changes: 1 addition & 1 deletion internal/scheme/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewProberFromURL(u *api.URL) (Prober, error) {
case "http", "https":
return NewHTTPScheme(u)
case "ftp", "ftps":
return NewFTPProbe(u)
return NewFTPScheme(u)
case "ping", "ping4", "ping6":
return NewPingProbe(u)
case "tcp", "tcp4", "tcp6":
Expand Down

0 comments on commit 739e935

Please sign in to comment.