Skip to content

Commit

Permalink
Add namedprocess_namegroup_tcp_socket_count metrics
Browse files Browse the repository at this point in the history
Some times, we need to know how many TCP socket are used by a process or
a group of process.
Adding this metrics allows to know precisely the number of socket
created and their states.
  • Loading branch information
xunleii committed Jan 26, 2024
1 parent 935906b commit af67d8b
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 63 deletions.
12 changes: 12 additions & 0 deletions collector/process_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ var (
[]string{"groupname", "state"},
nil)

processTcpSocketCountDesc = prometheus.NewDesc(
"namedprocess_namegroup_tcp_socket_count",
"Number of TCP sockets open by a process",
[]string{"groupname", "state"},
nil)

scrapeErrorsDesc = prometheus.NewDesc(
"namedprocess_scrape_errors",
"general scrape errors: no proc metrics collected during a cycle",
Expand Down Expand Up @@ -226,6 +232,7 @@ func (p *NamedProcessCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- scrapeErrorsDesc
ch <- scrapeProcReadErrorsDesc
ch <- scrapePartialErrorsDesc
ch <- processTcpSocketCountDesc
ch <- threadWchanDesc
ch <- threadCountDesc
ch <- threadCpuSecsDesc
Expand Down Expand Up @@ -301,6 +308,11 @@ func (p *NamedProcessCollector) scrape(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(statesDesc,
prometheus.GaugeValue, float64(gcounts.States.Other), gname, "Other")

for state, count := range gcounts.TCPSocketSummary {
ch <- prometheus.MustNewConstMetric(processTcpSocketCountDesc,
prometheus.GaugeValue, float64(count), gname, string(state))
}

for wchan, count := range gcounts.Wchans {
ch <- prometheus.MustNewConstMetric(threadWchanDesc,
prometheus.GaugeValue, float64(count), gname, wchan)
Expand Down
12 changes: 6 additions & 6 deletions proc/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,21 @@ func newProcParent(pid int, name string, ppid int) IDInfo {
return IDInfo{id, static, Metrics{}, nil}
}

func piinfot(pid int, name string, c Counts, m Memory, f Filedesc, threads []Thread) IDInfo {
pii := piinfo(pid, name, c, m, f, len(threads))
func piinfot(pid int, name string, c Counts, m Memory, f Filedesc, o TCPSocketSummary, threads []Thread) IDInfo {
pii := piinfo(pid, name, c, m, f, o, len(threads))
pii.Threads = threads
return pii
}

func piinfo(pid int, name string, c Counts, m Memory, f Filedesc, t int) IDInfo {
return piinfost(pid, name, c, m, f, t, States{})
func piinfo(pid int, name string, c Counts, m Memory, f Filedesc, o TCPSocketSummary, t int) IDInfo {
return piinfost(pid, name, c, m, f, o, t, States{})
}

func piinfost(pid int, name string, c Counts, m Memory, f Filedesc, t int, s States) IDInfo {
func piinfost(pid int, name string, c Counts, m Memory, f Filedesc, o TCPSocketSummary, t int, s States) IDInfo {
id, static := newProcIDStatic(pid, 0, 0, name, nil)
return IDInfo{
ID: id,
Static: static,
Metrics: Metrics{c, m, f, uint64(t), s, ""},
Metrics: Metrics{c, m, f, uint64(t), s, o, ""},
}
}
8 changes: 8 additions & 0 deletions proc/grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type (
Wchans map[string]int
Procs int
Memory
TCPSocketSummary
OldestStartTime time.Time
OpenFDs uint64
WorstFDratio float64
Expand Down Expand Up @@ -82,6 +83,13 @@ func groupadd(grp Group, ts Update) Group {
grp.OldestStartTime = ts.Start
}

if grp.TCPSocketSummary == nil {
grp.TCPSocketSummary = make(TCPSocketSummary, len(TCPSocketStates))
}
for state, count := range ts.TCPSocketSummary {
grp.TCPSocketSummary[state] += count
}

if grp.Wchans == nil {
grp.Wchans = make(map[string]int)
}
Expand Down
56 changes: 28 additions & 28 deletions proc/grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,29 @@ func TestGrouperBasic(t *testing.T) {
{
[]IDInfo{
piinfost(p1, n1, Counts{1, 2, 3, 4, 5, 6, 0, 0}, Memory{7, 8, 0, 0, 0},
Filedesc{4, 400}, 2, States{Other: 1}),
Filedesc{4, 400}, TCPSocketSummary{"CLOSED": 2}, 2, States{Other: 1}),
piinfost(p2, n2, Counts{2, 3, 4, 5, 6, 7, 0, 0}, Memory{8, 9, 0, 0, 0},
Filedesc{40, 400}, 3, States{Waiting: 1}),
Filedesc{40, 400}, TCPSocketSummary{"ESTABLISHED": 2}, 3, States{Waiting: 1}),
},
GroupByName{
"g1": Group{Counts{}, States{Other: 1}, msi{}, 1, Memory{7, 8, 0, 0, 0}, starttime,
"g1": Group{Counts{}, States{Other: 1}, msi{}, 1, Memory{7, 8, 0, 0, 0}, TCPSocketSummary{"CLOSED": 2}, starttime,
4, 0.01, 2, nil},
"g2": Group{Counts{}, States{Waiting: 1}, msi{}, 1, Memory{8, 9, 0, 0, 0}, starttime,
"g2": Group{Counts{}, States{Waiting: 1}, msi{}, 1, Memory{8, 9, 0, 0, 0}, TCPSocketSummary{"ESTABLISHED": 2}, starttime,
40, 0.1, 3, nil},
},
},
{
[]IDInfo{
piinfost(p1, n1, Counts{2, 3, 4, 5, 6, 7, 0, 0},
Memory{6, 7, 0, 0, 0}, Filedesc{100, 400}, 4, States{Zombie: 1}),
Memory{6, 7, 0, 0, 0}, Filedesc{100, 400}, TCPSocketSummary{"CLOSED": 2}, 4, States{Zombie: 1}),
piinfost(p2, n2, Counts{4, 5, 6, 7, 8, 9, 0, 0},
Memory{9, 8, 0, 0, 0}, Filedesc{400, 400}, 2, States{Running: 1}),
Memory{9, 8, 0, 0, 0}, Filedesc{400, 400}, TCPSocketSummary{"ESTABLISHED": 2}, 2, States{Running: 1}),
},
GroupByName{
"g1": Group{Counts{1, 1, 1, 1, 1, 1, 0, 0}, States{Zombie: 1}, msi{}, 1,
Memory{6, 7, 0, 0, 0}, starttime, 100, 0.25, 4, nil},
Memory{6, 7, 0, 0, 0}, TCPSocketSummary{"CLOSED": 2}, starttime, 100, 0.25, 4, nil},
"g2": Group{Counts{2, 2, 2, 2, 2, 2, 0, 0}, States{Running: 1}, msi{}, 1,
Memory{9, 8, 0, 0, 0}, starttime, 400, 1, 2, nil},
Memory{9, 8, 0, 0, 0}, TCPSocketSummary{"ESTABLISHED": 2}, starttime, 400, 1, 2, nil},
},
},
}
Expand All @@ -95,35 +95,35 @@ func TestGrouperProcJoin(t *testing.T) {
}{
{
[]IDInfo{
piinfo(p1, n1, Counts{1, 2, 3, 4, 5, 6, 0, 0}, Memory{3, 4, 0, 0, 0}, Filedesc{4, 400}, 2),
piinfo(p1, n1, Counts{1, 2, 3, 4, 5, 6, 0, 0}, Memory{3, 4, 0, 0, 0}, Filedesc{4, 400}, TCPSocketSummary{"CLOSED": 2}, 2),
},
GroupByName{
"g1": Group{Counts{}, States{}, msi{}, 1, Memory{3, 4, 0, 0, 0}, starttime, 4, 0.01, 2, nil},
"g1": Group{Counts{}, States{}, msi{}, 1, Memory{3, 4, 0, 0, 0}, TCPSocketSummary{"CLOSED": 2}, starttime, 4, 0.01, 2, nil},
},
}, {
// The counts for pid2 won't be factored into the total yet because we only add
// to counts starting with the second time we see a proc. Memory and FDs are
// affected though.
[]IDInfo{
piinfost(p1, n1, Counts{3, 4, 5, 6, 7, 8, 0, 0},
Memory{3, 4, 0, 0, 0}, Filedesc{4, 400}, 2, States{Running: 1}),
Memory{3, 4, 0, 0, 0}, Filedesc{4, 400}, TCPSocketSummary{"CLOSED": 2}, 2, States{Running: 1}),
piinfost(p2, n2, Counts{1, 1, 1, 1, 1, 1, 0, 0},
Memory{1, 2, 0, 0, 0}, Filedesc{40, 400}, 3, States{Sleeping: 1}),
Memory{1, 2, 0, 0, 0}, Filedesc{40, 400}, TCPSocketSummary{"CLOSED": 2}, 3, States{Sleeping: 1}),
},
GroupByName{
"g1": Group{Counts{2, 2, 2, 2, 2, 2, 0, 0}, States{Running: 1, Sleeping: 1}, msi{}, 2,
Memory{4, 6, 0, 0, 0}, starttime, 44, 0.1, 5, nil},
Memory{4, 6, 0, 0, 0}, TCPSocketSummary{"CLOSED": 4}, starttime, 44, 0.1, 5, nil},
},
}, {
[]IDInfo{
piinfost(p1, n1, Counts{4, 5, 6, 7, 8, 9, 0, 0},
Memory{1, 5, 0, 0, 0}, Filedesc{4, 400}, 2, States{Running: 1}),
Memory{1, 5, 0, 0, 0}, Filedesc{4, 400}, TCPSocketSummary{"CLOSED": 2}, 2, States{Running: 1}),
piinfost(p2, n2, Counts{2, 2, 2, 2, 2, 2, 0, 0},
Memory{2, 4, 0, 0, 0}, Filedesc{40, 400}, 3, States{Running: 1}),
Memory{2, 4, 0, 0, 0}, Filedesc{40, 400}, TCPSocketSummary{"CLOSED": 2}, 3, States{Running: 1}),
},
GroupByName{
"g1": Group{Counts{4, 4, 4, 4, 4, 4, 0, 0}, States{Running: 2}, msi{}, 2,
Memory{3, 9, 0, 0, 0}, starttime, 44, 0.1, 5, nil},
Memory{3, 9, 0, 0, 0}, TCPSocketSummary{"CLOSED": 4}, starttime, 44, 0.1, 5, nil},
},
},
}
Expand All @@ -150,23 +150,23 @@ func TestGrouperNonDecreasing(t *testing.T) {
}{
{
[]IDInfo{
piinfo(p1, n1, Counts{3, 4, 5, 6, 7, 8, 0, 0}, Memory{3, 4, 0, 0, 0}, Filedesc{4, 400}, 2),
piinfo(p2, n2, Counts{1, 1, 1, 1, 1, 1, 0, 0}, Memory{1, 2, 0, 0, 0}, Filedesc{40, 400}, 3),
piinfo(p1, n1, Counts{3, 4, 5, 6, 7, 8, 0, 0}, Memory{3, 4, 0, 0, 0}, Filedesc{4, 400}, TCPSocketSummary{"CLOSED": 2}, 2),
piinfo(p2, n2, Counts{1, 1, 1, 1, 1, 1, 0, 0}, Memory{1, 2, 0, 0, 0}, Filedesc{40, 400}, TCPSocketSummary{"CLOSED": 2}, 3),
},
GroupByName{
"g1": Group{Counts{}, States{}, msi{}, 2, Memory{4, 6, 0, 0, 0}, starttime, 44, 0.1, 5, nil},
"g1": Group{Counts{}, States{}, msi{}, 2, Memory{4, 6, 0, 0, 0}, TCPSocketSummary{"CLOSED": 4}, starttime, 44, 0.1, 5, nil},
},
}, {
[]IDInfo{
piinfo(p1, n1, Counts{4, 5, 6, 7, 8, 9, 0, 0}, Memory{1, 5, 0, 0, 0}, Filedesc{4, 400}, 2),
piinfo(p1, n1, Counts{4, 5, 6, 7, 8, 9, 0, 0}, Memory{1, 5, 0, 0, 0}, Filedesc{4, 400}, TCPSocketSummary{"CLOSED": 2}, 2),
},
GroupByName{
"g1": Group{Counts{1, 1, 1, 1, 1, 1, 0, 0}, States{}, msi{}, 1, Memory{1, 5, 0, 0, 0}, starttime, 4, 0.01, 2, nil},
"g1": Group{Counts{1, 1, 1, 1, 1, 1, 0, 0}, States{}, msi{}, 1, Memory{1, 5, 0, 0, 0}, TCPSocketSummary{"CLOSED": 2}, starttime, 4, 0.01, 2, nil},
},
}, {
[]IDInfo{},
GroupByName{
"g1": Group{Counts{1, 1, 1, 1, 1, 1, 0, 0}, States{}, nil, 0, Memory{}, time.Time{}, 0, 0, 0, nil},
"g1": Group{Counts{1, 1, 1, 1, 1, 1, 0, 0}, States{}, nil, 0, Memory{}, nil, time.Time{}, 0, 0, 0, nil},
},
},
}
Expand All @@ -188,35 +188,35 @@ func TestGrouperThreads(t *testing.T) {
want GroupByName
}{
{
piinfot(p, n, Counts{}, Memory{}, Filedesc{1, 1}, []Thread{
piinfot(p, n, Counts{}, Memory{}, Filedesc{1, 1}, TCPSocketSummary{"CLOSED": 2}, []Thread{
{ThreadID(ID{p, 0}), "t1", Counts{1, 2, 3, 4, 5, 6, 0, 0}, "", States{}},
{ThreadID(ID{p + 1, 0}), "t2", Counts{1, 1, 1, 1, 1, 1, 0, 0}, "", States{}},
}),
GroupByName{
"g1": Group{Counts{}, States{}, msi{}, 1, Memory{}, tm, 1, 1, 2, []Threads{
"g1": Group{Counts{}, States{}, msi{}, 1, Memory{}, TCPSocketSummary{"CLOSED": 2}, tm, 1, 1, 2, []Threads{
Threads{"t1", 1, Counts{}},
Threads{"t2", 1, Counts{}},
}},
},
}, {
piinfot(p, n, Counts{}, Memory{}, Filedesc{1, 1}, []Thread{
piinfot(p, n, Counts{}, Memory{}, Filedesc{1, 1}, TCPSocketSummary{"CLOSED": 2}, []Thread{
{ThreadID(ID{p, 0}), "t1", Counts{2, 3, 4, 5, 6, 7, 0, 0}, "", States{}},
{ThreadID(ID{p + 1, 0}), "t2", Counts{2, 2, 2, 2, 2, 2, 0, 0}, "", States{}},
{ThreadID(ID{p + 2, 0}), "t2", Counts{1, 1, 1, 1, 1, 1, 0, 0}, "", States{}},
}),
GroupByName{
"g1": Group{Counts{}, States{}, msi{}, 1, Memory{}, tm, 1, 1, 3, []Threads{
"g1": Group{Counts{}, States{}, msi{}, 1, Memory{}, TCPSocketSummary{"CLOSED": 2}, tm, 1, 1, 3, []Threads{
Threads{"t1", 1, Counts{1, 1, 1, 1, 1, 1, 0, 0}},
Threads{"t2", 2, Counts{1, 1, 1, 1, 1, 1, 0, 0}},
}},
},
}, {
piinfot(p, n, Counts{}, Memory{}, Filedesc{1, 1}, []Thread{
piinfot(p, n, Counts{}, Memory{}, Filedesc{1, 1}, TCPSocketSummary{"CLOSED": 2}, []Thread{
{ThreadID(ID{p + 1, 0}), "t2", Counts{4, 4, 4, 4, 4, 4, 0, 0}, "", States{}},
{ThreadID(ID{p + 2, 0}), "t2", Counts{2, 3, 4, 5, 6, 7, 0, 0}, "", States{}},
}),
GroupByName{
"g1": Group{Counts{}, States{}, msi{}, 1, Memory{}, tm, 1, 1, 2, []Threads{
"g1": Group{Counts{}, States{}, msi{}, 1, Memory{}, TCPSocketSummary{"CLOSED": 2}, tm, 1, 1, 2, []Threads{
Threads{"t2", 2, Counts{4, 5, 6, 7, 8, 9, 0, 0}},
}},
},
Expand Down
66 changes: 60 additions & 6 deletions proc/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,26 @@ import (
"github.com/prometheus/procfs"
)

// ErrProcNotExist indicates a process couldn't be read because it doesn't exist,
// typically because it disappeared while we were reading it.
var ErrProcNotExist = fmt.Errorf("process does not exist")
var (
// ErrProcNotExist indicates a process couldn't be read because it doesn't exist,
// typically because it disappeared while we were reading it.
ErrProcNotExist = fmt.Errorf("process does not exist")

// List of TCP socket states from https://github.com/torvalds/linux/blob/master/include/net/tcp_states.h
TCPSocketStates = [11]tcpSocketState{
"ESTABLISHED",
"SYN_SENT",
"SYN_RECV",
"FIN_WAIT1",
"FIN_WAIT2",
"TIME_WAIT",
"CLOSE",
"CLOSE_WAIT",
"LAST_ACK",
"LISTEN",
"CLOSING",
}
)

type (
// ID uniquely identifies a process.
Expand Down Expand Up @@ -74,13 +91,21 @@ type (
Other int
}

// tcpSocketState represents the state of a TCP socket.
tcpSocketState string

// TCPSocketSummary contains a summary of TCP socket states
// used by a process.
TCPSocketSummary map[tcpSocketState]int

// Metrics contains data read from /proc/pid/*
Metrics struct {
Counts
Memory
Filedesc
NumThreads uint64
States
TCPSocketSummary
Wchan string
}

Expand Down Expand Up @@ -495,6 +520,11 @@ func (p proc) GetMetrics() (Metrics, int, error) {
softerrors |= 1
}

tcpSocketSummary, err := p.getTCPSocketSummary()
if err != nil {
softerrors |= 1
}

limits, err := p.Proc.NewLimits()
if err != nil {
return Metrics{}, 0, err
Expand Down Expand Up @@ -528,9 +558,10 @@ func (p proc) GetMetrics() (Metrics, int, error) {
Open: int64(numfds),
Limit: uint64(limits.OpenFiles),
},
NumThreads: uint64(stat.NumThreads),
States: states,
Wchan: wchan,
NumThreads: uint64(stat.NumThreads),
States: states,
TCPSocketSummary: tcpSocketSummary,
Wchan: wchan,
}, softerrors, nil
}

Expand Down Expand Up @@ -583,6 +614,29 @@ func (p proc) GetThreads() ([]Thread, error) {
return threads, nil
}

func (p proc) getTCPSocketSummary() (TCPSocketSummary, error) {
pid, err := p.GetProcID()
if err != nil {
return TCPSocketSummary{}, err
}

sockets, err := NewNetIPSocket(fmt.Sprintf("/proc/%d/net/tcp", pid.Pid))
if err != nil {
return TCPSocketSummary{}, err
}

summary := TCPSocketSummary{}
for _, socket := range sockets {
if socket.St >= uint64(len(TCPSocketStates)) {
continue
}

summary[TCPSocketStates[socket.St]]++
}

return summary, nil
}

// See https://github.com/prometheus/procfs/blob/master/proc_stat.go for details on userHZ.
const userHZ = 100

Expand Down
Loading

0 comments on commit af67d8b

Please sign in to comment.