Skip to content

Commit

Permalink
feat(scheme): support extra values in embedded schemes
Browse files Browse the repository at this point in the history
  • Loading branch information
macrat committed May 29, 2022
1 parent 172780c commit ef11476
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 49 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ For example, log lines look like below.
```
{"time":"2001-02-30T16:00:00+09:00", "status":"FAILURE", "latency":0.544, "target":"http://localhost", "message":"Get \"http://localhost\": dial tcp [::1]:80: connect: connection refused"}
{"time":"2001-02-30T16:05:00+09:00", "status":"UNKNOWN", "latency":0.000, "target":"tcp:somehost:1234", "message":"lookup somehost on 192.168.1.1:53: no such host"}
{"time":"2001-02-30T16:10:00+09:00", "status":"HEALTHY", "latency":0.375, "target":"ping:anotherhost", "message":"rtt(min/avg/max)=0.31/0.38/0.47 send/rcv=4/4"}
{"time":"2001-02-30T16:10:00+09:00", "status":"HEALTHY", "latency":0.375, "target":"ping:anotherhost", "message":"All packets came back", "packets_recv":3, "packets_sent:3, "rtt_avg":0.38, "rtt_max":0.47, "rtt_min":0.31}
```

Ayd will save the log file named `ayd.log` into the current directory in default.
Expand Down
13 changes: 13 additions & 0 deletions internal/scheme/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,25 @@ func (s ExecScheme) run(ctx context.Context, r Reporter, extraEnv []string) {
message, latency = getLatencyByMessage(message, latency)
message, status = getStatusByMessage(message, status)

exitCode := 0
if err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
exitCode = exitErr.ExitCode()
} else {
exitCode = -1
}
}

r.Report(s.target, timeoutOr(ctx, api.Record{
CheckedAt: stime,
Target: s.target,
Status: status,
Message: message,
Latency: latency,
Extra: map[string]interface{}{
"exit_code": exitCode,
},
}))
}

