Skip to content

Commit

Permalink
Simplify currentHeapAboveThreshold using maxHeap
Browse files Browse the repository at this point in the history
  • Loading branch information
stephen-soltesz committed Nov 20, 2022
1 parent 04e7c52 commit 3d98d4f
Showing 1 changed file with 21 additions and 22 deletions.
43 changes: 21 additions & 22 deletions demuxer/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/m-lab/packet-headers/metrics"
"github.com/m-lab/packet-headers/saver"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/procfs"
)

// UUIDEvent is the datatype sent to a demuxer's UUIDChan to notify it about the
Expand Down Expand Up @@ -48,7 +47,7 @@ type TCP struct {
anon anonymize.IPAnonymizer
dataDir string
stream bool
maxRSSkb int
maxHeap uint64
}

type status interface {
Expand All @@ -62,23 +61,11 @@ func (promStatus) GC(stillPresent, discarded int) {
metrics.DemuxerSaverCount.Set(float64(stillPresent))
}

func currentRSSBelowThreshold(maxRSSkb int) (bool, error) {
p, err := procfs.Self()
if err != nil {
// This is a strange situation. The process cannot read /proc info about itself.
return false, err
}
s, err := p.Stat()
if err != nil {
// Again, this is out of the ordinary. The process cannot read /proc info about itself.
return false, err
}
// RSS is in pages. Assuming 4KB pages, return in KB.
if s.RSS*4 > maxRSSkb {
// count events.
return false, nil
}
return true, nil
func currentHeapAboveThreshold(maxHeap uint64) bool {
ms := runtime.MemStats{}
runtime.ReadMemStats(&ms)
// log.Println(ms.HeapAlloc, ">", maxHeap, "==", ms.HeapAlloc > maxHeap)
return (ms.HeapAlloc > maxHeap)
}

// GetSaver returns a saver with channels for packets and a uuid.
Expand All @@ -92,7 +79,19 @@ func (d *TCP) getSaver(ctx context.Context, flow FlowKey) *saver.TCP {
if ok {
delete(d.oldFlows, flow)
} else {
if below, err := currentRSSBelowThreshold(d.maxRSSkb); err != nil || !below {

// Only create a new Saver if we are below the configured maxHeap
// threshold. This condition is required because a SYN flood can
// cause packet-headers to allocate memory faster than partial
// connection timeouts allow the garbage collector to recover used
// memory. The result in that case would be RAM exhaustion.
//
// NOTE: When we are above the maxHeap threshold, we may lose some
// legitimate measurements. This check is a load shedding strategy
// to keep the process running and prevent resource usage from
// packet-headers to spill over into other components.
if currentHeapAboveThreshold(d.maxHeap) {
// TODO: count events.
return nil
}
t = saver.StartNew(ctx, d.anon, d.dataDir, d.uuidWaitDuration, d.maxDuration, flow.Format(d.anon), d.stream)
Expand Down Expand Up @@ -197,7 +196,7 @@ func (d *TCP) CapturePackets(ctx context.Context, packets <-chan gopacket.Packet

// NewTCP creates a demuxer.TCP, which is the system which chooses which channel
// to send TCP/IP packets for subsequent saving to a file.
func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFlowDuration time.Duration, maxIdleRAM bytecount.ByteCount, stream bool, maxRSSkb int) *TCP {
func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFlowDuration time.Duration, maxIdleRAM bytecount.ByteCount, stream bool, maxHeap uint64) *TCP {
uuidc := make(chan UUIDEvent, 100)
return &TCP{
UUIDChan: uuidc,
Expand All @@ -213,6 +212,6 @@ func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFl
maxDuration: maxFlowDuration,
uuidWaitDuration: uuidWaitDuration,
stream: stream,
maxRSSkb: maxRSSkb,
maxHeap: maxHeap,
}
}

0 comments on commit 3d98d4f

Please sign in to comment.