Skip to content

Commit

Permalink
Capture and mux packets on all interfaces. (#12)
Browse files Browse the repository at this point in the history
* Muxes packets from all interfaces
  • Loading branch information
pboothe committed Oct 31, 2019
1 parent 659514d commit d75cf83
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 18 deletions.
47 changes: 29 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,22 @@ package main
import (
"context"
"flag"
"net"
"os"
"sync"
"time"

"github.com/m-lab/go/anonymize"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"

"github.com/m-lab/go/flagx"
"github.com/m-lab/go/prometheusx"
"github.com/m-lab/go/rtx"
"github.com/m-lab/go/warnonerror"
"github.com/m-lab/packet-headers/demuxer"
"github.com/m-lab/packet-headers/muxer"
"github.com/m-lab/packet-headers/tcpinfohandler"
"github.com/m-lab/tcp-info/eventsocket"
)
Expand All @@ -28,26 +29,44 @@ var (
captureDuration = flag.Duration("captureduration", 30*time.Second, "Only save the first captureduration of each flow, to prevent long-lived flows from spamming the hard drive.")
flowTimeout = flag.Duration("flowtimeout", 30*time.Second, "Once there have been no packets for a flow for at least flowtimeout, the flow can be assumed to be closed.")
maxHeaderSize = flag.Int("maxheadersize", 256, "The maximum size of packet headers allowed. A lower value allows the pcap process to be less wasteful but risks more esoteric IPv6 headers (which can theoretically be up to the full size of the packet but in practice seem to be under 128) getting truncated.")
netInterface = flag.String("interface", "eth0", "The interface on which to capture packets.")

interfaces flagx.StringArray

// Context and injected variables to allow smoke testing of main()
mainCtx, mainCancel = context.WithCancel(context.Background())
pcapOpenLive = pcap.OpenLive
)

func init() {
flag.Var(&interfaces, "interface", "The interface on which to capture traffic. May be repeated. If unset, will capture on all available interfaces.")
}

func main() {
defer mainCancel()

flag.Parse()
rtx.Must(flagx.ArgsFromEnv(flag.CommandLine), "Could not get args from env")

defer mainCancel()
// Special case for argument "-interface": if no interface was specified,
// then all of them were implicitly specified. If new interfaces are created
// after capture is started, traffic on those interfaces will be ignored.
if len(interfaces) == 0 {
ifaces, err := net.Interfaces()
rtx.Must(err, "Could not list interfaces")
for _, iface := range ifaces {
interfaces = append(interfaces, iface.Name)
}
}

psrv := prometheusx.MustServeMetrics()
defer warnonerror.Close(psrv, "Could not stop metric server")

rtx.Must(os.Chdir(*dir), "Could not cd to directory %q", *dir)

// A waitgroup to make sure main() doesn't exit before all its components
// A waitgroup to make sure main() doesn't return before all its components
// get cleaned up.
cleanupWG := sync.WaitGroup{}
defer cleanupWG.Wait()

// Get ready to save the incoming packets to files.
tcpdm := demuxer.NewTCP(anonymize.New(anonymize.IPAnonymizationFlag), *dir, *captureDuration)
Expand All @@ -60,28 +79,20 @@ func main() {
cleanupWG.Done()
}()

// Open a packet capture
handle, err := pcapOpenLive(*netInterface, int32(*maxHeaderSize), true, pcap.BlockForever)
rtx.Must(err, "Could not create libpcap client")
rtx.Must(handle.SetBPFFilter("tcp"), "Could not set up BPF filter for TCP")
// Stop packet capture when the context is canceled.
// A channel with a buffer to prevent tight coupling of captures with the demuxer.TCP goroutine's read loop.
packets := make(chan gopacket.Packet, 1000)

// Capture packets on every interface.
cleanupWG.Add(1)
go func() {
<-mainCtx.Done()
handle.Close()
muxer.MustCaptureTCPOnInterfaces(mainCtx, interfaces, packets, pcapOpenLive, int32(*maxHeaderSize))
cleanupWG.Done()
}()

// Set up the packet capture.
packetSource := gopacket.NewPacketSource(handle, layers.LinkTypeEthernet)

// Set up the timer for flow timeouts.
flowTimeoutTicker := time.NewTicker(*flowTimeout)
defer flowTimeoutTicker.Stop()

// Capture packets forever, or until mainCtx is cancelled.
tcpdm.CapturePackets(mainCtx, packetSource.Packets(), flowTimeoutTicker.C)

// Wait until all cleanup routines have terminated.
cleanupWG.Wait()
tcpdm.CapturePackets(mainCtx, packets, flowTimeoutTicker.C)
}
7 changes: 7 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,12 @@ var (
[]string{"reason"},
)

InterfacesBeingCaptured = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "pcap_muxer_interfaces_with_captures",
Help: "How many interfaces currently have a packet capture running on them, according to the muxer.",
},
)

// TODO(https://github.com/m-lab/packet-headers/issues/5) Create some histograms for SLIs
)
75 changes: 75 additions & 0 deletions muxer/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Package muxer helps solve the problem that captures take place only on a
// per-interface basis, but tcp-info collects flow information with no reference
// to the underlying interface.
package muxer

