Skip to content

Commit

Permalink
Merge c301fdd into 163dd10
Browse files Browse the repository at this point in the history
  • Loading branch information
pboothe committed Dec 6, 2019
2 parents 163dd10 + c301fdd commit 5d65c74
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ before_script:
script:
- go vet ./...
- go build ./...
- go test ./... -cover=1 -coverprofile=_c.cov
- go test ./... -cover=1 -coverprofile=_c.cov -race
- $GOPATH/bin/goveralls -service=travis-ci -coverprofile=_c.cov
15 changes: 15 additions & 0 deletions demuxer/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type TCP struct {
// saver.TCP objects are finalized.
currentFlows map[FlowKey]*saver.TCP
oldFlows map[FlowKey]*saver.TCP
status status

// Variables required for the construction of new Savers
maxDuration time.Duration
Expand All @@ -42,6 +43,17 @@ type TCP struct {
dataDir string
}

type status interface {
GC(stillPresent, discarded int)
}

type promStatus struct{}

func (promStatus) GC(stillPresent, discarded int) {
metrics.DemuxerGarbageCollected.Add(float64(discarded))
metrics.DemuxerSaverCount.Set(float64(stillPresent))
}

// GetSaver returns a saver with channels for packets and a uuid.
func (d *TCP) getSaver(ctx context.Context, flow FlowKey) *saver.TCP {
// Read the flow from the flows map, the oldFlows map, or create it.
Expand Down Expand Up @@ -100,6 +112,8 @@ func (d *TCP) collectGarbage() {
close(s.Pchan)
}
}(d.oldFlows)
// Record GC data.
d.status.GC(len(d.currentFlows), len(d.oldFlows))
// Advance the generation.
d.oldFlows = d.currentFlows
d.currentFlows = make(map[FlowKey]*saver.TCP)
Expand Down Expand Up @@ -149,6 +163,7 @@ func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFl

currentFlows: make(map[FlowKey]*saver.TCP),
oldFlows: make(map[FlowKey]*saver.TCP),
status: promStatus{},

anon: anon,
dataDir: dataDir,
Expand Down
46 changes: 37 additions & 9 deletions demuxer/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"github.com/m-lab/packet-headers/saver"
)

func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
}

type fakePacketSource struct {
packets []gopacket.Packet
c chan gopacket.Packet
Expand Down Expand Up @@ -55,12 +59,35 @@ func TestTCPDryRun(t *testing.T) {
// Does not run forever or crash == success
}

type statusTracker struct {
stillPresent, discarded int
mu sync.Mutex
}

func (s *statusTracker) GC(stillPresent, discarded int) {
s.mu.Lock()
defer s.mu.Unlock()
s.stillPresent = stillPresent
s.discarded = discarded
}

func (s *statusTracker) Get() statusTracker {
s.mu.Lock()
defer s.mu.Unlock()
return statusTracker{
stillPresent: s.stillPresent,
discarded: s.discarded,
}
}

func TestTCPWithRealPcaps(t *testing.T) {
dir, err := ioutil.TempDir("", "TestTCPWithRealPcaps")
rtx.Must(err, "Could not create directory")
defer os.RemoveAll(dir)

tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 500*time.Millisecond, time.Second)
st := &statusTracker{}
tcpdm.status = st
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

Expand Down Expand Up @@ -154,8 +181,9 @@ func TestTCPWithRealPcaps(t *testing.T) {
// Lose all race conditions again.
time.Sleep(100 * time.Millisecond)
// Verify that one flow was garbage collected.
if len(tcpdm.oldFlows) != 1 || len(tcpdm.currentFlows) != 0 {
t.Errorf("Should have 1 old flow, not %d and 0 currentFlows not %d", len(tcpdm.oldFlows), len(tcpdm.currentFlows))
s := st.Get()
if s.stillPresent != 1 || s.discarded != 1 {
t.Errorf("Should have 1 flow left and 1 flow collected, not %d and %d", s.stillPresent, s.discarded)
}
gc <- time.Now()
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -198,12 +226,12 @@ func TestTCPWithRealPcaps(t *testing.T) {

// After all that, also check that writes to an out-of-capacity Pchan will
// not block.
s := tcpdm.getSaver(ctx, flow1)
close(s.Pchan)
close(s.UUIDchan)
// This new channel assigned to s.Pchan will never be read, so if a blocking
sav := tcpdm.getSaver(ctx, flow1)
close(sav.Pchan)
close(sav.UUIDchan)
// This new channel assigned to sav.Pchan will never be read, so if a blocking
// write is performed then this goroutine will block.
s.Pchan = make(chan gopacket.Packet)
sav.Pchan = make(chan gopacket.Packet)
tcpdm.savePacket(ctx, flow1packets[0])
// If this doesn't block, then success!
}
Expand All @@ -230,19 +258,19 @@ func TestUUIDWontBlock(t *testing.T) {

var wg sync.WaitGroup
wg.Add(1)
gcTimer := make(chan time.Time)
go func() {
pChan := make(chan gopacket.Packet)
gcTimer := make(chan time.Time)
tcpdm.CapturePackets(ctx, pChan, gcTimer)
// Does not run forever or crash == success
wg.Done()
}()

// Write to the UUID channel 1000 times (more than exhausting its buffer)
for i := 0; i < 1000; i++ {
log.Println(i)
tcpdm.UUIDChan <- e
}
gcTimer <- time.Now()
// Lose all channel-read race conditions.
time.Sleep(100 * time.Millisecond)
// Ensure that reads of that channel never block. If the cancel() has an
Expand Down
12 changes: 12 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ var (
Help: "How many UUIDs has the demuxer been told about. Should match pcap_saver_starts_total very closely",
},
)
DemuxerGarbageCollected = promauto.NewCounter(
prometheus.CounterOpts{
Name: "pcap_demuxer_savers_garbage_collected_total",
Help: "How many flows have been garbage collected.",
},
)
DemuxerSaverCount = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "pcap_demuxer_savers_active",
Help: "How many savers were still active after the most recent garbage collection round",
},
)

BadEventsFromTCPInfo = promauto.NewCounterVec(
prometheus.CounterOpts{
Expand Down
9 changes: 8 additions & 1 deletion saver/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,26 +75,33 @@ type statusSetter interface {

type status struct {
status string
mu sync.Mutex
}

func newStatus(beginstate string) *status {
metrics.SaverCount.WithLabelValues(beginstate).Inc()
return &status{beginstate}
return &status{status: beginstate}
}

func (s *status) Set(newstatus string) {
s.mu.Lock()
defer s.mu.Unlock()
var oldstatus string
oldstatus, s.status = s.status, newstatus
metrics.SaverCount.WithLabelValues(oldstatus).Dec()
metrics.SaverCount.WithLabelValues(newstatus).Inc()
}

func (s *status) Done() {
s.mu.Lock()
defer s.mu.Unlock()
metrics.SaverCount.WithLabelValues(s.status).Dec()
s.status = "stopped"
}

func (s *status) Get() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.status
}

Expand Down
6 changes: 6 additions & 0 deletions saver/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/exec"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -61,9 +62,12 @@ func TestAnonymizationWontCrashOnNil(t *testing.T) {
type statusTracker struct {
status string
past []string
mu sync.Mutex
}

func (s *statusTracker) Set(state string) {
s.mu.Lock()
defer s.mu.Unlock()
if s.status == state {
return
}
Expand All @@ -76,6 +80,8 @@ func (s *statusTracker) Done() {
}

func (s *statusTracker) Get() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.status
}

Expand Down

0 comments on commit 5d65c74

Please sign in to comment.