-
Notifications
You must be signed in to change notification settings - Fork 0
/
mgroup-processor.go
127 lines (115 loc) · 4.01 KB
/
mgroup-processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package agent
import (
"sync"
"time"
"github.com/toni-moreno/oracle_collector/pkg/agent/data"
"github.com/toni-moreno/oracle_collector/pkg/agent/oracle"
"github.com/toni-moreno/oracle_collector/pkg/agent/output"
"github.com/toni-moreno/oracle_collector/pkg/agent/selfmon"
"github.com/toni-moreno/oracle_collector/pkg/config"
)
type MGroupProcessor struct {
InstanceList *oracle.InstanceList
OracleInstances []*oracle.OracleInstance
cfg *config.OracleMetricGroupConfig
InstNames []string
}
func InitGroupProcessor(cfg *config.OracleMetricGroupConfig, oralist *oracle.InstanceList) *MGroupProcessor {
ret := MGroupProcessor{
InstanceList: oralist,
cfg: cfg,
}
return &ret
}
func (mgp *MGroupProcessor) UpdateInstances() int {
mgp.InstNames = nil
var filtered []*oracle.OracleInstance
// TODO: review if needed filter Instances by instance STATUS (OPEN)??
// may some configurable queries could be donde with instance status (MOUNT)??
instances := mgp.InstanceList.GetList()
if len(mgp.cfg.InstanceFilter) != 0 {
filtered = mgp.InstanceList.GetFilteredListBySid(mgp.cfg.InstanceFilter)
} else {
filtered = instances
for _, i := range instances {
mgp.InstNames = append(mgp.InstNames, i.GetInstanceName())
}
}
mgp.OracleInstances = filtered
mgp.InstNames = oracle.GetSidNames(mgp.OracleInstances)
ntotal := len(instances)
nfilter := len(filtered)
log.Debugf("[COLLECTOR] On update Number instances total [%d] Filtered [%d]", ntotal, nfilter)
return nfilter
}
// lt = lessThan
// goet = greater or equal than
func checkVersions(i *oracle.OracleInstance, goet, lt string) (string, bool) {
switch {
case len(lt) > 0 && len(goet) > 0:
return i.CheckVersionBetween(goet, lt)
case len(lt) > 0:
return i.CheckVersionLessThan(lt)
case len(goet) > 0:
return i.CheckVersionGreaterThanOrEqual(goet)
}
return "", true
}
func (mgp *MGroupProcessor) ProcesQuery() {
n := mgp.UpdateInstances()
mgp.BroadCastInfof("Init Query Process on [%d] Instances [%+v] ", n, mgp.InstNames)
log.Infof("[COLLECTOR] Processor [%s] new Iteration on [%d] Instances [%+v]", mgp.cfg.Name, n, mgp.InstNames)
for _, i := range mgp.OracleInstances {
// check if this instance should be queried
if mgp.cfg.QueryLevel == "db" && !i.GetIsValidForDBQuery() {
mgp.Infof(i, "QUERY IN DB MODE: SKIP querying instance %s : not smalest Instance in DB (Current %d)", i.InstInfo.InstName, i.InstInfo.InstNumber)
continue
}
extraLabels := i.GetExtraLabels()
for _, q := range mgp.cfg.OracleMetrics {
// check version affinity
v, match := checkVersions(i, q.OraVerGreaterOrEqualThan, q.OraVerLessThan)
if !match {
mgp.Infof(i, "Metric Query: [%s] | version filter [ %s, %s ): NOT MATCH IN Instance [%s]version[%s]", q.Context, q.OraVerGreaterOrEqualThan, q.OraVerLessThan, i.GetInstanceName(), v)
continue
}
mgp.Debugf(i, "Begin Metric Query: [%s]", q.Context)
table := data.NewDatatableWithConfig(q)
n, d, err := i.Query(mgp.cfg.QueryTimeout, q.Request, table)
if err != nil {
mgp.Errorf(i, "Error on query: %s (Duration: %s)", err, d)
continue
}
mgp.Infof(i, "Oracle Metric Query: [%s] returned [%d] rows (Transposed by: %s)(Duration: %s)", q.Context, n, q.FieldToAppend, d)
// Data transformation.
metrics, err := table.GetMetrics(extraLabels)
if err != nil {
mgp.Warnf(i, "Oracle Metric Query: [%s] Error on metric transformation: %s", q.Context, err)
continue
}
output.SendMetrics(metrics)
selfmon.SendQueryStat(extraLabels, mgp.cfg, q, n, d)
}
}
}
func (mgp *MGroupProcessor) StartCollection(done chan bool, s *sync.WaitGroup) {
s.Add(1)
go func() {
defer s.Done()
qTicker := time.NewTicker(mgp.cfg.QueryPeriod)
defer qTicker.Stop()
first := make(chan bool, 1)
first <- true
for {
select {
case <-first:
log.Infof("[COLLECTOR] Start Query Processor for Group: %s ( Period: %s )", mgp.cfg.Name, mgp.cfg.QueryPeriod.String())
mgp.ProcesQuery()
case <-qTicker.C:
mgp.ProcesQuery()
case <-done:
return
}
}
}()
}