/
source.go
107 lines (84 loc) · 2.38 KB
/
source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package tcp
import (
"fmt"
"time"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/tcpassembly"
"github.com/rename-this/vhs/capture"
"github.com/rename-this/vhs/core"
)
// NewSource creates a new TCP source.
func NewSource(_ core.Context) (core.Source, error) {
return &tcpSource{
streams: make(chan core.InputReader),
}, nil
}
type tcpSource struct {
streams chan core.InputReader
}
func (s *tcpSource) Streams() <-chan core.InputReader {
return s.streams
}
func (s *tcpSource) Init(ctx core.Context) {
s.read(ctx, capture.NewCapture, capture.NewListener)
}
type (
newCaptureFn func(addr string, response bool) (*capture.Capture, error)
newListenerFn func(*capture.Capture) capture.Listener
)
func (s *tcpSource) read(ctx core.Context, newCapture newCaptureFn, newListener newListenerFn) {
ctx.Logger = ctx.Logger.With().
Str(core.LoggerKeyComponent, "tcp_source").
Logger()
ctx.Logger.Debug().Msg("read")
cap, err := newCapture(ctx.FlowConfig.Addr, ctx.FlowConfig.CaptureResponse)
if err != nil {
ctx.Errors <- fmt.Errorf("failed to initialize capture: %w", err)
return
}
ctx.Logger.Debug().Interface("cap", cap).Msg("capture created")
listener := newListener(cap)
defer listener.Close()
go listener.Listen(ctx)
var (
factory = newStreamFactory(ctx, s.streams)
pool = tcpassembly.NewStreamPool(factory)
assembler = tcpassembly.NewAssembler(pool)
ticker = time.Tick(ctx.FlowConfig.TCPTimeout)
complete = time.After(ctx.FlowConfig.SourceDuration)
packets = listener.Packets()
)
for {
select {
case packet := <-packets:
if packet == nil {
if ctx.Config.DebugPackets {
ctx.Logger.Debug().Msg("nil packet")
}
return
}
if packet.NetworkLayer() == nil ||
packet.TransportLayer() == nil ||
packet.TransportLayer().LayerType() != layers.LayerTypeTCP {
if ctx.Config.DebugPackets {
ctx.Logger.Debug().Str("p", packet.String()).Msg("wrong packet layers")
}
continue
}
var (
tcp = packet.TransportLayer().(*layers.TCP)
flow = packet.NetworkLayer().NetworkFlow()
)
assembler.AssembleWithTimestamp(flow, tcp, time.Now())
case <-ticker:
ctx.Logger.Debug().Msg("flushing old streams")
assembler.FlushOlderThan(time.Now().Add(-ctx.FlowConfig.TCPTimeout))
factory.prune()
case <-complete:
factory.Close()
return
case <-ctx.StdContext.Done():
return
}
}
}