Skip to content

Commit

Permalink
add code and README
Browse files Browse the repository at this point in the history
  • Loading branch information
jaffee committed Jan 8, 2018
1 parent ed0bf32 commit 4d239f9
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 0 deletions.
70 changes: 70 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
`picap` is an example use case which collects network traffic and indexes it in
Pilosa. It uses tools from the [PDK](https://github.com/pilosa/pdk) to do this,
and so might be a nice example if you want to use the PDK in your own
architecture.

Note: a version of this functionality used to be included in the PDK and invoked
as `pdk net`. It has been separated because of dependencies which made use of
the PDK as a whole somewhat cumbersome. Namely, it requires libpcap development
headers and doesn't cross-compile easily due to reliance on the gopacket
library.

### Pre-requisites
Install [Go](https://golang.org/doc/install), [dep](https://github.com/golang/dep#setup), and [Pilosa](https://www.pilosa.com/docs/latest/installation/).

### Install
`go get github.com/pilosa/picap`
`cd $GOPATH/src/github.com/pilosa/picap`
`dep ensure`
`go install ./cmd/picap`

### Use
You must be running a Pilosa cluster.

See `picap -h` for command line usage.


### Functionality
When invoked, `picap` reads network packet data, either from an interface, or a
pcap file. It extracts information from each packet and indexes that information
in Pilosa. Each packet is assigned a new column in Pilosa, and picap extracts a
variety of fields, not all of which are necessarily present. See the
`picap.Packet` struct for the most up to date description of what data is
extracted.

Picap uses a PDK Translator to maintain mappings between values and their Pilosa
IDs. It also starts a proxy server which may be queried in place of Pilosa and
will map back and forth between values and Pilosa IDs. For example:

```
12:52:04~$ curl -XPOST localhost:11000/index/net/query -d'TopN(frame=http-hostname, n=3)' | jq
{
"results": [
[
{
"Key": "pilosa.com",
"Count": 1
},
{
"Key": "example.com",
"Count": 1
},
{
"Key": "readthedocs.org",
"Count": 1
}
]
]
}
```

Normally the "Key" values in a TopN response are integers, but the proxy has mapped them back to the hostnames that they represent. Similarly on the query side:
```
curl -XPOST localhost:11000/index/net/query -d'Count(Bitmap(frame=http-hostname, rowID="pilosa.com"))' | jq
{
"results": [
1
]
}
```
17 changes: 17 additions & 0 deletions cmd/picap/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package main

import (
"log"
"os"

"github.com/jaffee/commandeer/cobrafy"
"github.com/pilosa/picap"
)

func main() {
err := cobrafy.Execute(picap.NewMain())
if err != nil {
log.Fatalf("executing picap: %v", err)
os.Exit(1)
}
}
227 changes: 227 additions & 0 deletions picap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package picap

import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync/atomic"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/pilosa/pdk"
"github.com/pkg/errors"
)

type Main struct {
Iface string `help:"Interface on which to listen."`
Filename string `help:"File containing pcap data to read."`
Snaplen int32 `help:"Maximum number of bytes to capture per packet."`
Promisc bool `help:"Put capture interface into promiscuous mode."`
Timeout time.Duration `help:"Timeout for capturing packets."`
Concurrency int `help:"Number of goroutines parsing packets."`
PilosaHosts []string `help:"Comma separated list of pilosa host:port"`
Filter string `help:"BPF style filter for packet capture."`
Index string `help:"Pilosa index name."`
BindAddr string `help:"Local address for mapping proxy to bind."`
BufSize int `help:"Buffer size for Pilosa importer."`
MappingDir string `help:"Directory to store mapping data. Empty string uses a temp dir."`
Debug bool `help:"Turn on debug logging."`
Translator string `help:"How to store mappings. In memory(mem) or LevelDB(level)."`
}

// NewMain constructs a Main with default values.
func NewMain() *Main {
return &Main{
Iface: "en0",
Snaplen: 1500,
Timeout: time.Millisecond,
Concurrency: 1,
PilosaHosts: []string{"localhost:10101"},
Index: "net",
BindAddr: "localhost:11000",
BufSize: 100000,
Translator: "mem",
}
}

func (m *Main) Run() error {
src, err := m.NewNetSource()
if err != nil {
return errors.Wrap(err, "getting new net source")
}
src.debug = m.Debug
np := pdk.NewDefaultGenericParser()
// np.IncludeMethods = true
// np.SkipMethods["Reverse"] = struct{}{}
// np.SkipMethods["UTC"] = struct{}{}
// np.SkipMethods["Local"] = struct{}{}
nm := pdk.NewCollapsingMapper()

if m.MappingDir == "" {
m.MappingDir, err = ioutil.TempDir("", "")
if err != nil {
return errors.Wrap(err, "getting temp dir for mapping")
}
log.Printf("storing mapping data in %v", m.MappingDir)
}
if m.Translator == "level" {
lt, err := pdk.NewLevelTranslator(m.MappingDir)
if err != nil {
return errors.Wrap(err, "getting level translator")
}
nm.Translator = lt
} else if m.Translator != "mem" {
return errors.Errorf("unknown translator type: '%s'", m.Translator)
}

index, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, nil, uint(m.BufSize))
if err != nil {
return errors.Wrap(err, "setting up pilosa")
}
go func() {
err := pdk.StartMappingProxy(m.BindAddr, pdk.NewPilosaForwarder(m.PilosaHosts[0], nm.Translator))
log.Printf("starting mapping proxy: %v", err)
}()

ingester := pdk.NewIngester(src, np, nm, index)
return ingester.Run()
}

