forked from Griesbacher/nagflux
-
Notifications
You must be signed in to change notification settings - Fork 0
/
nagfluxFileCollector.go
155 lines (142 loc) · 4.12 KB
/
nagfluxFileCollector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package nagflux
import (
"encoding/csv"
"github.com/griesbacher/nagflux/collector"
"github.com/griesbacher/nagflux/collector/spoolfile"
"github.com/griesbacher/nagflux/config"
"github.com/griesbacher/nagflux/helper"
"github.com/griesbacher/nagflux/logging"
"github.com/kdar/factorlog"
"os"
"time"
)
//FileCollector provides a interface to nagflux, in which you could insert influxdb queries.
type FileCollector struct {
quit chan bool
results collector.ResultQueues
folder string
log *factorlog.FactorLog
fieldSeparator rune
}
/*
table&target&time&f_value&t_foo
test&all&1489474756000&1.0&"""bar"""
*/
var requiredFields = []string{"table", "time"}
var optionalFields = []string{"target"}
//NewNagfluxFileCollector constructor, which also starts the collector.
func NewNagfluxFileCollector(results collector.ResultQueues, folder string, fieldSeparator rune) *FileCollector {
s := &FileCollector{
quit: make(chan bool, 1),
results: results,
folder: folder,
log: logging.GetLogger(),
fieldSeparator: fieldSeparator,
}
go s.run()
return s
}
//Stop stops the Collector.
func (nfc *FileCollector) Stop() {
nfc.quit <- true
<-nfc.quit
nfc.log.Debug("NagfluxFileCollector stoped")
}
//Checks if the files are old enough, if so they will be added in the queue
func (nfc FileCollector) run() {
for {
select {
case <-nfc.quit:
nfc.quit <- true
return
case <-time.After(spoolfile.IntervalToCheckDirectory):
pause := config.IsAnyTargetOnPause()
if pause {
logging.GetLogger().Debugln("NagfluxFileCollector in pause")
continue
}
for _, currentFile := range spoolfile.FilesInDirectoryOlderThanX(nfc.folder, spoolfile.MinFileAge) {
logging.GetLogger().Debug("Reading file: ", currentFile)
for _, p := range nfc.parseFile(currentFile) {
for _, r := range nfc.results {
select {
case <-nfc.quit:
nfc.quit <- true
return
case r <- p:
case <-time.After(time.Duration(1) * time.Minute):
nfc.log.Warn("NagfluxFileCollector: Could not write to buffer")
}
}
}
err := os.Remove(currentFile)
if err != nil {
logging.GetLogger().Warn(err)
}
}
}
}
}
func (nfc FileCollector) parseFile(filename string) []Printable {
result := []Printable{}
csvfile, err := os.Open(filename)
if err != nil {
nfc.log.Warn(err)
return result
}
defer csvfile.Close()
reader := csv.NewReader(csvfile)
reader.Comma = nfc.fieldSeparator
records, err := reader.ReadAll()
if err != nil {
nfc.log.Warn(err)
return result
}
if !helper.Contains(records[0], requiredFields) {
nfc.log.Warnf("The file %s doesn't contain all of these fields: %s", filename, requiredFields)
return result
}
tagIndices := map[int]string{}
fieldIndices := map[int]string{}
for i, v := range records[0] {
if len(v) > 1 && v[:2] == "t_" {
tagIndices[i] = v[2:]
} else if len(v) > 1 && v[:2] == "f_" {
fieldIndices[i] = v[2:]
} else if helper.Contains(requiredFields, []string{v}) {
continue
} else if helper.Contains(optionalFields, []string{v}) {
continue
} else {
nfc.log.Warnf("This column does not fit the requirements: %s. Tags should start with t_, fields with f_", v)
}
}
for i, r := range records {
if i == 0 {
continue
}
currentPrintable := Printable{tags: map[string]string{}, fields: map[string]string{}}
for i, v := range r {
if v != "" {
if records[0][i] == requiredFields[0] {
currentPrintable.Table = v
} else if records[0][i] == requiredFields[1] {
currentPrintable.Timestamp = v
} else if records[0][i] == optionalFields[0] {
currentPrintable.Filterable = collector.Filterable{Filter: v}
} else if val, ok := tagIndices[i]; ok {
currentPrintable.tags[val] = v
} else if val, ok := fieldIndices[i]; ok {
currentPrintable.fields[val] = v
} else {
nfc.log.Warnf("This should not happen: %s->%s", records[0][i], v)
}
}
}
if currentPrintable.Filterable == collector.EmptyFilterable {
currentPrintable.Filterable = collector.AllFilterable
}
result = append(result, currentPrintable)
}
return result
}