Permalink
Browse files

pcapserver: Allow to handle PCAP frames over a zeromq interface

The moiji-mobile pcap capture client and collection server has a
ZeroMQ based interface to receive the datalink header and the
captured payload. Add support for this interface.

With each frame the datalink type can change. Modify the decoder
to be able to update itself on each iteration. The switch looks
cheap enough to execute per packet instead of trying to check if
the right type has already been set (and avoids storing the original
type).

Use by:

packetbeat.interfaces.type: pcapserver
packetbeat.interfaces.device: tcp://localhost:6666
packetbeat.interfaces.subscription: data.v1
  • Loading branch information...
1 parent 32446ed commit f317fbe988e47aa7d2e271f3705d7aabbba3ddb3 @zecke committed Aug 6, 2016
@@ -28,6 +28,7 @@ type InterfacesConfig struct {
Dumpfile string
OneAtATime bool
Loop int
+ Subscription string
}
type Flows struct {
@@ -54,6 +54,24 @@ const (
netBytesTotalCounter = "net_bytes_total"
)
+func (d *DecoderStruct) updateDataLink(datalink layers.LinkType) error {
+ switch datalink {
+ case layers.LinkTypeLinuxSLL:
+ d.linkLayerDecoder = &d.sll
+ d.linkLayerType = layers.LayerTypeLinuxSLL
+ case layers.LinkTypeEthernet:
+ d.linkLayerDecoder = &d.eth
+ d.linkLayerType = layers.LayerTypeEthernet
+ case layers.LinkTypeNull: // loopback on OSx
+ d.linkLayerDecoder = &d.lo
+ d.linkLayerType = layers.LayerTypeLoopback
+ default:
+ return fmt.Errorf("Unsupported link type: %s", datalink.String())
+ }
+
+ return nil
+}
+
// Creates and returns a new DecoderStruct.
func NewDecoder(
f *flows.Flows,
@@ -97,18 +115,9 @@ func NewDecoder(
debugf("Layer type: %s", datalink.String())
- switch datalink {
- case layers.LinkTypeLinuxSLL:
- d.linkLayerDecoder = &d.sll
- d.linkLayerType = layers.LayerTypeLinuxSLL
- case layers.LinkTypeEthernet:
- d.linkLayerDecoder = &d.eth
- d.linkLayerType = layers.LayerTypeEthernet
- case layers.LinkTypeNull: // loopback on OSx
- d.linkLayerDecoder = &d.lo
- d.linkLayerType = layers.LayerTypeLoopback
- default:
- return nil, fmt.Errorf("Unsupported link type: %s", datalink.String())
+ err := d.updateDataLink(datalink)
+ if err != nil {
+ return nil, err
}
return &d, nil
@@ -130,11 +139,19 @@ func (d *DecoderStruct) AddLayers(layers []gopacket.DecodingLayer) {
}
}
-func (d *DecoderStruct) OnPacket(data []byte, ci *gopacket.CaptureInfo) {
+func (d *DecoderStruct) OnPacket(datalink *layers.LinkType, data []byte, ci *gopacket.CaptureInfo) {
defer logp.Recover("packet decoding failed")
d.truncated = false
+ if datalink != nil {
+ err := d.updateDataLink(*datalink)
+ if err != nil {
+ logp.Err("Updating data link failed: %v", err)
+ return
+ }
+ }
+
current := d.linkLayerDecoder
currentType := d.linkLayerType
@@ -25,6 +25,8 @@ Currently Packetbeat has several options for traffic capturing:
* `pf_ring`, which makes use of an ntop.org
http://www.ntop.org/products/pf_ring/[project]. This setting provides the best
sniffing speed, but it requires a kernel module, and it's Linux-specific.
+ * `pcapserver`, which connects to a
+ http://github.com/moiji-mobile/pcap-client-server[moiji-mobile pcap storage server].
The `pf_ring` option is a good configuration to use when you have
dedicated servers for Packetbeat. It provides sniffing speeds in the order of
@@ -101,6 +101,8 @@ Packetbeat supports three sniffer types:
* `pf_ring`, which makes use of an ntop.org
http://www.ntop.org/products/pf_ring/[project]. This setting provides the best
sniffing speed, but it requires a kernel module, and it's Linux-specific.
+ * `pcapserver`, which connects to a
+ http://github.com/moiji-mobile/pcap-client-server[moiji-mobile pcap storage server].
The default sniffer type is `pcap`.
@@ -170,6 +172,13 @@ you use this setting, it's your responsibility to keep the BPF filters in sync w
ports defined in the `protocols` section.
+===== subscription
+
+Used by the pcapserver code to select which messages of the publisher to subscriber.
+By default data.v1 will be used but it allows by selecting the prefix of a client or
+a set of clients.
+
+
[[configuration-flows]]
=== Flows Configuration
@@ -0,0 +1,73 @@
+package sniffer
+
+import (
+ "github.com/tsg/gopacket"
+ "github.com/tsg/gopacket/layers"
+ "fmt"
+ "time"
+ "encoding/binary"
+ zmq "github.com/pebbe/zmq4"
+)
+
+type PcapSubscriber struct {
+ sub *zmq.Socket
+ datalink layers.LinkType
+ endian binary.ByteOrder
+}
+
+func NewPcapServerSubscriber(device string, subscription string) (*PcapSubscriber, error) {
+
+ sock, err := zmq.NewSocket(zmq.SUB)
+ if err != nil {
+ return nil, err
+ }
+
+ fmt.Printf("Using remote server %v with subscription %v\n", device, subscription)
+ sock.Connect(device)
+ sock.SetSubscribe(subscription)
+
+ return &PcapSubscriber{sub:sock, endian:binary.LittleEndian}, nil
+}
+
+func (sub* PcapSubscriber) extractLink(data []byte) layers.LinkType {
+ // struct pcap_file_header {
+ // 0-3 bpf_u_int32 magic;
+ // 4-5 u_short version_major;
+ // 6-7 u_short version_minor;
+ // 8-11 bpf_int32 thiszone; /* gmt to local correction */
+ // 12-15 bpf_u_int32 sigfigs; /* accuracy of timestamps */
+ // 16-19 bpf_u_int32 snaplen; /* max length saved portion of each pkt */
+ // 20-23 bpf_u_int32 linktype; /* data link type (LINKTYPE_*) */
+ // };
+
+ // TODO: use magic to determine endian? Assume LittleEndian now
+ return layers.LinkType(sub.endian.Uint32(data[20:24]))
+}
+
+func (sub *PcapSubscriber) extractData(pkt []byte) (data []byte, ci gopacket.CaptureInfo) {
+ ci.Timestamp = time.Unix(
+ int64(sub.endian.Uint32(pkt[0:4])),
+ int64(sub.endian.Uint32(pkt[4:8])) * 1000)
+ ci.CaptureLength = int(sub.endian.Uint32(pkt[8:12]))
+ ci.Length = int(sub.endian.Uint32(pkt[12:16]))
+ return pkt[16:], ci
+}
+
+func (sub *PcapSubscriber) Close() {
+ sub.sub.Close()
+}
+
+func (sub *PcapSubscriber) ReadPacketData() (data []byte, ci gopacket.CaptureInfo, err error) {
+ msg, _ := sub.sub.RecvMessageBytes(0)
+
+ // msg[0] == client name...
+ // msg[1] == pcap_file_header
+ // msg[2] == pcap_pkthdr + data
+ sub.datalink = sub.extractLink(msg[1])
+ data, ci = sub.extractData(msg[2])
+ return data, ci, nil
+}
+
+func (sub *PcapSubscriber) LastLinkType() *layers.LinkType {
+ return &sub.datalink
+}
@@ -25,6 +25,9 @@ type SnifferSetup struct {
isAlive bool
dumper *pcap.Dumper
+ // moiji-mobile pcap zeromq interface
+ pcapSubscriber *PcapSubscriber
+
// bpf filter
filter string
@@ -34,7 +37,7 @@ type SnifferSetup struct {
}
type Worker interface {
- OnPacket(data []byte, ci *gopacket.CaptureInfo)
+ OnPacket(linkType *layers.LinkType, data []byte, ci *gopacket.CaptureInfo)
}
type WorkerFactory func(layers.LinkType) (Worker, string, error)
@@ -209,6 +212,14 @@ func (sniffer *SnifferSetup) setFromConfig(config *config.InterfacesConfig) erro
sniffer.DataSource = gopacket.PacketDataSource(sniffer.pfringHandle)
+ case "pcapserver":
+ sniffer.pcapSubscriber, err = NewPcapServerSubscriber(
+ sniffer.config.Device,
+ sniffer.config.Subscription)
+ if err != nil {
+ return err
+ }
+ sniffer.DataSource = gopacket.PacketDataSource(sniffer.pcapSubscriber)
default:
return fmt.Errorf("Unknown sniffer type: %s", sniffer.config.Type)
}
@@ -238,6 +249,10 @@ func (sniffer *SnifferSetup) Datalink() layers.LinkType {
if sniffer.config.Type == "pcap" {
return sniffer.pcapHandle.LinkType()
}
+ if sniffer.config.Type == "pcapserver" {
+ // begin with a dummy one
+ return layers.LinkTypeEthernet
+ }
return layers.LinkTypeEthernet
}
@@ -287,6 +302,11 @@ func (sniffer *SnifferSetup) Run() error {
data, ci, err := sniffer.DataSource.ReadPacketData()
+ var linkType *layers.LinkType
+ if sniffer.pcapSubscriber != nil {
+ linkType = sniffer.pcapSubscriber.LastLinkType()
+ }
+
if err == pcap.NextErrorTimeoutExpired || err == syscall.EINTR {
logp.Debug("sniffer", "Interrupted")
continue
@@ -347,7 +367,7 @@ func (sniffer *SnifferSetup) Run() error {
}
logp.Debug("sniffer", "Packet number: %d", counter)
- sniffer.worker.OnPacket(data, &ci)
+ sniffer.worker.OnPacket(linkType, data, &ci)
}
logp.Info("Input finish. Processed %d packets. Have a nice day!", counter)
@@ -367,6 +387,8 @@ func (sniffer *SnifferSetup) Close() error {
sniffer.afpacketHandle.Close()
case "pfring", "pf_ring":
sniffer.pfringHandle.Close()
+ case "pcapserver":
+ sniffer.pcapSubscriber.Close()
}
return nil
}

0 comments on commit f317fbe

Please sign in to comment.