Expand Down
15 changes: 10 additions & 5 deletions internal/scheme/ftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,26 +150,27 @@ func (p FTPProbe) Probe(ctx context.Context, r Reporter) {
defer cancel()

stime := time.Now()
report := func(status api.Status, message string) {
report := func(status api.Status, message string, extra map[string]interface{}) {
r.Report(p.target, timeoutOr(ctx, api.Record{
CheckedAt: stime,
Status: status,
Latency: time.Since(stime),
Target: p.target,
Message: message,
Extra: extra,
}))
}

conn, status, message := ftpConnectAndLogin(ctx, p.target)
if status != api.StatusHealthy {
report(status, message)
report(status, message, nil)
return
}
defer conn.Quit()

ls, status, message := p.list(conn)
if status != api.StatusHealthy {
report(status, message)
report(status, message, nil)
return
}

Expand All @@ -181,8 +182,12 @@ func (p FTPProbe) Probe(ctx context.Context, r Reporter) {
}

if n == 1 && ls[0].Name == path.Base(p.target.Path) {
report(api.StatusHealthy, fmt.Sprintf("type=file size=%d", ls[0].Size))
report(api.StatusHealthy, fmt.Sprintf("type=file size=%d", ls[0].Size), map[string]interface{}{
"file_size": ls[0].Size,
})
} else {
report(api.StatusHealthy, fmt.Sprintf("type=directory files=%d", n))
report(api.StatusHealthy, fmt.Sprintf("type=directory files=%d", n), map[string]interface{}{
"files": n,
})
}
}
9 changes: 8 additions & 1 deletion internal/scheme/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,18 @@ func (s HTTPScheme) Target() *api.URL {
func (s HTTPScheme) responseToRecord(resp *http.Response, err error) api.Record {
status := api.StatusFailure
message := ""
var extra map[string]interface{}

if err == nil {
message = fmt.Sprintf("proto=%s length=%d status=%s", resp.Proto, resp.ContentLength, strings.ReplaceAll(resp.Status, " ", "_"))
message = resp.Status
if 200 <= resp.StatusCode && resp.StatusCode <= 299 {
status = api.StatusHealthy
}
extra = map[string]interface{}{
"proto": resp.Proto,
"length": resp.ContentLength,
"status_code": resp.StatusCode,
}
} else {
message = err.Error()

Expand All @@ -119,6 +125,7 @@ func (s HTTPScheme) responseToRecord(resp *http.Response, err error) api.Record
Target: s.target,
Status: status,
Message: message,
Extra: extra,
}
}

Expand Down
18 changes: 9 additions & 9 deletions internal/scheme/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ func TestHTTPScheme_Probe(t *testing.T) {
defer server.Close()

AssertProbe(t, []ProbeTest{
{server.URL + "/ok", api.StatusHealthy, `proto=HTTP/1\.1 length=2 status=200_OK`, ""},
{server.URL + "/redirect/ok", api.StatusHealthy, `proto=HTTP/1\.1 length=2 status=200_OK`, ""},
{server.URL + "/error", api.StatusFailure, `proto=HTTP/1\.1 length=5 status=500_Internal_Server_Error`, ""},
{server.URL + "/redirect/error", api.StatusFailure, `proto=HTTP/1\.1 length=5 status=500_Internal_Server_Error`, ""},
{server.URL + "/ok", api.StatusHealthy, `200 OK`, ""},
{server.URL + "/redirect/ok", api.StatusHealthy, `200 OK`, ""},
{server.URL + "/error", api.StatusFailure, `500 Internal Server Error`, ""},
{server.URL + "/redirect/error", api.StatusFailure, `500 Internal Server Error`, ""},
{server.URL + "/redirect/loop", api.StatusFailure, `Get "/redirect/loop": redirect loop detected`, ""},
{strings.Replace(server.URL, "http", "http-get", 1) + "/only/get", api.StatusHealthy, `proto=HTTP/1\.1 length=0 status=200_OK`, ""},
{strings.Replace(server.URL, "http", "http-post", 1) + "/only/post", api.StatusHealthy, `proto=HTTP/1\.1 length=0 status=200_OK`, ""},
{strings.Replace(server.URL, "http", "http-head", 1) + "/only/head", api.StatusHealthy, `proto=HTTP/1\.1 length=-1 status=200_OK`, ""},
{strings.Replace(server.URL, "http", "http-options", 1) + "/only/options", api.StatusHealthy, `proto=HTTP/1\.1 length=0 status=200_OK`, ""},
{strings.Replace(server.URL, "http", "http-connect", 1) + "/only/connect", api.StatusHealthy, `proto=HTTP/1\.1 length=0 status=200_OK`, ""},
{strings.Replace(server.URL, "http", "http-get", 1) + "/only/get", api.StatusHealthy, `200 OK`, ""},
{strings.Replace(server.URL, "http", "http-post", 1) + "/only/post", api.StatusHealthy, `200 OK`, ""},
{strings.Replace(server.URL, "http", "http-head", 1) + "/only/head", api.StatusHealthy, `200 OK`, ""},
{strings.Replace(server.URL, "http", "http-options", 1) + "/only/options", api.StatusHealthy, `200 OK`, ""},
{strings.Replace(server.URL, "http", "http-connect", 1) + "/only/connect", api.StatusHealthy, `200 OK`, ""},
{server.URL + "/slow-page", api.StatusFailure, `probe timed out`, ""},
{"http://localhost:54321/", api.StatusFailure, `(127\.0\.0\.1|\[::1\]):54321: connection refused`, ""},
}, 5)
Expand Down
20 changes: 10 additions & 10 deletions internal/scheme/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scheme
import (
"context"
"errors"
"fmt"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -178,24 +177,25 @@ func pingResultToRecord(ctx context.Context, target *api.URL, startTime time.Tim
CheckedAt: startTime,
Latency: result.AvgRTT,
Target: target,
Message: fmt.Sprintf(
"ip=%s rtt(min/avg/max)=%.2f/%.2f/%.2f recv/sent=%d/%d",
result.Target,
float64(result.MinRTT.Microseconds())/1000,
float64(result.AvgRTT.Microseconds())/1000,
float64(result.MaxRTT.Microseconds())/1000,
result.Recv,
result.Sent,
),
Extra: map[string]interface{}{
"rtt_min": float64(result.MinRTT.Microseconds()) / 1000,
"rtt_avg": float64(result.AvgRTT.Microseconds()) / 1000,
"rtt_max": float64(result.MaxRTT.Microseconds()) / 1000,
"packets_recv": result.Recv,
"packets_sent": result.Sent,
},
}

switch {
case result.Loss == 0:
rec.Status = api.StatusHealthy
rec.Message = "All packets came back"
case result.Recv == 0:
rec.Status = api.StatusFailure
rec.Message = "All packets has dropped"
default:
rec.Status = api.StatusDegrade
rec.Message = "Some packets has dropped"
}

if ctx.Err() == context.Canceled {
Expand Down
40 changes: 37 additions & 3 deletions internal/scheme/ping_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
api "github.com/macrat/ayd/lib-ayd"
"github.com/macrat/go-parallel-pinger"
)
Expand Down Expand Up @@ -236,6 +237,7 @@ func TestPingResultToRecord(t *testing.T) {
StartTime time.Time
Result pinger.Result
Message string
Extra map[string]interface{}
Status api.Status
}{
{
Expand All @@ -251,7 +253,14 @@ func TestPingResultToRecord(t *testing.T) {
AvgRTT: 2345 * time.Microsecond,
MaxRTT: 3456 * time.Microsecond,
},
"ip=127.0.0.1 rtt(min/avg/max)=1.23/2.35/3.46 recv/sent=3/3",
"All packets came back",
map[string]interface{}{
"rtt_min": 1.234,
"rtt_avg": 2.345,
"rtt_max": 3.456,
"packets_recv": 3,
"packets_sent": 3,
},
api.StatusHealthy,
},
{
Expand All @@ -264,7 +273,14 @@ func TestPingResultToRecord(t *testing.T) {
Recv: 0,
Loss: 3,
},
"ip=127.1.2.3 rtt(min/avg/max)=0.00/0.00/0.00 recv/sent=0/3",
"All packets has dropped",
map[string]interface{}{
"rtt_min": 0.0,
"rtt_avg": 0.0,
"rtt_max": 0.0,
"packets_recv": 0,
"packets_sent": 3,
},
api.StatusFailure,
},
{
Expand All @@ -280,7 +296,14 @@ func TestPingResultToRecord(t *testing.T) {
AvgRTT: 2345 * time.Microsecond,
MaxRTT: 3456 * time.Microsecond,
},
"ip=127.3.2.1 rtt(min/avg/max)=1.23/2.35/3.46 recv/sent=2/3",
"Some packets has dropped",
map[string]interface{}{
"rtt_min": 1.234,
"rtt_avg": 2.345,
"rtt_max": 3.456,
"packets_recv": 2,
"packets_sent": 3,
},
api.StatusDegrade,
},
{
Expand All @@ -297,6 +320,13 @@ func TestPingResultToRecord(t *testing.T) {
MaxRTT: 3456 * time.Microsecond,
},
"probe aborted",
map[string]interface{}{
"rtt_min": 1.234,
"rtt_avg": 2.345,
"rtt_max": 3.456,
"packets_recv": 2,
"packets_sent": 3,
},
api.StatusAborted,
},
}
Expand Down Expand Up @@ -324,6 +354,10 @@ func TestPingResultToRecord(t *testing.T) {
if rec.Message != tt.Message {
t.Errorf("unexpected message\n--- expected ---\n%s\n--- actual ---\n%s", tt.Message, rec.Message)
}

if diff := cmp.Diff(tt.Extra, rec.Extra); diff != "" {
t.Errorf("unexpected extra\n%s", diff)
}
})
}
}
12 changes: 6 additions & 6 deletions internal/scheme/ping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ func TestPingProbe_Probe(t *testing.T) {
}

AssertProbe(t, []ProbeTest{
{"ping:localhost", api.StatusHealthy, `ip=(127.0.0.1|::1) rtt\(min/avg/max\)=[0-9.]*/[0-9.]*/[0-9.]* recv/sent=3/3`, ""},
{"ping:127.0.0.1", api.StatusHealthy, `ip=127.0.0.1 rtt\(min/avg/max\)=[0-9.]*/[0-9.]*/[0-9.]* recv/sent=3/3`, ""},
{"ping:::1", api.StatusHealthy, `ip=::1 rtt\(min/avg/max\)=[0-9.]*/[0-9.]*/[0-9.]* recv/sent=3/3`, ""},
{"ping4:localhost", api.StatusHealthy, `ip=127.0.0.1 rtt\(min/avg/max\)=[0-9.]*/[0-9.]*/[0-9.]* recv/sent=3/3`, ""},
{"ping6:localhost", api.StatusHealthy, `ip=::1 rtt\(min/avg/max\)=[0-9.]*/[0-9.]*/[0-9.]* recv/sent=3/3`, ""},
{"ping:localhost", api.StatusHealthy, `All packets came back`, ""},
{"ping:127.0.0.1", api.StatusHealthy, `All packets came back`, ""},
{"ping:::1", api.StatusHealthy, `All packets came back`, ""},
{"ping4:localhost", api.StatusHealthy, `All packets came back`, ""},
{"ping6:localhost", api.StatusHealthy, `All packets came back`, ""},
{"ping:of-course-definitely-no-such-host", api.StatusUnknown, `.*`, ""},
}, 2)

Expand Down Expand Up @@ -71,7 +71,7 @@ func TestPingProbe_Probe(t *testing.T) {
t.Setenv("AYD_PING_INTERVAL", "1ms")

AssertProbe(t, []ProbeTest{
{"ping:localhost", api.StatusHealthy, `ip=(127.0.0.1|::1) rtt\(min/avg/max\)=[0-9.]*/[0-9.]*/[0-9.]* recv/sent=10/10`, ""},
{"ping:localhost", api.StatusHealthy, `All packets came back`, ""},
}, 2)
})
}
Expand Down
24 changes: 15 additions & 9 deletions internal/scheme/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,20 +367,23 @@ func (p SourceScheme) Probe(ctx context.Context, r Reporter) {
if err != nil {
r.Report(p.target, timeoutOr(ctx, api.Record{
CheckedAt: stime,
Target: p.target,
Latency: d,
Status: api.StatusFailure,
Target: p.target,
Message: err.Error(),
Latency: d,
}))
return
}

r.Report(p.target, api.Record{
CheckedAt: stime,
Target: p.target,
Status: api.StatusHealthy,
Message: fmt.Sprintf("target_count=%d", len(probes)),
Latency: d,
Target: p.target,
Message: fmt.Sprintf("loaded %d targets", len(probes)),
Extra: map[string]interface{}{
"targets": len(probes),
},
})

r = p.tracker.PrepareReporter(p.target, r)
Expand Down Expand Up @@ -413,20 +416,23 @@ func (p SourceScheme) Alert(ctx context.Context, r Reporter, lastRecord api.Reco
if err != nil {
r.Report(p.target, timeoutOr(ctx, api.Record{
CheckedAt: stime,
Target: p.target,
Latency: d,
Status: api.StatusFailure,
Target: p.target,
Message: err.Error(),
Latency: d,
}))
return
}

r.Report(p.target, api.Record{
CheckedAt: stime,
Target: p.target,
Status: api.StatusHealthy,
Message: fmt.Sprintf("target_count=%d", len(alerters)),
Latency: d,
Status: api.StatusHealthy,
Target: p.target,
Message: fmt.Sprintf("loaded %d targets", len(alerters)),
Extra: map[string]interface{}{
"targets": len(alerters),
},
})

wg := &sync.WaitGroup{}
Expand Down
6 changes: 5 additions & 1 deletion internal/scheme/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ func (s TCPProbe) Probe(ctx context.Context, r Reporter) {
}
} else {
rec.Status = api.StatusHealthy
rec.Message = "source=" + conn.LocalAddr().String() + " target=" + conn.RemoteAddr().String()
rec.Message = "succeed to connect"
rec.Extra = map[string]interface{}{
"source": conn.LocalAddr().String(),
"target": conn.RemoteAddr().String(),
}
conn.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion internal/scheme/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestTCPScheme_Probe(t *testing.T) {
defer server.Close()

AssertProbe(t, []ProbeTest{
{strings.Replace(server.URL, "http://", "tcp://", 1), api.StatusHealthy, `source=(127\.0\.0\.1|\[::1\]):[0-9]+ target=(127\.0\.0\.1|\[::1\]):[0-9]+`, ""},
{strings.Replace(server.URL, "http://", "tcp://", 1), api.StatusHealthy, `succeed to connect`, ""},

{"tcp://localhost", api.StatusUnknown, ``, "TCP target's port number is required"},
}, 5)
Expand Down
Loading

0 comments on commit ef11476

Please sign in to comment.