import (
"context"
"sync"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/m-lab/go/rtx"
"github.com/m-lab/packet-headers/metrics"
)

func forwardPackets(ctx context.Context, in <-chan gopacket.Packet, out chan<- gopacket.Packet, wg *sync.WaitGroup) {
defer wg.Done()
metrics.InterfacesBeingCaptured.Inc()
defer metrics.InterfacesBeingCaptured.Dec()

for {
select {
case p, ok := <-in:
if !ok {
return
}
out <- p
case <-ctx.Done():
return
}
}
}

// muxPackets causes each packet on every input channel to be sent to the output channel.
func muxPackets(ctx context.Context, in []<-chan gopacket.Packet, out chan<- gopacket.Packet) {
wg := sync.WaitGroup{}
for _, inC := range in {
wg.Add(1)
go forwardPackets(ctx, inC, out, &wg)
}

wg.Wait()
close(out)
}

// PcapHandleOpener is a type to allow injection of fake packet captures to aid
// in testing. It is exactly the type of pcap.OpenLive, and in production code
// every variable of this type should be set to pcap.OpenLive.
type PcapHandleOpener func(device string, snaplen int32, promisc bool, timeout time.Duration) (handle *pcap.Handle, _ error)

// MustCaptureTCPOnInterfaces fires off a packet capture on every one of the
// passed-in list of interfaces, and then muxes the resulting packet streams to
// all be sent to the passed-in packets channel.
func MustCaptureTCPOnInterfaces(ctx context.Context, interfaces []string, packets chan<- gopacket.Packet, opener PcapHandleOpener, maxHeaderSize int32) {
// Capture packets on every interface.
packetCaptures := make([]<-chan gopacket.Packet, 0)
for _, iface := range interfaces {
// Open a packet capture
handle, err := opener(iface, maxHeaderSize, true, pcap.BlockForever)
rtx.Must(err, "Could not create libpcap client for %q", iface)
rtx.Must(handle.SetBPFFilter("tcp"), "Could not set up BPF filter for TCP")

// Stop packet capture when this function exits.
defer handle.Close()

// Save the packet capture channel.
packetCaptures = append(packetCaptures, gopacket.NewPacketSource(handle, layers.LinkTypeEthernet).Packets())
}

// multiplex packets until all packet sources are exhausted or the context
// is cancelled.
muxPackets(ctx, packetCaptures, packets)
}
109 changes: 109 additions & 0 deletions muxer/interfaces_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package muxer

import (
"context"
"sync"
"testing"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/pcap"
"github.com/m-lab/go/rtx"
)

func channelFromFile(fname string) <-chan gopacket.Packet {
// Get packets from a wireshark-produced pcap file.
handle, err := pcap.OpenOffline(fname)
rtx.Must(err, "Could not open golden pcap file %s", fname)
ps := gopacket.NewPacketSource(handle, handle.LinkType())
return ps.Packets()
}

func TestMuxPacketsUntilSourceExhaustion(t *testing.T) {
// Open our two testfiles
ins := []<-chan gopacket.Packet{
channelFromFile("../testdata/v4.pcap"),
channelFromFile("../testdata/v6.pcap"),
}
out := make(chan gopacket.Packet)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
// Mux the packets from each.
muxPackets(context.Background(), ins, out)
wg.Done()
}()
// The only way that this will close is if we exhaust all the input channels
// and they each close. So let's do that.
pcount := 0
for range out {
pcount++
}
wg.Wait()
// Verify that the combined flow contains the right number of packets.
if pcount != 20 {
t.Errorf("pcount should be 20, not %d", pcount)
}
}

func TestMuxPacketsUntilContextCancellation(t *testing.T) {
ins := []<-chan gopacket.Packet{
make(chan gopacket.Packet),
make(chan gopacket.Packet),
}
out := make(chan gopacket.Packet)
wg := sync.WaitGroup{}
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
go func() {
// Mux the packets from each.
muxPackets(ctx, ins, out)
wg.Done()
}()
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
// The input channels will never close, so only context cancellation will work.
pcount := 0
for range out {
pcount++
}
wg.Wait()
// If we got to here, then muxPackets terminated! Hooray!

// Verify that the combined flow contained no packets.
if pcount != 0 {
t.Errorf("pcount should be 0, not %d", pcount)
}

}

func fakePcapOpenLive(filename string, _ int32, _ bool, _ time.Duration) (*pcap.Handle, error) {
return pcap.OpenOffline(filename)
}

func TestMustCaptureOnInterfaces(t *testing.T) {
wg := sync.WaitGroup{}
packets := make(chan gopacket.Packet)
wg.Add(1)
go func() {
MustCaptureTCPOnInterfaces(
context.Background(),
[]string{"../testdata/v4.pcap", "../testdata/v6.pcap"},
packets,
fakePcapOpenLive,
0,
)
wg.Done()
}()

count := 0
for range packets {
count++
}
wg.Wait()
if count != 20 {
t.Errorf("Was supposed to see 20 packets, but instead saw %d", count)
}
}

0 comments on commit d75cf83

Please sign in to comment.