Skip to content

Commit

Permalink
feat: add source: prober
Browse files Browse the repository at this point in the history
  • Loading branch information
macrat committed Apr 13, 2021
1 parent 4da66f8 commit 5a1aeee
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 33 deletions.
22 changes: 17 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func Usage() {
fmt.Fprintf(out, " and you can set environment variable with query.\n")
fmt.Fprintf(out, " e.g. exec:/path/to/script?something_variable=awesome-value#argument-for-script\n")
fmt.Fprintf(out, "\n")
fmt.Fprintf(out, " source:\n")
fmt.Fprintf(out, " Load a file, and test target URIs of each lines.\n")
fmt.Fprintf(out, " Lines in the file that starts with \"#\" will ignore as comments.\n")
fmt.Fprintf(out, " e.g. source:/path/to/list.txt\n")
fmt.Fprintf(out, "\n")
fmt.Fprintf(out, "EXAMPLES:\n")
fmt.Fprintf(out, " Send ping to example.com in default interval(5m):\n")
fmt.Fprintf(out, " $ %s ping:example.com\n", os.Args[0])
Expand All @@ -78,6 +83,11 @@ func Usage() {
fmt.Fprintf(out, " and execute ./check.sh command every 15 minutes:\n")
fmt.Fprintf(out, " $ %s 1m ping:a.local http://b.local 15m exec:./check.sh\n", os.Args[0])
fmt.Fprintf(out, "\n")
fmt.Fprintf(out, " Check targets that listed in file named \"./list.txt\":\n")
fmt.Fprintf(out, " $ echo ping:a.local >> list.txt\n")
fmt.Fprintf(out, " $ echo ping:b.local >> list.txt\n")
fmt.Fprintf(out, " $ %s source:./list.txt\n", os.Args[0])
fmt.Fprintf(out, "\n")
fmt.Fprintf(out, " Listen on http://0.0.0.0:8080 (and connect to example.com:3306 for check):\n")
fmt.Fprintf(out, " $ %s -p 8080 1m tcp:example.com:3306\n", os.Args[0])
}
Expand Down Expand Up @@ -140,10 +150,12 @@ func RunOneshot(tasks []Task) {

f := t.Probe.Check
go func() {
r := f()
s.Append(r)
if r.Status == store.STATUS_FAILURE {
failed.Store(true)
rs := f()
s.Append(rs...)
for _, r := range rs {
if r.Status == store.STATUS_FAILURE {
failed.Store(true)
}
}
wg.Done()
}()
Expand Down Expand Up @@ -179,7 +191,7 @@ func RunServer(tasks []Task) {

f := t.Probe.Check
job := func() {
s.Append(f())
s.Append(f()...)
}

if t.Schedule.NeedKickWhenStart() {
Expand Down
4 changes: 2 additions & 2 deletions probe/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (p DNSProbe) Target() *url.URL {
return p.target
}

func (p DNSProbe) Check() store.Record {
func (p DNSProbe) Check() []store.Record {
st := time.Now()
addrs, err := net.LookupHost(p.target.Opaque)
d := time.Now().Sub(st)
Expand All @@ -44,5 +44,5 @@ func (p DNSProbe) Check() store.Record {
r.Message = strings.Join(addrs, ", ")
}

return r
return []store.Record{r}
}
6 changes: 3 additions & 3 deletions probe/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (p ExecuteProbe) Target() *url.URL {
return p.target
}

func (p ExecuteProbe) Check() store.Record {
func (p ExecuteProbe) Check() []store.Record {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()

Expand Down Expand Up @@ -74,11 +74,11 @@ func (p ExecuteProbe) Check() store.Record {
}
}

return store.Record{
return []store.Record{{
CheckedAt: st,
Target: p.target,
Status: status,
Message: message,
Latency: d,
}
}}
}
6 changes: 3 additions & 3 deletions probe/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (p HTTPProbe) Target() *url.URL {
return p.target
}

func (p HTTPProbe) Check() store.Record {
func (p HTTPProbe) Check() []store.Record {
req := &http.Request{
Method: p.method,
URL: p.requrl,
Expand All @@ -98,11 +98,11 @@ func (p HTTPProbe) Check() store.Record {
}
}

return store.Record{
return []store.Record{{
CheckedAt: st,
Target: p.target,
Status: status,
Message: message,
Latency: d,
}
}}
}
10 changes: 5 additions & 5 deletions probe/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (p PingProbe) Target() *url.URL {
return p.target
}

func (p PingProbe) Check() store.Record {
func (p PingProbe) Check() []store.Record {
pinger, err := ping.NewPinger(p.target.Opaque)
if err != nil {
status := store.STATUS_FAILURE
Expand All @@ -35,12 +35,12 @@ func (p PingProbe) Check() store.Record {
status = store.STATUS_UNKNOWN
}

return store.Record{
return []store.Record{{
CheckedAt: time.Now(),
Target: p.target,
Status: status,
Message: err.Error(),
}
}}
}

pinger.Interval = 500 * time.Millisecond
Expand All @@ -59,7 +59,7 @@ func (p PingProbe) Check() store.Record {
status = store.STATUS_HEALTHY
}

return store.Record{
return []store.Record{{
CheckedAt: startTime,
Target: p.target,
Status: status,
Expand All @@ -72,5 +72,5 @@ func (p PingProbe) Check() store.Record {
pinger.PacketsRecv,
),
Latency: stat.AvgRtt,
}
}}
}
4 changes: 3 additions & 1 deletion probe/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var (

type Probe interface {
Target() *url.URL
Check() store.Record
Check() []store.Record
}

func GetByURL(u *url.URL) (Probe, error) {
Expand All @@ -35,6 +35,8 @@ func GetByURL(u *url.URL) (Probe, error) {
return NewDNSProbe(u)
case "exec":
return NewExecuteProbe(u)
case "source":
return NewSourceProbe(u)
default:
return nil, ErrUnsupportedScheme
}
Expand Down
118 changes: 118 additions & 0 deletions probe/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package probe

import (
"bufio"
"fmt"
"net/url"
"os"
"strings"
"sync"
"time"

"github.com/macrat/ayd/store"
)

type invalidURIs []string

func (es invalidURIs) Error() string {
var ss []string
for _, e := range es {
ss = append(ss, e)
}
return "Invalid URI: " + strings.Join(ss, ", ")
}

type SourceProbe struct {
target *url.URL
}

func NewSourceProbe(u *url.URL) (SourceProbe, error) {
s := SourceProbe{}
if u.Opaque == "" {
s.target = &url.URL{Scheme: "source", Opaque: u.Path}
} else {
s.target = &url.URL{Scheme: "source", Opaque: u.Opaque}
}
_, err := s.load()
return s, err
}

func (p SourceProbe) Target() *url.URL {
return p.target
}

func (p SourceProbe) load() ([]Probe, error) {
f, err := os.Open(p.target.Opaque)
if err != nil {
return nil, err
}

var probes []Probe
var invalids invalidURIs

scanner := bufio.NewScanner(f)
for scanner.Scan() {
target := strings.TrimSpace(scanner.Text())

if target == "" || strings.HasPrefix(target, "#") {
continue
}

probe, err := Get(target)
if err != nil {
invalids = append(invalids, target)
} else {
probes = append(probes, probe)
}
}

if len(invalids) > 0 {
return nil, invalids
}

return probes, nil
}

func (p SourceProbe) Check() []store.Record {
stime := time.Now()

probes, err := p.load()
if err != nil {
d := time.Now().Sub(stime)
return []store.Record{{
CheckedAt: stime,
Target: p.target,
Status: store.STATUS_UNKNOWN,
Message: err.Error(),
Latency: d,
}}
}

ch := make(chan []store.Record, len(probes))
wg := &sync.WaitGroup{}

for _, p := range probes {
wg.Add(1)

go func(p Probe, ch chan []store.Record) {
ch <- p.Check()
wg.Done()
}(p, ch)
}
wg.Wait()
close(ch)

results := []store.Record{}
for rs := range ch {
results = append(results, rs...)
}

d := time.Now().Sub(stime)
return append(results, store.Record{
CheckedAt: stime,
Target: p.target,
Status: store.STATUS_HEALTHY,
Message: fmt.Sprintf("checked %d targets", len(probes)),
Latency: d,
})
}
4 changes: 2 additions & 2 deletions probe/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (p TCPProbe) Target() *url.URL {
return p.target
}

func (p TCPProbe) Check() store.Record {
func (p TCPProbe) Check() []store.Record {
st := time.Now()
conn, err := net.DialTimeout("tcp", p.target.Opaque, 10*time.Second)
d := time.Now().Sub(st)
Expand All @@ -55,5 +55,5 @@ func (p TCPProbe) Check() store.Record {
conn.Close()
}

return r
return []store.Record{r}
}
26 changes: 14 additions & 12 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ type ProbeHistory struct {

type ProbeHistoryMap map[string]*ProbeHistory

func (hs ProbeHistoryMap) append(r *Record) {
func (hs ProbeHistoryMap) append(r Record) {
target := r.Target.String()

if h, ok := hs[target]; ok {
if len(h.Results) >= PROBE_HISTORY_LEN {
h.Results = h.Results[1:]
}

h.Results = append(h.Results, r)
h.Results = append(h.Results, &r)
} else {
hs[target] = &ProbeHistory{
Target: r.Target,
Results: []*Record{r},
Results: []*Record{&r},
}
}
}
Expand Down Expand Up @@ -109,24 +109,26 @@ func (s *Store) setIncidentIfNeed(r Record) {
}
}

func (s *Store) Append(r Record) {
func (s *Store) Append(rs ...Record) {
s.Lock()
defer s.Unlock()

r = r.Sanitize()

if s.file == nil {
fmt.Fprintf(os.Stderr, "log file isn't opened. may be bug.")
return
}

str := r.String()
fmt.Println(str)
_, s.lastError = fmt.Fprintln(s.file, str)
for _, r := range rs {
r = r.Sanitize()

str := r.String()
fmt.Println(str)
_, s.lastError = fmt.Fprintln(s.file, str)

s.ProbeHistory.append(&r)
s.ProbeHistory.append(r)

s.setIncidentIfNeed(r)
s.setIncidentIfNeed(r)
}
}

func (s *Store) Restore() error {
Expand All @@ -149,7 +151,7 @@ func (s *Store) Restore() error {
continue
}

s.ProbeHistory.append(&r)
s.ProbeHistory.append(r)

s.setIncidentIfNeed(r)
}
Expand Down

0 comments on commit 5a1aeee

Please sign in to comment.