-
Notifications
You must be signed in to change notification settings - Fork 23
/
local.go
87 lines (75 loc) · 1.78 KB
/
local.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package aws
import (
"bufio"
"os"
"github.com/kentik/ktranslate/pkg/kt"
)
func (vpc *AwsVpc) handleLocal(file string) error {
f, err := os.Open(file)
if err != nil {
return err
}
defer f.Close()
res := make([]*AWSLogLine, 0)
size := int64(0)
scanBuf := make([]byte, 1024*1024)
scanner := bufio.NewScanner(f)
if size > bufio.MaxScanTokenSize {
scanner.Buffer(scanBuf, len(scanBuf)*1024)
}
lineMap := AwsLineMap{}
for scanner.Scan() {
rawLine := scanner.Text()
lines, lineMapOut, err := NewAws(lineMap, &rawLine, vpc)
if err != nil {
if len(rawLine) > 80 {
vpc.Errorf("Error reading line: %v -> %s", err, rawLine[0:80])
} else {
vpc.Errorf("Error reading line: %v -> %s", err, rawLine)
}
} else {
if lineMapOut != nil {
lineMap = lineMapOut
}
if lines != nil {
res = append(res, lines...)
}
}
}
if err := scanner.Err(); err != nil {
vpc.Warnf("Could not scan %v", err)
return err
}
if len(res) > 0 {
record := FlowSet{
Lines: res,
}
// Pull out all the info we can from the key path.
err := record.ProcessKey("", file)
if err != nil {
vpc.Warnf("Could not process %s: %v.", file, err)
return err
}
// Send this record on to be processed.
select {
case vpc.recs <- &record:
vpc.metrics.Flows.Mark(int64(len(record.Lines)))
default:
vpc.metrics.DroppedFlows.Mark(int64(len(record.Lines)))
}
} else {
vpc.Warnf("No flow data devices found for %s.", file)
}
// Get the record back, turn it into flow.
rec := <-vpc.recs
dst := make([]*kt.JCHF, len(rec.Lines))
vpc.Debugf("Found %d logs to send", len(rec.Lines))
for i, l := range rec.Lines {
dst[i] = l.ToFlow(vpc, vpc.topo)
}
vpc.Infof("Ready to send %d lines", len(dst))
if len(dst) > 0 {
vpc.Infof("Row: %v", dst[0])
}
return nil
}