Skip to content
This repository has been archived by the owner on Dec 15, 2020. It is now read-only.

Commit

Permalink
Simplified main loop. Split out PingRecord and PingState code. Beginn…
Browse files Browse the repository at this point in the history
…ing of geoip support.
  • Loading branch information
joshuar committed Sep 3, 2016
1 parent 334b576 commit 29fd48c
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 450 deletions.
17 changes: 17 additions & 0 deletions beat/PingRecord.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package pingbeat

import (
"time"
)

type PingRecord struct {
Target string
Sent time.Time
}

func NewPingRecord(target string) *PingRecord {
return &PingRecord{
Target: target,
Sent: time.Now().UTC(),
}
}
2 changes: 0 additions & 2 deletions beat/PingRequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ type PingRequest struct {
text_payload *icmp.Message
binary_payload []byte
ping_type icmp.Type
target string
addr net.Addr
seq_no int
}

func NewPingRequest(seq_no int, ping_type icmp.Type, target string, network string) (*PingRequest, error) {
pr := &PingRequest{}
pr.target = target
pr.addr = pr.toAddr(target, network)
pr.ping_type = ping_type
pr.seq_no = seq_no
Expand Down
27 changes: 27 additions & 0 deletions beat/PingState.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package pingbeat

import (
"github.com/elastic/beats/libbeat/logp"
"sync"
)

type PingState struct {
MU sync.RWMutex
Pings map[int]*PingRecord
SeqNo int
}

func NewPingState() *PingState {
return &PingState{SeqNo: 0, Pings: make(map[int]*PingRecord)}
}

func (p *PingState) GetSeqNo() int {
s := p.SeqNo
p.SeqNo++
// reset sequence no if we go above a 32-bit value
if p.SeqNo > 65535 {
logp.Debug("pingbeat", "Resetting sequence number")
p.SeqNo = 0
}
return s
}
181 changes: 82 additions & 99 deletions beat/pingbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
cfg "github.com/joshuar/pingbeat/config"
"github.com/oschwald/geoip2-golang"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
"gopkg.in/go-playground/pool.v3"
"net"
"os"
"sync"
"time"
)

Expand All @@ -30,22 +30,18 @@ type Pingbeat struct {
ipv6network string
ipv4targets map[string][2]string
ipv6targets map[string][2]string
geoipdb *geoip2.Reader
config cfg.ConfigSettings
events publisher.Client
done chan struct{}
}

type PingRecord struct {
Target string
Sent time.Time
}

type PingSent struct {
Seq int
Target string
Type icmp.Type
Sent time.Time
}
// type PingSent struct {
// Seq int
// Target string
// Type icmp.Type
// Sent time.Time
// }

type PingRecv struct {
Seq int
Expand All @@ -54,27 +50,6 @@ type PingRecv struct {
LossReason string
}

type PingState struct {
MU sync.RWMutex
Pings map[int]*PingRecord
SeqNo int
}

func NewPingState() *PingState {
return &PingState{SeqNo: 0, Pings: make(map[int]*PingRecord)}
}

func (p *PingState) GetSeqNo() int {
s := p.SeqNo
p.SeqNo++
// reset sequence no if we go above a 32-bit value
if p.SeqNo > 65535 {
logp.Debug("pingbeat", "Resetting sequence number")
p.SeqNo = 0
}
return s
}

func New() *Pingbeat {
return &Pingbeat{}
}
Expand Down Expand Up @@ -147,6 +122,16 @@ func (p *Pingbeat) Config(b *beat.Beat) error {
return err
}

// Check and load the GeoIP database
if p.config.Input.GeoIPDB != nil {
db, err := geoip2.Open(*p.config.Input.GeoIPDB)
if err != nil {
return err
}
p.geoipdb = db
defer db.Close()
}

return nil
}