func (m *Main) NewNetSource() (*NetSource, error) {
var h *pcap.Handle
var err error
if m.Filename != "" {
h, err = pcap.OpenOffline(m.Filename)
} else {
h, err = pcap.OpenLive(m.Iface, m.Snaplen, m.Promisc, m.Timeout)
}
if err != nil {
return nil, fmt.Errorf("open error: %v", err)
}

err = h.SetBPFFilter(m.Filter)
if err != nil {
return nil, fmt.Errorf("error setting bpf filter: %v", err)
}
packetSource := gopacket.NewPacketSource(h, h.LinkType())
packets := packetSource.Packets()
num := uint64(0)
np := &NetSource{
num: &num,
packets: packets,
}
return np, nil
}

type NetSource struct {
num *uint64
debug bool
packets chan gopacket.Packet
}

func (n *NetSource) Record() (interface{}, error) {
atomic.AddUint64(n.num, 1)
num := atomic.LoadUint64(n.num)
if n.debug && num%1000 == 20 {
log.Println("Record has reported", num, "packets")
}
return reifyPacket(<-n.packets)
}

type Packet struct {
Length int
NetProto string
NetSrc string
NetDst string

TransProto string
TransSrc string
TransDst string

TCP struct {
FIN bool
SYN bool
RST bool
PSH bool
ACK bool
URG bool
ECE bool
CWR bool
NS bool
}

AppProto string

HTTP struct {
Hostname string
UserAgent string
Method string
}
}

func reifyPacket(pkt gopacket.Packet) (*Packet, error) {
pr := &Packet{}
if errl := pkt.ErrorLayer(); errl != nil && errl.Error() != nil {
return pr, errors.Wrap(errl.Error(), "decoding packet")
}
pr.Length = pkt.Metadata().Length

netLayer := pkt.NetworkLayer()
if netLayer == nil {
return pr, nil
}
netProto := netLayer.LayerType()
pr.NetProto = netProto.String()
netFlow := netLayer.NetworkFlow()
netSrc, netDst := netFlow.Endpoints()
pr.NetSrc = netSrc.String()
pr.NetDst = netDst.String()

transLayer := pkt.TransportLayer()
if transLayer == nil {
return pr, nil
}
transProto := transLayer.LayerType()
pr.TransProto = transProto.String()
transFlow := transLayer.TransportFlow()
transSrc, transDst := transFlow.Endpoints()
pr.TransSrc = transSrc.String()
pr.TransDst = transDst.String()

if tcpLayer, ok := transLayer.(*layers.TCP); ok {
pr.TCP.FIN = tcpLayer.FIN
pr.TCP.SYN = tcpLayer.SYN
pr.TCP.RST = tcpLayer.RST
pr.TCP.PSH = tcpLayer.PSH
pr.TCP.ACK = tcpLayer.ACK
pr.TCP.URG = tcpLayer.URG
pr.TCP.ECE = tcpLayer.ECE
pr.TCP.CWR = tcpLayer.CWR
pr.TCP.NS = tcpLayer.NS
}
appLayer := pkt.ApplicationLayer()
if appLayer != nil {
appProto := appLayer.LayerType()
pr.AppProto = appProto.String()
appBytes := appLayer.Payload()
buf := bytes.NewBuffer(appBytes)
req, err := http.ReadRequest(bufio.NewReader(buf))
if err == nil {
pr.HTTP.UserAgent = req.UserAgent()
pr.HTTP.Method = req.Method
pr.HTTP.Hostname = req.Host
} else {
// try HTTP response?
// resp, err := http.ReadResponse(bufio.NewReader(buf))
// if err == nil {
// }
}
}
return pr, nil

}

0 comments on commit 4d239f9

Please sign in to comment.