Skip to content

Commit

Permalink
Reduce tcpassembly code
Browse files Browse the repository at this point in the history
Todo check for gaps
  • Loading branch information
negbie committed Jul 28, 2018
1 parent ecde733 commit 9743d9f
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 194 deletions.
21 changes: 8 additions & 13 deletions decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/coocood/freecache"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/reassembly"
"github.com/google/gopacket/tcpassembly"
"github.com/negbie/heplify/config"
"github.com/negbie/heplify/ip4defrag"
"github.com/negbie/heplify/ip6defrag"
Expand All @@ -19,7 +19,7 @@ import (
)

type Decoder struct {
asm *reassembly.Assembler
asm *tcpassembly.Assembler
defrag4 *ip4defrag.IPv4Defragmenter
defrag6 *ip6defrag.IPv6Defragmenter
parser *gopacket.DecodingLayerParser
Expand Down Expand Up @@ -109,9 +109,11 @@ func NewDecoder(datalink layers.LinkType) *Decoder {
// TODO: make a flag for this
debug.SetGCPercent(50)

streamFactory := &tcpStreamFactory{}
streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool)
streamFactory := &sipStreamFactory{}
streamPool := tcpassembly.NewStreamPool(streamFactory)
assembler := tcpassembly.NewAssembler(streamPool)
assembler.MaxBufferedPagesPerConnection = 1
assembler.MaxBufferedPagesTotal = 1

decoder := gopacket.NewDecodingLayerParser(
lt, &sll, &d1q, &gre, &eth, &ip4, &ip6, &tcp, &udp, &dns, &payload,
Expand Down Expand Up @@ -332,14 +334,7 @@ func (d *Decoder) processTransport(foundLayerTypes *[]gopacket.LayerType, udp *l
logp.Debug("payload", "TCP:\n%s", pkt)

if config.Cfg.Reassembly {
d.asm.AssembleWithContext(flow, tcp, &Context{CaptureInfo: *ci})
flushOptions := reassembly.FlushOptions{
T: ci.Timestamp.Add(-2 * time.Second),
TC: ci.Timestamp.Add(-1 * time.Second),
}
if d.tcpCount%128 == 0 {
d.asm.FlushWithOptions(flushOptions)
}
d.asm.AssembleWithTimestamp(flow, tcp, ci.Timestamp)
return
}

Expand Down
180 changes: 0 additions & 180 deletions decoder/tcpassembler.go

This file was deleted.

106 changes: 106 additions & 0 deletions decoder/tcpassembly.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package decoder

import (
"bufio"
"bytes"
"encoding/binary"
"io"
"io/ioutil"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/tcpassembly"
"github.com/google/gopacket/tcpassembly/tcpreader"
"github.com/negbie/heplify/config"
"github.com/negbie/logp"
)

type sipStreamFactory struct{}

type sipStream struct {
net, transport gopacket.Flow
reader tcpreader.ReaderStream
}

func (s *sipStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {
st := &sipStream{
net: net,
transport: transport,
reader: tcpreader.NewReaderStream(),
}
go st.run()
return &st.reader
}

func (s *sipStream) run() {
splitPackets(s.net, s.transport, &s.reader)
io.Copy(ioutil.Discard, &s.reader)
}

func splitPackets(netFlow, transFlow gopacket.Flow, r io.Reader) {
scanner := bufio.NewScanner(r)
scanner.Split(scanSIP)
for scanner.Scan() {
ts := time.Now()
pkt := &Packet{}
pkt.Version = 0x02
pkt.Protocol = 0x06
pkt.SrcIP = netFlow.Src().Raw()
pkt.DstIP = netFlow.Dst().Raw()
sp := transFlow.Src().Raw()
dp := transFlow.Dst().Raw()
if len(sp) == 2 && len(dp) == 2 {
pkt.SrcPort = binary.BigEndian.Uint16(sp)
pkt.DstPort = binary.BigEndian.Uint16(dp)
}
if len(pkt.SrcIP) > 4 || len(pkt.DstIP) > 4 {
pkt.Version = 0x0a
}
pkt.Tsec = uint32(ts.Unix())
pkt.Tmsec = uint32(ts.Nanosecond() / 1000)
pkt.NodeID = uint32(config.Cfg.HepNodeID)
pkt.NodePW = []byte(config.Cfg.HepNodePW)
pkt.Payload = scanner.Bytes()
if bytes.Contains(pkt.Payload, []byte("CSeq")) {
pkt.ProtoType = 1
PacketQueue <- pkt
cacheSDPIPPort(pkt.Payload)
}
logp.Debug("tcpassembly", "%s", pkt)
}
}

func scanSIP(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}

for k := range startSIP {
if bytes.HasPrefix(data, startSIP[k]) && bytes.HasSuffix(data, []byte("\r\n")) || bytes.HasSuffix(data, []byte("\r\n\r\n")) {
return len(data), data, nil
}
}

if atEOF {
return len(data), data, nil
}
return 0, nil, nil
}

var startSIP = [][]byte{
[]byte("INVITE "),
[]byte("REGISTER "),
[]byte("ACK "),
[]byte("BYE "),
[]byte("CANCEL "),
[]byte("OPTIONS "),
[]byte("INFO "),
[]byte("PRACK "),
[]byte("SUBSCRIBE "),
[]byte("NOTIFY "),
[]byte("UPDATE "),
[]byte("MESSAGE "),
[]byte("REFER "),
[]byte("PUBLISH "),
[]byte("SIP/"),
}
Binary file added example/pcap/sip_ipv4_tcp_fragmented.pcap
Binary file not shown.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func init() {
flag.UintVar(&config.Cfg.HepNodeID, "hi", 2002, "HEP node ID")
flag.StringVar(&config.Cfg.Network, "nt", "udp", "Network types are [udp, tcp, tls]")
flag.BoolVar(&config.Cfg.Protobuf, "protobuf", false, "Use Protobuf on wire")
flag.BoolVar(&config.Cfg.Reassembly, "tcpassembly", false, "If true, tcp assembly will be enabled")
flag.BoolVar(&config.Cfg.Reassembly, "tcpassembly", false, "If true, tcpassembly will be enabled")
flag.Parse()

config.Cfg.Iface = &ifaceConfig
Expand Down

0 comments on commit 9743d9f

Please sign in to comment.