Expand Down Expand Up @@ -199,48 +184,41 @@ func (p *Pingbeat) Run(b *beat.Beat) error {
return nil
case <-ticker.C:
sendBatch := spool.Batch()

if p.useIPv4 {
go p.QueueRequests(state, c4, sendBatch)
}
if p.useIPv6 {
go p.QueueRequests(state, c6, sendBatch)
}

recvBatch := rpool.Batch()
for result := range sendBatch.Results() {
if err := result.Error(); err != nil {
logp.Err("Send unsuccessful: %v", err)
} else {
ping := result.Value().(*PingSent)
state.MU.Lock()
state.Pings[ping.Seq] = &PingRecord{Target: ping.Target, Sent: ping.Sent}
state.MU.Unlock()
// ping := result.Value().(icmp.Type)

switch ping.Type {
var recv pool.WorkUnit
switch result.Value().(icmp.Type) {
case ipv4.ICMPTypeEcho:
recvBatch.Queue(RecvPing(c4))
recv = rpool.Queue(RecvPing(c4))
case ipv6.ICMPTypeEchoRequest:
recvBatch.Queue(RecvPing(c6))
recv = rpool.Queue(RecvPing(c6))
default:
logp.Err("Invalid ICMP message type")
}
}
}
recvBatch.QueueComplete()

for result := range recvBatch.Results() {
if err := result.Error(); err != nil {
logp.Err("Recv unsuccessful: %v", err)
} else {
ping := result.Value().(*PingRecv)
if !ping.Loss {
target := ping.Target
state.MU.RLock()
rtt := time.Since(state.Pings[ping.Seq].Sent)
delete(state.Pings, ping.Seq)
state.MU.RUnlock()
p.ProcessPing(target, rtt)
recv.Wait()
if err := recv.Error(); err != nil {
logp.Err("Recv unsuccessful: %v", err)
} else {
ping := recv.Value().(*PingRecv)
if ping.Loss == false {
target := ping.Target
state.MU.Lock()
rtt := time.Since(state.Pings[ping.Seq].Sent)
delete(state.Pings, ping.Seq)
state.MU.Unlock()
go p.ProcessPing(target, rtt)
}
}
}
}
Expand Down Expand Up @@ -340,39 +318,6 @@ func FetchIPs(ip4addr, ip6addr chan string, target string) {
return
}

func (p *Pingbeat) QueueRequests(state *PingState, conn *icmp.PacketConn, batch pool.Batch) {
var network string
var ping_type icmp.Type
switch {
case conn.IPv4PacketConn() != nil:
ping_type = ipv4.ICMPTypeEcho
case conn.IPv4PacketConn() != nil:
ping_type = ipv6.ICMPTypeEchoRequest
default:
logp.Err("QueueRequests: Unknown connection type")
}
targets := make(map[string][2]string)
switch ping_type {
case ipv4.ICMPTypeEcho:
targets = p.ipv4targets
network = p.ipv4network
case ipv6.ICMPTypeEchoRequest:
targets = p.ipv6targets
network = p.ipv6network
default:
logp.Err("QueueTargets: Invalid ICMP message type")
}
for addr, _ := range targets {
req, err := NewPingRequest(state.GetSeqNo(), ping_type, addr, network)
if err != nil {
logp.Err("QueueTargets: %v", err)
}
batch.Queue(SendPing(conn, p.period, req, state))

}
batch.QueueComplete()
}

func (p *Pingbeat) ProcessPing(target string, rtt time.Duration) {
name, tag := p.FetchDetails(target)
event := common.MapStr{
Expand Down Expand Up @@ -404,7 +349,44 @@ func (p *Pingbeat) ProcessMissing(state *PingState) {
}
}

func SendPing(conn *icmp.PacketConn, timeout time.Duration, req *PingRequest, st *PingState) pool.WorkFunc {
func (p *Pingbeat) QueueRequests(state *PingState, conn *icmp.PacketConn, batch pool.Batch) {
var network string
var ping_type icmp.Type
switch {
case conn.IPv4PacketConn() != nil:
ping_type = ipv4.ICMPTypeEcho
case conn.IPv4PacketConn() != nil:
ping_type = ipv6.ICMPTypeEchoRequest
default:
logp.Err("QueueRequests: Unknown connection type")
}
targets := make(map[string][2]string)
switch ping_type {
case ipv4.ICMPTypeEcho:
targets = p.ipv4targets
network = p.ipv4network
case ipv6.ICMPTypeEchoRequest:
targets = p.ipv6targets
network = p.ipv6network
default:
logp.Err("QueueTargets: Invalid ICMP message type")
}
for addr, _ := range targets {
seq := state.GetSeqNo()
req, err := NewPingRequest(seq, ping_type, addr, network)
if err != nil {
logp.Err("QueueTargets: %v", err)
}
batch.Queue(SendPing(conn, p.period, req))
state.MU.Lock()
state.Pings[seq] = NewPingRecord(addr)
state.MU.Unlock()
}
batch.QueueComplete()

}

func SendPing(conn *icmp.PacketConn, timeout time.Duration, req *PingRequest) pool.WorkFunc {
return func(wu pool.WorkUnit) (interface{}, error) {
if wu.IsCancelled() {
logp.Debug("pingbeat", "SendPing: workunit cancelled")
Expand All @@ -416,12 +398,13 @@ func SendPing(conn *icmp.PacketConn, timeout time.Duration, req *PingRequest, st
if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
}
ping := &PingSent{}
ping.Seq = req.seq_no
ping.Target = req.addr.String()
ping.Type = req.ping_type
ping.Sent = time.Now().UTC()
return ping, nil
return req.ping_type, nil
// ping := &PingSent{}
// ping.Seq = req.seq_no
// ping.Target = req.addr.String()
// ping.Type = req.ping_type
// ping.Sent = time.Now().UTC()
// return ping, nil
}
}
}
Expand Down Expand Up @@ -471,7 +454,7 @@ func RecvPing(conn *icmp.PacketConn) pool.WorkFunc {
ping.LossReason = "Destination Unreachable"
var d []byte
d = rep.text_payload.Body.(*icmp.DstUnreach).Data
header, _ := ipv4.ParseHeader(d[len(d)-8:])
header, _ := ipv4.ParseHeader(d[:len(d)-8])
spew.Dump(header)
// rm, err := icmp.ParseMessage(ipv4.ICMPTypeEcho.Protocol(), d[len(d)-8:])
// spew.Dump(rm)
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

type PingConfig struct {
Period *string
GeoIPDB *string
UseIPv4 *bool
UseIPv6 *bool
Privileged *bool
Expand Down
2 changes: 2 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ import:
- ipv6
- package: gopkg.in/go-playground/pool.v3
version: ^3.1.0
- package: github.com/oschwald/geoip2-golang
version: ^0.1.0
Loading

0 comments on commit 29fd48c

Please sign in to comment.