/
proc_status_name.go
132 lines (120 loc) · 2.66 KB
/
proc_status_name.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package proc_status_name
import (
"context"
"errors"
"os"
"regexp"
"sync"
"github.com/k1LoW/grouped_process_exporter/grouped_proc"
"github.com/k1LoW/grouped_process_exporter/metric"
"github.com/prometheus/procfs"
"golang.org/x/sync/semaphore"
)
type ProcStatusName struct {
nRe *regexp.Regexp // normalize regexp
eRe *regexp.Regexp // exclude regexp
procMountPoint string
}
func (g *ProcStatusName) Name() string {
return "proc_status_name"
}
func (g *ProcStatusName) Collect(gprocs *grouped_proc.GroupedProcs, enabled map[metric.MetricKey]bool, sem *semaphore.Weighted) error {
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_ = sem.Acquire(ctx, 1)
fs, err := procfs.NewFS(g.procMountPoint)
if err != nil {
sem.Release(1)
return err
}
procs, err := fs.AllProcs()
if err != nil {
sem.Release(1)
return err
}
sem.Release(1)
for _, proc := range procs {
_ = sem.Acquire(ctx, 1)
status, err := proc.NewStatus()
sem.Release(1)
if err != nil {
// TODO: Log
continue
}
// collect process only
if status.PID != status.TGID {
continue
}
pid := proc.PID
name := status.Name
if g.eRe != nil {
if g.eRe.MatchString(name) {
continue
}
}
if g.nRe != nil {
matches := g.nRe.FindStringSubmatch(name)
if len(matches) > 1 {
name = matches[1]
}
}
var (
gproc *grouped_proc.GroupedProc
ok bool
)
gproc, ok = gprocs.Load(name)
if !ok {
gproc = grouped_proc.NewGroupedProc(enabled)
gprocs.Store(name, gproc)
}
if err := gproc.Collect(name); err != nil {
return err
}
gproc.Exists = true
wg.Add(1)
_ = sem.Acquire(ctx, gproc.RequiredWeight)
go func(wg *sync.WaitGroup, pid int, gproc *grouped_proc.GroupedProc) {
_ = gproc.AppendProcAndCollect(pid)
sem.Release(gproc.RequiredWeight)
wg.Done()
}(wg, pid, gproc)
}
wg.Wait()
return nil
}
func (g *ProcStatusName) SetNormalizeRegexp(nReStr string) error {
if nReStr == "" {
return nil
}
nRe, err := regexp.Compile(nReStr)
if err != nil {
return err
}
if nRe.NumSubexp() != 1 {
return errors.New("number of parenthesized subexpressions in this regexp should be 1")
}
g.nRe = nRe
return nil
}
func (g *ProcStatusName) SetExcludeRegexp(eReStr string) error {
if eReStr == "" {
return nil
}
eRe, err := regexp.Compile(eReStr)
if err != nil {
return err
}
g.eRe = eRe
return nil
}
// NewProcStatusName
func NewProcStatusName() *ProcStatusName {
procMountPoint := os.Getenv("GROUPED_PROCESS_PROC_MOUNT_POINT")
if procMountPoint == "" {
procMountPoint = procfs.DefaultMountPoint
}
return &ProcStatusName{
procMountPoint: procMountPoint,
}
}